Skip to content

Commit f3038bc

Browse files
authored
copr: fix the issue that busy threshold may redirect batch copr to followers (pingcap#58193) (pingcap#58414)
close pingcap#58001
1 parent a0a8adb commit f3038bc

File tree

4 files changed

+70
-46
lines changed

4 files changed

+70
-46
lines changed

pkg/store/copr/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ go_test(
8383
embed = [":copr"],
8484
flaky = True,
8585
race = "on",
86-
shard_count = 30,
86+
shard_count = 31,
8787
deps = [
8888
"//pkg/kv",
8989
"//pkg/store/driver/backoff",

pkg/store/copr/coprocessor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,10 +534,17 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
534534
// disable paging for batched task.
535535
b.tasks[idx].paging = false
536536
b.tasks[idx].pagingSize = 0
537+
// The task and it's batched can be served only in the store we chose.
538+
// If the task is redirected to other replica, the batched task may not meet region-miss or store-not-match error.
539+
// So disable busy threshold for the task which carries batched tasks.
540+
b.tasks[idx].busyThreshold = 0
537541
}
538542
if task.RowCountHint > 0 {
539543
b.tasks[idx].RowCountHint += task.RowCountHint
540544
}
545+
batchedTask.task.paging = false
546+
batchedTask.task.pagingSize = 0
547+
batchedTask.task.busyThreshold = 0
541548
b.tasks[idx].batchTaskList[task.taskID] = batchedTask
542549
}
543550
handled = true

pkg/store/copr/coprocessor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package copr
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
"github.com/pingcap/kvproto/pkg/coprocessor"
2223
"github.com/pingcap/tidb/pkg/kv"
@@ -881,3 +882,53 @@ func TestSmallTaskConcurrencyLimit(t *testing.T) {
881882
require.Equal(t, smallConcPerCore, conc)
882883
require.Equal(t, smallTaskCount, count)
883884
}
885+
886+
func TestBatchStoreCoprOnlySendToLeader(t *testing.T) {
887+
// nil --- 'g' --- 'n' --- 't' --- nil
888+
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
889+
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
890+
require.NoError(t, err)
891+
defer func() {
892+
pdClient.Close()
893+
err = mockClient.Close()
894+
require.NoError(t, err)
895+
}()
896+
_, _, _ = testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
897+
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
898+
defer pdCli.Close()
899+
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
900+
defer cache.Close()
901+
902+
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)
903+
req := &kv.Request{
904+
StoreBatchSize: 3,
905+
StoreBusyThreshold: time.Second,
906+
}
907+
ranges := buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z")
908+
tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{
909+
req: req,
910+
cache: cache,
911+
rowHints: []int{1, 1, 3, 3},
912+
})
913+
require.Len(t, tasks, 1)
914+
require.Zero(t, tasks[0].busyThreshold)
915+
batched := tasks[0].batchTaskList
916+
require.Len(t, batched, 3)
917+
for _, task := range batched {
918+
require.Zero(t, task.task.busyThreshold)
919+
}
920+
921+
req = &kv.Request{
922+
StoreBatchSize: 0,
923+
StoreBusyThreshold: time.Second,
924+
}
925+
tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{
926+
req: req,
927+
cache: cache,
928+
rowHints: []int{1, 1, 3, 3},
929+
})
930+
require.Len(t, tasks, 4)
931+
for _, task := range tasks {
932+
require.Equal(t, task.busyThreshold, time.Second)
933+
}
934+
}

pkg/store/copr/region_cache.go

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ package copr
1616

1717
import (
1818
"bytes"
19-
"math"
2019
"strconv"
2120
"sync/atomic"
22-
"time"
2321

2422
"github.com/pingcap/errors"
2523
"github.com/pingcap/failpoint"
@@ -327,57 +325,25 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store
327325

328326
// BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`.
329327
func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
330-
var (
331-
rpcContext *tikv.RPCContext
332-
err error
333-
)
334-
if replicaRead == kv.ReplicaReadFollower {
335-
followerStoreSeed := uint32(0)
336-
leastEstWaitTime := time.Duration(math.MaxInt64)
337-
var (
338-
firstFollowerPeer *uint64
339-
followerContext *tikv.RPCContext
340-
)
341-
for {
342-
followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed)
343-
if err != nil {
344-
return nil, err
345-
}
346-
if firstFollowerPeer == nil {
347-
firstFollowerPeer = &rpcContext.Peer.Id
348-
} else if *firstFollowerPeer == rpcContext.Peer.Id {
349-
break
350-
}
351-
estWaitTime := followerContext.Store.EstimatedWaitTime()
352-
// the wait time of this follower is under given threshold, choose it.
353-
if estWaitTime > req.StoreBusyThreshold {
354-
continue
355-
}
356-
if rpcContext == nil {
357-
rpcContext = followerContext
358-
} else if estWaitTime < leastEstWaitTime {
359-
leastEstWaitTime = estWaitTime
360-
rpcContext = followerContext
361-
}
362-
followerStoreSeed++
363-
}
364-
// all replicas are busy, fallback to leader.
365-
if rpcContext == nil {
366-
replicaRead = kv.ReplicaReadLeader
367-
}
328+
if replicaRead != kv.ReplicaReadLeader {
329+
return nil, nil
368330
}
369331

370-
if replicaRead == kv.ReplicaReadLeader {
371-
rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
372-
if err != nil {
373-
return nil, err
374-
}
332+
rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
333+
if err != nil {
334+
return nil, err
375335
}
376336

377337
// fallback to non-batch path
378338
if rpcContext == nil {
379339
return nil, nil
380340
}
341+
342+
// when leader is busy, we don't batch the cop task to allow the load balance to work.
343+
if rpcContext.Store.EstimatedWaitTime() > req.StoreBusyThreshold {
344+
return nil, nil
345+
}
346+
381347
return &batchedCopTask{
382348
task: task,
383349
region: coprocessor.RegionInfo{

0 commit comments

Comments
 (0)