Skip to content

Commit eacc493

Browse files
authored
br: refactor error handle mechanism to tolerant unexpect kv errors. (pingcap#48646) (pingcap#49276)
close pingcap#47656
1 parent 80f96ed commit eacc493

File tree

11 files changed

+311
-92
lines changed

11 files changed

+311
-92
lines changed

br/pkg/backup/client.go

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,17 +1185,18 @@ func OnBackupResponse(
11851185
backupTS uint64,
11861186
lockResolver *txnlock.LockResolver,
11871187
resp *backuppb.BackupResponse,
1188+
errContext *utils.ErrorContext,
11881189
) (*backuppb.BackupResponse, int, error) {
11891190
log.Debug("OnBackupResponse", zap.Reflect("resp", resp))
11901191
if resp.Error == nil {
11911192
return resp, 0, nil
11921193
}
11931194
backoffMs := 0
1194-
switch v := resp.Error.Detail.(type) {
1195+
1196+
err := resp.Error
1197+
switch v := err.Detail.(type) {
11951198
case *backuppb.Error_KvError:
11961199
if lockErr := v.KvError.Locked; lockErr != nil {
1197-
// Try to resolve lock.
1198-
log.Warn("backup occur kv error", zap.Reflect("error", v))
11991200
msBeforeExpired, err1 := lockResolver.ResolveLocks(
12001201
bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)})
12011202
if err1 != nil {
@@ -1206,44 +1207,16 @@ func OnBackupResponse(
12061207
}
12071208
return nil, backoffMs, nil
12081209
}
1209-
// Backup should not meet error other than KeyLocked.
1210-
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
1211-
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
1212-
1213-
case *backuppb.Error_RegionError:
1214-
regionErr := v.RegionError
1215-
// Ignore following errors.
1216-
if !(regionErr.EpochNotMatch != nil ||
1217-
regionErr.NotLeader != nil ||
1218-
regionErr.RegionNotFound != nil ||
1219-
regionErr.ServerIsBusy != nil ||
1220-
regionErr.StaleCommand != nil ||
1221-
regionErr.StoreNotMatch != nil ||
1222-
regionErr.ReadIndexNotReady != nil ||
1223-
regionErr.ProposalInMergingMode != nil) {
1224-
log.Error("unexpect region error", zap.Reflect("RegionError", regionErr))
1225-
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
1226-
}
1227-
log.Warn("backup occur region error",
1228-
zap.Reflect("RegionError", regionErr),
1229-
zap.Uint64("storeID", storeID))
1230-
// TODO: a better backoff.
1231-
backoffMs = 1000 /* 1s */
1232-
return nil, backoffMs, nil
1233-
case *backuppb.Error_ClusterIdError:
1234-
log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID))
1235-
return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID)
12361210
default:
1237-
// UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error.
1238-
if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
1239-
log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg()))
1240-
// back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year),
1241-
// this time would be probably enough for s3 to resume.
1211+
res := errContext.HandleError(resp.Error, storeID)
1212+
switch res.Strategy {
1213+
case utils.GiveUpStrategy:
1214+
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason)
1215+
case utils.RetryStrategy:
12421216
return nil, 3000, nil
12431217
}
1244-
log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID))
1245-
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID)
12461218
}
1219+
return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable")
12471220
}
12481221

12491222
func (bc *Client) handleFineGrained(
@@ -1273,12 +1246,13 @@ func (bc *Client) handleFineGrained(
12731246
}
12741247
hasProgress := false
12751248
backoffMill := 0
1249+
errContext := utils.NewErrorContext("handleFineGrainedBackup", 10)
12761250
err = SendBackup(
12771251
ctx, storeID, client, req,
12781252
// Handle responses with the same backoffer.
12791253
func(resp *backuppb.BackupResponse) error {
12801254
response, shouldBackoff, err1 :=
1281-
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp)
1255+
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext)
12821256
if err1 != nil {
12831257
return err1
12841258
}

br/pkg/backup/client_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pingcap/tidb/br/pkg/mock"
2121
"github.com/pingcap/tidb/br/pkg/pdutil"
2222
"github.com/pingcap/tidb/br/pkg/storage"
23+
"github.com/pingcap/tidb/br/pkg/utils"
2324
"github.com/pingcap/tidb/kv"
2425
"github.com/pingcap/tidb/parser/model"
2526
"github.com/pingcap/tidb/tablecodec"
@@ -230,20 +231,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
230231
}
231232

