Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
// we will never cancel the job once there is panic in bf.BackfillData.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, false)
err := d.isReorgRunnable(d.ctx, false)
if err != nil {
result.err = err
return result
Expand Down Expand Up @@ -677,8 +677,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
t table.PhysicalTable,
reorgInfo *reorgInfo,
) error {
// TODO(tangenta): support adjust worker count dynamically.
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job
Expand Down Expand Up @@ -921,7 +920,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
) (err error) {
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
defer func() {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ type DDLForTest interface {
RemoveReorgCtx(id int64)
}

// IsReorgCanceled exports for testing.
func (rc *reorgCtx) IsReorgCanceled() bool {
return rc.isReorgCanceled()
}

// NewReorgCtx exports for testing.
func (d *ddl) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, rowCount)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2362,7 +2362,7 @@ func (w *worker) addTableIndex(
// TODO: Support typeAddIndexMergeTmpWorker.
if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
err := w.executeDistTask(t, reorgInfo)
err := w.executeDistTask(ctx, t, reorgInfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -2443,7 +2443,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
return nil
}

func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
return errors.New("do not support merge index")
}
Expand Down Expand Up @@ -2494,7 +2494,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
return err
}
err = handle.WaitTaskDoneOrPaused(ctx, task.ID)
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err := w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.DDLLogger().Warn("job paused by user", zap.Error(err))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
Expand Down Expand Up @@ -2531,7 +2531,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
defer close(done)
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, reorgInfo.ReorgMeta.TargetScope, metaData)
failpoint.InjectCall("pauseAfterDistTaskFinished")
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err := w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.DDLLogger().Warn("job paused by user", zap.Error(err))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
Expand All @@ -2552,7 +2552,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
w.updateDistTaskRowCount(taskKey, reorgInfo.Job.ID)
return nil
case <-checkFinishTk.C:
if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err = w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
if err = handle.PauseTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Error("pause task error", zap.String("task_key", taskKey), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (w *worker) runOneJobStep(
logutil.DDLLogger().Info("job is paused",
zap.Int64("job_id", job.ID),
zap.Stringer("state", latestJob.State))
cancelStep(dbterror.ErrPausedDDLJob)
cancelStep(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
return
case model.JobStateDone, model.JobStateSynced:
return
Expand Down
24 changes: 0 additions & 24 deletions pkg/ddl/job_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,3 @@ func TestJobNeedGC(t *testing.T) {
}}}
require.True(t, ddl.JobNeedGC(job))
}

func TestUsingReorgCtx(t *testing.T) {
_, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := domain.DDL()

wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Run(func() {
jobID := int64(1)
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Wait()
}
35 changes: 3 additions & 32 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type reorgCtx struct {
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
jobState model.JobState

mu struct {
sync.Mutex
Expand Down Expand Up @@ -275,20 +274,6 @@ func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, erro
return tzLoc.GetLocation()
}

func (rc *reorgCtx) notifyJobState(state model.JobState) {
atomic.StoreInt32((*int32)(&rc.jobState), int32(state))
}

func (rc *reorgCtx) isReorgCanceled() bool {
s := atomic.LoadInt32((*int32)(&rc.jobState))
return int32(model.JobStateCancelled) == s || int32(model.JobStateCancelling) == s
}

func (rc *reorgCtx) isReorgPaused() bool {
s := atomic.LoadInt32((*int32)(&rc.jobState))
return int32(model.JobStatePaused) == s || int32(model.JobStatePausing) == s
}

func (rc *reorgCtx) setRowCount(count int64) {
atomic.StoreInt64(&rc.rowCount, count)
}
Expand Down Expand Up @@ -566,28 +551,14 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
return rows[0].GetInt64(0)
}

func (dc *ddlCtx) isReorgCancelled(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgCanceled()
}
func (dc *ddlCtx) isReorgPaused(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgPaused()
}

func (dc *ddlCtx) isReorgRunnable(jobID int64, isDistReorg bool) error {
func (dc *ddlCtx) isReorgRunnable(ctx context.Context, isDistReorg bool) error {
if dc.ctx.Err() != nil {
// Worker is closed. So it can't do the reorganization.
return dbterror.ErrInvalidWorker.GenWithStack("worker is closed")
}

// TODO(lance6716): check ctx.Err?
if dc.isReorgCancelled(jobID) {
// Job is cancelled. So it can't be done.
return dbterror.ErrCancelledDDLJob
}

if dc.isReorgPaused(jobID) {
logutil.DDLLogger().Warn("job paused by user", zap.String("ID", dc.uuid))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(jobID)
if ctx.Err() != nil {
return context.Cause(ctx)
}

// If isDistReorg is true, we needn't check if it is owner.
Expand Down
67 changes: 66 additions & 1 deletion tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package addindextest
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -109,7 +111,7 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec(`set global tidb_enable_dist_task=0;`)
}

func TestAddIndexDistCancel(t *testing.T) {
func TestAddIndexDistCancelWithPartition(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
if store.Name() != "TiKV" {
t.Skip("TiKV store only")
Expand Down Expand Up @@ -150,6 +152,69 @@ func TestAddIndexDistCancel(t *testing.T) {
tk.MustExec(`set global tidb_enable_dist_task=0;`)
}

func TestAddIndexDistCancel(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use addindexlit;")
tk2.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk2.MustExec("create table t2 (a int, b int);")
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
tk.MustExec("alter table t add index idx(a);")
wg.Done()
}()
go func() {
tk2.MustExec("alter table t2 add index idx_b(b);")
wg.Done()
}()
wg.Wait()
rows := tk.MustQuery("admin show ddl jobs 2;").Rows()
require.Len(t, rows, 2)
require.True(t, strings.Contains(rows[0][12].(string) /* comments */, "ingest"))
require.True(t, strings.Contains(rows[1][12].(string) /* comments */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
require.Equal(t, rows[1][7].(string) /* row_count */, "3")

tk.MustExec("set @@global.tidb_enable_dist_task = 1;")

// test cancel is timely
enter := make(chan struct{})
testfailpoint.EnableCall(
t,
"github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob",
func(ctx context.Context) {
close(enter)
select {
case <-time.After(time.Second * 30):
case <-ctx.Done():
}
})
wg.Add(1)
go func() {
defer wg.Done()
err := tk2.ExecToErr("alter table t add index idx_ba(b, a);")
require.ErrorContains(t, err, "Cancelled DDL job")
}()
<-enter
jobID := tk.MustQuery("admin show ddl jobs 1;").Rows()[0][0].(string)
now := time.Now()
tk.MustExec("admin cancel ddl jobs " + jobID)
wg.Wait()
// cancel should be timely
require.Less(t, time.Since(now).Seconds(), 20.0)
}

func TestAddIndexDistPauseAndResume(t *testing.T) {
t.Skip("unstable") // TODO(tangenta): fix this unstable test
store := realtikvtest.CreateMockStoreAndSetup(t)
Expand Down