Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ br_bins:

.PHONY: data_parsers
data_parsers: tools/bin/vfsgendev pkg/lightning/mydump/parser_generated.go lightning_web
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOMODCACHE)" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
tools/bin/vfsgendev -source='"github.com/pingcap/tidb/lightning/pkg/web".Res' && mv res_vfsdata.go lightning/pkg/web/

.PHONY: build_dumpling
Expand Down
3 changes: 2 additions & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

PROJECT=tidb
GOPATH ?= $(shell go env GOPATH)
GOMODCACHE ?= $(shell go env GOMODCACHE)
P=8

# Ensure GOPATH is set before running build process.
Expand Down Expand Up @@ -132,4 +133,4 @@ ifneq ("$(CI)", "")
endif
BAZEL_INSTRUMENTATION_FILTER := --instrument_test_targets --instrumentation_filter=//pkg/...,//br/...,//dumpling/...

NOGO_FLAG=true
NOGO_FLAG=true
2 changes: 1 addition & 1 deletion lightning/pkg/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
if err != nil {
return 0.0, false, errors.Trace(err)
}
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0)
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)
if err != nil {
return 0.0, false, errors.Trace(err)
Expand Down
19 changes: 8 additions & 11 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1990,20 +1990,17 @@ type deliverResult struct {
}

func saveCheckpoint(rc *Controller, t *TableImporter, engineID int32, chunk *checkpoints.ChunkCheckpoint) {
// We need to update the AllocBase every time we've finished a file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line of comment can be kept. Maybe it reminds us to keep the consistency of "ID used for encoding data" and "ID saved in checkpoint"

// The AllocBase is determined by the maximum of the "handle" (_tidb_rowid
// or integer primary key), which can only be obtained by reading all data.

var base int64
if t.tableInfo.Core.ContainsAutoRandomBits() {
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1
} else {
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1
}
// we save the XXXBase every time a chunk is finished.
// Note, it's possible some chunk with larger autoID range finished first, so
// the saved XXXBase is larger, when chunks with smaller autoID range finished
// it might have no effect on the saved XXXBase, but it's OK, we only need
// the largest.
rc.saveCpCh <- saveCp{
tableName: t.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: base,
AutoRandBase: t.alloc.Get(autoid.AutoRandomType).Base(),
AutoIncrBase: t.alloc.Get(autoid.AutoIncrementType).Base(),
AutoRowIDBase: t.alloc.Get(autoid.RowIDAllocType).Base(),
},
}
rc.saveCpCh <- saveCp{
Expand Down
85 changes: 52 additions & 33 deletions lightning/pkg/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr {

type tableMetaMgr interface {
InitTableMeta(ctx context.Context) error
AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error)
AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error)
UpdateTableStatus(ctx context.Context, status metaStatus) error
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
Expand Down Expand Up @@ -177,7 +177,7 @@ func parseMetaStatus(s string) (metaStatus, error) {
}
}

func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) {
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error) {
conn, err := m.session.Conn(ctx)
if err != nil {
return nil, 0, errors.Trace(err)
Expand All @@ -188,22 +188,31 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
DB: conn,
Logger: m.tr.logger,
}
var newRowIDBase, newRowIDMax int64
curStatus := metaStatusInitial
// (myStartRowID, myEndRowID] is the range of row_id that current instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use [start, end)? This is a bit strange and we have to -1 in GetMaxAutoIDBAse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's how auto id allocator works now, both in TiDB and lightning, won't do it here

// can use to encode the table.
var myStartRowID, myEndRowID int64
myStatus := metaStatusInitial
newStatus := metaStatusRowIDAllocated
var baseTotalKvs, baseTotalBytes, baseChecksum uint64
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
if err != nil {
return nil, 0, errors.Annotate(err, "enable pessimistic transaction failed")
}

needAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
hasAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
tableChecksumingMsg := "Target table is calculating checksum. Please wait until the checksum is finished and try again."
doAllocTableRowIDsFn := func() error {
return exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
// lightning follows below calling sequence, so at most one client
// can execute the code after the FOR UPDATE part for some table,
// even though FOR UPDATE only lock rows that matches the condition:
// - insert into table_meta with key (table_id, task_id)
// - try lock with FOR UPDATE
rows, err := tx.QueryContext(
ctx,
common.SprintfWithIdentifiers("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status FROM %s.%s WHERE table_id = ? FOR UPDATE", m.schemaName, m.tableName),
common.SprintfWithIdentifiers(`
SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status
FROM %s.%s WHERE table_id = ? FOR UPDATE`, m.schemaName, m.tableName),
m.tr.tableInfo.ID,
)
if err != nil {
Expand Down Expand Up @@ -234,16 +243,16 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

if metaTaskID == m.taskID {
curStatus = status
myStatus = status
baseChecksum = checksum
baseTotalKvs = totalKvs
baseTotalBytes = totalBytes
if status >= metaStatusRowIDAllocated {
if rowIDMax-rowIDBase != rawRowIDMax {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase)
if rowIDMax-rowIDBase != requiredRowIDCnt {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", requiredRowIDCnt, rowIDMax-rowIDBase)
}
newRowIDBase = rowIDBase
newRowIDMax = rowIDMax
myStartRowID = rowIDBase
myEndRowID = rowIDMax
break
}
continue
Expand All @@ -263,36 +272,43 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

// no enough info are available, fetch row_id max for table
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id,
// the id allocators are separated in this case.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if myStatus == metaStatusInitial {
// if the table don't have auto id, we still guarantee that the
// row ID is unique across all lightning instances.
// or if someone have already allocated the auto id, we can continue
// allocating from previous maxRowIDMax.
if !hasAutoID || maxRowIDMax > 0 {
myStartRowID = maxRowIDMax
} else {
// we are the first one to allocate the auto id, we need to
// fetch the max auto id base from the table, and allocate
// from there.
// as we only have one estimated requiredRowIDCount, but the
// table might have multiple allocators, so we use the max
// of them.
maxAutoIDBase, err := common.GetMaxAutoIDBase(m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
} else {
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax
myStartRowID = maxAutoIDBase
}
myEndRowID = myStartRowID + requiredRowIDCnt

// table contains no data, can skip checksum
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
// if we are the first one to allocate, the table has auto-id,
// and our start is 0, it means the table is empty, so we move
// the state to next one directly without going through below
// checksum branch.
if hasAutoID && myStartRowID == 0 && newStatus < metaStatusRestoreStarted {
newStatus = metaStatusRestoreStarted
}

query := common.SprintfWithIdentifiers("UPDATE %s.%s SET row_id_base = ?, row_id_max = ?, status = ? WHERE table_id = ? AND task_id = ?", m.schemaName, m.tableName)
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
_, err := tx.ExecContext(ctx, query, myStartRowID, myEndRowID, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
if err != nil {
return errors.Trace(err)
}

curStatus = newStatus
myStatus = newStatus
}
return nil
})
Expand Down Expand Up @@ -325,9 +341,12 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64

var checksum *verify.KVChecksum
// need to do checksum and update checksum meta since we are the first one.
if curStatus < metaStatusRestoreStarted {
// table contains data but haven't do checksum yet
if (newRowIDBase > 0 || !needAutoID) && m.needChecksum && baseTotalKvs == 0 {
if myStatus < metaStatusRestoreStarted {
// the table might have data if our StartRowID is not 0, or if the table
// don't have any auto id.
if (myStartRowID > 0 || !hasAutoID) && m.needChecksum && baseTotalKvs == 0 {
// if another instance finished import before below checksum logic,
// it will cause checksum mismatch, but it's very rare.
Comment on lines +348 to +349
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #57399

remoteCk, err := DoChecksum(ctx, m.tr.tableInfo)
if err != nil {
return nil, 0, errors.Trace(err)
Expand All @@ -354,11 +373,11 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
checksum = &ck
}
log.FromContext(ctx).Info("allocate table row_id base", zap.String("table", m.tr.tableName),
zap.Int64("row_id_base", newRowIDBase))
zap.Int64("startRowID", myStartRowID), zap.Int64("endRowID", myEndRowID))
if checksum != nil {
log.FromContext(ctx).Info("checksum base", zap.Any("checksum", checksum))
}
return checksum, newRowIDBase, nil
return checksum, myStartRowID, nil
}

func (m *dbTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error {
Expand Down
36 changes: 24 additions & 12 deletions lightning/pkg/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewTableImporter(
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase)
idAlloc := kv.NewPanickingAllocatorsWithBase(tableInfo.Core.SepAutoInc(), cp.AutoRandBase, cp.AutoIncrBase, cp.AutoRowIDBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
Expand Down Expand Up @@ -143,12 +143,15 @@ func (tr *TableImporter) importTable(
}

// fetch the max chunk row_id max value as the global max row_id
rowIDMax := int64(0)
requiredRowIDCnt := int64(0)
for _, engine := range cp.Engines {
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax {
rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > requiredRowIDCnt {
requiredRowIDCnt = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
}
}
tr.logger.Info("estimated required row id count",
zap.String("table", tr.tableName),
zap.Int64("count", requiredRowIDCnt))
versionStr, err := version.FetchVersion(ctx, rc.db)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -163,7 +166,7 @@ func (tr *TableImporter) importTable(
return false, err
}

checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax)
checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, requiredRowIDCnt)
if err != nil {
return false, err
}
Expand All @@ -187,22 +190,31 @@ func (tr *TableImporter) importTable(
}
web.BroadcastTableCheckpoint(tr.tableName, cp)

// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
// rebase the allocator based on the max ID from table info.
ti := tr.tableInfo.Core
if ti.ContainsAutoRandomBits() {
cp.AutoRandBase = max(cp.AutoRandBase, ti.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AutoRandBase, false); err != nil {
return false, err
}
} else {
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
if ti.GetAutoIncrementColInfo() != nil && ti.SepAutoInc() {
cp.AutoIncrBase = max(cp.AutoIncrBase, ti.AutoIncID)
if err := tr.alloc.Get(autoid.AutoIncrementType).Rebase(context.Background(), cp.AutoIncrBase, false); err != nil {
return false, err
}
}
cp.AutoRowIDBase = max(cp.AutoRowIDBase, ti.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AutoRowIDBase, false); err != nil {
return false, err
}
}
rc.saveCpCh <- saveCp{
tableName: tr.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: cp.AllocBase,
AutoRandBase: cp.AutoRandBase,
AutoIncrBase: cp.AutoIncrBase,
AutoRowIDBase: cp.AutoRowIDBase,
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions lightning/pkg/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() {
mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes()
mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes()

tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc()), s.tableInfo.Core)
require.NoError(s.T(), err)
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (s *tableRestoreSuite) TestEstimate() {
controller := gomock.NewController(s.T())
defer controller.Finish()
mockEncBuilder := mock.NewMockEncodingBuilder(controller)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
require.NoError(s.T(), err)

Expand Down
15 changes: 15 additions & 0 deletions lightning/tests/lightning_checkpoint/config-file.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[lightning]
index-concurrency = 1
table-concurrency = 1

[tikv-importer]
backend = "local"
parallel-import = true

[checkpoint]
enable = true
driver = "file"

[mydumper]
read-block-size = 1
filter = ['cppk_tsr.tbl1', 'cppk_tsr.tbl2', 'cppk_tsr.tbl7', 'cppk_tsr.tbl8', 'cppk_tsr.tbl9']
1 change: 1 addition & 0 deletions lightning/tests/lightning_checkpoint/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ table-concurrency = 1

[tikv-importer]
backend = "local"
parallel-import = true

[checkpoint]
enable = true
Expand Down
Loading