Skip to content

Commit 923334d

Browse files
authored
ddl/ingest: set minCommitTS when detect remote duplicate keys (#55588) (#62290)
close #55587
1 parent 954d306 commit 923334d

File tree

8 files changed

+35
-18
lines changed

8 files changed

+35
-18
lines changed

pkg/ddl/backfilling_dist_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (s *backfillDistExecutor) getBackendCtx(ctx context.Context) (ingest.Backen
141141
ddlObj := s.d
142142
discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
143143

144-
return ingest.LitBackCtxMgr.Register(ctx, job.ID, unique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
144+
return ingest.LitBackCtxMgr.Register(ctx, job.ID, unique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, job.RealStartTS)
145145
}
146146

147147
func decodeIndexUniqueness(job *model.Job) (bool, error) {

pkg/ddl/index.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,8 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
942942
metrics.UnregisterLightningCommonMetricsForDDL(job.ID, m)
943943
}
944944
}()
945-
bc, err = ingest.LitBackCtxMgr.Register(ctx, job.ID, allIndexInfos[0].Unique, nil, discovery, job.ReorgMeta.ResourceGroupName)
945+
bc, err = ingest.LitBackCtxMgr.Register(ctx, job.ID, allIndexInfos[0].Unique,
946+
nil, discovery, job.ReorgMeta.ResourceGroupName, job.RealStartTS)
946947
if err != nil {
947948
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
948949
return false, ver, errors.Trace(err)
@@ -2018,7 +2019,8 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
20182019
if indexInfo.Unique {
20192020
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
20202021
if bc == nil {
2021-
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName)
2022+
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil,
2023+
discovery, reorgInfo.ReorgMeta.ResourceGroupName, reorgInfo.RealStartTS)
20222024
if err != nil {
20232025
return err
20242026
}

pkg/ddl/ingest/backend.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type litBackendCtx struct {
9393
updateInterval time.Duration
9494
checkpointMgr *CheckpointManager
9595
etcdClient *clientv3.Client
96+
initTS uint64
9697
}
9798

9899
func (bc *litBackendCtx) handleErrorAfterCollectRemoteDuplicateRows(err error, indexID int64, tbl table.Table, hasDupe bool) error {
@@ -128,9 +129,10 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
128129
// backend must be a local backend.
129130
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
130131
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
131-
SQLMode: mysql.ModeStrictAllTables,
132-
SysVars: bc.sysVars,
133-
IndexID: indexID,
132+
SQLMode: mysql.ModeStrictAllTables,
133+
SysVars: bc.sysVars,
134+
IndexID: indexID,
135+
MinCommitTS: bc.initTS,
134136
}, lightning.ErrorOnDup)
135137
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
136138
}
@@ -160,9 +162,10 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
160162
//nolint:forcetypeassert
161163
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
162164
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
163-
SQLMode: mysql.ModeStrictAllTables,
164-
SysVars: bc.sysVars,
165-
IndexID: ei.indexID,
165+
SQLMode: mysql.ModeStrictAllTables,
166+
SysVars: bc.sysVars,
167+
IndexID: ei.indexID,
168+
MinCommitTS: bc.initTS,
166169
}, lightning.ErrorOnDup)
167170
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
168171
}

