Skip to content

Commit e6e8f7f

Browse files
authored
ddl/ingest: refactor checkpoint manager (#54747)
ref #54436
1 parent d6ee4b8 commit e6e8f7f

17 files changed

+231
-206
lines changed

pkg/ddl/backfilling.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/tidb/pkg/ddl/logutil"
3131
sess "github.com/pingcap/tidb/pkg/ddl/session"
3232
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
33+
"github.com/pingcap/tidb/pkg/disttask/operator"
3334
"github.com/pingcap/tidb/pkg/expression"
3435
exprctx "github.com/pingcap/tidb/pkg/expression/context"
3536
"github.com/pingcap/tidb/pkg/kv"
@@ -689,6 +690,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
689690
cpMgr, err := ingest.NewCheckpointManager(
690691
ctx,
691692
sessPool,
693+
reorgInfo.PhysicalTableID,
692694
job.ID,
693695
indexIDs,
694696
ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID),
@@ -700,7 +702,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
700702
zap.Error(err))
701703
} else {
702704
defer cpMgr.Close()
703-
cpMgr.Reset(t.GetPhysicalID(), reorgInfo.StartKey, reorgInfo.EndKey)
704705
bcCtx.AttachCheckpointManager(cpMgr)
705706
}
706707

@@ -723,10 +724,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
723724
zap.Int64s("index IDs", indexIDs))
724725
return errors.Trace(err)
725726
}
726-
// in happy path FinishAndUnregisterEngines will be called in pipe.Close. We can
727-
// ignore the error here.
728-
//nolint: errcheck
729-
defer bcCtx.FinishAndUnregisterEngines()
730727

731728
pipe, err := NewAddIndexIngestPipeline(
732729
opCtx,
@@ -748,13 +745,31 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
748745
if err != nil {
749746
return err
750747
}
751-
err = pipe.Execute()
748+
err = executeAndClosePipeline(opCtx, pipe)
749+
if err != nil {
750+
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
751+
if err1 != nil {
752+
logutil.DDLIngestLogger().Error("unregister engine failed",
753+
zap.Int64("jobID", job.ID),
754+
zap.Error(err1),
755+
zap.Int64s("index IDs", indexIDs))
756+
}
757+
return err
758+
}
759+
if cpMgr != nil {
760+
cpMgr.AdvanceWatermark(true, true)
761+
}
762+
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
763+
}
764+
765+
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
766+
err := pipe.Execute()
752767
if err != nil {
753768
return err
754769
}
755770
err = pipe.Close()
756-
if opCtx.OperatorErr() != nil {
757-
return opCtx.OperatorErr()
771+
if opErr := ctx.OperatorErr(); opErr != nil {
772+
return opErr
758773
}
759774
return err
760775
}

