Skip to content

Commit 0c2c600

Browse files
authored
ddl: fix job state overridden when concurrent updates don't overlap in time range (#58495) (#58755)
close #52747
1 parent 556654f commit 0c2c600

File tree

9 files changed

+117
-37
lines changed

9 files changed

+117
-37
lines changed

pkg/ddl/cancel_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func cancelSuccess(rs *testkit.Result) bool {
217217
func TestCancelVariousJobs(t *testing.T) {
218218
var enterCnt, exitCnt atomic.Int32
219219
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) })
220-
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) })
220+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) { exitCnt.Add(1) })
221221
waitDDLWorkerExited := func() {
222222
require.Eventually(t, func() bool {
223223
return enterCnt.Load() == exitCnt.Load()
@@ -378,3 +378,26 @@ func TestCancelForAddUniqueIndex(t *testing.T) {
378378
tbl = external.GetTableByName(t, tk, "test", "t")
379379
require.Equal(t, 0, len(tbl.Meta().Indices))
380380
}
381+
382+
func TestCancelJobBeforeRun(t *testing.T) {
383+
store := testkit.CreateMockStore(t)
384+
tk := testkit.NewTestKit(t, store)
385+
tkCancel := testkit.NewTestKit(t, store)
386+
387+
// Prepare schema.
388+
tk.MustExec("use test")
389+
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
390+
tk.MustExec("insert into t values(1, 1, 1)")
391+
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
392+
393+
counter := 0
394+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeTransitOneJobStep", func(jobW *model.JobW) {
395+
if counter == 0 && jobW.TableName == "t" {
396+
tkCancel.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobW.ID))
397+
counter++
398+
}
399+
})
400+
401+
tk.MustGetErrCode("truncate table t", errno.ErrCancelledDDLJob)
402+
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
403+
}

