Skip to content

Commit 44f100f

Browse files
authored
disttask: fix flaky test TestParallelCancel (#55668)
close #55658
1 parent ac9916a commit 44f100f

File tree

1 file changed

+27
-22
lines changed

1 file changed

+27
-22
lines changed

pkg/disttask/framework/scheduler/scheduler_test.go

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
184184
}()
185185

186186
// 3s
187-
cnt := 60
188187
checkGetRunningTaskCnt := func(expected int) {
189188
require.Eventually(t, func() bool {
190189
return sch.GetRunningTaskCnt() == expected
@@ -236,21 +235,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
236235

237236
// test DetectTaskLoop
238237
checkGetTaskState := func(expectedState proto.TaskState) {
239-
i := 0
240-
for ; i < cnt; i++ {
238+
require.Eventually(t, func() bool {
241239
tasks, err := mgr.GetTasksInStates(ctx, expectedState)
242240
require.NoError(t, err)
243241
if len(tasks) == taskCnt {
244-
break
242+
return true
245243
}
246244
historyTasks, err := testutil.GetTasksFromHistoryInStates(ctx, mgr, expectedState)
247245
require.NoError(t, err)
248-
if len(tasks)+len(historyTasks) == taskCnt {
249-
break
250-
}
251-
time.Sleep(time.Millisecond * 50)
252-
}
253-
require.Less(t, i, cnt)
246+
return len(tasks)+len(historyTasks) == taskCnt
247+
}, 10*time.Second, 100*time.Millisecond)
254248
}
255249
// Test all subtasks are successful.
256250
var err error
@@ -267,9 +261,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
267261
return
268262
}
269263

264+
subtasksMap := make(map[int64][]*proto.SubtaskBase, len(taskIDs))
265+
for _, taskID := range taskIDs {
266+
subtasks, err := mgr.GetActiveSubtasks(ctx, taskID)
267+
require.NoError(t, err)
268+
subtasksMap[taskID] = subtasks
269+
}
270+
270271
if isCancel {
271-
for i := 1; i <= taskCnt; i++ {
272-
err = mgr.CancelTask(ctx, int64(i))
272+
for _, taskID := range taskIDs {
273+
err = mgr.CancelTask(ctx, taskID)
273274
require.NoError(t, err)
274275
}
275276
} else if isPauseAndResume {
@@ -278,9 +279,11 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
278279
require.True(t, found)
279280
require.NoError(t, err)
280281
}
281-
for i := 1; i <= subtaskCnt*taskCnt; i++ {
282-
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStatePaused, nil)
283-
require.NoError(t, err)
282+
for _, sts := range subtasksMap {
283+
for _, st := range sts {
284+
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStatePaused, nil)
285+
require.NoError(t, err)
286+
}
284287
}
285288
checkGetTaskState(proto.TaskStatePaused)
286289
for i := 0; i < taskCnt; i++ {
@@ -290,23 +293,25 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
290293
}
291294

292295
// Mock subtasks succeed.
293-
for i := 1; i <= subtaskCnt*taskCnt; i++ {
294-
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateSucceed, nil)
295-
require.NoError(t, err)
296+
for _, sts := range subtasksMap {
297+
for _, st := range sts {
298+
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStateSucceed, nil)
299+
require.NoError(t, err)
300+
}
296301
}
297302
checkGetTaskState(proto.TaskStateSucceed)
298303
return
299304
} else {
300305
if isSubtaskCancel {
301306
// Mock a subtask canceled
302-
for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt {
303-
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateCanceled, nil)
307+
for _, sts := range subtasksMap {
308+
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateCanceled, nil)
304309
require.NoError(t, err)
305310
}
306311
} else {
307312
// Mock a subtask fails.
308-
for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt {
309-
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateFailed, nil)
313+
for _, sts := range subtasksMap {
314+
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateFailed, nil)
310315
require.NoError(t, err)
311316
}
312317
}

0 commit comments

Comments
 (0)