-
Notifications
You must be signed in to change notification settings - Fork 6k
store/copr: handle region error from client #39838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9a5bfd1
f226c6e
cbe7971
a206ec8
dc6c168
40d7ae7
41b7513
6aa1a83
e08d705
8daf4a3
8227e9b
1996cb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,7 +140,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars | |
// disable batch copr for follower read | ||
req.StoreBatchSize = 0 | ||
} | ||
// disable paging for batch copr | ||
// disable batch copr when paging is enabled. | ||
if req.Paging.Enable { | ||
req.StoreBatchSize = 0 | ||
} | ||
|
@@ -315,13 +315,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv | |
chanSize = 18 | ||
} | ||
|
||
tasks := make([]*copTask, 0, len(locs)) | ||
origRangeIdx := 0 | ||
taskID := uint64(0) | ||
var store2Idx map[uint64]int | ||
var builder taskBuilder | ||
if req.StoreBatchSize > 0 { | ||
store2Idx = make(map[uint64]int, 16) | ||
builder = newBatchTaskBuilder(bo, req, cache) | ||
} else { | ||
builder = newLegacyTaskBuilder(len(locs)) | ||
} | ||
origRangeIdx := 0 | ||
for _, loc := range locs { | ||
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice | ||
// to make sure the message can be sent successfully. | ||
|
@@ -357,7 +357,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv | |
} | ||
} | ||
task := &copTask{ | ||
taskID: taskID, | ||
region: loc.Location.Region, | ||
bucketsVer: loc.getBucketVersion(), | ||
ranges: loc.Ranges.Slice(i, nextI), | ||
|
@@ -370,50 +369,138 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv | |
requestSource: req.RequestSource, | ||
RowCountHint: hint, | ||
} | ||
if req.StoreBatchSize > 0 { | ||
batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize { | ||
tasks = append(tasks, batchedTask.task) | ||
store2Idx[batchedTask.storeID] = len(tasks) - 1 | ||
} else { | ||
if tasks[idx].batchTaskList == nil { | ||
tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize) | ||
// disable paging for batched task. | ||
tasks[idx].paging = false | ||
tasks[idx].pagingSize = 0 | ||
} | ||
if task.RowCountHint > 0 { | ||
tasks[idx].RowCountHint += task.RowCountHint | ||
} | ||
tasks[idx].batchTaskList[taskID] = batchedTask | ||
} | ||
} else { | ||
tasks = append(tasks, task) | ||
if err = builder.handle(task); err != nil { | ||
return nil, err | ||
} | ||
i = nextI | ||
if req.Paging.Enable { | ||
pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) | ||
} | ||
taskID++ | ||
} | ||
} | ||
|
||
if req.Desc { | ||
reverseTasks(tasks) | ||
builder.reverse() | ||
} | ||
tasks := builder.build() | ||
if elapsed := time.Since(start); elapsed > time.Millisecond*500 { | ||
logutil.BgLogger().Warn("buildCopTasks takes too much time", | ||
zap.Duration("elapsed", elapsed), | ||
zap.Int("range len", rangesLen), | ||
zap.Int("task len", len(tasks))) | ||
} | ||
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks))) | ||
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) | ||
return tasks, nil | ||
} | ||
|
||
type taskBuilder interface { | ||
handle(*copTask) error | ||
reverse() | ||
build() []*copTask | ||
regionNum() int | ||
} | ||
|
||
type legacyTaskBuilder struct { | ||
tasks []*copTask | ||
} | ||
|
||
func newLegacyTaskBuilder(hint int) *legacyTaskBuilder { | ||
return &legacyTaskBuilder{ | ||
tasks: make([]*copTask, 0, hint), | ||
} | ||
} | ||
|
||
func (b *legacyTaskBuilder) handle(task *copTask) error { | ||
b.tasks = append(b.tasks, task) | ||
return nil | ||
} | ||
|
||
func (b *legacyTaskBuilder) regionNum() int { | ||
return len(b.tasks) | ||
} | ||
|
||
func (b *legacyTaskBuilder) reverse() { | ||
reverseTasks(b.tasks) | ||
} | ||
|
||
func (b *legacyTaskBuilder) build() []*copTask { | ||
return b.tasks | ||
} | ||
|
||
type batchStoreTaskBuilder struct { | ||
bo *Backoffer | ||
req *kv.Request | ||
cache *RegionCache | ||
taskID uint64 | ||
limit int | ||
store2Idx map[uint64]int | ||
tasks []*copTask | ||
} | ||
|
||
func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder { | ||
return &batchStoreTaskBuilder{ | ||
bo: bo, | ||
req: req, | ||
cache: cache, | ||
taskID: 0, | ||
limit: req.StoreBatchSize, | ||
store2Idx: make(map[uint64]int, 16), | ||
tasks: make([]*copTask, 0, 16), | ||
} | ||
} | ||
|
||
func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { | ||
b.taskID++ | ||
task.taskID = b.taskID | ||
handled := false | ||
defer func() { | ||
if !handled && err == nil { | ||
// fallback to non-batch way. It's mainly caused by region miss. | ||
b.tasks = append(b.tasks, task) | ||
} | ||
}() | ||
if b.limit <= 0 { | ||
return nil | ||
} | ||
batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) | ||
if err != nil { | ||
return err | ||
} | ||
if batchedTask == nil { | ||
return nil | ||
} | ||
if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit { | ||
b.tasks = append(b.tasks, batchedTask.task) | ||
b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1 | ||
} else { | ||
if b.tasks[idx].batchTaskList == nil { | ||
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit) | ||
// disable paging for batched task. | ||
b.tasks[idx].paging = false | ||
b.tasks[idx].pagingSize = 0 | ||
} | ||
if task.RowCountHint > 0 { | ||
b.tasks[idx].RowCountHint += task.RowCountHint | ||
} | ||
b.tasks[idx].batchTaskList[task.taskID] = batchedTask | ||
} | ||
handled = true | ||
return nil | ||
} | ||
|
||
func (b *batchStoreTaskBuilder) regionNum() int { | ||
// we allocate b.taskID for each region task, so the final b.taskID is equal to the related region number. | ||
return int(b.taskID) | ||
} | ||
|
||
func (b *batchStoreTaskBuilder) reverse() { | ||
reverseTasks(b.tasks) | ||
} | ||
|
||
func (b *batchStoreTaskBuilder) build() []*copTask { | ||
return b.tasks | ||
} | ||
|
||
func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) { | ||
servers, err := infosync.GetAllServerInfo(context.Background()) | ||
if err != nil { | ||
|
@@ -1138,13 +1225,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R | |
if err != nil { | ||
return remains, err | ||
} | ||
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch) | ||
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) | ||
} | ||
if lockErr := resp.pbResp.GetLocked(); lockErr != nil { | ||
if err := worker.handleLockErr(bo, lockErr, task); err != nil { | ||
return nil, err | ||
} | ||
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch) | ||
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) | ||
} | ||
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { | ||
err := errors.Errorf("other error: %s", otherErr) | ||
|
@@ -1250,16 +1337,26 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains | |
} | ||
|
||
// handle the batched cop response. | ||
// tasks will be changed, so the input tasks should not be used after calling this function. | ||
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { | ||
if len(tasks) == 0 { | ||
return nil, nil | ||
} | ||
var remainTasks []*copTask | ||
appendRemainTasks := func(tasks ...*copTask) { | ||
if remainTasks == nil { | ||
// allocate size fo remain length | ||
remainTasks = make([]*copTask, 0, len(tasks)) | ||
} | ||
remainTasks = append(remainTasks, tasks...) | ||
} | ||
for _, batchResp := range batchResps { | ||
batchedTask, ok := tasks[batchResp.GetTaskId()] | ||
taskID := batchResp.GetTaskId() | ||
batchedTask, ok := tasks[taskID] | ||
if !ok { | ||
return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId()) | ||
} | ||
delete(tasks, taskID) | ||
resp := &copResponse{ | ||
pbResp: &coprocessor.Response{ | ||
Data: batchResp.Data, | ||
|
@@ -1276,15 +1373,15 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp | |
if err != nil { | ||
return nil, err | ||
} | ||
remainTasks = append(remainTasks, remains...) | ||
appendRemainTasks(remains...) | ||
continue | ||
} | ||
//TODO: handle locks in batch | ||
if lockErr := batchResp.GetLocked(); lockErr != nil { | ||
if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil { | ||
return nil, err | ||
} | ||
remainTasks = append(remainTasks, task) | ||
appendRemainTasks(task) | ||
continue | ||
} | ||
if otherErr := batchResp.GetOtherError(); otherErr != "" { | ||
|
@@ -1312,6 +1409,24 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp | |
// TODO: check OOM | ||
worker.sendToRespCh(resp, ch, false) | ||
} | ||
for _, t := range tasks { | ||
task := t.task | ||
// when the error is generated by client, response is empty, skip warning for this case. | ||
if len(batchResps) != 0 { | ||
firstRangeStartKey := task.ranges.At(0).StartKey | ||
lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey | ||
logutil.Logger(bo.GetCtx()).Error("response of batched task missing", | ||
zap.Uint64("id", task.taskID), | ||
zap.Uint64("txnStartTS", worker.req.StartTs), | ||
zap.Uint64("regionID", task.region.GetID()), | ||
zap.Uint64("bucketsVer", task.bucketsVer), | ||
zap.Int("rangeNums", task.ranges.Len()), | ||
zap.ByteString("firstRangeStartKey", firstRangeStartKey), | ||
zap.ByteString("lastRangeEndKey", lastRangeEndKey), | ||
Comment on lines
+1424
to
+1425
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the meaning of printing the start key of the first range and the end key of the last range...? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the start key and end key for one batched task(it might be in the same region), I just copy the log from here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay.. |
||
zap.String("storeAddr", task.storeAddr)) | ||
} | ||
appendRemainTasks(t.task) | ||
} | ||
return remainTasks, nil | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.