Skip to content

Commit 246a241

Browse files
authored
dxf: add manual recovery mode (#59641)
ref #59640
1 parent a116b4e commit 246a241

22 files changed

+250
-97
lines changed

pkg/ddl/backfilling_dist_scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
158158
ext.(*ddl.LitBackfillScheduler).GlobalSort = true
159159
sch.Extension = ext
160160

161-
taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, "", 0, task.Meta)
161+
taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, "", 0, proto.ExtraParams{}, task.Meta)
162162
require.NoError(t, err)
163163
task.ID = taskID
164164
execIDs := []string{":4000"}

pkg/disttask/framework/doc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@
125125
// └─────────►│cancelling├────┘
126126
// └──────────┘
127127
//
128+
// Note: if ManualRecovery is enabled, when some subtask failed, the task will
129+
// move to `awaiting-resolution` state, and manual operation is needed for the
130+
// task to continue. This mechanism is used for debugging, some bug such as those
131+
// on global-sort are harder to investigate without the intermediate files, or to
132+
// manually recover from some error when importing large mount of data using
133+
// global-sort where one round of import takes a lot of time, it might be more
134+
// flexible and efficient than retrying the whole task.
135+
//
128136
// pause/resume state transition:
129137
// as we don't know the state of the task before `paused`, so the state after
130138
// `resuming` is always `running`.

pkg/disttask/framework/handle/handle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co
6969
return nil, storage.ErrTaskAlreadyExists
7070
}
7171

72-
taskID, err := taskManager.CreateTask(ctx, taskKey, taskType, concurrency, targetScope, maxNodeCnt, taskMeta)
72+
taskID, err := taskManager.CreateTask(ctx, taskKey, taskType, concurrency, targetScope, maxNodeCnt, proto.ExtraParams{}, taskMeta)
7373
if err != nil {
7474
return nil, err
7575
}