pkg/ddl/backfilling_operators.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,13 @@ func (src *TableScanTaskSource) Open() error {
340340

341341
// adjustStartKey adjusts the start key so that we can skip the ranges that have been processed
342342
// according to the information of checkpoint manager.
343-
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
343+
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.Key, done bool) {
344344
if src.cpMgr == nil {
345-
return start
345+
return start, false
346346
}
347347
cpKey := src.cpMgr.LastProcessedKey()
348348
if len(cpKey) == 0 {
349-
return start
349+
return start, false
350350
}
351351
if cpKey.Cmp(start) < 0 || cpKey.Cmp(end) > 0 {
352352
logutil.Logger(src.ctx).Error("invalid checkpoint key",
@@ -357,16 +357,23 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
357357
if intest.InTest {
358358
panic("invalid checkpoint key")
359359
}
360-
return start
360+
return start, false
361361
}
362-
return cpKey.Next()
362+
if cpKey.Cmp(end) == 0 {
363+
return cpKey, true
364+
}
365+
return cpKey.Next(), false
363366
}
364367

365368
func (src *TableScanTaskSource) generateTasks() error {
366369
taskIDAlloc := newTaskIDAllocator()
367370
defer src.sink.Finish()
368371

369-
startKey := src.adjustStartKey(src.startKey, src.endKey)
372+
startKey, done := src.adjustStartKey(src.startKey, src.endKey)
373+
if done {
374+
// All table data are done.
375+
return nil
376+
}
370377
for {
371378
kvRanges, err := loadTableRanges(
372379
src.ctx,
@@ -931,27 +938,23 @@ func (s *indexWriteResultSink) flush() error {
931938
failpoint.Return(errors.New("mock flush error"))
932939
})
933940
flushed, imported, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
934-
if err != nil {
935-
logutil.Logger(s.ctx).Error("flush error",
936-
zap.String("category", "ddl"), zap.Error(err))
937-
return err
938-
}
939941
if s.cpMgr != nil {
942+
// Try to advance watermark even if there is an error.
940943
s.cpMgr.AdvanceWatermark(flushed, imported)
941944
}
945+
if err != nil {
946+
msg := "flush error"
947+
if flushed {
948+
msg = "import error"
949+
}
950+
logutil.Logger(s.ctx).Error(msg, zap.String("category", "ddl"), zap.Error(err))
951+
return err
952+
}
942953
return nil
943954
}
944955

945956
func (s *indexWriteResultSink) Close() error {
946-
err := s.errGroup.Wait()
947-
// for local pipeline
948-
if bc := s.backendCtx; bc != nil {
949-
err2 := bc.FinishAndUnregisterEngines()
950-
if err == nil {
951-
err = err2
952-
}
953-
}
954-
return err
957+
return s.errGroup.Wait()
955958
}
956959

957960
func (*indexWriteResultSink) String() string {

pkg/ddl/backfilling_read_index.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,25 +109,29 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
109109
defer opCtx.Cancel()
110110
r.curRowCount.Store(0)
111111

112-
var pipe *operator.AsyncPipeline
113112
if len(r.cloudStorageURI) > 0 {
114-
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
115-
} else {
116-
pipe, err = r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
113+
pipe, err := r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
114+
if err != nil {
115+
return err
116+
}
117+
return executeAndClosePipeline(opCtx, pipe)
117118
}
119+
120+
pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
118121
if err != nil {
119122
return err
120123
}
121-
122-
err = pipe.Execute()
124+
err = executeAndClosePipeline(opCtx, pipe)
123125
if err != nil {
126+
// For dist task local based ingest, checkpoint is unsupported.
127+
// If there is an error we should keep local sort dir clean.
128+
err1 := r.bc.FinishAndUnregisterEngines(ingest.OptCleanData)
129+
if err1 != nil {
130+
logutil.DDLLogger().Warn("read index executor unregister engine failed", zap.Error(err1))
131+
}
124132
return err
125133
}
126-
err = pipe.Close()
127-
if opCtx.OperatorErr() != nil {
128-
return opCtx.OperatorErr()
129-
}
130-
return err
134+
return r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
131135
}
132136

133137
func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {

pkg/ddl/index.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -960,28 +960,6 @@ func runIngestReorgJobDist(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
960960

961961
func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
962962
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
963-
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
964-
if ok && bc.Done() {
965-
return true, 0, nil
966-
}
967-
ctx := tidblogutil.WithCategory(w.ctx, "ddl-ingest")
968-
var discovery pd.ServiceDiscovery
969-
if d != nil {
970-
//nolint:forcetypeassert
971-
discovery = d.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
972-
}
973-
hasUnique := false
974-
for _, indexInfo := range allIndexInfos {
975-
if indexInfo.Unique {
976-
hasUnique = true
977-
break
978-
}
979-
}
980-
bc, err = ingest.LitBackCtxMgr.Register(ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName)
981-
if err != nil {
982-
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
983-
return false, ver, errors.Trace(err)
984-
}
985963
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, allIndexInfos, false)
986964
if err != nil {
987965
if kv.ErrKeyExists.Equal(err) {
@@ -996,11 +974,8 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
996974
}
997975
return false, ver, errors.Trace(err)
998976
}
999-
if !done {
1000-
return false, ver, nil
1001-
}
1002-
bc.SetDone()
1003-
return true, ver, nil
977+
failpoint.InjectCall("afterRunIngestReorgJob", job, done)
978+
return done, ver, nil
1004979
}
1005980

1006981
func errorIsRetryable(err error, job *model.Job) bool {

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ go_test(
7373
embed = [":ingest"],
7474
flaky = True,
7575
race = "on",
76-
shard_count = 18,
76+
shard_count = 20,
7777
deps = [
7878
"//pkg/config",
7979
"//pkg/ddl",

pkg/ddl/ingest/backend.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,9 @@ type BackendCtx interface {
5858
// are Register-ed before. It's safe to call it multiple times.
5959
//
6060
// FinishAndUnregisterEngines is only used in local disk based ingest.
61-
FinishAndUnregisterEngines() error
61+
FinishAndUnregisterEngines(opt UnregisterOpt) error
6262

6363
FlushController
64-
Done() bool
65-
SetDone()
6664

6765
AttachCheckpointManager(*CheckpointManager)
6866
GetCheckpointManager() *CheckpointManager
@@ -99,7 +97,6 @@ type litBackendCtx struct {
9997
ctx context.Context
10098
cfg *lightning.Config
10199
sysVars map[string]string
102-
done bool
103100

104101
flushing atomic.Bool
105102
timeOfLastFlush atomicutil.Time
@@ -300,7 +297,6 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
300297
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
301298
}
302299
ei.openedEngine = nil
303-
ei.closedEngine = nil
304300
return err
305301
}
306302
return nil
@@ -331,16 +327,6 @@ func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImp
331327
return shouldFlush, shouldImport
332328
}
333329

334-
// Done returns true if the lightning backfill is done.
335-
func (bc *litBackendCtx) Done() bool {
336-
return bc.done
337-
}
338-
339-
// SetDone sets the done flag.
340-
func (bc *litBackendCtx) SetDone() {
341-
bc.done = true
342-
}
343-
344330
// AttachCheckpointManager attaches a checkpoint manager to the backend context.
345331
func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
346332
bc.checkpointMgr = mgr

pkg/ddl/ingest/backend_mgr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
231231
if !exist {
232232
return
233233
}
234-
_ = bc.FinishAndUnregisterEngines()
234+
_ = bc.FinishAndUnregisterEngines(OptCloseEngines)
235235
bc.backend.Close()
236236
m.memRoot.Release(structSizeBackendCtx)
237237
m.memRoot.ReleaseWithTag(encodeBackendTag(jobID))

0 commit comments

Comments
 (0)