Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
break
}
}
failpoint.InjectCall("afterHandleBackfillTask", task.jobID)

logutil.DDLLogger().Info("backfill worker finish task",
zap.Stringer("worker", w), zap.Stringer("task", task),
zap.Int("added count", result.addedCount),
Expand Down Expand Up @@ -1022,7 +1024,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(
} else {
totalAddedCount += int64(result.addedCount)
}
dc.getReorgCtx(reorgInfo.Job.ID).setRowCount(totalAddedCount)

keeper.updateNextKey(result.taskID, result.nextKey)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func runReorgJobAndHandleErr(
if err != nil {
return false, ver, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, allIndexInfos[0].Name)
Expand Down
42 changes: 42 additions & 0 deletions pkg/ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"context"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -390,3 +391,44 @@ func checkDropDeleteOnly(ctx sessionctx.Context, writeTbl, delTbl table.Table) e
}
return txn.Commit(context.Background())
}

func TestAddIndexRowCountUpdate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set global tidb_enable_dist_task = 0;")

var jobID int64
rowCntUpdated := make(chan struct{})
backfillDone := make(chan struct{})
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateProgressIntervalInMs", "return(50)")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterHandleBackfillTask", func(id int64) {
jobID = id
backfillDone <- struct{}{}
<-rowCntUpdated
})
go func() {
defer func() {
rowCntUpdated <- struct{}{}
}()
<-backfillDone
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
require.Eventually(t, func() bool {
rs := tk2.MustQuery("admin show ddl jobs 1;").Rows()
idStr := rs[0][0].(string)
id, err := strconv.Atoi(idStr)
require.NoError(t, err)
require.Equal(t, int64(id), jobID)
rcStr := rs[0][7].(string)
rc, err := strconv.Atoi(rcStr)
require.NoError(t, err)
return rc > 0
}, 2*time.Minute, 60*time.Millisecond)
}()
tk.MustExec("alter table t add index idx(c2);")
}
64 changes: 47 additions & 17 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ type jobContext struct {
logger *zap.Logger

// per job step fields, they will be changed on each call of transitOneJobStep.
// stepCtx is initilaized and destroyed for each job step except reorg job,
// which returns timeout error periodically.
stepCtx context.Context
stepCtxCancel context.CancelCauseFunc
reorgTimeoutOccurred bool
inInnerRunOneJobStep bool // Only used for multi-schema change DDL job.

stepCtx context.Context
metaMut *meta.Mutator
// decoded JobArgs, we store it here to avoid decoding it multiple times and
// pass some runtime info specific to some job type.
Expand All @@ -108,6 +113,32 @@ type jobContext struct {
lockStartTime time.Time
}

func (c *jobContext) shouldPollDDLJob() bool {
// If we are in multi-schema change DDL and this is not the outermost
// runOneJobStep, we should not start a goroutine to poll the ddl job.
return !c.inInnerRunOneJobStep
}

func (c *jobContext) initStepCtx() {
if c.stepCtx == nil {
stepCtx, cancel := context.WithCancelCause(c.ctx)
c.stepCtx = stepCtx
c.stepCtxCancel = cancel
}
}

func (c *jobContext) cleanStepCtx() {
// reorgTimeoutOccurred indicates whether the current reorg process
// was temporarily exit due to a timeout condition. When set to true,
// it prevents premature cleanup of step context.
if c.reorgTimeoutOccurred {
c.reorgTimeoutOccurred = false // reset flag
return
}
c.stepCtxCancel(context.Canceled)
c.stepCtx = nil // unset stepCtx for the next step initialization
}

func (c *jobContext) getAutoIDRequirement() autoid.Requirement {
return &asAutoIDRequirement{
store: c.store,
Expand Down Expand Up @@ -584,7 +615,7 @@ func (w *worker) transitOneJobStep(
}()
// If running job meets error, we will save this error in job Error and retry
// later if the job is not cancelled.
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job, jobCtx.sysTblMgr)
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job)

failpoint.InjectCall("afterRunOneJobStep", job)

Expand Down Expand Up @@ -774,7 +805,6 @@ func (*worker) processJobPausingRequest(jobCtx *jobContext, job *model.Job) (isR
func (w *worker) runOneJobStep(
jobCtx *jobContext,
job *model.Job,
sysTblMgr systable.Manager,
) (ver int64, updateRawArgs bool, err error) {
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runOneJobStep", w),
func() {
Expand All @@ -784,8 +814,10 @@ func (w *worker) runOneJobStep(
// Mock for run ddl job panic.
failpoint.Inject("mockPanicInRunDDLJob", func(failpoint.Value) {})

failpoint.InjectCall("onRunOneJobStep")
if job.Type != model.ActionMultiSchemaChange {
jobCtx.logger.Info("run one job step", zap.String("job", job.String()))
failpoint.InjectCall("onRunOneJobStep")
}
timeStart := time.Now()
if job.RealStartTS == 0 {
Expand All @@ -809,27 +841,20 @@ func (w *worker) runOneJobStep(
return ver, false, err
}

// when sysTblMgr is nil, clean up the job step context just for clearness.
// Otherwise, we are in multi-schema change DDL and this is not the outermost
// runOneJobStep, we should keep the job step context.
if sysTblMgr != nil {
jobCtx.stepCtx = nil
}

// It would be better to do the positive check, but no idea to list all valid states here now.
if job.IsRollingback() {
// when rolling back, we use worker context to process.
jobCtx.stepCtx = w.workCtx
} else {
job.State = model.JobStateRunning

if sysTblMgr != nil {
if jobCtx.shouldPollDDLJob() {
failpoint.InjectCall("beforePollDDLJob")
stopCheckingJobCancelled := make(chan struct{})
defer close(stopCheckingJobCancelled)

var cancelStep context.CancelCauseFunc
jobCtx.stepCtx, cancelStep = context.WithCancelCause(jobCtx.ctx)
defer cancelStep(context.Canceled)
jobCtx.initStepCtx()
defer jobCtx.cleanStepCtx()
w.wg.Run(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every time we enter runOneJobStep, we will refresh the job, so if we change back to the old bahavior, there is no need to keep this routine

but if we do remove this routine, will cancel be blocked? as fixed in #56404 cc @lance6716

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we do remove this routine, will cancel be blocked?

In this PR, I see in line 886 still calls stepCtxCancel, so cancel should still be seen and processed timely.

ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
Expand All @@ -840,7 +865,7 @@ func (w *worker) runOneJobStep(
return
case <-ticker.C:
failpoint.InjectCall("checkJobCancelled", job)
latestJob, err := sysTblMgr.GetJobByID(w.workCtx, job.ID)
latestJob, err := jobCtx.sysTblMgr.GetJobByID(w.workCtx, job.ID)
if goerrors.Is(err, systable.ErrNotFound) {
logutil.DDLLogger().Info(
"job not found, might already finished",
Expand All @@ -858,13 +883,13 @@ func (w *worker) runOneJobStep(
logutil.DDLLogger().Info("job is cancelled",
zap.Int64("job_id", job.ID),
zap.Stringer("state", latestJob.State))
cancelStep(dbterror.ErrCancelledDDLJob)
jobCtx.stepCtxCancel(dbterror.ErrCancelledDDLJob)
return
case model.JobStatePausing, model.JobStatePaused:
logutil.DDLLogger().Info("job is paused",
zap.Int64("job_id", job.ID),
zap.Stringer("state", latestJob.State))
cancelStep(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
jobCtx.stepCtxCancel(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
return
case model.JobStateDone, model.JobStateSynced:
return
Expand Down Expand Up @@ -1082,6 +1107,11 @@ func updateGlobalVersionAndWaitSynced(
var err error

if latestSchemaVersion == 0 {
// If the DDL step is still in progress (e.g., during reorg timeout),
// skip logging to avoid generating redundant entries.
if jobCtx.stepCtx != nil {
return nil
}
logutil.DDLLogger().Info("schema version doesn't change", zap.Int64("jobID", job.ID))
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func doReorgWorkForModifyColumn(w *worker, jobCtx *jobContext, job *model.Job, t
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (addIndexErr error) {
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
Expand Down
12 changes: 8 additions & 4 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
)

func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int64, err error) {
jobCtx.inInnerRunOneJobStep = true
defer func() {
jobCtx.inInnerRunOneJobStep = false
}()
metaMut := jobCtx.metaMut
if job.MultiSchemaInfo.Revertible {
// Handle the rolling back job.
Expand All @@ -37,7 +41,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
continue
}
proxyJob := sub.ToProxyJob(job, i)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
err = handleRollbackException(err, proxyJob.Error)
if err != nil {
return ver, err
Expand All @@ -60,7 +64,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
continue
}
proxyJob := sub.ToProxyJob(job, i)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
sub.FromProxyJob(&proxyJob, ver)
handleRevertibleException(job, sub, proxyJob.Error)
return ver, err
Expand All @@ -86,7 +90,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
if schemaVersionGenerated {
proxyJob.MultiSchemaInfo.SkipVersion = true
}
proxyJobVer, _, err := w.runOneJobStep(jobCtx, &proxyJob, nil)
proxyJobVer, _, err := w.runOneJobStep(jobCtx, &proxyJob)
if !schemaVersionGenerated && proxyJobVer != 0 {
schemaVersionGenerated = true
ver = proxyJobVer
Expand Down Expand Up @@ -135,7 +139,7 @@ func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int
continue
}
proxyJob := sub.ToProxyJob(job, i)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob, nil)
ver, _, err = w.runOneJobStep(jobCtx, &proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, err
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,28 @@ func TestMultiSchemaChangeBlockedByRowLevelChecksum(t *testing.T) {
tk.MustGetErrCode("alter table t add (c1 int, c2 int)", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangePollJobCount(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t values (1);")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set global tidb_enable_dist_task = 0;")
runOneJobCounter := 0
pollJobCounter := 0
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onRunOneJobStep", func() {
runOneJobCounter++
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforePollDDLJob", func() {
pollJobCounter++
})
// Should not test reorg DDL because the result can be unstable.
tk.MustExec("alter table t add column b int, modify column a bigint, add column c char(10);")
require.Equal(t, 29, runOneJobCounter)
require.Equal(t, 9, pollJobCounter)
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ func (w *worker) cleanGlobalIndexEntriesFromDroppedPartitions(jobCtx *jobContext
// and then run the reorg next time.
return false, errors.Trace(err)
}
err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (dropIndexErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition",
func() {
dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic")
Expand Down Expand Up @@ -3715,7 +3715,7 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, partTbl, physTblIDs, elements)
err = w.runReorgJob(reorgInfo, reorgTbl.Meta(), func() (reorgErr error) {
err = w.runReorgJob(jobCtx, reorgInfo, reorgTbl.Meta(), func() (reorgErr error) {
defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork",
func() {
reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", tbl.Meta().Name)
Expand Down
12 changes: 11 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (rc *reorgCtx) getRowCount() int64 {
//
// After that, we can make sure that the worker goroutine is correctly shut down.
func (w *worker) runReorgJob(
jobCtx *jobContext,
reorgInfo *reorgInfo,
tblInfo *model.TableInfo,
reorgFn func() error,
Expand Down Expand Up @@ -381,7 +382,13 @@ func (w *worker) runReorgJob(
})
}

updateProcessTicker := time.NewTicker(5 * time.Second)
updateProgressInverval := 5 * time.Second
failpoint.Inject("updateProgressIntervalInMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
updateProgressInverval = time.Duration(v) * time.Millisecond
}
})
updateProcessTicker := time.NewTicker(updateProgressInverval)
defer updateProcessTicker.Stop()
for {
select {
Expand All @@ -393,6 +400,7 @@ func (w *worker) runReorgJob(
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
jobCtx.reorgTimeoutOccurred = true
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
Expand Down Expand Up @@ -428,6 +436,8 @@ func (w *worker) runReorgJob(
w.mergeWarningsIntoJob(job)

rc.resetWarnings()
jobCtx.reorgTimeoutOccurred = true
return dbterror.ErrWaitReorgTimeout
}
}
}
Expand Down