Skip to content

Commit f52c9cc

Browse files
authored
ddl: fix job state overridden when concurrent updates don't overlap in time range (#58495) (#59728)
close #52747
1 parent 33a4052 commit f52c9cc

File tree

4 files changed

+79
-10
lines changed

4 files changed

+79
-10
lines changed

pkg/ddl/cancel_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,3 +366,26 @@ func TestSubmitJobAfterDDLIsClosed(t *testing.T) {
366366
require.Error(t, ddlErr)
367367
require.Equal(t, "context canceled", ddlErr.Error())
368368
}
369+
370+
func TestCancelJobBeforeRun(t *testing.T) {
371+
store := testkit.CreateMockStore(t)
372+
tk := testkit.NewTestKit(t, store)
373+
tkCancel := testkit.NewTestKit(t, store)
374+
375+
// Prepare schema.
376+
tk.MustExec("use test")
377+
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
378+
tk.MustExec("insert into t values(1, 1, 1)")
379+
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
380+
381+
counter := 0
382+
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/beforeHandleDDLJobTable", func(jobID int64, tableName string) {
383+
if counter == 0 && tableName == "t" {
384+
tkCancel.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobID))
385+
counter++
386+
}
387+
}))
388+
389+
tk.MustGetErrCode("truncate table t", errno.ErrCancelledDDLJob)
390+
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
391+
}

pkg/ddl/ddl_worker.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package ddl
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"fmt"
2021
"math/rand"
@@ -730,7 +731,8 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
730731
return nil
731732
}
732733

733-
func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
734+
func (w *worker) HandleDDLJobTable(d *ddlCtx, jobW *model.JobW) (int64, error) {
735+
failpoint.InjectCall("beforeHandleDDLJobTable", jobW.ID, jobW.TableName)
734736
var (
735737
err error
736738
schemaVer int64
@@ -740,6 +742,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
740742
w.unlockSeqNum(err)
741743
}()
742744

745+
job := jobW.Job
743746
err = w.sess.Begin()
744747
if err != nil {
745748
return 0, err
@@ -766,6 +769,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
766769
}
767770
txn.SetOption(kv.ResourceGroupName, jobContext.resourceGroupName)
768771

772+
// we are using optimistic txn in nearly all DDL related transactions, if
773+
// time range of another concurrent job updates, such as 'cancel/pause' job
774+
// or on owner change, overlap with us, we will report 'write conflict', but
775+
// if they don't overlap, we query and check inside our txn to detect the conflict.
776+
currBytes, err := getJobBytesByIDWithSe(d.ctx, w.sess, job.ID)
777+
if err != nil {
778+
// TODO maybe we can unify where to rollback, they are scatting around.
779+
w.sess.Rollback()
780+
return 0, err
781+
}
782+
if !bytes.Equal(currBytes, jobW.Bytes) {
783+
w.sess.Rollback()
784+
return 0, errors.New("job meta changed by others")
785+
}
786+
769787
t := meta.NewMeta(txn)
770788
if job.IsDone() || job.IsRollbackDone() {
771789
if job.IsDone() {
@@ -862,6 +880,18 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
862880
return schemaVer, nil
863881
}
864882

883+
func getJobBytesByIDWithSe(ctx context.Context, se *sess.Session, jobID int64) ([]byte, error) {
884+
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
885+
rows, err := se.Execute(ctx, sql, "get-job-by-id")
886+
if err != nil {
887+
return nil, errors.Trace(err)
888+
}
889+
if len(rows) == 0 {
890+
return nil, errors.New("not found")
891+
}
892+
return rows[0].GetBytes(0), nil
893+
}
894+
865895
func (w *worker) checkBeforeCommit() error {
866896
if !w.ddlCtx.isOwner() {
867897
// Since this TiDB instance is not a DDL owner anymore,

pkg/ddl/job_table.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ const (
7777
jobTypeReorg
7878
)
7979

80-
func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) {
80+
func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.JobW, error) {
8181
not := "not"
8282
label := "get_job_general"
8383
if tp == jobTypeReorg {
@@ -116,7 +116,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool
116116

117117
// The job has already been picked up, just return to continue it.
118118
if isJobProcessing {
119-
return &job, nil
119+
return model.NewJobW(&job, jobBinary), nil
120120
}
121121

122122
b, err := filter(&job)
@@ -131,7 +131,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool
131131
zap.String("job", job.String()))
132132
return nil, errors.Trace(err)
133133
}
134-
return &job, nil
134+
return model.NewJobW(&job, jobBinary), nil
135135
}
136136
}
137137
return nil, nil
@@ -199,7 +199,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
199199
return true, nil
200200
}
201201

202-
func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
202+
func (d *ddl) getGeneralJob(sess *sess.Session) (*model.JobW, error) {
203203
return d.getJob(sess, jobTypeGeneral, func(job *model.Job) (bool, error) {
204204
if !d.runningJobs.checkRunnable(job) {
205205
return false, nil
@@ -219,7 +219,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
219219
})
220220
}
221221

222-
func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
222+
func (d *ddl) getReorgJob(sess *sess.Session) (*model.JobW, error) {
223223
return d.getJob(sess, jobTypeReorg, func(job *model.Job) (bool, error) {
224224
if !d.runningJobs.checkRunnable(job) {
225225
return false, nil
@@ -330,7 +330,7 @@ func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error {
330330
return nil
331331
}
332332

333-
func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.Job, error)) {
333+
func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.JobW, error)) {
334334
wk, err := pool.get()
335335
if err != nil || wk == nil {
336336
logutil.BgLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
@@ -350,7 +350,7 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*
350350
return
351351
}
352352
d.mu.RLock()
353-
d.mu.hook.OnGetJobAfter(pool.tp().String(), job)
353+
d.mu.hook.OnGetJobAfter(pool.tp().String(), job.Job)
354354
d.mu.RUnlock()
355355

356356
d.delivery2worker(wk, pool, job)
@@ -360,7 +360,8 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*
360360
var AfterDeliverToWorkerForTest func(*model.Job)
361361

362362
// delivery2worker owns the worker, need to put it back to the pool in this function.
363-
func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
363+
func (d *ddl) delivery2worker(wk *worker, pool *workerPool, jobW *model.JobW) {
364+
job := jobW.Job
364365
injectFailPointForGetJob(job)
365366
d.runningJobs.add(job)
366367
d.wg.Run(func() {
@@ -407,7 +408,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
407408
}
408409
}
409410

410-
schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, job)
411+
schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, jobW)
411412
pool.put(wk)
412413
if err != nil {
413414
logutil.BgLogger().Info("handle ddl job failed", zap.String("category", "ddl"), zap.Error(err), zap.String("job", job.String()))

pkg/parser/model/model.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2231,3 +2231,18 @@ type TraceInfo struct {
22312231
// SessionAlias is the alias of session
22322232
SessionAlias string `json:"session_alias"`
22332233
}
2234+
2235+
// JobW is a wrapper of model.Job, it contains the job and the binary representation
2236+
// of the job.
2237+
type JobW struct {
2238+
*Job
2239+
Bytes []byte
2240+
}
2241+
2242+
// NewJobW creates a new JobW.
2243+
func NewJobW(job *Job, bytes []byte) *JobW {
2244+
return &JobW{
2245+
Job: job,
2246+
Bytes: bytes,
2247+
}
2248+
}

0 commit comments

Comments
 (0)