Skip to content

Commit ef4cba4

Browse files
tangentati-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#57813
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 1ad553d commit ef4cba4

File tree

7 files changed

+1284
-29
lines changed

7 files changed

+1284
-29
lines changed

pkg/ddl/backfilling.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
278278
// we will never cancel the job once there is panic in bf.BackfillData.
279279
// Because reorgRecordTask may run a long time,
280280
// we should check whether this ddl job is still runnable.
281-
err := d.isReorgRunnable(jobID, false)
281+
err := d.isReorgRunnable(d.ctx, false)
282282
if err != nil {
283283
result.err = err
284284
return result
@@ -588,11 +588,73 @@ func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.
588588
return decodeColMap, nil
589589
}
590590

591+
<<<<<<< HEAD
591592
func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error {
592593
// It is set to SystemLocation to be compatible with nil LocationInfo.
593594
tz := *timeutil.SystemLocation()
594595
if sctx.GetSessionVars().TimeZone == nil {
595596
sctx.GetSessionVars().TimeZone = &tz
597+
=======
598+
const backfillTaskChanSize = 128
599+
600+
func (dc *ddlCtx) runAddIndexInLocalIngestMode(
601+
ctx context.Context,
602+
sessPool *sess.Pool,
603+
t table.PhysicalTable,
604+
reorgInfo *reorgInfo,
605+
) error {
606+
if err := dc.isReorgRunnable(ctx, false); err != nil {
607+
return errors.Trace(err)
608+
}
609+
job := reorgInfo.Job
610+
opCtx, cancel := NewLocalOperatorCtx(ctx, job.ID)
611+
defer cancel()
612+
613+
idxCnt := len(reorgInfo.elements)
614+
indexIDs := make([]int64, 0, idxCnt)
615+
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
616+
uniques := make([]bool, 0, idxCnt)
617+
hasUnique := false
618+
for _, e := range reorgInfo.elements {
619+
indexIDs = append(indexIDs, e.ID)
620+
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, e.ID)
621+
if indexInfo == nil {
622+
logutil.DDLIngestLogger().Warn("index info not found",
623+
zap.Int64("jobID", job.ID),
624+
zap.Int64("tableID", t.Meta().ID),
625+
zap.Int64("indexID", e.ID))
626+
return errors.Errorf("index info not found: %d", e.ID)
627+
}
628+
indexInfos = append(indexInfos, indexInfo)
629+
uniques = append(uniques, indexInfo.Unique)
630+
hasUnique = hasUnique || indexInfo.Unique
631+
}
632+
633+
//nolint: forcetypeassert
634+
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
635+
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
636+
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
637+
bcCtx, err := ingest.LitBackCtxMgr.Register(
638+
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
639+
if err != nil {
640+
return errors.Trace(err)
641+
}
642+
defer ingest.LitBackCtxMgr.Unregister(job.ID)
643+
644+
cpMgr, err := ingest.NewCheckpointManager(
645+
ctx,
646+
sessPool,
647+
reorgInfo.PhysicalTableID,
648+
job.ID,
649+
indexIDs,
650+
ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID),
651+
dc.store.(kv.StorageWithPD).GetPDClient(),
652+
)
653+
if err != nil {
654+
logutil.DDLIngestLogger().Warn("create checkpoint manager failed",
655+
zap.Int64("jobID", job.ID),
656+
zap.Error(err))
657+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
596658
} else {
597659
*sctx.GetSessionVars().TimeZone = tz
598660
}
@@ -636,7 +698,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
636698
) error {
637699
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
638700

639-
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
701+
if err := dc.isReorgRunnable(ctx, false); err != nil {
640702
return errors.Trace(err)
641703
}
642704

pkg/ddl/ddl_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type DDLForTest interface {
4545
RemoveReorgCtx(id int64)
4646
}
4747

48+
<<<<<<< HEAD
4849
// SetInterceptor implements DDL.SetInterceptor interface.
4950
func (d *ddl) SetInterceptor(i Interceptor) {
5051
d.mu.Lock()
@@ -58,6 +59,8 @@ func (rc *reorgCtx) IsReorgCanceled() bool {
5859
return rc.isReorgCanceled()
5960
}
6061

62+
=======
63+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
6164
// NewReorgCtx exports for testing.
6265
func (d *ddl) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx {
6366
return d.newReorgCtx(jobID, rowCount)

pkg/ddl/ddl_worker_test.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -304,27 +304,3 @@ func TestJobNeedGC(t *testing.T) {
304304
}}}
305305
require.True(t, ddl.JobNeedGCForTest(job))
306306
}
307-
308-
func TestUsingReorgCtx(t *testing.T) {
309-
_, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
310-
d := domain.DDL()
311-
312-
wg := util.WaitGroupWrapper{}
313-
wg.Run(func() {
314-
jobID := int64(1)
315-
for i := 0; i < 500; i++ {
316-
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
317-
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
318-
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
319-
}
320-
})
321-
wg.Run(func() {
322-
jobID := int64(1)
323-
for i := 0; i < 500; i++ {
324-
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
325-
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
326-
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
327-
}
328-
})
329-
wg.Wait()
330-
}

pkg/ddl/index.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2009,7 +2009,11 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
20092009
// TODO: Support typeAddIndexMergeTmpWorker.
20102010
if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
20112011
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
2012+
<<<<<<< HEAD
20122013
err := w.executeDistGlobalTask(reorgInfo)
2014+
=======
2015+
err := w.executeDistTask(ctx, t, reorgInfo)
2016+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
20132017
if err != nil {
20142018
return err
20152019
}
@@ -2082,6 +2086,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
20822086
return nil
20832087
}
20842088