232233
cases := []Case{
233-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false},
234-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false},
234+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false},
235+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false},
235236
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true},
236-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
237-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false},
238-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false},
239-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
237+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
238+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false},
239+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false},
240+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
240241
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true},
241-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false},
242-
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false},
242+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false},
243+
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false},
243244
}
244245
for _, cs := range cases {
245246
t.Log(cs)
246-
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp)
247+
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1))
247248
require.Equal(t, cs.exceptedBackoffMs, backoffMs)
248249
if cs.exceptedErr {
249250
require.Error(t, err)

br/pkg/backup/push.go

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package backup
44

55
import (
66
"context"
7-
"fmt"
87
"sync"
98

109
"github.com/opentracing/opentracing-go"
@@ -73,6 +72,7 @@ func (push *pushDown) pushBackup(
7372
})
7473

7574
wg := new(sync.WaitGroup)
75+
errContext := utils.NewErrorContext("pushBackup", 10)
7676
for _, s := range stores {
7777
store := s
7878
storeID := s.GetId()
@@ -183,35 +183,10 @@ func (push *pushDown) pushBackup(
183183
progressCallBack(RegionUnit)
184184
} else {
185185
errPb := resp.GetError()
186-
switch v := errPb.Detail.(type) {
187-
case *backuppb.Error_KvError:
188-
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))
189-
190-
case *backuppb.Error_RegionError:
191-
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))
192-
193-
case *backuppb.Error_ClusterIdError:
194-
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
195-
return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
196-
default:
197-
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
198-
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
199-
continue
200-
}
201-
var errMsg string
202-
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
203-
errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+
204-
"work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.",
205-
store.GetId(), redact.String(store.GetAddress()))
206-
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
207-
}
208-
if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
209-
errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+
210-
"work around:please ensure tikv has permission to read from & write to the storage.",
211-
store.GetId(), redact.String(store.GetAddress()))
212-
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
213-
}
214-
186+
res := errContext.HandleIgnorableError(errPb, store.GetId())
187+
switch res.Strategy {
188+
case utils.GiveUpStrategy:
189+
errMsg := res.Reason
215190
if len(errMsg) <= 0 {
216191
errMsg = errPb.Msg
217192
}
@@ -220,6 +195,10 @@ func (push *pushDown) pushBackup(
220195
redact.String(store.GetAddress()),
221196
errMsg,
222197
)
198+
default:
199+
// other type just continue for next response
200+
// and finally handle the range in fineGrainedBackup
201+
continue
223202
}
224203
}
225204
case err := <-push.errCh:

