-
Notifications
You must be signed in to change notification settings - Fork 6k
lightning: fix id too large after parallel import #57398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
6061a6e
74b83cb
a7d8625
c516fdb
613e9c2
9c02c2c
03e16f7
c04f322
c833bcf
105f401
4b485e0
3889d53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1990,20 +1990,12 @@ type deliverResult struct { | |
} | ||
|
||
func saveCheckpoint(rc *Controller, t *TableImporter, engineID int32, chunk *checkpoints.ChunkCheckpoint) { | ||
// We need to update the AllocBase every time we've finished a file. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line of comment can be kept. Maybe it reminds us to keep the consistency of "ID used for encoding data" and "ID saved in checkpoint" |
||
// The AllocBase is determined by the maximum of the "handle" (_tidb_rowid | ||
// or integer primary key), which can only be obtained by reading all data. | ||
|
||
var base int64 | ||
if t.tableInfo.Core.ContainsAutoRandomBits() { | ||
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1 | ||
} else { | ||
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1 | ||
} | ||
rc.saveCpCh <- saveCp{ | ||
tableName: t.tableName, | ||
merger: &checkpoints.RebaseCheckpointMerger{ | ||
AllocBase: base, | ||
AutoRandBase: t.alloc.Get(autoid.AutoRandomType).Base(), | ||
AutoIncrBase: t.alloc.Get(autoid.AutoIncrementType).Base(), | ||
AutoRowIDBase: t.alloc.Get(autoid.RowIDAllocType).Base(), | ||
}, | ||
} | ||
rc.saveCpCh <- saveCp{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr { | |
|
||
type tableMetaMgr interface { | ||
InitTableMeta(ctx context.Context) error | ||
AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) | ||
AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error) | ||
UpdateTableStatus(ctx context.Context, status metaStatus) error | ||
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error | ||
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( | ||
|
@@ -177,7 +177,7 @@ func parseMetaStatus(s string) (metaStatus, error) { | |
} | ||
} | ||
|
||
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { | ||
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error) { | ||
conn, err := m.session.Conn(ctx) | ||
if err != nil { | ||
return nil, 0, errors.Trace(err) | ||
|
@@ -188,8 +188,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
DB: conn, | ||
Logger: m.tr.logger, | ||
} | ||
var newRowIDBase, newRowIDMax int64 | ||
curStatus := metaStatusInitial | ||
// (myStartRowID, myEndRowID] is the range of row_id that current instance | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use [start, end)? This is a bit strange and we have to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's how auto id allocator works now, both in TiDB and lightning, won't do it here |
||
// can use to encode the table. | ||
var myStartRowID, myEndRowID int64 | ||
myStatus := metaStatusInitial | ||
newStatus := metaStatusRowIDAllocated | ||
var baseTotalKvs, baseTotalBytes, baseChecksum uint64 | ||
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") | ||
|
@@ -201,9 +203,16 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
tableChecksumingMsg := "Target table is calculating checksum. Please wait until the checksum is finished and try again." | ||
doAllocTableRowIDsFn := func() error { | ||
return exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error { | ||
// lightning follows below calling sequence, so at most one client | ||
// can execute the code after the FOR UPDATE part for some table, | ||
// even though FOR UPDATE only lock rows that matches the condition: | ||
// - insert into table_meta with key (table_id, task_id) | ||
// - try lock with FOR UPDATE | ||
rows, err := tx.QueryContext( | ||
ctx, | ||
common.SprintfWithIdentifiers("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status FROM %s.%s WHERE table_id = ? FOR UPDATE", m.schemaName, m.tableName), | ||
common.SprintfWithIdentifiers(` | ||
SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status | ||
FROM %s.%s WHERE table_id = ? FOR UPDATE`, m.schemaName, m.tableName), | ||
m.tr.tableInfo.ID, | ||
) | ||
if err != nil { | ||
|
@@ -234,16 +243,16 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
} | ||
|
||
if metaTaskID == m.taskID { | ||
curStatus = status | ||
myStatus = status | ||
baseChecksum = checksum | ||
baseTotalKvs = totalKvs | ||
baseTotalBytes = totalBytes | ||
if status >= metaStatusRowIDAllocated { | ||
if rowIDMax-rowIDBase != rawRowIDMax { | ||
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase) | ||
if rowIDMax-rowIDBase != requiredRowIDCnt { | ||
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", requiredRowIDCnt, rowIDMax-rowIDBase) | ||
} | ||
newRowIDBase = rowIDBase | ||
newRowIDMax = rowIDMax | ||
myStartRowID = rowIDBase | ||
myEndRowID = rowIDMax | ||
break | ||
} | ||
continue | ||
|
@@ -263,36 +272,21 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
} | ||
|
||
// no enough info are available, fetch row_id max for table | ||
if curStatus == metaStatusInitial { | ||
if needAutoID { | ||
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first. | ||
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id, | ||
// the id allocators are separated in this case. | ||
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { | ||
return errors.Trace(err) | ||
} | ||
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} else { | ||
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances. | ||
newRowIDBase = maxRowIDMax | ||
newRowIDMax = newRowIDBase + rawRowIDMax | ||
} | ||
if myStatus == metaStatusInitial { | ||
myStartRowID = maxRowIDMax | ||
myEndRowID = myStartRowID + requiredRowIDCnt | ||
|
||
// table contains no data, can skip checksum | ||
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted { | ||
if needAutoID && myStartRowID == 0 && newStatus < metaStatusRestoreStarted { | ||
newStatus = metaStatusRestoreStarted | ||
} | ||
|
||
query := common.SprintfWithIdentifiers("UPDATE %s.%s SET row_id_base = ?, row_id_max = ?, status = ? WHERE table_id = ? AND task_id = ?", m.schemaName, m.tableName) | ||
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID) | ||
_, err := tx.ExecContext(ctx, query, myStartRowID, myEndRowID, newStatus.String(), m.tr.tableInfo.ID, m.taskID) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
curStatus = newStatus | ||
myStatus = newStatus | ||
} | ||
return nil | ||
}) | ||
|
@@ -325,9 +319,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
|
||
var checksum *verify.KVChecksum | ||
// need to do checksum and update checksum meta since we are the first one. | ||
if curStatus < metaStatusRestoreStarted { | ||
// table contains data but haven't do checksum yet | ||
if (newRowIDBase > 0 || !needAutoID) && m.needChecksum && baseTotalKvs == 0 { | ||
if myStatus < metaStatusRestoreStarted { | ||
if m.needChecksum && baseTotalKvs == 0 { | ||
// if another instance finished import before below checksum logic, | ||
// it will cause checksum mismatch, but it's very rare. | ||
Comment on lines
+348
to
+349
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see #57399 |
||
remoteCk, err := DoChecksum(ctx, m.tr.tableInfo) | ||
if err != nil { | ||
return nil, 0, errors.Trace(err) | ||
|
@@ -354,11 +349,11 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 | |
checksum = &ck | ||
} | ||
log.FromContext(ctx).Info("allocate table row_id base", zap.String("table", m.tr.tableName), | ||
zap.Int64("row_id_base", newRowIDBase)) | ||
zap.Int64("startRowID", myStartRowID), zap.Int64("endRowID", myEndRowID)) | ||
if checksum != nil { | ||
log.FromContext(ctx).Info("checksum base", zap.Any("checksum", checksum)) | ||
} | ||
return checksum, newRowIDBase, nil | ||
return checksum, myStartRowID, nil | ||
} | ||
|
||
func (m *dbTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
[lightning] | ||
index-concurrency = 1 | ||
table-concurrency = 1 | ||
|
||
[tikv-importer] | ||
backend = "local" | ||
parallel-import = true | ||
|
||
[checkpoint] | ||
enable = true | ||
driver = "file" | ||
|
||
[mydumper] | ||
read-block-size = 1 | ||
filter = ['cppk_tsr.tbl1', 'cppk_tsr.tbl2', 'cppk_tsr.tbl7', 'cppk_tsr.tbl8', 'cppk_tsr.tbl9'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use
GOMODCACHE
value fromgo env
https://go.dev/ref/mod#environment-variables