pkg/ddl/ingest/backend_mgr.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type BackendCtxMgr interface {
5252
etcdClient *clientv3.Client,
5353
pdSvcDiscovery pd.ServiceDiscovery,
5454
resourceGroupName string,
55+
initTS uint64,
5556
) (BackendCtx, error)
5657
Unregister(jobID int64)
5758
// Load returns the registered BackendCtx with the given jobID.
@@ -123,6 +124,7 @@ func (m *litBackendCtxMgr) Register(
123124
etcdClient *clientv3.Client,
124125
pdSvcDiscovery pd.ServiceDiscovery,
125126
resourceGroupName string,
127+
initTS uint64,
126128
) (BackendCtx, error) {
127129
bc, exist := m.Load(jobID)
128130
if exist {
@@ -153,7 +155,7 @@ func (m *litBackendCtxMgr) Register(
153155
return nil, err
154156
}
155157

156-
bcCtx := newBackendContext(ctx, jobID, bd, cfg.lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
158+
bcCtx := newBackendContext(ctx, jobID, bd, cfg.lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, initTS)
157159
m.backends.m[jobID] = bcCtx
158160
m.memRoot.Consume(StructSizeBackendCtx)
159161
m.backends.mu.Unlock()
@@ -266,6 +268,7 @@ func newBackendContext(
266268
memRoot MemRoot,
267269
diskRoot DiskRoot,
268270
etcdClient *clientv3.Client,
271+
initTS uint64,
269272
) *litBackendCtx {
270273
bCtx := &litBackendCtx{
271274
SyncMap: generic.NewSyncMap[int64, *engineInfo](10),
@@ -279,6 +282,7 @@ func newBackendContext(
279282
diskRoot: diskRoot,
280283
updateInterval: checkpointUpdateInterval,
281284
etcdClient: etcdClient,
285+
initTS: initTS,
282286
}
283287
bCtx.timeOfLastFlush.Store(time.Now())
284288
return bCtx

pkg/ddl/ingest/mock.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable(context.Context) (bool, erro
5252
}
5353

5454
// Register implements BackendCtxMgr.Register interface.
55-
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string) (BackendCtx, error) {
55+
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
56+
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, initTS uint64) (BackendCtx, error) {
5657
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
5758
if mockCtx, ok := m.runningJobs[jobID]; ok {
5859
return mockCtx, nil

pkg/lightning/backend/encode/encode.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ type SessionOptions struct {
5757
SysVars map[string]string
5858
// a seed used for tableKvEncoder's auto random bits value
5959
AutoRandomSeed int64
60-
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
60+
// IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned.
6161
IndexID int64
62+
// MinCommitTS is used by dupeDetector. Only records that larger than commit TS are considered.
63+
MinCommitTS uint64
6264
}
6365

6466
// Rows represents a collection of encoded rows.

pkg/lightning/backend/local/duplicate.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ func getDupDetectClient(
310310
importClientFactory ImportClientFactory,
311311
resourceGroupName string,
312312
taskType string,
313+
minCommitTS uint64,
313314
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
314315
leader := region.Leader
315316
if leader == nil {
@@ -330,9 +331,10 @@ func getDupDetectClient(
330331
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),
331332
}
332333
req := &import_sstpb.DuplicateDetectRequest{
333-
Context: reqCtx,
334-
StartKey: keyRange.StartKey,
335-
EndKey: keyRange.EndKey,
334+
Context: reqCtx,
335+
StartKey: keyRange.StartKey,
336+
EndKey: keyRange.EndKey,
337+
MinCommitTs: minCommitTS,
336338
}
337339
cli, err := importClient.DuplicateDetect(ctx, req)
338340
if err != nil {
@@ -349,9 +351,10 @@ func NewRemoteDupKVStream(
349351
importClientFactory ImportClientFactory,
350352
resourceGroupName string,
351353
taskType string,
354+
minCommitTS uint64,
352355
) (*RemoteDupKVStream, error) {
353356
subCtx, cancel := context.WithCancel(ctx)
354-
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)
357+
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType, minCommitTS)
355358
if err != nil {
356359
cancel()
357360
return nil, errors.Trace(err)
@@ -422,6 +425,7 @@ type DupeDetector struct {
422425
indexID int64
423426
resourceGroupName string
424427
taskType string
428+
minCommitTS uint64
425429
}
426430

427431
// NewDupeDetector creates a new DupeDetector.
@@ -456,6 +460,7 @@ func NewDupeDetector(
456460
indexID: sessOpts.IndexID,
457461
resourceGroupName: resourceGroupName,
458462
taskType: taskType,
463+
minCommitTS: sessOpts.MinCommitTS,
459464
}, nil
460465
}
461466

@@ -933,7 +938,7 @@ func (m *DupeDetector) processRemoteDupTaskOnce(
933938
logutil.Key("dupDetectEndKey", kr.EndKey),
934939
)
935940
err := func() error {
936-
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)
941+
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType, m.minCommitTS)
937942
if err != nil {
938943
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
939944
}

tests/realtikvtest/addindextest/add_index_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func TestLitBackendCtxMgr(t *testing.T) {
186186

187187
jobID := int64(102)
188188
discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
189-
backendCtx, err := mgr.Register(ctx, jobID, false, nil, discovery, "TestLitBackendCtxMgr")
189+
backendCtx, err := mgr.Register(ctx, jobID, false, nil, discovery, "TestLitBackendCtxMgr", 0)
190190
require.NoError(t, err)
191191
require.NotNil(t, backendCtx)
192192

0 commit comments

Comments
 (0)