Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6295f58
rename
okJiang Nov 30, 2023
eae2e5e
save work
okJiang Dec 1, 2023
45e4b72
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
26e6254
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
66c50ed
rename GetSubtasksByStepAndStates
okJiang Dec 1, 2023
bfc0084
implement priority quere for task and slotManager
okJiang Dec 4, 2023
6df88d1
add ut
okJiang Dec 4, 2023
908b65c
fix ut
okJiang Dec 4, 2023
59db269
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 4, 2023
ff93eab
fix bazel
okJiang Dec 4, 2023
b69f291
add comment, fix bazel
okJiang Dec 5, 2023
f54e5d7
fix comment
okJiang Dec 5, 2023
8809905
add ut
okJiang Dec 5, 2023
b57efc6
fix comment
okJiang Dec 5, 2023
133f34b
add comment
okJiang Dec 5, 2023
e6006d1
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 13, 2023
0a72a54
delete priority queue
okJiang Dec 13, 2023
4a0ef07
remove useless convert
okJiang Dec 13, 2023
3c1beb9
fix bazel
okJiang Dec 13, 2023
8cc2e33
fix ut
okJiang Dec 13, 2023
c640046
Refactor slot management in task executor
okJiang Dec 14, 2023
c442754
Update pkg/disttask/framework/taskexecutor/slot.go
okJiang Dec 18, 2023
3781105
fix comment
okJiang Dec 18, 2023
6392faf
fix comment
okJiang Dec 19, 2023
7d9f45e
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 19, 2023
2003d2a
fix comment: refactor ut
okJiang Dec 20, 2023
a34651f
fix comment
okJiang Dec 20, 2023
06422f4
fix comment
okJiang Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ tools/bin/vfsgendev:
tools/bin/gotestsum:
GOBIN=$(shell pwd)/tools/bin $(GO) install gotest.tools/[email protected]

tools/bin/mockgen:
# [email protected] is imcompatible with v0.3.0, so install it always.
mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/[email protected]

# Usage:
Expand Down Expand Up @@ -375,17 +376,17 @@ br_compatibility_test_prepare:
br_compatibility_test:
@cd br && tests/run_compatible.sh run

mock_s3iface: tools/bin/mockgen
mock_s3iface: mockgen
tools/bin/mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning: tools/bin/mockgen
mock_lightning: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go
Expand Down
12 changes: 6 additions & 6 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,16 @@ func (stm *TaskManager) GetTopUnfinishedTasks(ctx context.Context) (task []*prot
return task, nil
}

// GetTasksInStates gets the tasks in the states.
// GetTasksInStates gets the tasks in the states(order by priority asc, create_time acs, id asc).
func (stm *TaskManager) GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error) {
if len(states) == 0 {
return task, nil
}

rs, err := stm.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)", states...)
rs, err := stm.executeSQLWithNewSession(ctx,
"select "+taskColumns+" from mysql.tidb_global_task "+
"where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)"+
" order by priority asc, create_time asc, id asc", states...)
if err != nil {
return task, err
}
Expand Down Expand Up @@ -486,8 +489,8 @@ func row2SubTask(r chunk.Row) *proto.Subtask {
return subtask
}

// GetSubtasksInStates gets all subtasks by given states.
func (stm *TaskManager) GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
// GetSubtasksByStepAndStates gets all subtasks by given states.
func (stm *TaskManager) GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
args := []interface{}{tidbID, taskID, step}
args = append(args, states...)
rs, err := stm.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask
Expand Down
5 changes: 4 additions & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"interface.go",
"manager.go",
"register.go",
"slot.go",
"task_executor.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor",
Expand All @@ -25,6 +26,7 @@ go_library(
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/cpu",
"//pkg/util/gctuner",
"//pkg/util/logutil",
"//pkg/util/memory",
Expand All @@ -40,12 +42,13 @@ go_test(
srcs = [
"manager_test.go",
"register_test.go",
"slot_test.go",
"task_executor_test.go",
"task_executor_testkit_test.go",
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 9,
shard_count = 11,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TaskTable interface {
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)

GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...interface{}) (*proto.Subtask, error)
StartManager(ctx context.Context, tidbID string, role string) error
StartSubtask(ctx context.Context, subtaskID int64) error
Expand Down
26 changes: 20 additions & 6 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -72,12 +73,13 @@ type Manager struct {
handlingTasks map[int64]context.CancelCauseFunc
}
// id, it's the same as server id now, i.e. host:port.
id string
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
id string
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
slotManager *slotManager
}

// BuildManager builds a Manager.
Expand All @@ -87,6 +89,10 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
taskTable: taskTable,
logCtx: logutil.WithFields(context.Background()),
newPool: b.newPool,
slotManager: &slotManager{
executorSlotInfos: make(map[int64]*slotInfo),
available: cpu.GetCPUCount(),
},
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc)
Expand Down Expand Up @@ -196,6 +202,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
return
}
tasks = m.filterAlreadyHandlingTasks(tasks)

