Skip to content

Commit 3a378c8

Browse files
authored
br: add retry for raw kv client put (#58963)
close #58845
1 parent c34a6b6 commit 3a378c8

File tree

4 files changed

+96
-1
lines changed

4 files changed

+96
-1
lines changed

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ go_test(
101101
"//br/pkg/mock",
102102
"//br/pkg/restore",
103103
"//br/pkg/restore/internal/import_client",
104+
"//br/pkg/restore/internal/rawkv",
104105
"//br/pkg/restore/split",
105106
"//br/pkg/restore/utils",
106107
"//br/pkg/storage",
@@ -132,6 +133,7 @@ go_test(
132133
"@com_github_pingcap_kvproto//pkg/metapb",
133134
"@com_github_pingcap_log//:log",
134135
"@com_github_stretchr_testify//require",
136+
"@com_github_tikv_client_go_v2//rawkv",
135137
"@org_golang_google_grpc//codes",
136138
"@org_golang_google_grpc//keepalive",
137139
"@org_golang_google_grpc//status",

br/pkg/restore/log_client/client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1493,7 +1493,7 @@ func (rc *LogClient) restoreMetaKvEntries(
14931493
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
14941494
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
14951495
})
1496-
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
1496+
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
14971497
return 0, 0, errors.Trace(err)
14981498
}
14991499
// for failpoint, we need to flush the cache in rawKVClient every time
@@ -2053,3 +2053,13 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore(
20532053

20542054
return eg.Wait()
20552055
}
2056+
2057+
func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, originTs uint64) error {
2058+
err := utils.WithRetry(ctx, func() error {
2059+
return client.Put(ctx, key, value, originTs)
2060+
}, utils.NewRawClientBackoffStrategy())
2061+
if err != nil {
2062+
return errors.Errorf("failed to put raw kv after retry")
2063+
}
2064+
return nil
2065+
}

br/pkg/restore/log_client/client_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pingcap/tidb/br/pkg/gluetidb"
3232
"github.com/pingcap/tidb/br/pkg/mock"
3333
"github.com/pingcap/tidb/br/pkg/restore"
34+
rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv"
3435
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
3536
"github.com/pingcap/tidb/br/pkg/restore/split"
3637
"github.com/pingcap/tidb/br/pkg/restore/utils"
@@ -49,6 +50,7 @@ import (
4950
"github.com/pingcap/tidb/pkg/util/sqlexec"
5051
filter "github.com/pingcap/tidb/pkg/util/table-filter"
5152
"github.com/stretchr/testify/require"
53+
"github.com/tikv/client-go/v2/rawkv"
5254
"google.golang.org/grpc/keepalive"
5355
)
5456

@@ -1986,3 +1988,69 @@ func fakeRowKey(tableID, rowID int64) kv.Key {
19861988
func fakeRowRawKey(tableID, rowID int64) kv.Key {
19871989
return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))
19881990
}
1991+
1992+
type mockRawKVClient struct {
1993+
rawkv.Client
1994+
putCount int
1995+
errThreshold int
1996+
}
1997+
1998+
func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
1999+
m.putCount += 1
2000+
if m.errThreshold >= m.putCount {
2001+
return errors.New("rpcClient is idle")
2002+
}
2003+
return nil
2004+
}
2005+
2006+
func TestPutRawKvWithRetry(t *testing.T) {
2007+
tests := []struct {
2008+
name string
2009+
errThreshold int
2010+
cancelAfter time.Duration
2011+
wantErr string
2012+
wantPuts int
2013+
}{
2014+
{
2015+
name: "success on first try",
2016+
errThreshold: 0,
2017+
wantPuts: 1,
2018+
},
2019+
{
2020+
name: "success on after failure",
2021+
errThreshold: 2,
2022+
wantPuts: 3,
2023+
},
2024+
{
2025+
name: "fails all retries",
2026+
errThreshold: 5,
2027+
wantErr: "failed to put raw kv after retry",
2028+
wantPuts: 5,
2029+
},
2030+
}
2031+
2032+
for _, tt := range tests {
2033+
t.Run(tt.name, func(t *testing.T) {
2034+
mockRawClient := &mockRawKVClient{
2035+
errThreshold: tt.errThreshold,
2036+
}
2037+
client := rawclient.NewRawKVBatchClient(mockRawClient, 1)
2038+
2039+
ctx := context.Background()
2040+
if tt.cancelAfter > 0 {
2041+
var cancel context.CancelFunc
2042+
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
2043+
defer cancel()
2044+
}
2045+
2046+
err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)
2047+
2048+
if tt.wantErr != "" {
2049+
require.ErrorContains(t, err, tt.wantErr)
2050+
} else {
2051+
require.NoError(t, err)
2052+
}
2053+
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
2054+
})
2055+
}
2056+
}

br/pkg/utils/backoff.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ const (
5353
recoveryMaxAttempts = 16
5454
recoveryDelayTime = 30 * time.Second
5555
recoveryMaxDelayTime = 4 * time.Minute
56+
57+
rawClientMaxAttempts = 5
58+
rawClientDelayTime = 500 * time.Millisecond
59+
rawClientMaxDelayTime = 5 * time.Second
5660
)
5761

5862
// BackoffStrategy implements a backoff strategy for retry operations.
@@ -379,6 +383,17 @@ func NewChecksumBackoffStrategy() BackoffStrategy {
379383
)
380384
}
381385

386+
func NewRawClientBackoffStrategy() BackoffStrategy {
387+
return NewBackoffStrategy(
388+
WithRemainingAttempts(rawClientMaxAttempts),
389+
WithDelayTime(rawClientDelayTime),
390+
WithMaxDelayTime(rawClientMaxDelayTime),
391+
WithErrorContext(NewZeroRetryContext("raw client")),
392+
WithRetryErrorFunc(alwaysTrueFunc()),
393+
WithNonRetryErrorFunc(alwaysFalseFunc()),
394+
)
395+
}
396+
382397
func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration {
383398
errs := multierr.Errors(err)
384399
lastErr := errs[len(errs)-1]

0 commit comments

Comments
 (0)