br/pkg/errors/errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ var (
4949
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
5050
ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader"))
5151
ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded"))
52+
ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked"))
53+
ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion"))
5254

5355
ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch"))
5456
ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch"))

br/pkg/utils/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ go_test(
9494
],
9595
embed = [":utils"],
9696
flaky = True,
97-
shard_count = 33,
97+
shard_count = 36,
9898
deps = [
9999
"//br/pkg/errors",
100100
"//br/pkg/metautil",
@@ -115,6 +115,7 @@ go_test(
115115
"@com_github_pingcap_failpoint//:failpoint",
116116
"@com_github_pingcap_kvproto//pkg/brpb",
117117
"@com_github_pingcap_kvproto//pkg/encryptionpb",
118+
"@com_github_pingcap_kvproto//pkg/errorpb",
118119
"@com_github_pingcap_kvproto//pkg/import_sstpb",
119120
"@com_github_pingcap_kvproto//pkg/metapb",
120121
"@com_github_stretchr_testify//require",

br/pkg/utils/backoff.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,28 +120,34 @@ type importerBackoffer struct {
120120
attempt int
121121
delayTime time.Duration
122122
maxDelayTime time.Duration
123+
errContext *ErrorContext
123124
}
124125

125126
// NewBackoffer creates a new controller regulating a truncated exponential backoff.
126-
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer {
127+
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer {
127128
return &importerBackoffer{
128129
attempt: attempt,
129130
delayTime: delayTime,
130131
maxDelayTime: maxDelayTime,
132+
errContext: errContext,
131133
}
132134
}
133135

134136
func NewImportSSTBackoffer() Backoffer {
135-
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval)
137+
errContext := NewErrorContext("import sst", 3)
138+
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext)
136139
}
137140

138141
func NewDownloadSSTBackoffer() Backoffer {
139-
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
142+
errContext := NewErrorContext("download sst", 3)
143+
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
140144
}
141145

142146
func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
143147
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
144-
if MessageIsRetryableStorageError(err.Error()) {
148+
// we don't care storeID here.
149+
res := bo.errContext.HandleErrorMsg(err.Error(), 0)
150+
if res.Strategy == RetryStrategy {
145151
bo.delayTime = 2 * bo.delayTime
146152
bo.attempt--
147153
} else {
@@ -151,7 +157,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
151157
bo.delayTime = 2 * bo.delayTime
152158
bo.attempt--
153159
case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound:
154-
// Excepted error, finish the operation
160+
// Expected error, finish the operation
155161
bo.delayTime = 0
156162
bo.attempt = 0
157163
default:
@@ -160,10 +166,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
160166
bo.delayTime = 2 * bo.delayTime
161167
bo.attempt--
162168
default:
163-
// Unexcepted error
169+
// Unexpected error
164170
bo.delayTime = 0
165171
bo.attempt = 0
166-
log.Warn("unexcepted error, stop to retry", zap.Error(err))
172+
log.Warn("unexpected error, stop retrying", zap.Error(err))
167173
}
168174
}
169175
}

br/pkg/utils/backoff_test.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/pingcap/errors"
1112
berrors "github.com/pingcap/tidb/br/pkg/errors"
1213
"github.com/pingcap/tidb/br/pkg/utils"
1314
"github.com/stretchr/testify/require"
@@ -18,7 +19,7 @@ import (
1819

1920
func TestBackoffWithSuccess(t *testing.T) {
2021
var counter int
21-
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
22+
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
2223
err := utils.WithRetry(context.Background(), func() error {
2324
defer func() { counter++ }()
2425
switch counter {
@@ -35,9 +36,26 @@ func TestBackoffWithSuccess(t *testing.T) {
3536
require.NoError(t, err)
3637
}
3738

39+
func TestBackoffWithUnknowneErrorSuccess(t *testing.T) {
40+
var counter int
41+
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
42+
err := utils.WithRetry(context.Background(), func() error {
43+
defer func() { counter++ }()
44+
switch counter {
45+
case 0:
46+
return errors.New("unknown error: not in the allow list")
47+
case 1:
48+
return berrors.ErrKVEpochNotMatch
49+
}
50+
return nil
51+
}, backoffer)
52+
require.Equal(t, 3, counter)
53+
require.NoError(t, err)
54+
}
55+
3856
func TestBackoffWithFatalError(t *testing.T) {
3957
var counter int
40-
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
58+
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
4159
gRPCError := status.Error(codes.Unavailable, "transport is closing")
4260
err := utils.WithRetry(context.Background(), func() error {
4361
defer func() { counter++ }()
@@ -65,7 +83,7 @@ func TestBackoffWithFatalError(t *testing.T) {
6583
func TestBackoffWithFatalRawGRPCError(t *testing.T) {
6684
var counter int
6785
canceledError := status.Error(codes.Canceled, "context canceled")
68-
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
86+
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
6987
err := utils.WithRetry(context.Background(), func() error {
7088
defer func() { counter++ }()
7189
return canceledError // nolint:wrapcheck
@@ -76,7 +94,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) {
7694

7795
func TestBackoffWithRetryableError(t *testing.T) {
7896
var counter int
79-
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
97+
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
8098
err := utils.WithRetry(context.Background(), func() error {
8199
defer func() { counter++ }()
82100
return berrors.ErrKVEpochNotMatch

br/pkg/utils/permission.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ var (
77
permissionDeniedMsg = "permissiondenied"
88
)
99

10-
// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
11-
func MessageIsNotFoundStorageError(msg string) bool {
10+
// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
11+
func messageIsNotFoundStorageError(msg string) bool {
1212
msgLower := strings.ToLower(msg)
1313
return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg)
1414
}
1515

1616
// MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
17-
func MessageIsPermissionDeniedStorageError(msg string) bool {
17+
func messageIsPermissionDeniedStorageError(msg string) bool {
1818
msgLower := strings.ToLower(msg)
1919
return strings.Contains(msgLower, permissionDeniedMsg)
2020
}

0 commit comments

Comments
 (0)