Skip to content

Commit c199ddf

Browse files
authored
disttask: cancel subtask context if scheduled away (#58615)
close #58450
1 parent bf939fa commit c199ddf

File tree

4 files changed

+74
-14
lines changed

4 files changed

+74
-14
lines changed

pkg/disttask/framework/scheduler/balancer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/pingcap/errors"
22+
"github.com/pingcap/failpoint"
2223
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
2324
llog "github.com/pingcap/tidb/pkg/lightning/log"
2425
"github.com/pingcap/tidb/pkg/util/intest"
@@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
116117
// managed nodes, subtasks of task might not be balanced.
117118
adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(),
118119
eligibleNodes, subtasks[0].Concurrency)
120+
failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) {
121+
adjustedNodes = []string{}
122+
})
119123
if len(adjustedNodes) == 0 {
120124
// no node has enough slots to run the subtasks, skip balance and skip
121125
// update used slots.

pkg/disttask/framework/taskexecutor/task_executor.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba
122122
// `pending` state, to make sure subtasks can be balanced later when node scale out.
123123
// - If current running subtask are scheduled away from this node, i.e. this node
124124
// is taken as down, cancel running.
125-
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
125+
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) {
126126
ticker := time.NewTicker(checkBalanceSubtaskInterval)
127127
defer ticker.Stop()
128128
for {
@@ -143,7 +143,10 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
143143
e.logger.Info("subtask is scheduled away, cancel running",
144144
zap.Int64("subtaskID", e.currSubtaskID.Load()))
145145
// cancels runStep, but leave the subtask state unchanged.
146-
e.cancelRunStepWith(nil)
146+
if subtaskCtxCancel != nil {
147+
subtaskCtxCancel()
148+
}
149+
failpoint.InjectCall("afterCancelSubtaskExec")
147150
return
148151
}
149152

@@ -317,6 +320,12 @@ func (e *BaseTaskExecutor) Run() {
317320
continue
318321
}
319322
}
323+
if err := e.stepCtx.Err(); err != nil {
324+
e.logger.Error("step executor context is done, the task should have been reverted",
325+
zap.String("step", proto.Step2Str(task.Type, task.Step)),
326+
zap.Error(err))
327+
continue
328+
}
320329
err = e.runSubtask(subtask)
321330
if err != nil {
322331
// task executor keeps running its subtasks even though some subtask
@@ -418,23 +427,25 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
418427
logTask := llog.BeginTask(logger, "run subtask")
419428
subtaskErr := func() error {
420429
e.currSubtaskID.Store(subtask.ID)
430+
subtaskCtx, subtaskCtxCancel := context.WithCancel(e.stepCtx)
421431

422432
var wg util.WaitGroupWrapper
423-
checkCtx, checkCancel := context.WithCancel(e.stepCtx)
433+
checkCtx, checkCancel := context.WithCancel(subtaskCtx)
424434
wg.RunWithLog(func() {
425-
e.checkBalanceSubtask(checkCtx)
435+
e.checkBalanceSubtask(checkCtx, subtaskCtxCancel)
426436
})
427437

428438
if e.hasRealtimeSummary(e.stepExec) {
429439
wg.RunWithLog(func() {
430-
e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec)
440+
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
431441
})
432442
}
433443
defer func() {
434444
checkCancel()
435445
wg.Wait()
446+
subtaskCtxCancel()
436447
}()
437-
return e.stepExec.RunSubtask(e.stepCtx, subtask)
448+
return e.stepExec.RunSubtask(subtaskCtx, subtask)
438449
}()
439450
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
440451
logTask.End2(zap.InfoLevel, subtaskErr)

pkg/disttask/framework/taskexecutor/task_executor_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func TestTaskExecutorRun(t *testing.T) {
315315
// mock for checkBalanceSubtask, returns empty subtask list
316316
e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id",
317317
e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
318-
// this subtask is scheduled awsy during running
318+
// this subtask is scheduled away during running
319319
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
320320
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
321321
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
@@ -326,6 +326,7 @@ func TestTaskExecutorRun(t *testing.T) {
326326
<-ctx.Done()
327327
return ctx.Err()
328328
})
329+
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
329330
// keep running next subtask
330331
nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
331332
ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}
@@ -889,18 +890,17 @@ func TestCheckBalanceSubtask(t *testing.T) {
889890
// context canceled
890891
canceledCtx, cancel := context.WithCancel(ctx)
891892
cancel()
892-
taskExecutor.checkBalanceSubtask(canceledCtx)
893+
taskExecutor.checkBalanceSubtask(canceledCtx, nil)
893894
})
894895

