Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,25 @@ func TestCoprocessorOOMTiCase(t *testing.T) {
*/
}

func TestCoprocessorBlockIssues56916(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/issue56916", `return`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/issue56916")) }()

tk.MustExec("use test")
tk.MustExec("drop table if exists t_cooldown")
tk.MustExec("create table t_cooldown (id int auto_increment, k int, unique index(id));")
tk.MustExec("insert into t_cooldown (k) values (1);")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("split table t_cooldown by (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);")
tk.MustQuery("select * from t_cooldown use index(id) where id > 0 and id < 10").CheckContain("1")
tk.MustQuery("select * from t_cooldown use index(id) where id between 1 and 10 or id between 124660 and 132790;").CheckContain("1")
}

func TestIssue21441(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/union/issue21441", `return`))
defer func() {
Expand Down
23 changes: 16 additions & 7 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
it.concurrency = 1
}

// issue56916 is about the cooldown of the runaway checker may block the SQL execution.
failpoint.Inject("issue56916", func(_ failpoint.Value) {
it.concurrency = 1
it.smallTaskConcurrency = 0
})

// if the request is triggered cool down by the runaway checker, we need to adjust the concurrency, let the sql run slowly.
if req.RunawayChecker != nil && req.RunawayChecker.CheckAction() == rmpb.RunawayAction_CoolDown {
it.concurrency = 1
Expand Down Expand Up @@ -835,15 +841,16 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
taskCh := make(chan *copTask, 1)
smallTaskCh := make(chan *copTask, 1)
it.unconsumedStats = &unconsumedCopRuntimeStats{}
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
var smallTaskCh chan *copTask
if it.smallTaskConcurrency > 0 {
smallTaskCh = make(chan *copTask, 1)
}
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
var ch chan *copTask
if i < it.concurrency {
ch = taskCh
} else {
ch := taskCh
if i >= it.concurrency && smallTaskCh != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: this change seems unnecessary as smallTaskCh can only be not nil when smallTaskConcurrency > 0

ch = smallTaskCh
}
worker := &copIteratorWorker{
Expand Down Expand Up @@ -897,7 +904,7 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru
break
}
var sendTo chan<- *copTask
if isSmallTask(t) {
if isSmallTask(t) && sender.smallTaskCh != nil {
sendTo = sender.smallTaskCh
} else {
sendTo = sender.taskCh
Expand All @@ -911,7 +918,9 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru
}
}
close(sender.taskCh)
close(sender.smallTaskCh)
if sender.smallTaskCh != nil {
close(sender.smallTaskCh)
}

// Wait for worker goroutines to exit.
sender.wg.Wait()
Expand Down