pkg/disttask/framework/integrationtests/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ go_test(
1717
],
1818
flaky = True,
1919
race = "off",
20-
shard_count = 23,
20+
shard_count = 22,
2121
deps = [
2222
"//pkg/config",
2323
"//pkg/ddl",

pkg/disttask/framework/integrationtests/framework_err_handling_test.go

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,84 @@
1515
package integrationtests
1616

1717
import (
18+
"context"
19+
"fmt"
20+
"sync/atomic"
1821
"testing"
22+
"time"
1923

24+
"github.com/pingcap/errors"
2025
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
26+
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
27+
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
2128
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
29+
"github.com/pingcap/tidb/pkg/testkit"
2230
"github.com/stretchr/testify/require"
2331
)
2432

25-
func TestRetryErrOnNextSubtasksBatch(t *testing.T) {
33+
func TestOnTaskError(t *testing.T) {
2634
c := testutil.NewTestDXFContext(t, 2, 16, true)
27-
registerExampleTask(t, c.MockCtrl, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext), c.TestContext, nil)
28-
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
29-
}
3035

31-
func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {
32-
c := testutil.NewTestDXFContext(t, 2, 16, true)
36+
t.Run("retryable error on OnNextSubtasksBatch", func(t *testing.T) {
37+
registerExampleTask(t, c.MockCtrl, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext), c.TestContext, nil)
38+
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
39+
})
40+
41+
t.Run("non retryable error on OnNextSubtasksBatch", func(t *testing.T) {
42+
registerExampleTask(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
43+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key2-1", "", 1)
44+
require.Equal(t, proto.TaskStateReverted, task.State)
45+
registerExampleTask(t, c.MockCtrl, testutil.GetStepTwoPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
46+
task = testutil.SubmitAndWaitTask(c.Ctx, t, "key2-2", "", 1)
47+
require.Equal(t, proto.TaskStateReverted, task.State)
48+
})
49+
50+
prepareForAwaitingResolutionTestFn := func(t *testing.T, taskKey string) int64 {
51+
subtaskErrRetryable := atomic.Bool{}
52+
executorExt := testutil.GetTaskExecutorExt(c.MockCtrl,
53+
func(task *proto.Task) (execute.StepExecutor, error) {
54+
return testutil.GetCommonStepExecutor(c.MockCtrl, task.Step, func(ctx context.Context, subtask *proto.Subtask) error {
55+
if !subtaskErrRetryable.Load() {
56+
return errors.New("non retryable subtask error")
57+
}
58+
return nil
59+
}), nil
60+
},
61+
func(error) bool {
62+
return subtaskErrRetryable.Load()
63+
},
64+
)
65+
testutil.RegisterExampleTask(t, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext),
66+
executorExt, testutil.GetCommonCleanUpRoutine(c.MockCtrl))
67+
tm, err := storage.GetTaskManager()
68+
require.NoError(t, err)
69+
taskID, err := tm.CreateTask(c.Ctx, taskKey, proto.TaskTypeExample, 1, "",
70+
2, proto.ExtraParams{ManualRecovery: true}, nil)
71+
require.NoError(t, err)
72+
require.Eventually(t, func() bool {
73+
task, err := tm.GetTaskByID(c.Ctx, taskID)
74+
require.NoError(t, err)
75+
return task.State == proto.TaskStateAwaitingResolution
76+
}, 10*time.Second, 100*time.Millisecond)
77+
subtaskErrRetryable.Store(true)
78+
return taskID
79+
}
80+
81+
t.Run("task enter awaiting-resolution state if ManualRecovery set, success after manual recover", func(t *testing.T) {
82+
taskKey := "key3-1"
83+
taskID := prepareForAwaitingResolutionTestFn(t, taskKey)
84+
tk := testkit.NewTestKit(t, c.Store)
85+
tk.MustExec(fmt.Sprintf("update mysql.tidb_background_subtask set state='pending' where state='failed' and task_key= %d", taskID))
86+
tk.MustExec(fmt.Sprintf("update mysql.tidb_global_task set state='running' where id = %d", taskID))
87+
task := testutil.WaitTaskDone(c.Ctx, t, taskKey)
88+
require.Equal(t, proto.TaskStateSucceed, task.State)
89+
})
3390

34-
registerExampleTask(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
35-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
36-
require.Equal(t, proto.TaskStateReverted, task.State)
37-
registerExampleTask(t, c.MockCtrl, testutil.GetStepTwoPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
38-
task = testutil.SubmitAndWaitTask(c.Ctx, t, "key2", "", 1)
39-
require.Equal(t, proto.TaskStateReverted, task.State)
91+
t.Run("task enter awaiting-resolution state if ManualRecovery set, cancel also works", func(t *testing.T) {
92+
taskKey := "key4-1"
93+
taskID := prepareForAwaitingResolutionTestFn(t, taskKey)
94+
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, taskID))
95+
task := testutil.WaitTaskDone(c.Ctx, t, taskKey)
96+
require.Equal(t, proto.TaskStateReverted, task.State)
97+
})
4098
}

pkg/disttask/framework/mock/scheduler_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/disttask/framework/planner/planner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package planner
1616

1717
import (
1818
"github.com/pingcap/tidb/pkg/config"
19+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
1920
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
2021
)
2122

@@ -47,6 +48,7 @@ func (*Planner) Run(planCtx PlanCtx, plan LogicalPlan) (int64, error) {
4748
planCtx.ThreadCnt,
4849
config.GetGlobalConfig().Instance.TiDBServiceScope,
4950
planCtx.MaxNodeCnt,
51+
proto.ExtraParams{},
5052
taskMeta,
5153
)
5254
}

