Skip to content

Commit baed58f

Browse files
tangentati-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#60828
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 71d59db commit baed58f

File tree

9 files changed

+148
-25
lines changed

9 files changed

+148
-25
lines changed

pkg/ddl/backfilling.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
394394
break
395395
}
396396
}
397+
failpoint.InjectCall("afterHandleBackfillTask", task.jobID)
398+
397399
logutil.DDLLogger().Info("backfill worker finish task",
398400
zap.Stringer("worker", w), zap.Stringer("task", task),
399401
zap.Int("added count", result.addedCount),
@@ -1034,7 +1036,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(
10341036
} else {
10351037
totalAddedCount += int64(result.addedCount)
10361038
}
1037-
dc.getReorgCtx(reorgInfo.Job.ID).setRowCount(totalAddedCount)
10381039

10391040
keeper.updateNextKey(result.taskID, result.nextKey)
10401041

pkg/ddl/index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1518,7 +1518,7 @@ func runReorgJobAndHandleErr(
15181518
if err != nil {
15191519
return false, ver, errors.Trace(err)
15201520
}
1521-
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
1521+
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (addIndexErr error) {
15221522
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
15231523
func() {
15241524
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, allIndexInfos[0].Name)

pkg/ddl/index_change_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package ddl_test
1616

1717
import (
1818
"context"
19+
"strconv"
1920
"sync/atomic"
2021
"testing"
2122
"time"
@@ -390,3 +391,44 @@ func checkDropDeleteOnly(ctx sessionctx.Context, writeTbl, delTbl table.Table) e
390391
}
391392
return txn.Commit(context.Background())
392393
}
394+
395+
func TestAddIndexRowCountUpdate(t *testing.T) {
396+
store := testkit.CreateMockStore(t)
397+
tk := testkit.NewTestKit(t, store)
398+
tk.MustExec("use test")
399+
tk.MustExec("create table t (c1 int primary key, c2 int)")
400+
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")
401+
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
402+
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
403+
tk.MustExec("set global tidb_enable_dist_task = 0;")
404+
405+
var jobID int64
406+
rowCntUpdated := make(chan struct{})
407+
backfillDone := make(chan struct{})
408+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateProgressIntervalInMs", "return(50)")
409+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterHandleBackfillTask", func(id int64) {
410+
jobID = id
411+
backfillDone <- struct{}{}
412+
<-rowCntUpdated
413+
})
414+
go func() {
415+
defer func() {
416+
rowCntUpdated <- struct{}{}
417+
}()
418+
<-backfillDone
419+
tk2 := testkit.NewTestKit(t, store)
420+
tk2.MustExec("use test")
421+
require.Eventually(t, func() bool {
422+
rs := tk2.MustQuery("admin show ddl jobs 1;").Rows()
423+
idStr := rs[0][0].(string)
424+
id, err := strconv.Atoi(idStr)
425+
require.NoError(t, err)
426+
require.Equal(t, int64(id), jobID)
427+
rcStr := rs[0][7].(string)
428+
rc, err := strconv.Atoi(rcStr)
429+
require.NoError(t, err)
430+
return rc > 0
431+
}, 2*time.Minute, 60*time.Millisecond)
432+
}()
433+
tk.MustExec("alter table t add index idx(c2);")
434+
}