895896
t.Run("subtask scheduled away", func(t *testing.T) {
896897
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
897898
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error"))
898899
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
899900
task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
900-
runCtx, cancelCause := context.WithCancelCause(ctx)
901-
taskExecutor.mu.runtimeCancel = cancelCause
901+
runCtx, cancel := context.WithCancel(ctx)
902902
require.NoError(t, runCtx.Err())
903-
taskExecutor.checkBalanceSubtask(ctx)
903+
taskExecutor.checkBalanceSubtask(ctx, cancel)
904904
require.ErrorIs(t, runCtx.Err(), context.Canceled)
905905
require.True(t, ctrl.Satisfied())
906906
})
@@ -913,7 +913,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
913913
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
914914
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
915915
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
916-
taskExecutor.checkBalanceSubtask(ctx)
916+
taskExecutor.checkBalanceSubtask(ctx, nil)
917917
require.True(t, ctrl.Satisfied())
918918

919919
// if we failed to change state of non-idempotent subtask, will retry
@@ -930,7 +930,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
930930
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
931931
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
932932
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
933-
taskExecutor.checkBalanceSubtask(ctx)
933+
taskExecutor.checkBalanceSubtask(ctx, nil)
934934
require.True(t, ctrl.Satisfied())
935935
})
936936

@@ -945,7 +945,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
945945
// used to break the loop
946946
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
947947
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil)
948-
taskExecutor.checkBalanceSubtask(ctx)
948+
taskExecutor.checkBalanceSubtask(ctx, nil)
949949
require.True(t, ctrl.Satisfied())
950950
})
951951
}

tests/realtikvtest/addindextest1/disttask_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,48 @@ func TestAddIndexDistLockAcquireFailed(t *testing.T) {
391391
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
392392
tk.MustExec("alter table t add index idx(b);")
393393
}
394+
395+
func TestAddIndexScheduleAway(t *testing.T) {
396+
store := realtikvtest.CreateMockStoreAndSetup(t)
397+
tk := testkit.NewTestKit(t, store)
398+
tk.MustExec("use test")
399+
tk.MustExec("set global tidb_enable_dist_task = on;")
400+
t.Cleanup(func() {
401+
tk.MustExec("set global tidb_enable_dist_task = off;")
402+
})
403+
tk.MustExec("create table t (a int, b int);")
404+
tk.MustExec("insert into t values (1, 1);")
405+
406+
var jobID atomic.Int64
407+
// Acquire the job ID.
408+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
409+
if job.Type == model.ActionAddIndex {
410+
jobID.Store(job.ID)
411+
}
412+
})
413+
// Do not balance subtasks automatically.
414+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return")
415+
afterCancel := make(chan struct{})
416+
// Capture the cancel operation from checkBalanceLoop.
417+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelSubtaskExec", func() {
418+
close(afterCancel)
419+
})
420+
var once sync.Once
421+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", func() {
422+
once.Do(func() {
423+
tk1 := testkit.NewTestKit(t, store)
424+
tk1.MustExec("use test")
425+
updateExecID := fmt.Sprintf(`
426+
update mysql.tidb_background_subtask set exec_id = 'other' where task_key in
427+
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
428+
tk1.MustExec(updateExecID)
429+
<-afterCancel
430+
updateExecID = fmt.Sprintf(`
431+
update mysql.tidb_background_subtask set exec_id = ':4000' where task_key in
432+
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
433+
tk1.MustExec(updateExecID)
434+
})
435+
})
436+
tk.MustExec("alter table t add index idx(b);")
437+
require.NotEqual(t, int64(0), jobID.Load())
438+
}

0 commit comments

Comments
 (0)