Skip to content

Commit 0abd132

Browse files
authored
br: retry to scatter the regions if status is timeout or cancel (#46471)
close #47236
1 parent 249a5fe commit 0abd132

File tree

3 files changed

+202
-38
lines changed

3 files changed

+202
-38
lines changed

br/pkg/restore/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_library(
3131
"//br/pkg/conn/util",
3232
"//br/pkg/errors",
3333
"//br/pkg/glue",
34+
"//br/pkg/lightning/common",
3435
"//br/pkg/logutil",
3536
"//br/pkg/metautil",
3637
"//br/pkg/pdutil",

br/pkg/restore/split.go

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/pingcap/kvproto/pkg/pdpb"
1919
"github.com/pingcap/log"
2020
berrors "github.com/pingcap/tidb/br/pkg/errors"
21+
"github.com/pingcap/tidb/br/pkg/lightning/common"
2122
"github.com/pingcap/tidb/br/pkg/logutil"
2223
"github.com/pingcap/tidb/br/pkg/restore/split"
2324
"github.com/pingcap/tidb/br/pkg/rtree"
@@ -144,21 +145,15 @@ SplitRegions:
144145
}
145146
log.Info("start to wait for scattering regions",
146147
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
147-
startTime = time.Now()
148-
scatterCount := 0
149-
for _, region := range scatterRegions {
150-
rs.waitForScatterRegion(ctx, region)
151-
if time.Since(startTime) > split.ScatterWaitUpperInterval {
152-
break
153-
}
154-
scatterCount++
155-
}
156-
if scatterCount == len(scatterRegions) {
148+
149+
leftCnt := rs.WaitForScatterRegions(ctx, scatterRegions, split.ScatterWaitUpperInterval)
150+
if leftCnt == 0 {
157151
log.Info("waiting for scattering regions done",
158152
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
159153
} else {
160154
log.Warn("waiting for scattering regions timeout",
161-
zap.Int("scatterCount", scatterCount),
155+
zap.Int("NotScatterCount", leftCnt),
156+
zap.Int("TotalScatterCount", len(scatterRegions)),
162157
zap.Int("regions", len(scatterRegions)),
163158
zap.Duration("take", time.Since(startTime)))
164159
}
@@ -188,26 +183,48 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64)
188183
return len(regionInfo.PendingPeers) == 0, nil
189184
}
190185

191-
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
186+
// isScatterRegionFinished check the latest successful operator and return the follow status:
187+
//
188+
// return (finished, needRescatter, error)
189+
//
190+
// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the
191+
// scatter region operator is finished.
192+
//
193+
// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter
194+
// is true and the function caller needs to scatter this region again.
195+
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, bool, error) {
192196
resp, err := rs.client.GetOperator(ctx, regionID)
193197
if err != nil {
194-
return false, errors.Trace(err)
198+
if common.IsRetryableError(err) {
199+
// retry in the next cycle
200+
return false, false, nil
201+
}
202+
return false, false, errors.Trace(err)
195203
}
196204
// Heartbeat may not be sent to PD
197205
if respErr := resp.GetHeader().GetError(); respErr != nil {
198206
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
199-
return true, nil
207+
return true, false, nil
200208
}
201-
return false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType())
209+
return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType())
202210
}
203211
retryTimes := ctx.Value(retryTimes).(int)
204212
if retryTimes > 3 {
205213
log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp))
206214
}
207215
// If the current operator of the region is not 'scatter-region', we could assume
208-
// that 'scatter-operator' has finished or timeout
209-
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
210-
return ok, nil
216+
// that 'scatter-operator' has finished
217+
if string(resp.GetDesc()) != "scatter-region" {
218+
return true, false, nil
219+
}
220+
switch resp.GetStatus() {
221+
case pdpb.OperatorStatus_SUCCESS:
222+
return true, false, nil
223+
case pdpb.OperatorStatus_RUNNING:
224+
return false, false, nil
225+
default:
226+
return false, true, nil
227+
}
211228
}
212229

213230
func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
@@ -233,26 +250,66 @@ type retryTimeKey struct{}
233250

234251
var retryTimes = new(retryTimeKey)
235252