2089+
<<<<<<< HEAD
20852090
// MockDMLExecutionOnTaskFinished is used to mock DML execution when tasks finished.
20862091
var MockDMLExecutionOnTaskFinished func()
20872092

@@ -2092,6 +2097,9 @@ var MockDMLExecutionOnDDLPaused func()
20922097
var TestSyncChan = make(chan struct{})
20932098

20942099
func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
2100+
=======
2101+
func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgInfo *reorgInfo) error {
2102+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
20952103
if reorgInfo.mergingTmpIdx {
20962104
return errors.New("do not support merge index")
20972105
}
@@ -2142,8 +2150,13 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
21422150
if err != nil {
21432151
return err
21442152
}
2153+
<<<<<<< HEAD
21452154
err = handle.WaitGlobalTask(ctx, task.ID)
21462155
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
2156+
=======
2157+
err = handle.WaitTaskDoneOrPaused(ctx, task.ID)
2158+
if err := w.isReorgRunnable(stepCtx, true); err != nil {
2159+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
21472160
if dbterror.ErrPausedDDLJob.Equal(err) {
21482161
logutil.BgLogger().Warn("job paused by user", zap.String("category", "ddl"), zap.Error(err))
21492162
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
@@ -2167,11 +2180,17 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
21672180

21682181
g.Go(func() error {
21692182
defer close(done)
2183+
<<<<<<< HEAD
21702184
err := handle.SubmitAndRunGlobalTask(ctx, taskKey, taskType, distPhysicalTableConcurrency, metaData)
21712185
failpoint.Inject("pauseAfterDistTaskFinished", func() {
21722186
MockDMLExecutionOnTaskFinished()
21732187
})
21742188
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
2189+
=======
2190+
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, reorgInfo.ReorgMeta.TargetScope, metaData)
2191+
failpoint.InjectCall("pauseAfterDistTaskFinished")
2192+
if err := w.isReorgRunnable(stepCtx, true); err != nil {
2193+
>>>>>>> 575310677da (ddl: check context done in isReorgRunnable function (#57813))
21752194
if dbterror.ErrPausedDDLJob.Equal(err) {
21762195
logutil.BgLogger().Warn("job paused by user", zap.String("category", "ddl"), zap.Error(err))
21772196
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
@@ -2192,7 +2211,7 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
21922211
w.updateJobRowCount(taskKey, reorgInfo.Job.ID)
21932212
return nil
21942213
case <-checkFinishTk.C:
2195-
if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
2214+
if err = w.isReorgRunnable(stepCtx, true); err != nil {
21962215
if dbterror.ErrPausedDDLJob.Equal(err) {
21972216
if err = handle.PauseTask(w.ctx, taskKey); err != nil {
21982217
logutil.BgLogger().Error("pause global task error", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))

0 commit comments

Comments
 (0)