Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6295f58
rename
okJiang Nov 30, 2023
eae2e5e
save work
okJiang Dec 1, 2023
45e4b72
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
26e6254
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
66c50ed
rename GetSubtasksByStepAndStates
okJiang Dec 1, 2023
bfc0084
implement priority quere for task and slotManager
okJiang Dec 4, 2023
6df88d1
add ut
okJiang Dec 4, 2023
908b65c
fix ut
okJiang Dec 4, 2023
59db269
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 4, 2023
ff93eab
fix bazel
okJiang Dec 4, 2023
b69f291
add comment, fix bazel
okJiang Dec 5, 2023
f54e5d7
fix comment
okJiang Dec 5, 2023
8809905
add ut
okJiang Dec 5, 2023
b57efc6
fix comment
okJiang Dec 5, 2023
133f34b
add comment
okJiang Dec 5, 2023
e6006d1
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 13, 2023
0a72a54
delete priority queue
okJiang Dec 13, 2023
4a0ef07
remove useless convert
okJiang Dec 13, 2023
3c1beb9
fix bazel
okJiang Dec 13, 2023
8cc2e33
fix ut
okJiang Dec 13, 2023
c640046
Refactor slot management in task executor
okJiang Dec 14, 2023
c442754
Update pkg/disttask/framework/taskexecutor/slot.go
okJiang Dec 18, 2023
3781105
fix comment
okJiang Dec 18, 2023
6392faf
fix comment
okJiang Dec 19, 2023
7d9f45e
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 19, 2023
2003d2a
fix comment: refactor ut
okJiang Dec 20, 2023
a34651f
fix comment
okJiang Dec 20, 2023
06422f4
fix comment
okJiang Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ tools/bin/vfsgendev:
tools/bin/gotestsum:
GOBIN=$(shell pwd)/tools/bin $(GO) install gotest.tools/[email protected]

tools/bin/mockgen:
# [email protected] is imcompatible with v0.3.0, so install it always.
mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/[email protected]

# Usage:
Expand Down Expand Up @@ -374,17 +375,17 @@ br_compatibility_test_prepare:
br_compatibility_test:
@cd br && tests/run_compatible.sh run

mock_s3iface: tools/bin/mockgen
mock_s3iface: mockgen
tools/bin/mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning: tools/bin/mockgen
mock_lightning: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Dispatcher,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Extension > pkg/disttask/framework/dispatcher/mock/dispatcher_mock.go
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
priority int,
checkpoint longblob not null,
concurrency int,
create_time timestamp,
Expand All @@ -79,6 +80,7 @@ const (
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
priority int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upgrade

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is useless, I will remove it

checkpoint longblob not null,
concurrency int,
create_time timestamp,
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "mock",
srcs = [
"alloctor_mock.go",
"dispatcher_mock.go",
"plan_mock.go",
"task_executor_mock.go",
Expand Down
78 changes: 78 additions & 0 deletions pkg/disttask/framework/mock/alloctor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions pkg/disttask/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "proto",
srcs = ["task.go"],
srcs = [
"priority_queue.go",
"task.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/proto",
visibility = ["//visibility:public"],
)

go_test(
name = "proto_test",
timeout = "short",
srcs = ["task_test.go"],
srcs = [
"priority_queue_test.go",
"task_test.go",
],
embed = [":proto"],
flaky = True,
shard_count = 3,
deps = ["@com_github_stretchr_testify//require"],
)
65 changes: 65 additions & 0 deletions pkg/disttask/framework/proto/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

type TaskWrapper struct {
*Task

IndexInPriorityQueue int // Index of the task in the priority queue
}

// TaskPriorityQueue represents a priority queue of tasks.
type TaskPriorityQueue []*TaskWrapper

// TaskPriorityQueue implementation for heap.Interface
func (pq TaskPriorityQueue) Len() int { return len(pq) }

// Less returns true if the task at index i has higher priority than the task at index j.
func (pq TaskPriorityQueue) Less(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

// We want Pop to give us the highest priority, so we use greater than here.
return pq[i].Priority > pq[j].Priority ||
(pq[i].Priority == pq[j].Priority && pq[i].CreateTime.Before(pq[j].CreateTime)) ||
(pq[i].Priority == pq[j].Priority && pq[i].CreateTime.Equal(pq[j].CreateTime) && pq[i].ID < pq[j].ID)
}

// Swap swaps the tasks at the given indices.
func (pq TaskPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].IndexInPriorityQueue = i
pq[j].IndexInPriorityQueue = j
}

// Push adds x as element Len().
func (pq *TaskPriorityQueue) Push(x interface{}) {
task := x.(*TaskWrapper)
task.IndexInPriorityQueue = len(*pq)
*pq = append(*pq, task)
}

// Pop removes and returns element Len() - 1.
func (pq *TaskPriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
task := old[n-1]
task.IndexInPriorityQueue = -1 // for safety
*pq = old[0 : n-1]
return task
}

func WrapPriorityQueue(task *Task) *TaskWrapper {
return &TaskWrapper{
Task: task,
}
}
48 changes: 48 additions & 0 deletions pkg/disttask/framework/proto/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

import (
"container/heap"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTaskPriorityQueue(t *testing.T) {
priorityQueue := make(TaskPriorityQueue, 0)
heap.Init(&priorityQueue)

task1 := &TaskWrapper{Task: &Task{ID: 1, Priority: 1, CreateTime: time.Now()}}
task2 := &TaskWrapper{Task: &Task{ID: 2, Priority: 1, CreateTime: time.Now()}}
task3 := &TaskWrapper{Task: &Task{ID: 3, Priority: 1, CreateTime: time.Now().Add(-2 * time.Second)}}
task4 := &TaskWrapper{Task: &Task{ID: 4, Priority: 2, CreateTime: time.Now().Add(2 * time.Second)}}

heap.Push(&priorityQueue, task1)
heap.Push(&priorityQueue, task2)
heap.Push(&priorityQueue, task3)
heap.Push(&priorityQueue, task4)

expected := []int64{4, 3, 1, 2}

i := 0
// Pop tasks in priority order
for priorityQueue.Len() > 0 {
task := heap.Pop(&priorityQueue).(*TaskWrapper)
require.Equal(t, expected[i], task.ID)
i++
}
}
4 changes: 2 additions & 2 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ func (stm *TaskManager) CreateSubTask(ctx context.Context, taskID int64, step pr
return nil
}

// GetSubtasksInStates gets all subtasks by given states.
func (stm *TaskManager) GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
// GetSubtasksByStepAndStates gets all subtasks by given step and states.
func (stm *TaskManager) GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
args := []interface{}{tidbID, taskID, step}
args = append(args, states...)
rs, err := stm.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"interface.go",
"manager.go",
"register.go",
"slot_manager.go",
"task_executor.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor",
Expand Down Expand Up @@ -40,12 +41,13 @@ go_test(
srcs = [
"manager_test.go",
"register_test.go",
"slot_manager_test.go",
"task_executor_test.go",
"task_executor_testkit_test.go",
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TaskTable interface {
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)

GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...interface{}) (*proto.Subtask, error)
StartManager(ctx context.Context, tidbID string, role string) error
StartSubtask(ctx context.Context, subtaskID int64) error
Expand Down
Loading