Skip to content

Commit 2de2950

Browse files
committed
ddl: prevent ddl job state being overridden during update
1 parent 100aa05 commit 2de2950

File tree

6 files changed

+78
-15
lines changed

6 files changed

+78
-15
lines changed

pkg/ddl/constant.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
job_meta longblob,
4949
type int,
5050
processing int,
51+
job_state int,
5152
primary key(job_id))`
5253
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
5354
ReorgTableSQL = "create table " + ReorgTable + `(

pkg/ddl/db_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,3 +1162,35 @@ func TestDDLJobErrEntrySizeTooLarge(t *testing.T) {
11621162
tk.MustExec("create table t1 (a int);")
11631163
tk.MustExec("alter table t add column b int;") // Should not block.
11641164
}
1165+
1166+
func TestDDLCancelJobOverridden(t *testing.T) {
1167+
store := testkit.CreateMockStore(t)
1168+
tk := testkit.NewTestKit(t, store)
1169+
1170+
tk.MustExec("use test")
1171+
tk.MustExec("create table t (a int);")
1172+
1173+
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/ddl/mockBeforePrepareTxn", `return(true)`)
1174+
var (
1175+
cancelErr error
1176+
jobID int64
1177+
once bool
1178+
)
1179+
ddl.BeforeWorkerPrepareTxn = func(job *model.Job) {
1180+
if job.Type == model.ActionTruncateTable && job.State == model.JobStateQueueing && !once {
1181+
once = true
1182+
jobID = job.ID
1183+
tk1 := testkit.NewTestKit(t, store)
1184+
cancelJob := fmt.Sprintf("admin cancel ddl jobs %d", jobID)
1185+
_, cancelErr = tk1.Exec(cancelJob)
1186+
}
1187+
}
1188+
1189+
tk.MustExec("truncate table t;")
1190+
require.True(t, once)
1191+
require.NoError(t, cancelErr)
1192+
rs := tk.MustQuery(fmt.Sprintf("admin show ddl jobs where job_id = %d", jobID)).Rows()
1193+
require.Len(t, rs, 1, fmt.Sprintf("jobID: %d", jobID))
1194+
jobState := rs[0][4].(string)
1195+
require.Equal(t, jobState, "cancelled")
1196+
}

pkg/ddl/ddl.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,10 +1755,12 @@ func resumePausedJob(_ *sess.Session, job *model.Job,
17551755
}
17561756

17571757
// processJobs command on the Job according to the process
1758-
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
1758+
func processJobs(
1759+
process func(*sess.Session, *model.Job, model.AdminCommandOperator) error,
17591760
sessCtx sessionctx.Context,
17601761
ids []int64,
1761-
byWho model.AdminCommandOperator) (jobErrs []error, err error) {
1762+
byWho model.AdminCommandOperator,
1763+
) (jobErrs []error, err error) {
17621764
failpoint.Inject("mockFailedCommandOnConcurencyDDL", func(val failpoint.Value) {
17631765
if val.(bool) {
17641766
failpoint.Return(nil, errors.New("mock failed admin command on ddl jobs"))
@@ -1807,7 +1809,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
18071809
continue
18081810
}
18091811

1810-
err = updateDDLJob2Table(ns, job, false)
1812+
err = updateDDLJob2Table(ns, job, job.State, false)
18111813
if err != nil {
18121814
jobErrs[i] = err
18131815
continue
@@ -1898,7 +1900,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
18981900
continue
18991901
}
19001902

1901-
err = updateDDLJob2Table(ns, job, false)
1903+
err = updateDDLJob2Table(ns, job, job.State, false)
19021904
if err != nil {
19031905
jobErrs[job.ID] = err
19041906
continue

pkg/ddl/ddl_worker.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e
580580

581581
// updateDDLJob updates the DDL job information.
582582
// Every time we enter another state except final state, we must call this function.
583-
func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error {
583+
func (w *worker) updateDDLJob(job *model.Job, prevState model.JobState, meetErr bool) error {
584584
failpoint.Inject("mockErrEntrySizeTooLarge", func(val failpoint.Value) {
585585
if val.(bool) {
586586
failpoint.Return(kv.ErrEntryTooLarge)
@@ -591,7 +591,7 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error {
591591
w.jobLogger(job).Info("meet something wrong before update DDL job, shouldn't update raw args",
592592
zap.String("job", job.String()))
593593
}
594-
return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs))
594+
return errors.Trace(updateDDLJob2Table(w.sess, job, prevState, updateRawArgs))
595595
}
596596

597597
// registerMDLInfo registers metadata lock info.
@@ -882,7 +882,15 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
882882
return nil
883883
}
884884

885+
// BeforeWorkerPrepareTxn is only used for test.
886+
var BeforeWorkerPrepareTxn func(*model.Job)
887+
885888
func (w *worker) prepareTxn(job *model.Job) (kv.Transaction, error) {
889+
failpoint.Inject("mockBeforePrepareTxn", func(val failpoint.Value) {
890+
if val.(bool) {
891+
BeforeWorkerPrepareTxn(job)
892+
}
893+
})
886894
err := w.sess.Begin()
887895
if err != nil {
888896
return nil, err
@@ -918,7 +926,9 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
918926
err error
919927
schemaVer int64
920928
runJobErr error
929+
prevState model.JobState
921930
)
931+
prevState = job.State
922932
defer func() {
923933
w.unlockSeqNum(err)
924934
}()
@@ -999,7 +1009,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
9991009
d.unlockSchemaVersion(job.ID)
10001010
return 0, err
10011011
}
1002-
err = w.updateDDLJob(job, runJobErr != nil)
1012+
err = w.updateDDLJob(job, prevState, runJobErr != nil)
10031013
if err = w.handleUpdateJobError(t, job, err); err != nil {
10041014
w.sess.Rollback()
10051015
d.unlockSchemaVersion(job.ID)

pkg/ddl/job_table.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,8 @@ func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*mod
527527
}
528528

529529
const (
530-
addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values"
531-
updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d"
530+
addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing, job_state) values"
531+
updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s, job_state = %d where job_id = %d and job_state = %d"
532532
)
533533

534534
func insertDDLJobs2Table(se *sess.Session, updateRawArgs bool, jobs ...*model.Job) error {
@@ -550,7 +550,16 @@ func insertDDLJobs2Table(se *sess.Session, updateRawArgs bool, jobs ...*model.Jo
550550
if i != 0 {
551551
sql.WriteString(",")
552552
}
553-
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", job.ID, job.MayNeedReorg(), strconv.Quote(job2SchemaIDs(job)), strconv.Quote(job2TableIDs(job)), util.WrapKey2String(b), job.Type, !job.NotStarted())
553+
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t, %d)",
554+
job.ID,
555+
job.MayNeedReorg(),
556+
strconv.Quote(job2SchemaIDs(job)),
557+
strconv.Quote(job2TableIDs(job)),
558+
util.WrapKey2String(b), // job_meta
559+
job.Type,
560+
!job.NotStarted(), // processing
561+
job.State,
562+
)
554563
}
555564
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
556565
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
@@ -605,14 +614,21 @@ func (w *worker) deleteDDLJob(job *model.Job) error {
605614
return errors.Trace(err)
606615
}
607616

608-
func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) error {
617+
func updateDDLJob2Table(se *sess.Session, job *model.Job, prevState model.JobState, updateRawArgs bool) error {
609618
b, err := job.Encode(updateRawArgs)
610619
if err != nil {
611620
return err
612621
}
613-
sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID)
622+
sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.State, job.ID, prevState)
614623
_, err = se.Execute(context.Background(), sql, "update_job")
615-
return errors.Trace(err)
624+
if err != nil {
625+
return errors.Trace(err)
626+
}
627+
affectedRows := se.GetSessionVars().StmtCtx.AffectedRows()
628+
if affectedRows == 0 {
629+
return errors.Errorf("job is updated concurrently")
630+
}
631+
return nil
616632
}
617633

618634
// getDDLReorgHandle gets DDL reorg handle.

pkg/ddl/partition.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2493,8 +2493,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
24932493
ptID int64
24942494
partName string
24952495
withValidation bool
2496+
prevState model.JobState
24962497
)
24972498

2499+
prevState = job.State
24982500
if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
24992501
job.State = model.JobStateCancelled
25002502
return ver, errors.Trace(err)
@@ -2560,7 +2562,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
25602562
zap.Stringer("job", job), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
25612563
job.Args[0] = partDef.ID
25622564
defID = partDef.ID
2563-
err = updateDDLJob2Table(w.sess, job, true)
2565+
err = updateDDLJob2Table(w.sess, job, prevState, true)
25642566
if err != nil {
25652567
return ver, errors.Trace(err)
25662568
}
@@ -2603,7 +2605,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
26032605
zap.Stringer("job", job), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
26042606
job.Args[0] = partDef.ID
26052607
defID = partDef.ID
2606-
err = updateDDLJob2Table(w.sess, job, true)
2608+
err = updateDDLJob2Table(w.sess, job, prevState, true)
26072609
if err != nil {
26082610
return ver, errors.Trace(err)
26092611
}

0 commit comments

Comments
 (0)