236-
func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
237-
interval := split.ScatterWaitInterval
238-
regionID := regionInfo.Region.GetId()
239-
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
240-
ctx1 := context.WithValue(ctx, retryTimes, i)
241-
ok, err := rs.isScatterRegionFinished(ctx1, regionID)
242-
if err != nil {
243-
log.Warn("scatter region failed: do not have the region",
244-
logutil.Region(regionInfo.Region))
245-
return
253+
func mapRegionInfoSlice(regionInfos []*split.RegionInfo) map[uint64]*split.RegionInfo {
254+
regionInfoMap := make(map[uint64]*split.RegionInfo)
255+
for _, info := range regionInfos {
256+
regionID := info.Region.GetId()
257+
regionInfoMap[regionID] = info
258+
}
259+
return regionInfoMap
260+
}
261+
262+
func (rs *RegionSplitter) WaitForScatterRegions(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int {
263+
var (
264+
startTime = time.Now()
265+
interval = split.ScatterWaitInterval
266+
leftRegions = mapRegionInfoSlice(regionInfos)
267+
retryCnt = 0
268+
269+
reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos))
270+
)
271+
for {
272+
ctx1 := context.WithValue(ctx, retryTimes, retryCnt)
273+
reScatterRegions = reScatterRegions[:0]
274+
for regionID, regionInfo := range leftRegions {
275+
ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID)
276+
if err != nil {
277+
log.Warn("scatter region failed: do not have the region",
278+
logutil.Region(regionInfo.Region), zap.Error(err))
279+
delete(leftRegions, regionID)
280+
continue
281+
}
282+
if ok {
283+
delete(leftRegions, regionID)
284+
continue
285+
}
286+
if rescatter {
287+
reScatterRegions = append(reScatterRegions, regionInfo)
288+
}
289+
// RUNNING_STATUS, just wait and check it in the next loop
246290
}
247-
if ok {
291+
292+
if len(leftRegions) == 0 {
293+
return 0
294+
}
295+
296+
if len(reScatterRegions) > 0 {
297+
rs.ScatterRegions(ctx1, reScatterRegions)
298+
}
299+
300+
if time.Since(startTime) > timeout {
248301
break
249302
}
303+
304+
retryCnt += 1
250305
interval = 2 * interval
251306
if interval > split.ScatterMaxWaitInterval {
252307
interval = split.ScatterMaxWaitInterval
253308
}
254309
time.Sleep(interval)
255310
}
311+
312+
return len(leftRegions)
256313
}
257314

258315
func (rs *RegionSplitter) splitAndScatterRegions(
@@ -780,16 +837,10 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error {
780837
}
781838
}
782839

783-
startTime := time.Now()
784840
regionSplitter := NewRegionSplitter(helper.client)
785-
for _, region := range scatterRegions {
786-
regionSplitter.waitForScatterRegion(ctx, region)
787-
// It is too expensive to stop recovery and wait for a small number of regions
788-
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
789-
if time.Since(startTime) > time.Minute {
790-
break
791-
}
792-
}
841+
// It is too expensive to stop recovery and wait for a small number of regions
842+
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
843+
_ = regionSplitter.WaitForScatterRegions(ctx, scatterRegions, time.Minute)
793844
}()
794845

795846
iter := helper.iterator()

br/pkg/restore/split_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type TestClient struct {
4141
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
4242
nextRegionID uint64
4343
injectInScatter func(*split.RegionInfo) error
44+
injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error)
4445
supportBatchScatter bool
4546

4647
scattered map[uint64]bool
@@ -215,6 +216,9 @@ func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *split.Region
215216
}
216217