pkg/disttask/framework/proto/task.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@ import (
2222

2323
// see doc.go for more details.
2424
const (
25-
TaskStatePending TaskState = "pending"
26-
TaskStateRunning TaskState = "running"
27-
TaskStateSucceed TaskState = "succeed"
28-
TaskStateFailed TaskState = "failed"
29-
TaskStateReverting TaskState = "reverting"
30-
TaskStateReverted TaskState = "reverted"
31-
TaskStateCancelling TaskState = "cancelling"
32-
TaskStatePausing TaskState = "pausing"
33-
TaskStatePaused TaskState = "paused"
34-
TaskStateResuming TaskState = "resuming"
35-
TaskStateModifying TaskState = "modifying"
25+
TaskStatePending TaskState = "pending"
26+
TaskStateRunning TaskState = "running"
27+
TaskStateSucceed TaskState = "succeed"
28+
TaskStateFailed TaskState = "failed"
29+
TaskStateReverting TaskState = "reverting"
30+
TaskStateAwaitingResolution TaskState = "awaiting-resolution"
31+
TaskStateReverted TaskState = "reverted"
32+
TaskStateCancelling TaskState = "cancelling"
33+
TaskStatePausing TaskState = "pausing"
34+
TaskStatePaused TaskState = "paused"
35+
TaskStateResuming TaskState = "resuming"
36+
TaskStateModifying TaskState = "modifying"
3637
)
3738

3839
type (
@@ -66,6 +67,15 @@ const (
6667
// TODO: remove this limit later.
6768
var MaxConcurrentTask = 16
6869

70+
// ExtraParams is the extra params of task.
71+
// Note: only store params that's not used for filter or sort in this struct.
72+
type ExtraParams struct {
73+
// ManualRecovery indicates whether the task can be recovered manually.
74+
// if enabled, the task will enter 'awaiting-resolution' state when it failed,
75+
// then the user can recover the task manually or fail it if it's not recoverable.
76+
ManualRecovery bool `json:"manual_recovery"`
77+
}
78+
6979
// TaskBase contains the basic information of a task.
7080
// we define this to avoid load task meta which might be very large into memory.
7181
type TaskBase struct {
@@ -87,6 +97,7 @@ type TaskBase struct {
8797
TargetScope string
8898
CreateTime time.Time
8999
MaxNodeCount int
100+
ExtraParams
90101
}
91102

92103
// IsDone checks if the task is done.

pkg/disttask/framework/scheduler/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type TaskManager interface {
4545
FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
4646
// RevertTask updates task state to reverting, and task error.
4747
RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
48+
// AwaitingResolveTask updates task state to awaiting-resolve, also set task err.
49+
AwaitingResolveTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
4850
// RevertedTask updates task state to reverted.
4951
RevertedTask(ctx context.Context, taskID int64) error
5052
// PauseTask updated task state to pausing.

pkg/disttask/framework/scheduler/scheduler.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s *BaseScheduler) onRunning() error {
388388
if len(subTaskErrs) > 0 {
389389
s.logger.Warn("subtasks encounter errors", zap.Errors("subtask-errs", subTaskErrs))
390390
// we only store the first error as task error.
391-
return s.revertTask(subTaskErrs[0])
391+
return s.revertTaskOrManualRecover(subTaskErrs[0])
392392
}
393393
} else if s.isStepSucceed(cntByStates) {
394394
return s.switch2NextStep()
@@ -586,6 +586,20 @@ func (s *BaseScheduler) revertTask(taskErr error) error {
586586
return nil
587587
}
588588

589+
func (s *BaseScheduler) revertTaskOrManualRecover(taskErr error) error {
590+
task := s.getTaskClone()
591+
if task.ManualRecovery {
592+
if err := s.taskMgr.AwaitingResolveTask(s.ctx, task.ID, task.State, taskErr); err != nil {
593+
return err
594+
}
595+
task.State = proto.TaskStateAwaitingResolution
596+
task.Error = taskErr
597+
s.task.Store(task)
598+
return nil
599+
}
600+
return s.revertTask(taskErr)
601+
}
602+
589603
// MockServerInfo exported for scheduler_test.go
590604
var MockServerInfo atomic.Pointer[[]string]
591605

0 commit comments

Comments
 (0)