pkg/ddl/job_worker.go

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,13 @@ type jobContext struct {
9696
logger *zap.Logger
9797

9898
// per job step fields, they will be changed on each call of transitOneJobStep.
99+
// stepCtx is initilaized and destroyed for each job step except reorg job,
100+
// which returns timeout error periodically.
101+
stepCtx context.Context
102+
stepCtxCancel context.CancelCauseFunc
103+
reorgTimeoutOccurred bool
104+
inInnerRunOneJobStep bool // Only used for multi-schema change DDL job.
99105

100-
stepCtx context.Context
101106
metaMut *meta.Mutator
102107
// decoded JobArgs, we store it here to avoid decoding it multiple times and
103108
// pass some runtime info specific to some job type.
@@ -107,6 +112,32 @@ type jobContext struct {
107112
oldDDLCtx *ddlCtx
108113
}
109114

115+
func (c *jobContext) shouldPollDDLJob() bool {
116+
// If we are in multi-schema change DDL and this is not the outermost
117+
// runOneJobStep, we should not start a goroutine to poll the ddl job.
118+
return !c.inInnerRunOneJobStep
119+
}
120+
121+
func (c *jobContext) initStepCtx() {
122+
if c.stepCtx == nil {
123+
stepCtx, cancel := context.WithCancelCause(c.ctx)
124+
c.stepCtx = stepCtx
125+
c.stepCtxCancel = cancel
126+
}
127+
}
128+
129+
func (c *jobContext) cleanStepCtx() {
130+
// reorgTimeoutOccurred indicates whether the current reorg process
131+
// was temporarily exit due to a timeout condition. When set to true,
132+
// it prevents premature cleanup of step context.
133+
if c.reorgTimeoutOccurred {
134+
c.reorgTimeoutOccurred = false // reset flag
135+
return
136+
}
137+
c.stepCtxCancel(context.Canceled)
138+
c.stepCtx = nil // unset stepCtx for the next step initialization
139+
}
140+
110141
func (c *jobContext) getAutoIDRequirement() autoid.Requirement {
111142
return &asAutoIDRequirement{
112143
store: c.store,
@@ -581,7 +612,11 @@ func (w *worker) transitOneJobStep(
581612

582613
// If running job meets error, we will save this error in job Error and retry
583614
// later if the job is not cancelled.
615+
<<<<<<< HEAD
584616
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job, sysTblMgr)
617+
=======
618+
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job)
619+
>>>>>>> e8fb24a20d4 (ddl: update row count periodically when running reorg job (#60828))
585620

586621
failpoint.InjectCall("onJobRunAfter", job)
587622

@@ -784,7 +819,6 @@ func (*worker) processJobPausingRequest(jobCtx *jobContext, job *model.Job) (isR
784819
func (w *worker) runOneJobStep(
785820
jobCtx *jobContext,
786821
job *model.Job,
787-
sysTblMgr systable.Manager,
788822
) (ver int64, updateRawArgs bool, err error) {
789823
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runOneJobStep", w),
790824
func() {
@@ -794,8 +828,14 @@ func (w *worker) runOneJobStep(
794828
// Mock for run ddl job panic.
795829
failpoint.Inject("mockPanicInRunDDLJob", func(failpoint.Value) {})
796830

831+
failpoint.InjectCall("onRunOneJobStep")
797832
if job.Type != model.ActionMultiSchemaChange {
833+
<<<<<<< HEAD
798834
jobCtx.logger.Info("run DDL job", zap.String("job", job.String()))
835+
=======
836+
jobCtx.logger.Info("run one job step", zap.String("job", job.String()))
837+
failpoint.InjectCall("onRunOneJobStep")
838+
>>>>>>> e8fb24a20d4 (ddl: update row count periodically when running reorg job (#60828))
799839
}
800840
timeStart := time.Now()
801841
if job.RealStartTS == 0 {
@@ -819,27 +859,20 @@ func (w *worker) runOneJobStep(
819859
return ver, false, err
820860
}
821861

822-
// when sysTblMgr is nil, clean up the job step context just for clearness.
823-
// Otherwise, we are in multi-schema change DDL and this is not the outermost
824-
// runOneJobStep, we should keep the job step context.
825-
if sysTblMgr != nil {
826-
jobCtx.stepCtx = nil
827-
}
828-
829862
// It would be better to do the positive check, but no idea to list all valid states here now.
830863
if job.IsRollingback() {
831864
// when rolling back, we use worker context to process.
832865
jobCtx.stepCtx = w.workCtx
833866
} else {
834867
job.State = model.JobStateRunning
835868

836-
if sysTblMgr != nil {
869+
if jobCtx.shouldPollDDLJob() {
870+
failpoint.InjectCall("beforePollDDLJob")
837871
stopCheckingJobCancelled := make(chan struct{})
838872
defer close(stopCheckingJobCancelled)
839873

840-
var cancelStep context.CancelCauseFunc
841-
jobCtx.stepCtx, cancelStep = context.WithCancelCause(jobCtx.ctx)
842-
defer cancelStep(context.Canceled)
874+
jobCtx.initStepCtx()
875+
defer jobCtx.cleanStepCtx()
843876
w.wg.Run(func() {
844877
ticker := time.NewTicker(2 * time.Second)
845878
defer ticker.Stop()
@@ -849,8 +882,14 @@ func (w *worker) runOneJobStep(
849882
case <-stopCheckingJobCancelled:
850883
return
851884
case <-ticker.C:
885+
<<<<<<< HEAD
852886
latestJob, err := sysTblMgr.GetJobByID(w.workCtx, job.ID)
853887
if err == systable.ErrNotFound {
888+
=======
889+
failpoint.InjectCall("checkJobCancelled", job)
890+
latestJob, err := jobCtx.sysTblMgr.GetJobByID(w.workCtx, job.ID)
891+
if goerrors.Is(err, systable.ErrNotFound) {
892+
>>>>>>> e8fb24a20d4 (ddl: update row count periodically when running reorg job (#60828))
854893
logutil.DDLLogger().Info(
855894
"job not found, might already finished",
856895
zap.Int64("job_id", job.ID))
@@ -867,13 +906,13 @@ func (w *worker) runOneJobStep(
867906
logutil.DDLLogger().Info("job is cancelled",
868907
zap.Int64("job_id", job.ID),
869908
zap.Stringer("state", latestJob.State))
870-
cancelStep(dbterror.ErrCancelledDDLJob)
909+
jobCtx.stepCtxCancel(dbterror.ErrCancelledDDLJob)
871910
return
872911
case model.JobStatePausing, model.JobStatePaused:
873912
logutil.DDLLogger().Info("job is paused",
874913
zap.Int64("job_id", job.ID),
875914
zap.Stringer("state", latestJob.State))
876-
cancelStep(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
915+
jobCtx.stepCtxCancel(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
877916
return
878917
case model.JobStateDone, model.JobStateSynced:
879918
return
@@ -1089,6 +1128,11 @@ func updateGlobalVersionAndWaitSynced(
10891128
var err error
10901129

10911130
if latestSchemaVersion == 0 {
1131+
// If the DDL step is still in progress (e.g., during reorg timeout),
1132+
// skip logging to avoid generating redundant entries.
1133+
if jobCtx.stepCtx != nil {
1134+
return nil
1135+
}
10921136
logutil.DDLLogger().Info("schema version doesn't change", zap.Int64("jobID", job.ID))
10931137
return nil
10941138
}

pkg/ddl/modify_column.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ func doReorgWorkForModifyColumn(w *worker, jobCtx *jobContext, job *model.Job, t
635635
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData".
636636
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData"
637637
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
638-
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
638+
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (addIndexErr error) {
639639
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
640640
func() {
641641
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)

pkg/ddl/multi_schema_change.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import (
2727
)
2828

2929
func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int64, err error) {
30+
jobCtx.inInnerRunOneJobStep = true
31+
defer func() {
32+
jobCtx.inInnerRunOneJobStep = false
33+
}()
3034
metaMut := jobCtx.metaMut
3135
if job.MultiSchemaInfo.Revertible {
3236
// Handle the rolling back job.
@@ -38,7 +42,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
3842
continue
3943
}
4044
proxyJob := sub.ToProxyJob(job, i)
41-
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
45+
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
4246
err = handleRollbackException(err, proxyJob.Error)
4347
if err != nil {
4448
return ver, err
@@ -61,7 +65,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
6165
continue
6266
}
6367
proxyJob := sub.ToProxyJob(job, i)
64-
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
68+
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
6569
sub.FromProxyJob(&proxyJob, ver)
6670
handleRevertibleException(job, sub, proxyJob.Error)
6771
return ver, err
@@ -87,7 +91,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
8791
if schemaVersionGenerated {
8892
proxyJob.MultiSchemaInfo.SkipVersion = true
8993
}
90-
proxyJobVer, _, err := w.runOneJobStep(jobCtx, &proxyJob, nil)
94+
proxyJobVer, _, err := w.runOneJobStep(jobCtx, &proxyJob)
9195
if !schemaVersionGenerated && proxyJobVer != 0 {
9296
schemaVersionGenerated = true
9397
ver = proxyJobVer
@@ -136,7 +140,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
136140
continue
137141
}
138142
proxyJob := sub.ToProxyJob(job, i)
139-
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
143+
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
140144
sub.FromProxyJob(&proxyJob, ver)
141145
return ver, err
142146
}

pkg/ddl/multi_schema_change_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,28 @@ func TestMultiSchemaChangeBlockedByRowLevelChecksum(t *testing.T) {
797797
tk.MustGetErrCode("alter table t add (c1 int, c2 int)", errno.ErrUnsupportedDDLOperation)
798798
}
799799

800+
func TestMultiSchemaChangePollJobCount(t *testing.T) {
801+
store := testkit.CreateMockStore(t)
802+
tk := testkit.NewTestKit(t, store)
803+
tk.MustExec("use test")
804+
tk.MustExec("create table t (a int)")
805+
tk.MustExec("insert into t values (1);")
806+
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
807+
tk.MustExec("set global tidb_enable_dist_task = 0;")
808+
runOneJobCounter := 0
809+
pollJobCounter := 0
810+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onRunOneJobStep", func() {
811+
runOneJobCounter++
812+
})
813+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforePollDDLJob", func() {
814+
pollJobCounter++
815+
})
816+
// Should not test reorg DDL because the result can be unstable.
817+
tk.MustExec("alter table t add column b int, modify column a bigint, add column c char(10);")
818+
require.Equal(t, 29, runOneJobCounter)
819+
require.Equal(t, 9, pollJobCounter)
820+
}
821+
800822
type cancelOnceHook struct {
801823
store kv.Storage
802824
triggered bool

pkg/ddl/partition.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2441,7 +2441,7 @@ func (w *worker) cleanGlobalIndexEntriesFromDroppedPartitions(jobCtx *jobContext
24412441
// and then run the reorg next time.
24422442
return false, errors.Trace(err)
24432443
}
2444-
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
2444+
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
24452445
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
24462446
func() {
24472447
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
@@ -3733,7 +3733,7 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab
37333733
return false, ver, errors.Trace(err)
37343734
}
37353735
reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, partTbl, physTblIDs, elements)
3736-
err = w.runReorgJob(reorgInfo, reorgTbl.Meta(), func() (reorgErr error) {
3736+
err = w.runReorgJob(jobCtx, reorgInfo, reorgTbl.Meta(), func() (reorgErr error) {
37373737
defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork",
37383738
func() {
37393739
reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", tbl.Meta().Name)

pkg/ddl/reorg.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ func (rc *reorgCtx) getRowCount() int64 {
339339
//
340340
// After that, we can make sure that the worker goroutine is correctly shut down.
341341
func (w *worker) runReorgJob(
342+
jobCtx *jobContext,
342343
reorgInfo *reorgInfo,
343344
tblInfo *model.TableInfo,
344345
reorgFn func() error,
@@ -380,7 +381,13 @@ func (w *worker) runReorgJob(
380381
})
381382
}
382383

383-
updateProcessTicker := time.NewTicker(5 * time.Second)
384+
updateProgressInverval := 5 * time.Second
385+
failpoint.Inject("updateProgressIntervalInMs", func(val failpoint.Value) {
386+
if v, ok := val.(int); ok {
387+
updateProgressInverval = time.Duration(v) * time.Millisecond
388+
}
389+
})
390+
updateProcessTicker := time.NewTicker(updateProgressInverval)
384391
defer updateProcessTicker.Stop()
385392
for {
386393
select {
@@ -392,6 +399,7 @@ func (w *worker) runReorgJob(
392399
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
393400
zap.Int64("prevTS", res.ownerTS),
394401
zap.Int64("curTS", curTS))
402+
jobCtx.reorgTimeoutOccurred = true
395403
return dbterror.ErrWaitReorgTimeout
396404
}
397405
// Since job is cancelled,we don't care about its partial counts.
@@ -427,6 +435,8 @@ func (w *worker) runReorgJob(
427435
w.mergeWarningsIntoJob(job)
428436

429437
rc.resetWarnings()
438+
jobCtx.reorgTimeoutOccurred = true
439+
return dbterror.ErrWaitReorgTimeout
430440
}
431441
}
432442
}

0 commit comments

Comments
 (0)