Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/planner/memo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"implementation.go",
"pattern.go",
"task.go",
"task_scheduler.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/memo",
visibility = ["//visibility:public"],
Expand All @@ -28,11 +29,12 @@ go_test(
"group_test.go",
"main_test.go",
"pattern_test.go",
"task_scheduler_test.go",
"task_test.go",
],
embed = [":memo"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//pkg/domain",
"//pkg/expression",
Expand Down
12 changes: 12 additions & 0 deletions pkg/planner/memo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package memo

import (
"strings"
"sync"
)

Expand Down Expand Up @@ -51,6 +52,17 @@ func (ts *TaskStack) Destroy() {
TaskStackPool.Put(ts)
}

// Desc is used to desc the detail info about current stack state.
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
func (ts *TaskStack) Desc() string {
var str strings.Builder
for _, one := range ts.tasks {
str.WriteString(one.desc())
str.WriteString("\n")
}
return str.String()
}

// Len indicates the length of current stack.
func (ts *TaskStack) Len() int {
return len(ts.tasks)
Expand Down
53 changes: 53 additions & 0 deletions pkg/planner/memo/task_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

var _ TaskScheduler = &SimpleTaskScheduler{}

// TaskScheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running.
type TaskScheduler interface {
ExecuteTasks()
}

// SimpleTaskScheduler is defined for serializing scheduling of memo tasks.
type SimpleTaskScheduler struct {
Err error
SchedulerCtx TaskSchedulerContext
}

// ExecuteTasks implements the interface of TaskScheduler.
func (s *SimpleTaskScheduler) ExecuteTasks() {
stack := s.SchedulerCtx.getStack()
defer func() {
// when step output of the scheduler, if the stack is empty, clean and release it.
if !stack.Empty() {
stack.Destroy()
}
}()
for !stack.Empty() {
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
task := stack.Pop()
if err := task.execute(); err != nil {
s.Err = err
return
}
}
}

// TaskSchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing.
type TaskSchedulerContext interface {
getStack() *TaskStack
Copy link
Member

@winoros winoros Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this method to export the stack? Just a method popTask is enough?

Copy link
Contributor Author

@AilinKid AilinKid Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not enough, TaskSchedulerContext is just a wrapper of Stack, the scheduler should be responsible for how tasks are taken out and how it runs (operate on stack elements)

pushTask(task Task)
}
70 changes: 70 additions & 0 deletions pkg/planner/memo/task_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

import (
"errors"
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

// TestSchedulerContext is defined to test scheduling logic here.
type TestSchedulerContext struct {
ts *TaskStack
}

func (t *TestSchedulerContext) getStack() *TaskStack {
return t.ts
}

func (t *TestSchedulerContext) pushTask(task Task) {
t.ts.Push(task)
}

// TestSchedulerContext is defined to mock special error state in specified task.
type TestTaskImpl2 struct {
a int64
}

func (t *TestTaskImpl2) execute() error {
// mock error at special task
if t.a == 2 {
return errors.New("mock error at task id = 2")
}
return nil
}

func (t *TestTaskImpl2) desc() string {
return strconv.Itoa(int(t.a))
}

func TestSimpleTaskScheduler(t *testing.T) {
testSchedulerContext := &TestSchedulerContext{
newTaskStack(),
}
testScheduler := &SimpleTaskScheduler{
SchedulerCtx: testSchedulerContext,
}
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 1})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3})

var testTaskScheduler TaskScheduler = testScheduler
testTaskScheduler.ExecuteTasks()
require.NotNil(t, testScheduler.Err)
require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2")
}