Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestFrameworkCancelTask(t *testing.T) {
registerExampleTask(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(e taskexecutor.TaskExecutor, _ *error) {
func(e taskexecutor.TaskExecutor, _ *error, _ context.Context) {
if counter.Add(1) == 1 {
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, e.GetTaskBase().ID))
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) {
t.Run("meet cancel on run subtask", func(t *testing.T) {
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(e taskexecutor.TaskExecutor, errP *error) {
func(e taskexecutor.TaskExecutor, errP *error, _ context.Context) {
if counter.Add(1) == 1 {
e.CancelRunningSubtask()
*errP = taskexecutor.ErrCancelSubtask
Expand All @@ -259,7 +259,7 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) {
t.Run("meet some error on run subtask", func(t *testing.T) {
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(_ taskexecutor.TaskExecutor, errP *error) {
func(_ taskexecutor.TaskExecutor, errP *error, _ context.Context) {
if counter.Add(1) == 1 {
*errP = errors.New("MockExecutorRunErr")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (b *balancer) balanceLoop(ctx context.Context, sm *Manager) {
func (b *balancer) balance(ctx context.Context, sm *Manager) {
// we will use currUsedSlots to calculate adjusted eligible nodes during balance,
// it's initial value depends on the managed nodes, to have a consistent view,
// DO NOT call getManagedNodes twice during 1 balance.
// DO NOT call getNodes twice during 1 balance.
managedNodes := b.nodeMgr.getNodes()
b.currUsedSlots = make(map[string]int, len(managedNodes))
for _, n := range managedNodes {
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,14 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {

logger := e.logger.With(zap.Int64("subtaskID", subtask.ID), zap.String("step", proto.Step2Str(subtask.Type, subtask.Step)))
logTask := llog.BeginTask(logger, "run subtask")
subtaskCtx, subtaskCancel := context.WithCancel(e.stepCtx)
subtaskErr := func() error {
e.currSubtaskID.Store(subtask.ID)
subtaskCtx, subtaskCtxCancel := context.WithCancel(e.stepCtx)

var wg util.WaitGroupWrapper
checkCtx, checkCancel := context.WithCancel(subtaskCtx)
wg.RunWithLog(func() {
e.checkBalanceSubtask(checkCtx, subtaskCtxCancel)
e.checkBalanceSubtask(checkCtx, subtaskCancel)
})

if e.hasRealtimeSummary(e.stepExec) {
Expand All @@ -455,17 +455,17 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
defer func() {
checkCancel()
wg.Wait()
subtaskCtxCancel()
}()
return e.stepExec.RunSubtask(subtaskCtx, subtask)
}()
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
defer subtaskCancel()
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr, subtaskCtx)
logTask.End2(zap.InfoLevel, subtaskErr)

failpoint.InjectCall("mockTiDBShutdown", e, e.execID, e.GetTaskBase())

if subtaskErr != nil {
if err := e.markSubTaskCanceledOrFailed(e.stepCtx, subtask, subtaskErr); err != nil {
if err := e.markSubTaskCanceledOrFailed(subtaskCtx, subtask, subtaskErr); err != nil {
logger.Error("failed to handle subtask error", zap.Error(err))
}
return subtaskErr
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ func TestTaskExecutorRun(t *testing.T) {
<-ctx.Done()
return ctx.Err()
})
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
// keep running next subtask
nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}
Expand Down
8 changes: 7 additions & 1 deletion tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestAddIndexDistBasic(t *testing.T) {

var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(e taskexecutor.TaskExecutor, errP *error) {
func(e taskexecutor.TaskExecutor, errP *error, _ context.Context) {
if counter.Add(1) == 1 {
*errP = context.Canceled
}
Expand Down Expand Up @@ -433,6 +433,12 @@ func TestAddIndexScheduleAway(t *testing.T) {
tk1.MustExec(updateExecID)
})
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(_ taskexecutor.TaskExecutor, _ *error, ctx context.Context) {
require.Error(t, ctx.Err())
require.Equal(t, context.Canceled, context.Cause(ctx))
},
)
tk.MustExec("alter table t add index idx(b);")
require.NotEqual(t, int64(0), jobID.Load())
}