217218
func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
219+
if c.injectInOperator != nil {
220+
return c.injectInOperator(regionID)
221+
}
218222
return &pdpb.GetOperatorResponse{
219223
Header: new(pdpb.ResponseHeader),
220224
}, nil
@@ -337,6 +341,114 @@ func TestSplitAndScatter(t *testing.T) {
337341
client := initTestClient(false)
338342
runTestSplitAndScatterWith(t, client)
339343
})
344+
t.Run("WaitScatter", func(t *testing.T) {
345+
client := initTestClient(false)
346+
client.InstallBatchScatterSupport()
347+
runWaitScatter(t, client)
348+
})
349+
}
350+
351+
func TestXXX(t *testing.T) {
352+
client := initTestClient(false)
353+
client.InstallBatchScatterSupport()
354+
runWaitScatter(t, client)
355+
}
356+
357+
// +------------+----------------------------
358+
// | region | states
359+
// +------------+----------------------------
360+
// | [ , aay) | SUCCESS
361+
// +------------+----------------------------
362+
// | [aay, bba) | CANCEL, SUCCESS
363+
// +------------+----------------------------
364+
// | [bba, bbh) | RUNNING, TIMEOUT, SUCCESS
365+
// +------------+----------------------------
366+
// | [bbh, cca) | <NOT_SCATTER_OPEARTOR>
367+
// +------------+----------------------------
368+
// | [cca, ) | CANCEL, RUNNING, SUCCESS
369+
// +------------+----------------------------
370+
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
371+
// states:
372+
func runWaitScatter(t *testing.T, client *TestClient) {
373+
// configuration
374+
type Operatorstates struct {
375+
index int
376+
status []pdpb.OperatorStatus
377+
}
378+
results := map[string]*Operatorstates{
379+
"": {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_SUCCESS}},
380+
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_SUCCESS}},
381+
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_TIMEOUT, pdpb.OperatorStatus_SUCCESS}},
382+
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): {},
383+
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_SUCCESS}},
384+
}
385+
// after test done, the `leftScatterCount` should be empty
386+
leftScatterCount := map[string]int{
387+
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 1,
388+
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 1,
389+
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 1,
390+
}
391+
client.injectInScatter = func(ri *split.RegionInfo) error {
392+
states, ok := results[string(ri.Region.StartKey)]
393+
require.True(t, ok)
394+
require.NotEqual(t, 0, len(states.status))
395+
require.NotEqual(t, pdpb.OperatorStatus_SUCCESS, states.status[states.index])
396+
states.index += 1
397+
cnt, ok := leftScatterCount[string(ri.Region.StartKey)]
398+
require.True(t, ok)
399+
if cnt == 1 {
400+
delete(leftScatterCount, string(ri.Region.StartKey))
401+
} else {
402+
leftScatterCount[string(ri.Region.StartKey)] = cnt - 1
403+
}
404+
return nil
405+
}
406+
regionsMap := client.GetAllRegions()
407+
leftOperatorCount := map[string]int{
408+
"": 1,
409+
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 2,
410+
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 3,
411+
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): 1,
412+
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 3,
413+
}
414+
client.injectInOperator = func(u uint64) (*pdpb.GetOperatorResponse, error) {
415+
ri := regionsMap[u]
416+
cnt, ok := leftOperatorCount[string(ri.Region.StartKey)]
417+
require.True(t, ok)
418+
if cnt == 1 {
419+
delete(leftOperatorCount, string(ri.Region.StartKey))
420+
} else {
421+
leftOperatorCount[string(ri.Region.StartKey)] = cnt - 1
422+
}
423+
states, ok := results[string(ri.Region.StartKey)]
424+
require.True(t, ok)
425+
if len(states.status) == 0 {
426+
return &pdpb.GetOperatorResponse{
427+
Desc: []byte("other"),
428+
}, nil
429+
}
430+
if states.status[states.index] == pdpb.OperatorStatus_RUNNING {
431+
states.index += 1
432+
return &pdpb.GetOperatorResponse{
433+
Desc: []byte("scatter-region"),
434+
Status: states.status[states.index-1],
435+
}, nil
436+
}
437+
return &pdpb.GetOperatorResponse{
438+
Desc: []byte("scatter-region"),
439+
Status: states.status[states.index],
440+
}, nil
441+
}
442+
443+
// begin to test
444+
ctx := context.Background()
445+
regions := make([]*split.RegionInfo, 0, len(regionsMap))
446+
for _, info := range regionsMap {
447+
regions = append(regions, info)
448+
}
449+
regionSplitter := restore.NewRegionSplitter(client)
450+
leftCnt := regionSplitter.WaitForScatterRegions(ctx, regions, 2000*time.Second)
451+
require.Equal(t, leftCnt, 0)
340452
}
341453

342454
func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {

0 commit comments

Comments
 (0)