Skip to content

Commit 82376c7

Browse files
authored
lightning: fix id too large after parallel import (#57398) (#57516)
close #56814
1 parent 5cf2682 commit 82376c7

33 files changed

+587
-325
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ br_bins:
530530

531531
.PHONY: data_parsers
532532
data_parsers: tools/bin/vfsgendev pkg/lightning/mydump/parser_generated.go lightning_web
533-
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
533+
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOMODCACHE)" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
534534
tools/bin/vfsgendev -source='"github.com/pingcap/tidb/lightning/pkg/web".Res' && mv res_vfsdata.go lightning/pkg/web/
535535

536536
.PHONY: build_dumpling

Makefile.common

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
PROJECT=tidb
1616
GOPATH ?= $(shell go env GOPATH)
17+
GOMODCACHE ?= $(shell go env GOMODCACHE)
1718
P=8
1819

1920
# Ensure GOPATH is set before running build process.
@@ -132,4 +133,4 @@ ifneq ("$(CI)", "")
132133
endif
133134
BAZEL_INSTRUMENTATION_FILTER := --instrument_test_targets --instrumentation_filter=//pkg/...,//br/...,//dumpling/...
134135

135-
NOGO_FLAG=true
136+
NOGO_FLAG=true

lightning/pkg/importer/get_pre_info.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
618618
if err != nil {
619619
return 0.0, false, errors.Trace(err)
620620
}
621-
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0)
621+
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc())
622622
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)
623623
if err != nil {
624624
return 0.0, false, errors.Trace(err)

lightning/pkg/importer/import.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1979,20 +1979,17 @@ type deliverResult struct {
19791979
}
19801980

19811981
func saveCheckpoint(rc *Controller, t *TableImporter, engineID int32, chunk *checkpoints.ChunkCheckpoint) {
1982-
// We need to update the AllocBase every time we've finished a file.
1983-
// The AllocBase is determined by the maximum of the "handle" (_tidb_rowid
1984-
// or integer primary key), which can only be obtained by reading all data.
1985-
1986-
var base int64
1987-
if t.tableInfo.Core.ContainsAutoRandomBits() {
1988-
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1
1989-
} else {
1990-
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1
1991-
}
1982+
// we save the XXXBase every time a chunk is finished.
1983+
// Note, it's possible some chunk with larger autoID range finished first, so
1984+
// the saved XXXBase is larger, when chunks with smaller autoID range finished
1985+
// it might have no effect on the saved XXXBase, but it's OK, we only need
1986+
// the largest.
19921987
rc.saveCpCh <- saveCp{
19931988
tableName: t.tableName,
19941989
merger: &checkpoints.RebaseCheckpointMerger{
1995-
AllocBase: base,
1990+
AutoRandBase: t.alloc.Get(autoid.AutoRandomType).Base(),
1991+
AutoIncrBase: t.alloc.Get(autoid.AutoIncrementType).Base(),
1992+
AutoRowIDBase: t.alloc.Get(autoid.RowIDAllocType).Base(),
19961993
},
19971994
}
19981995
rc.saveCpCh <- saveCp{

lightning/pkg/importer/meta_manager.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr {
9393

9494
type tableMetaMgr interface {
9595
InitTableMeta(ctx context.Context) error
96-
AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error)
96+
AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error)
9797
UpdateTableStatus(ctx context.Context, status metaStatus) error
9898
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error
9999
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
@@ -177,7 +177,7 @@ func parseMetaStatus(s string) (metaStatus, error) {
177177
}
178178
}
179179

180-
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) {
180+
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error) {
181181
conn, err := m.session.Conn(ctx)
182182
if err != nil {
183183
return nil, 0, errors.Trace(err)
@@ -188,22 +188,31 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
188188
DB: conn,
189189
Logger: m.tr.logger,
190190
}
191-
var newRowIDBase, newRowIDMax int64
192-
curStatus := metaStatusInitial
191+
// (myStartRowID, myEndRowID] is the range of row_id that current instance
192+
// can use to encode the table.
193+
var myStartRowID, myEndRowID int64
194+
myStatus := metaStatusInitial
193195
newStatus := metaStatusRowIDAllocated
194196
var baseTotalKvs, baseTotalBytes, baseChecksum uint64
195197
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
196198
if err != nil {
197199
return nil, 0, errors.Annotate(err, "enable pessimistic transaction failed")
198200
}
199201

200-
needAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
202+
hasAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
201203
tableChecksumingMsg := "Target table is calculating checksum. Please wait until the checksum is finished and try again."
202204
doAllocTableRowIDsFn := func() error {
203205
return exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
206+
// lightning follows below calling sequence, so at most one client
207+
// can execute the code after the FOR UPDATE part for some table,
208+
// even though FOR UPDATE only lock rows that matches the condition:
209+
// - insert into table_meta with key (table_id, task_id)
210+
// - try lock with FOR UPDATE
204211
rows, err := tx.QueryContext(
205212
ctx,
206-
common.SprintfWithIdentifiers("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status FROM %s.%s WHERE table_id = ? FOR UPDATE", m.schemaName, m.tableName),
213+
common.SprintfWithIdentifiers(`
214+
SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status
215+
FROM %s.%s WHERE table_id = ? FOR UPDATE`, m.schemaName, m.tableName),
207216
m.tr.tableInfo.ID,
208217
)
209218
if err != nil {
@@ -234,16 +243,16 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
234243
}
235244

236245
if metaTaskID == m.taskID {
237-
curStatus = status
246+
myStatus = status
238247
baseChecksum = checksum
239248
baseTotalKvs = totalKvs
240249
baseTotalBytes = totalBytes
241250
if status >= metaStatusRowIDAllocated {
242-
if rowIDMax-rowIDBase != rawRowIDMax {
243-
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase)
251+
if rowIDMax-rowIDBase != requiredRowIDCnt {
252+
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", requiredRowIDCnt, rowIDMax-rowIDBase)
244253
}
245-
newRowIDBase = rowIDBase
246-
newRowIDMax = rowIDMax
254+
myStartRowID = rowIDBase
255+
myEndRowID = rowIDMax
247256
break
248257
}
249258
continue
@@ -263,36 +272,43 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
263272
}
264273

265274
// no enough info are available, fetch row_id max for table
266-
if curStatus == metaStatusInitial {
267-
if needAutoID {
268-
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
269-
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id,
270-
// the id allocators are separated in this case.
271-
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
272-
return errors.Trace(err)
273-
}
274-
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
275+
if myStatus == metaStatusInitial {
276+
// if the table don't have auto id, we still guarantee that the
277+
// row ID is unique across all lightning instances.
278+
// or if someone have already allocated the auto id, we can continue
279+
// allocating from previous maxRowIDMax.
280+
if !hasAutoID || maxRowIDMax > 0 {
281+
myStartRowID = maxRowIDMax
282+
} else {
283+
// we are the first one to allocate the auto id, we need to
284+
// fetch the max auto id base from the table, and allocate
285+
// from there.
286+
// as we only have one estimated requiredRowIDCount, but the
287+
// table might have multiple allocators, so we use the max
288+
// of them.
289+
maxAutoIDBase, err := common.GetMaxAutoIDBase(m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
275290
if err != nil {
276291
return errors.Trace(err)
277292
}
278-
} else {
279-
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
280-
newRowIDBase = maxRowIDMax
281-
newRowIDMax = newRowIDBase + rawRowIDMax
293+
myStartRowID = maxAutoIDBase
282294
}
295+
myEndRowID = myStartRowID + requiredRowIDCnt
283296

284-
// table contains no data, can skip checksum
285-
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
297+
// if we are the first one to allocate, the table has auto-id,
298+
// and our start is 0, it means the table is empty, so we move
299+
// the state to next one directly without going through below
300+
// checksum branch.
301+
if hasAutoID && myStartRowID == 0 && newStatus < metaStatusRestoreStarted {
286302
newStatus = metaStatusRestoreStarted
287303
}
288304

289305
query := common.SprintfWithIdentifiers("UPDATE %s.%s SET row_id_base = ?, row_id_max = ?, status = ? WHERE table_id = ? AND task_id = ?", m.schemaName, m.tableName)
290-
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
306+
_, err := tx.ExecContext(ctx, query, myStartRowID, myEndRowID, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
291307
if err != nil {
292308
return errors.Trace(err)
293309
}
294310

295-
curStatus = newStatus
311+
myStatus = newStatus
296312
}
297313
return nil
298314
})
@@ -325,9 +341,12 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
325341

326342
var checksum *verify.KVChecksum
327343
// need to do checksum and update checksum meta since we are the first one.
328-
if curStatus < metaStatusRestoreStarted {
329-
// table contains data but haven't do checksum yet
330-
if (newRowIDBase > 0 || !needAutoID) && m.needChecksum && baseTotalKvs == 0 {
344+
if myStatus < metaStatusRestoreStarted {
345+
// the table might have data if our StartRowID is not 0, or if the table
346+
// don't have any auto id.
347+
if (myStartRowID > 0 || !hasAutoID) && m.needChecksum && baseTotalKvs == 0 {
348+
// if another instance finished import before below checksum logic,
349+
// it will cause checksum mismatch, but it's very rare.
331350
remoteCk, err := DoChecksum(ctx, m.tr.tableInfo)
332351
if err != nil {
333352
return nil, 0, errors.Trace(err)
@@ -354,11 +373,11 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
354373
checksum = &ck
355374
}
356375
log.FromContext(ctx).Info("allocate table row_id base", zap.String("table", m.tr.tableName),
357-
zap.Int64("row_id_base", newRowIDBase))
376+
zap.Int64("startRowID", myStartRowID), zap.Int64("endRowID", myEndRowID))
358377
if checksum != nil {
359378
log.FromContext(ctx).Info("checksum base", zap.Any("checksum", checksum))
360379
}
361-
return checksum, newRowIDBase, nil
380+
return checksum, myStartRowID, nil
362381
}
363382

364383
func (m *dbTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error {

lightning/pkg/importer/table_import.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func NewTableImporter(
9292
etcdCli *clientv3.Client,
9393
logger log.Logger,
9494
) (*TableImporter, error) {
95-
idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase)
95+
idAlloc := kv.NewPanickingAllocatorsWithBase(tableInfo.Core.SepAutoInc(), cp.AutoRandBase, cp.AutoIncrBase, cp.AutoRowIDBase)
9696
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
9797
if err != nil {
9898
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
@@ -143,12 +143,15 @@ func (tr *TableImporter) importTable(
143143
}
144144

145145
// fetch the max chunk row_id max value as the global max row_id
146-
rowIDMax := int64(0)
146+
requiredRowIDCnt := int64(0)
147147
for _, engine := range cp.Engines {
148-
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax {
149-
rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
148+
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > requiredRowIDCnt {
149+
requiredRowIDCnt = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
150150
}
151151
}
152+
tr.logger.Info("estimated required row id count",
153+
zap.String("table", tr.tableName),
154+
zap.Int64("count", requiredRowIDCnt))
152155
versionStr, err := version.FetchVersion(ctx, rc.db)
153156
if err != nil {
154157
return false, errors.Trace(err)
@@ -163,7 +166,7 @@ func (tr *TableImporter) importTable(
163166
return false, err
164167
}
165168

166-
checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax)
169+
checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, requiredRowIDCnt)
167170
if err != nil {
168171
return false, err
169172
}
@@ -187,22 +190,31 @@ func (tr *TableImporter) importTable(
187190
}
188191
web.BroadcastTableCheckpoint(tr.tableName, cp)
189192

190-
// rebase the allocator so it exceeds the number of rows.
191-
if tr.tableInfo.Core.ContainsAutoRandomBits() {
192-
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
193-
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
193+
// rebase the allocator based on the max ID from table info.
194+
ti := tr.tableInfo.Core
195+
if ti.ContainsAutoRandomBits() {
196+
cp.AutoRandBase = max(cp.AutoRandBase, ti.AutoRandID)
197+
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AutoRandBase, false); err != nil {
194198
return false, err
195199
}
196200
} else {
197-
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
198-
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
201+
if ti.GetAutoIncrementColInfo() != nil && ti.SepAutoInc() {
202+
cp.AutoIncrBase = max(cp.AutoIncrBase, ti.AutoIncID)
203+
if err := tr.alloc.Get(autoid.AutoIncrementType).Rebase(context.Background(), cp.AutoIncrBase, false); err != nil {
204+
return false, err
205+
}
206+
}
207+
cp.AutoRowIDBase = max(cp.AutoRowIDBase, ti.AutoIncID)
208+
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AutoRowIDBase, false); err != nil {
199209
return false, err
200210
}
201211
}
202212
rc.saveCpCh <- saveCp{
203213
tableName: tr.tableName,
204214
merger: &checkpoints.RebaseCheckpointMerger{
205-
AllocBase: cp.AllocBase,
215+
AutoRandBase: cp.AutoRandBase,
216+
AutoIncrBase: cp.AutoIncrBase,
217+
AutoRowIDBase: cp.AutoRowIDBase,
206218
},
207219
}
208220
}

lightning/pkg/importer/table_import_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() {
409409
mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes()
410410
mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes()
411411

412-
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core)
412+
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc()), s.tableInfo.Core)
413413
require.NoError(s.T(), err)
414414
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
415415
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
@@ -1445,7 +1445,7 @@ func (s *tableRestoreSuite) TestEstimate() {
14451445
controller := gomock.NewController(s.T())
14461446
defer controller.Finish()
14471447
mockEncBuilder := mock.NewMockEncodingBuilder(controller)
1448-
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0)
1448+
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc())
14491449
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
14501450
require.NoError(s.T(), err)
14511451

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[lightning]
2+
index-concurrency = 1
3+
table-concurrency = 1
4+
5+
[tikv-importer]
6+
backend = "local"
7+
parallel-import = true
8+
9+
[checkpoint]
10+
enable = true
11+
driver = "file"
12+
13+
[mydumper]
14+
read-block-size = 1
15+
filter = ['cppk_tsr.tbl1', 'cppk_tsr.tbl2', 'cppk_tsr.tbl7', 'cppk_tsr.tbl8', 'cppk_tsr.tbl9']

lightning/tests/lightning_checkpoint/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ table-concurrency = 1
44

55
[tikv-importer]
66
backend = "local"
7+
parallel-import = true
78

89
[checkpoint]
910
enable = true

0 commit comments

Comments
 (0)