Skip to content

Commit 8f74db6

Browse files
YuJuncenti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#46094
Signed-off-by: ti-chi-bot <[email protected]>
1 parent f1f62f7 commit 8f74db6

File tree

3 files changed

+126
-8
lines changed

3 files changed

+126
-8
lines changed

br/pkg/restore/data.go

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,75 @@ import (
2727
"google.golang.org/grpc/backoff"
2828
)
2929

30+
type RecoveryStage int
31+
32+
const (
33+
StageUnknown RecoveryStage = iota
34+
StageCollectingMeta
35+
StageMakingRecoveryPlan
36+
StageResetPDAllocateID
37+
StageRecovering
38+
StageFlashback
39+
)
40+
41+
func (s RecoveryStage) String() string {
42+
switch s {
43+
case StageCollectingMeta:
44+
return "collecting meta"
45+
case StageMakingRecoveryPlan:
46+
return "making recovery plan"
47+
case StageResetPDAllocateID:
48+
return "resetting PD allocate ID"
49+
case StageRecovering:
50+
return "recovering"
51+
case StageFlashback:
52+
return "flashback"
53+
default:
54+
return "unknown"
55+
}
56+
}
57+
58+
type recoveryError struct {
59+
error
60+
atStage RecoveryStage
61+
}
62+
63+
func FailedAt(err error) RecoveryStage {
64+
if rerr, ok := err.(recoveryError); ok {
65+
return rerr.atStage
66+
}
67+
return StageUnknown
68+
}
69+
70+
type recoveryBackoffer struct {
71+
state utils.RetryState
72+
}
73+
74+
func newRecoveryBackoffer() *recoveryBackoffer {
75+
return &recoveryBackoffer{
76+
state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute),
77+
}
78+
}
79+
80+
func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
81+
s := FailedAt(err)
82+
switch s {
83+
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
84+
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
85+
return bo.state.ExponentialBackoff()
86+
case StageFlashback:
87+
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
88+
bo.state.GiveUp()
89+
return 0
90+
}
91+
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
92+
return bo.state.ExponentialBackoff()
93+
}
94+
95+
func (bo *recoveryBackoffer) Attempt() int {
96+
return bo.state.Attempt()
97+
}
98+
3099
// RecoverData recover the tikv cluster
31100
// 1. read all meta data from tikvs
32101
// 2. make recovery plan and then recovery max allocate ID firstly
@@ -35,39 +104,52 @@ import (
35104
// 5. prepare the flashback
36105
// 6. flashback to resolveTS
37106
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
107+
// Roughly handle the case that some TiKVs are rebooted during making plan.
108+
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
109+
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
110+
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
111+
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
112+
})
113+
}
114+
115+
func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
116+
var cancel context.CancelFunc
117+
ctx, cancel = context.WithCancel(ctx)
118+
defer cancel()
119+
38120
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
39121
if err := recovery.ReadRegionMeta(ctx); err != nil {
40-
return 0, errors.Trace(err)
122+
return 0, recoveryError{error: err, atStage: StageCollectingMeta}
41123
}
42124

43125
totalRegions := recovery.GetTotalRegions()
44126

45127
if err := recovery.MakeRecoveryPlan(); err != nil {
46-
return totalRegions, errors.Trace(err)
128+
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}
47129
}
48130

49131
log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
50132
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
51-
return totalRegions, errors.Trace(err)
133+
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}
52134
}
53135

54136
// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
55137
// This wathcher will retrigger `RecoveryRegions` for those stores.
56138
recovery.SpawnTiKVShutDownWatchers(ctx)
57139
if err := recovery.RecoverRegions(ctx); err != nil {
58-
return totalRegions, errors.Trace(err)
140+
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
59141
}
60142

61143
if err := recovery.WaitApply(ctx); err != nil {
62-
return totalRegions, errors.Trace(err)
144+
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
63145
}
64146

65147
if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
66-
return totalRegions, errors.Trace(err)
148+
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
67149
}
68150

69151
if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
70-
return totalRegions, errors.Trace(err)
152+
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
71153
}
72154

73155
return totalRegions, nil

br/pkg/utils/backoff.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration {
7272
return backoff
7373
}
7474

75+
func (rs *RetryState) GiveUp() {
76+
rs.retryTimes = rs.maxRetry
77+
}
78+
7579
// InitialRetryState make the initial state for retrying.
7680
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
7781
return RetryState{

br/pkg/utils/retry.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ var retryableServerError = []string{
3434
// RetryableFunc presents a retryable operation.
3535
type RetryableFunc func() error
3636

37+
type RetryableFuncV2[T any] func(context.Context) (T, error)
38+
3739
// Backoffer implements a backoff policy for retrying operations.
3840
type Backoffer interface {
3941
// NextBackoff returns a duration to wait before retrying again
@@ -51,8 +53,26 @@ func WithRetry(
5153
retryableFunc RetryableFunc,
5254
backoffer Backoffer,
5355
) error {
56+
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
57+
innerErr := retryableFunc()
58+
return struct{}{}, innerErr
59+
})
60+
return err
61+
}
62+
63+
// WithRetryV2 retries a given operation with a backoff policy.
64+
//
65+
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
66+
// multierr that containing all errors encountered.
67+
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
68+
func WithRetryV2[T any](
69+
ctx context.Context,
70+
backoffer Backoffer,
71+
fn RetryableFuncV2[T],
72+
) (T, error) {
5473
var allErrors error
5574
for backoffer.Attempt() > 0 {
75+
<<<<<<< HEAD
5676
err := retryableFunc()
5777
if err != nil {
5878
allErrors = multierr.Append(allErrors, err)
@@ -64,8 +84,20 @@ func WithRetry(
6484
} else {
6585
return nil
6686
}
87+
=======
88+
res, err := fn(ctx)
89+
if err == nil {
90+
return res, nil
91+
}
92+
allErrors = multierr.Append(allErrors, err)
93+
select {
94+
case <-ctx.Done():
95+
return *new(T), allErrors
96+
case <-time.After(backoffer.NextBackoff(err)):
97+
}
98+
>>>>>>> 2ac191a1379 (snap_restore: added retry for recovery (#46094))
6799
}
68-
return allErrors // nolint:wrapcheck
100+
return *new(T), allErrors // nolint:wrapcheck
69101
}
70102

71103
// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.

0 commit comments

Comments
 (0)