for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
Expand All @@ -210,9 +217,16 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
continue
}
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.canAlloc(task) {
logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
continue
}
m.addHandlingTask(task.ID)
m.slotManager.alloc(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's moved out, we have to free on error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t := task
err = m.executorPool.Run(func() {
defer m.slotManager.free(t.ID)
m.onRunnableTask(t)
m.removeHandlingTask(t.ID)
})
Expand Down
89 changes: 88 additions & 1 deletion pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) {
}

func TestManageTask(t *testing.T) {
b := NewManagerBuilder()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

b := NewManagerBuilder()
mockTaskTable := mock.NewMockTaskTable(ctrl)
m, err := b.BuildManager(context.Background(), "test", mockTaskTable)
require.NoError(t, err)
Expand Down Expand Up @@ -250,3 +251,89 @@ func TestManager(t *testing.T) {
time.Sleep(5 * time.Second)
m.Stop()
}

func TestSlotManagerInManager(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockTaskTable := mock.NewMockTaskTable(ctrl)
mockInternalExecutor := mock.NewMockTaskExecutor(ctrl)
mockPool := mock.NewMockPool(ctrl)
b := NewManagerBuilder()
b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) {
return mockPool, nil
})
RegisterTaskType("type",
func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) TaskExecutor {
return mockInternalExecutor
})
id := "test"

m, err := b.BuildManager(context.Background(), id, mockTaskTable)
require.NoError(t, err)
m.slotManager.available = 10

taskID1 := int64(1)
taskID2 := int64(2)

now := time.Now()

task1 := &proto.Task{
ID: taskID1,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 10,
Step: proto.StepOne,
Type: "type",
}
task2 := &proto.Task{
ID: taskID2,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 1,
Step: proto.StepOne,
Type: "type",
}

ch := make(chan struct{})

wg, runFn := getPoolRunFn()

mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)

// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
<-ch
return nil
})

m.onRunnableTasks([]*proto.Task{task1, task2})
time.Sleep(2 * time.Second)

// task1 alloc resource success
require.Equal(t, 0, m.slotManager.available)
require.Equal(t, map[int64]*slotInfo{
taskID1: {taskID: int(taskID1), slotCount: 10},
}, m.slotManager.executorSlotInfos)
ch <- struct{}{}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()

wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
}
70 changes: 70 additions & 0 deletions pkg/disttask/framework/taskexecutor/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package taskexecutor

import (
"sync"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
)

// slotManager is used to manage the slots of the executor.
type slotManager struct {
sync.RWMutex
// taskID -> slotInfo
executorSlotInfos map[int64]*slotInfo

// The number of slots that can be used by the executor.
// It is always equal to CPU cores of the instance.
available int
}

type slotInfo struct {
taskID int
// priority will be used in future
priority int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better store task itself, priority itself cannot determine task order. ok to do it later

slotCount int
}

func (sm *slotManager) alloc(task *proto.Task) {
sm.Lock()
defer sm.Unlock()
sm.executorSlotInfos[task.ID] = &slotInfo{
taskID: int(task.ID),
priority: task.Priority,
slotCount: task.Concurrency,
}

sm.available -= task.Concurrency
}

func (sm *slotManager) free(taskID int64) {
sm.Lock()
defer sm.Unlock()

slotInfo, ok := sm.executorSlotInfos[taskID]
if ok {
delete(sm.executorSlotInfos, taskID)
sm.available += slotInfo.slotCount
}
}

// canReserve is used to check whether the instance has enough slots to run the task.
func (sm *slotManager) canAlloc(task *proto.Task) bool {
sm.RLock()
defer sm.RUnlock()

return sm.available >= task.Concurrency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sm.RLock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
Loading