Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,26 @@ func TestSubmitJobAfterDDLIsClosed(t *testing.T) {
require.Error(t, ddlErr)
require.Equal(t, "context canceled", ddlErr.Error())
}

func TestCancelJobBeforeRun(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)

// Prepare schema.
tk.MustExec("use test")
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
tk.MustExec("insert into t values(1, 1, 1)")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))

counter := 0
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/beforeHandleDDLJobTable", func(jobID int64, tableName string) {
if counter == 0 && tableName == "t" {
tkCancel.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobID))
counter++
}
}))

tk.MustGetErrCode("truncate table t", errno.ErrCancelledDDLJob)
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
}
32 changes: 31 additions & 1 deletion pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand Down Expand Up @@ -730,7 +731,8 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
return nil
}

func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
func (w *worker) HandleDDLJobTable(d *ddlCtx, jobW *model.JobW) (int64, error) {
failpoint.InjectCall("beforeHandleDDLJobTable", jobW.ID, jobW.TableName)
var (
err error
schemaVer int64
Expand All @@ -740,6 +742,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
w.unlockSeqNum(err)
}()

job := jobW.Job
err = w.sess.Begin()
if err != nil {
return 0, err
Expand All @@ -766,6 +769,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
}
txn.SetOption(kv.ResourceGroupName, jobContext.resourceGroupName)

// we are using optimistic txn in nearly all DDL related transactions, if
// time range of another concurrent job updates, such as 'cancel/pause' job
// or on owner change, overlap with us, we will report 'write conflict', but
// if they don't overlap, we query and check inside our txn to detect the conflict.
currBytes, err := getJobBytesByIDWithSe(d.ctx, w.sess, job.ID)
if err != nil {
// TODO maybe we can unify where to rollback, they are scatting around.
w.sess.Rollback()
return 0, err
}
if !bytes.Equal(currBytes, jobW.Bytes) {
w.sess.Rollback()
return 0, errors.New("job meta changed by others")
}

t := meta.NewMeta(txn)
if job.IsDone() || job.IsRollbackDone() {
if job.IsDone() {
Expand Down Expand Up @@ -862,6 +880,18 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
return schemaVer, nil
}

func getJobBytesByIDWithSe(ctx context.Context, se *sess.Session, jobID int64) ([]byte, error) {
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
rows, err := se.Execute(ctx, sql, "get-job-by-id")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("not found")
}
return rows[0].GetBytes(0), nil
}

func (w *worker) checkBeforeCommit() error {
if !w.ddlCtx.isOwner() {
// Since this TiDB instance is not a DDL owner anymore,
Expand Down
19 changes: 10 additions & 9 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const (
jobTypeReorg
)

func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) {
func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.JobW, error) {
not := "not"
label := "get_job_general"
if tp == jobTypeReorg {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool

// The job has already been picked up, just return to continue it.
if isJobProcessing {
return &job, nil
return model.NewJobW(&job, jobBinary), nil
}

b, err := filter(&job)
Expand All @@ -131,7 +131,7 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool
zap.String("job", job.String()))
return nil, errors.Trace(err)
}
return &job, nil
return model.NewJobW(&job, jobBinary), nil
}
}
return nil, nil
Expand Down Expand Up @@ -199,7 +199,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
return true, nil
}

func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
func (d *ddl) getGeneralJob(sess *sess.Session) (*model.JobW, error) {
return d.getJob(sess, jobTypeGeneral, func(job *model.Job) (bool, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
Expand All @@ -219,7 +219,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
})
}

func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
func (d *ddl) getReorgJob(sess *sess.Session) (*model.JobW, error) {
return d.getJob(sess, jobTypeReorg, func(job *model.Job) (bool, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
Expand Down Expand Up @@ -330,7 +330,7 @@ func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error {
return nil
}

func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.Job, error)) {
func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.JobW, error)) {
wk, err := pool.get()
if err != nil || wk == nil {
logutil.BgLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
Expand All @@ -350,7 +350,7 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*
return
}
d.mu.RLock()
d.mu.hook.OnGetJobAfter(pool.tp().String(), job)
d.mu.hook.OnGetJobAfter(pool.tp().String(), job.Job)
d.mu.RUnlock()

d.delivery2worker(wk, pool, job)
Expand All @@ -360,7 +360,8 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*
var AfterDeliverToWorkerForTest func(*model.Job)

// delivery2worker owns the worker, need to put it back to the pool in this function.
func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
func (d *ddl) delivery2worker(wk *worker, pool *workerPool, jobW *model.JobW) {
job := jobW.Job
injectFailPointForGetJob(job)
d.runningJobs.add(job)
d.wg.Run(func() {
Expand Down Expand Up @@ -407,7 +408,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
}
}

schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, job)
schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, jobW)
pool.put(wk)
if err != nil {
logutil.BgLogger().Info("handle ddl job failed", zap.String("category", "ddl"), zap.Error(err), zap.String("job", job.String()))
Expand Down
15 changes: 15 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2231,3 +2231,18 @@ type TraceInfo struct {
// SessionAlias is the alias of session
SessionAlias string `json:"session_alias"`
}

// JobW is a wrapper of model.Job, it contains the job and the binary representation
// of the job.
type JobW struct {
*Job
Bytes []byte
}

// NewJobW creates a new JobW.
func NewJobW(job *Job, bytes []byte) *JobW {
return &JobW{
Job: job,
Bytes: bytes,
}
}