Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6295f58
rename
okJiang Nov 30, 2023
eae2e5e
save work
okJiang Dec 1, 2023
45e4b72
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
26e6254
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
66c50ed
rename GetSubtasksByStepAndStates
okJiang Dec 1, 2023
bfc0084
implement priority quere for task and slotManager
okJiang Dec 4, 2023
6df88d1
add ut
okJiang Dec 4, 2023
908b65c
fix ut
okJiang Dec 4, 2023
59db269
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 4, 2023
ff93eab
fix bazel
okJiang Dec 4, 2023
b69f291
add comment, fix bazel
okJiang Dec 5, 2023
f54e5d7
fix comment
okJiang Dec 5, 2023
8809905
add ut
okJiang Dec 5, 2023
b57efc6
fix comment
okJiang Dec 5, 2023
133f34b
add comment
okJiang Dec 5, 2023
e6006d1
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 13, 2023
0a72a54
delete priority queue
okJiang Dec 13, 2023
4a0ef07
remove useless convert
okJiang Dec 13, 2023
3c1beb9
fix bazel
okJiang Dec 13, 2023
8cc2e33
fix ut
okJiang Dec 13, 2023
c640046
Refactor slot management in task executor
okJiang Dec 14, 2023
c442754
Update pkg/disttask/framework/taskexecutor/slot.go
okJiang Dec 18, 2023
3781105
fix comment
okJiang Dec 18, 2023
6392faf
fix comment
okJiang Dec 19, 2023
7d9f45e
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 19, 2023
2003d2a
fix comment: refactor ut
okJiang Dec 20, 2023
a34651f
fix comment
okJiang Dec 20, 2023
06422f4
fix comment
okJiang Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond

// for test
onRunnableTasksTick = make(chan struct{})
onRunnableTaskTick = make(chan struct{})
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -223,15 +219,13 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.canAlloc(task) {
failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
continue
}
m.addHandlingTask(task.ID)
m.slotManager.alloc(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's moved out, we have to free on error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t := task
err = m.executorPool.Run(func() {
m.slotManager.alloc(t)
defer m.slotManager.free(t.ID)
m.onRunnableTask(t)
m.removeHandlingTask(t.ID)
Expand All @@ -242,10 +236,6 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
m.logErr(err)
return
}

failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
}
}

Expand Down Expand Up @@ -422,10 +412,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
if err != nil {
logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err))
}

failpoint.Inject("taskTick", func() {
<-onRunnableTaskTick
})
}
}

Expand Down
111 changes: 33 additions & 78 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
Expand Down Expand Up @@ -295,90 +294,46 @@ func TestSlotManagerInManager(t *testing.T) {
Type: "type",
}

ch := make(chan struct{})

wg, runFn := getPoolRunFn()

// task1 is prior to task2
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task1, task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).Return(nil)
}
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)

failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick", "return()")
defer func() {
failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick")
}()
go m.fetchAndHandleRunnableTasksLoop()

{
// mock inside onRunnableTask
time.Sleep(2 * time.Second)
// task2 has been blocked by slot manager
require.Equal(t, 0, m.slotManager.available)

{
// mock in manager, task2 has been rejected by slot manager
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
onRunnableTasksTick <- struct{}{}
time.Sleep(time.Second)
}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)
require.Equal(t, 10, m.slotManager.available)
}
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
<-ch
return nil
})

// task2 is after task1
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task2 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).Return(nil)
}
onRunnableTasksTick <- struct{}{}
m.onRunnableTasks([]*proto.Task{task1, task2})
time.Sleep(2 * time.Second)

require.Equal(t, 9, m.slotManager.available)
// task2 succeed
task2.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
// task1 alloc resource success
require.Equal(t, 0, m.slotManager.available)
require.Equal(t, map[int64]*slotInfo{
taskID1: {taskID: int(taskID1), slotCount: 10},
}, m.slotManager.executorSlotInfos)
ch <- struct{}{}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)

require.Equal(t, 10, m.slotManager.available)
wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
}