pkg/ddl/executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func TestCreateViewConcurrently(t *testing.T) {
175175
return
176176
}
177177
})
178-
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) {
178+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) {
179179
if job.Type == model.ActionCreateView {
180180
counter--
181181
}

pkg/ddl/job_scheduler.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,7 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error {
436436
continue
437437
}
438438

439-
s.deliveryJob(wk, targetPool, &job)
440-
439+
s.deliveryJob(wk, targetPool, model.NewJobW(&job, jobBinary))
441440
if s.generalDDLWorkerPool.available() == 0 && s.reorgWorkerPool.available() == 0 {
442441
break
443442
}
@@ -467,44 +466,44 @@ func (s *jobScheduler) mustReloadSchemas() {
467466
// deliveryJob deliver the job to the worker to run it asynchronously.
468467
// the worker will run the job until it's finished, paused or another owner takes
469468
// over and finished it.
470-
func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, job *model.Job) {
471-
failpoint.InjectCall("beforeDeliveryJob", job)
472-
injectFailPointForGetJob(job)
473-
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
469+
func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, jobW *model.JobW) {
470+
failpoint.InjectCall("beforeDeliveryJob", jobW.Job)
471+
injectFailPointForGetJob(jobW.Job)
472+
jobID, involvedSchemaInfos := jobW.ID, jobW.GetInvolvingSchemaInfo()
474473
s.runningJobs.addRunning(jobID, involvedSchemaInfos)
475474
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
476-
jobCtx := s.getJobRunCtx(job.ID, job.TraceInfo)
475+
jobCtx := s.getJobRunCtx(jobW.ID, jobW.TraceInfo)
477476

478477
s.wg.Run(func() {
479478
defer func() {
480479
r := recover()
481480
if r != nil {
482481
logutil.DDLLogger().Error("panic in deliveryJob", zap.Any("recover", r), zap.Stack("stack"))
483482
}
484-
failpoint.InjectCall("afterDeliveryJob", job)
483+
failpoint.InjectCall("afterDeliveryJob", jobW)
485484
// Because there is a gap between `allIDs()` and `checkRunnable()`,
486485
// we append unfinished job to pending atomically to prevent `getJob()`
487-
// chosing another runnable job that involves the same schema object.
488-
moveRunningJobsToPending := r != nil || (job != nil && !job.IsFinished())
486+
// choosing another runnable job that involves the same schema object.
487+
moveRunningJobsToPending := r != nil || (jobW != nil && !jobW.IsFinished())
489488
s.runningJobs.finishOrPendJob(jobID, involvedSchemaInfos, moveRunningJobsToPending)
490489
asyncNotify(s.ddlJobNotifyCh)
491490
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
492491
pool.put(wk)
493492
}()
494493
for {
495-
err := s.transitOneJobStepAndWaitSync(wk, jobCtx, job)
494+
err := s.transitOneJobStepAndWaitSync(wk, jobCtx, jobW)
496495
if err != nil {
497-
logutil.DDLLogger().Info("run job failed", zap.Error(err), zap.Stringer("job", job))
498-
} else if job.InFinalState() {
496+
logutil.DDLLogger().Info("run job failed", zap.Error(err), zap.Stringer("job", jobW))
497+
} else if jobW.InFinalState() {
499498
return
500499
}
501500
// we have to refresh the job, to handle cases like job cancel or pause
502501
// or the job is finished by another owner.
503502
// TODO for JobStateRollbackDone we have to query 1 additional time when the
504503
// job is already moved to history.
505-
failpoint.InjectCall("beforeRefreshJob", job)
504+
failpoint.InjectCall("beforeRefreshJob", jobW.Job)
506505
for {
507-
job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
506+
jobW, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
508507
failpoint.InjectCall("mockGetJobByIDFail", &err)
509508
if err == nil {
510509
break
@@ -551,14 +550,15 @@ func (s *jobScheduler) getJobRunCtx(jobID int64, traceInfo *model.TraceInfo) *jo
551550

552551
// transitOneJobStepAndWaitSync runs one step of the DDL job, persist it and
553552
// waits for other TiDB node to synchronize.
554-
func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobContext, job *model.Job) error {
553+
func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobContext, jobW *model.JobW) error {
555554
failpoint.InjectCall("beforeRunOneJobStep")
556555
ownerID := s.ownerManager.ID()
557556
// suppose we failed to sync version last time, we need to check and sync it
558557
// before run to maintain the 2-version invariant.
559558
// if owner not change, we need try to sync when it's un-synced.
560559
// if owner changed, we need to try sync it if the job is not started by
561560
// current owner.
561+
job := jobW.Job
562562
if jobCtx.isUnSynced(job.ID) || (job.Started() && !jobCtx.maybeAlreadyRunOnce(job.ID)) {
563563
if variable.EnableMDL.Load() {
564564
version, err := s.sysTblMgr.GetMDLVer(s.schCtx, job.ID)
@@ -582,7 +582,7 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobConte
582582
jobCtx.setAlreadyRunOnce(job.ID)
583583
}
584584

585-
schemaVer, err := wk.transitOneJobStep(jobCtx, job, s.sysTblMgr)
585+
schemaVer, err := wk.transitOneJobStep(jobCtx, jobW, s.sysTblMgr)
586586
if err != nil {
587587
jobCtx.logger.Info("handle ddl job failed", zap.Error(err), zap.Stringer("job", job))
588588
return err

pkg/ddl/job_worker.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package ddl
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"fmt"
2021
"math/rand"
@@ -529,19 +530,32 @@ func (w *worker) prepareTxn(job *model.Job) (kv.Transaction, error) {
529530
// non-zero, caller should wait for other nodes to catch up.
530531
func (w *worker) transitOneJobStep(
531532
jobCtx *jobContext,
532-
job *model.Job,
533+
jobW *model.JobW,
533534
sysTblMgr systable.Manager,
534535
) (int64, error) {
535-
var (
536-
err error
537-
)
538-
536+
failpoint.InjectCall("beforeTransitOneJobStep", jobW)
537+
job := jobW.Job
539538
txn, err := w.prepareTxn(job)
540539
if err != nil {
541540
return 0, err
542541
}
543542
jobCtx.metaMut = meta.NewMutator(txn)
544543

544+
// we are using optimistic txn in nearly all DDL related transactions, if
545+
// time range of another concurrent job updates, such as 'cancel/pause' job
546+
// or on owner change, overlap with us, we will report 'write conflict', but
547+
// if they don't overlap, we query and check inside our txn to detect the conflict.
548+
currBytes, err := sysTblMgr.GetJobBytesByIDWithSe(jobCtx.ctx, w.sess, job.ID)
549+
if err != nil {
550+
// TODO maybe we can unify where to rollback, they are scatting around.
551+
w.sess.Rollback()
552+
return 0, err
553+
}
554+
if !bytes.Equal(currBytes, jobW.Bytes) {
555+
w.sess.Rollback()
556+
return 0, errors.New("job meta changed by others")
557+
}
558+
545559
if job.IsDone() || job.IsRollbackDone() || job.IsCancelled() {
546560
if job.IsDone() {
547561
job.State = model.JobStateSynced

pkg/ddl/mock/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
importpath = "github.com/pingcap/tidb/pkg/ddl/mock",
1010
visibility = ["//visibility:public"],
1111
deps = [
12+
"//pkg/ddl/session",
1213
"//pkg/meta/model",
1314
"@org_uber_go_mock//gomock",
1415
],

pkg/ddl/mock/systable_manager_mock.go

Lines changed: 18 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ddl/systable/manager.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ var (
3535
// to access the job/MDL related tables.
3636
type Manager interface {
3737
// GetJobByID gets the job by ID, returns ErrNotFound if the job does not exist.
38-
GetJobByID(ctx context.Context, jobID int64) (*model.Job, error)
38+
GetJobByID(ctx context.Context, jobID int64) (*model.JobW, error)
39+
// GetJobBytesByIDWithSe gets the job binary by ID with the given session.
40+
GetJobBytesByIDWithSe(ctx context.Context, se *session.Session, jobID int64) ([]byte, error)
3941
// GetMDLVer gets the MDL version by job ID, returns ErrNotFound if the MDL info does not exist.
4042
GetMDLVer(ctx context.Context, jobID int64) (int64, error)
4143
// GetMinJobID gets current minimum job ID in the job table for job_id >= prevMinJobID,
@@ -71,27 +73,36 @@ func (mgr *manager) withNewSession(fn func(se *session.Session) error) error {
7173
return fn(ddlse)
7274
}
7375

74-
func (mgr *manager) GetJobByID(ctx context.Context, jobID int64) (*model.Job, error) {
76+
func (mgr *manager) GetJobByID(ctx context.Context, jobID int64) (*model.JobW, error) {
7577
job := model.Job{}
78+
var jobBytes []byte
7679
if err := mgr.withNewSession(func(se *session.Session) error {
77-
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
78-
rows, err := se.Execute(ctx, sql, "get-job-by-id")
80+
bytes, err := mgr.GetJobBytesByIDWithSe(ctx, se, jobID)
7981
if err != nil {
8082
return errors.Trace(err)
8183
}
82-
if len(rows) == 0 {
83-
return ErrNotFound
84-
}
85-
jobBinary := rows[0].GetBytes(0)
86-
err = job.Decode(jobBinary)
84+
err = job.Decode(bytes)
8785
if err != nil {
8886
return errors.Trace(err)
8987
}
88+
jobBytes = bytes
9089
return nil
9190
}); err != nil {
9291
return nil, err
9392
}
94-
return &job, nil
93+
return model.NewJobW(&job, jobBytes), nil
94+
}
95+
96+
func (*manager) GetJobBytesByIDWithSe(ctx context.Context, se *session.Session, jobID int64) ([]byte, error) {
97+
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
98+
rows, err := se.Execute(ctx, sql, "get-job-by-id")
99+
if err != nil {
100+
return nil, errors.Trace(err)
101+
}
102+
if len(rows) == 0 {
103+
return nil, ErrNotFound
104+
}
105+
return rows[0].GetBytes(0), nil
95106
}
96107

97108
func (mgr *manager) GetMDLVer(ctx context.Context, jobID int64) (int64, error) {

pkg/meta/model/job.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,21 @@ type TraceInfo struct {
12881288
SessionAlias string `json:"session_alias"`
12891289
}
12901290

1291+
// JobW is a wrapper of model.Job, it contains the job and the binary representation
1292+
// of the job.
1293+
type JobW struct {
1294+
*Job
1295+
Bytes []byte
1296+
}
1297+
1298+
// NewJobW creates a new JobW.
1299+
func NewJobW(job *Job, bytes []byte) *JobW {
1300+
return &JobW{
1301+
Job: job,
1302+
Bytes: bytes,
1303+
}
1304+
}
1305+
12911306
func init() {
12921307
SetJobVerInUse(JobVersion1)
12931308
}

pkg/session/bootstraptest/bootstrap_upgrade_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
519519
}
520520
})
521521
var once sync.Once
522-
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) {
522+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) {
523523
if job != nil && job.ID == jobID {
524524
once.Do(func() { ch <- struct{}{} })
525525
}

0 commit comments

Comments
 (0)