Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func cancelSuccess(rs *testkit.Result) bool {
func TestCancelVariousJobs(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) { exitCnt.Add(1) })
waitDDLWorkerExited := func() {
require.Eventually(t, func() bool {
return enterCnt.Load() == exitCnt.Load()
Expand Down Expand Up @@ -378,3 +378,26 @@ func TestCancelForAddUniqueIndex(t *testing.T) {
tbl = external.GetTableByName(t, tk, "test", "t")
require.Equal(t, 0, len(tbl.Meta().Indices))
}

func TestCancelJobBeforeRun(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)

// Prepare schema.
tk.MustExec("use test")
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
tk.MustExec("insert into t values(1, 1, 1)")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))

counter := 0
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeTransitOneJobStep", func(jobW *model.JobW) {
if counter == 0 && jobW.TableName == "t" {
tkCancel.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobW.ID))
counter++
}
})

tk.MustGetErrCode("truncate table t", errno.ErrCancelledDDLJob)
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
}
2 changes: 1 addition & 1 deletion pkg/ddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestCreateViewConcurrently(t *testing.T) {
return
}
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) {
if job.Type == model.ActionCreateView {
counter--
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error {
continue
}

s.deliveryJob(wk, targetPool, &job)
s.deliveryJob(wk, targetPool, model.NewJobW(&job, jobBinary))
if s.generalDDLWorkerPool.available() == 0 && s.reorgWorkerPool.available() == 0 {
break
}
Expand Down Expand Up @@ -466,13 +466,13 @@ func (s *jobScheduler) mustReloadSchemas() {
// deliveryJob deliver the job to the worker to run it asynchronously.
// the worker will run the job until it's finished, paused or another owner takes
// over and finished it.
func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, job *model.Job) {
failpoint.InjectCall("beforeDeliveryJob", job)
injectFailPointForGetJob(job)
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, jobW *model.JobW) {
failpoint.InjectCall("beforeDeliveryJob", jobW.Job)
injectFailPointForGetJob(jobW.Job)
jobID, involvedSchemaInfos := jobW.ID, jobW.GetInvolvingSchemaInfo()
s.runningJobs.addRunning(jobID, involvedSchemaInfos)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
jobCtx := s.getJobRunCtx(job.ID, job.TraceInfo)
jobCtx := s.getJobRunCtx(jobW.ID, jobW.TraceInfo)

s.wg.Run(func() {
start := time.Now()
Expand All @@ -484,30 +484,30 @@ func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, job *model.Job)
if r != nil {
logutil.DDLLogger().Error("panic in deliveryJob", zap.Any("recover", r), zap.Stack("stack"))
}
failpoint.InjectCall("afterDeliveryJob", job)
failpoint.InjectCall("afterDeliveryJob", jobW)
// Because there is a gap between `allIDs()` and `checkRunnable()`,
// we append unfinished job to pending atomically to prevent `getJob()`
// chosing another runnable job that involves the same schema object.
moveRunningJobsToPending := r != nil || (job != nil && !job.IsFinished())
// choosing another runnable job that involves the same schema object.
moveRunningJobsToPending := r != nil || (jobW != nil && !jobW.IsFinished())
s.runningJobs.finishOrPendJob(jobID, involvedSchemaInfos, moveRunningJobsToPending)
asyncNotify(s.ddlJobNotifyCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
pool.put(wk)
}()
for {
err := s.transitOneJobStepAndWaitSync(wk, jobCtx, job)
err := s.transitOneJobStepAndWaitSync(wk, jobCtx, jobW)
if err != nil {
logutil.DDLLogger().Info("run job failed", zap.Error(err), zap.Stringer("job", job))
} else if job.InFinalState() {
logutil.DDLLogger().Info("run job failed", zap.Error(err), zap.Stringer("job", jobW))
} else if jobW.InFinalState() {
return
}
// we have to refresh the job, to handle cases like job cancel or pause
// or the job is finished by another owner.
// TODO for JobStateRollbackDone we have to query 1 additional time when the
// job is already moved to history.
failpoint.InjectCall("beforeRefreshJob", job)
failpoint.InjectCall("beforeRefreshJob", jobW.Job)
for {
job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
jobW, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
failpoint.InjectCall("mockGetJobByIDFail", &err)
if err == nil {
break
Expand Down Expand Up @@ -554,14 +554,15 @@ func (s *jobScheduler) getJobRunCtx(jobID int64, traceInfo *model.TraceInfo) *jo

// transitOneJobStepAndWaitSync runs one step of the DDL job, persist it and
// waits for other TiDB node to synchronize.
func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobContext, job *model.Job) error {
func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobContext, jobW *model.JobW) error {
failpoint.InjectCall("beforeTransitOneJobStepAndWaitSync")
ownerID := s.ownerManager.ID()
// suppose we failed to sync version last time, we need to check and sync it
// before run to maintain the 2-version invariant.
// if owner not change, we need try to sync when it's un-synced.
// if owner changed, we need to try sync it if the job is not started by
// current owner.
job := jobW.Job
if jobCtx.isUnSynced(job.ID) || (job.Started() && !jobCtx.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
version, err := s.sysTblMgr.GetMDLVer(s.schCtx, job.ID)
Expand All @@ -585,7 +586,7 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobConte
jobCtx.setAlreadyRunOnce(job.ID)
}

schemaVer, err := wk.transitOneJobStep(jobCtx, job, s.sysTblMgr)
schemaVer, err := wk.transitOneJobStep(jobCtx, jobW, s.sysTblMgr)
if err != nil {
jobCtx.logger.Info("handle ddl job failed", zap.Error(err), zap.Stringer("job", job))
return err
Expand Down
24 changes: 19 additions & 5 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand Down Expand Up @@ -529,19 +530,32 @@ func (w *worker) prepareTxn(job *model.Job) (kv.Transaction, error) {
// non-zero, caller should wait for other nodes to catch up.
func (w *worker) transitOneJobStep(
jobCtx *jobContext,
job *model.Job,
jobW *model.JobW,
sysTblMgr systable.Manager,
) (int64, error) {
var (
err error
)

failpoint.InjectCall("beforeTransitOneJobStep", jobW)
job := jobW.Job
txn, err := w.prepareTxn(job)
if err != nil {
return 0, err
}
jobCtx.metaMut = meta.NewMutator(txn)

// we are using optimistic txn in nearly all DDL related transactions, if
// time range of another concurrent job updates, such as 'cancel/pause' job
// or on owner change, overlap with us, we will report 'write conflict', but
// if they don't overlap, we query and check inside our txn to detect the conflict.
currBytes, err := sysTblMgr.GetJobBytesByIDWithSe(jobCtx.ctx, w.sess, job.ID)
if err != nil {
// TODO maybe we can unify where to rollback, they are scatting around.
w.sess.Rollback()
return 0, err
}
if !bytes.Equal(currBytes, jobW.Bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what's the duration of comparing two 6M bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worst case, about 210us

goos: darwin
goarch: arm64
pkg: try-out/test/misc
cpu: Apple M1
BenchmarkCompareBytes-8   	    6459	    211581 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	try-out/test/misc	1.408s
func BenchmarkCompareBytes(b *testing.B) {
	size := 6 << 20
	l := make([]byte, 0, size)
	for i := 0; i < size; i++ {
		l = append(l, 'a')
	}
	r := make([]byte, 0, size)
	for i := 0; i < size; i++ {
		r = append(r, 'a')
	}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_ = bytes.Equal(l, r)
	}
}

w.sess.Rollback()
return 0, errors.New("job meta changed by others")
}

if job.IsDone() || job.IsRollbackDone() || job.IsCancelled() {
if job.IsDone() {
job.State = model.JobStateSynced
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/ddl/mock",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/session",
"//pkg/meta/model",
"@org_uber_go_mock//gomock",
],
Expand Down
20 changes: 18 additions & 2 deletions pkg/ddl/mock/systable_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 21 additions & 10 deletions pkg/ddl/systable/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ var (
// to access the job/MDL related tables.
type Manager interface {
// GetJobByID gets the job by ID, returns ErrNotFound if the job does not exist.
GetJobByID(ctx context.Context, jobID int64) (*model.Job, error)
GetJobByID(ctx context.Context, jobID int64) (*model.JobW, error)
// GetJobBytesByIDWithSe gets the job binary by ID with the given session.
GetJobBytesByIDWithSe(ctx context.Context, se *session.Session, jobID int64) ([]byte, error)
// GetMDLVer gets the MDL version by job ID, returns ErrNotFound if the MDL info does not exist.
GetMDLVer(ctx context.Context, jobID int64) (int64, error)
// GetMinJobID gets current minimum job ID in the job table for job_id >= prevMinJobID,
Expand Down Expand Up @@ -71,27 +73,36 @@ func (mgr *manager) withNewSession(fn func(se *session.Session) error) error {
return fn(ddlse)
}

func (mgr *manager) GetJobByID(ctx context.Context, jobID int64) (*model.Job, error) {
func (mgr *manager) GetJobByID(ctx context.Context, jobID int64) (*model.JobW, error) {
job := model.Job{}
var jobBytes []byte
if err := mgr.withNewSession(func(se *session.Session) error {
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
rows, err := se.Execute(ctx, sql, "get-job-by-id")
bytes, err := mgr.GetJobBytesByIDWithSe(ctx, se, jobID)
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return ErrNotFound
}
jobBinary := rows[0].GetBytes(0)
err = job.Decode(jobBinary)
err = job.Decode(bytes)
if err != nil {
return errors.Trace(err)
}
jobBytes = bytes
return nil
}); err != nil {
return nil, err
}
return &job, nil
return model.NewJobW(&job, jobBytes), nil
}

func (*manager) GetJobBytesByIDWithSe(ctx context.Context, se *session.Session, jobID int64) ([]byte, error) {
sql := fmt.Sprintf(`select job_meta from mysql.tidb_ddl_job where job_id = %d`, jobID)
rows, err := se.Execute(ctx, sql, "get-job-by-id")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, ErrNotFound
}
return rows[0].GetBytes(0), nil
}

func (mgr *manager) GetMDLVer(ctx context.Context, jobID int64) (int64, error) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,21 @@ type TraceInfo struct {
SessionAlias string `json:"session_alias"`
}

// JobW is a wrapper of model.Job, it contains the job and the binary representation
// of the job.
type JobW struct {
*Job
Bytes []byte
}

// NewJobW creates a new JobW.
func NewJobW(job *Job, bytes []byte) *JobW {
return &JobW{
Job: job,
Bytes: bytes,
}
}

func init() {
SetJobVerInUse(JobVersion1)
}
2 changes: 1 addition & 1 deletion pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
}
})
var once sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.JobW) {
if job != nil && job.ID == jobID {
once.Do(func() { ch <- struct{}{} })
}
Expand Down