Skip to content

Commit b0d4953

Browse files
authored
lightning: add timeout for "write" RPC (#48355) (#48396)
close #46321, close #48352
1 parent 40f8cb8 commit b0d4953

File tree

3 files changed

+24
-3
lines changed

3 files changed

+24
-3
lines changed

br/pkg/lightning/backend/local/local.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package local
1717
import (
1818
"bytes"
1919
"context"
20+
goerrors "errors"
2021
"fmt"
2122
"io"
2223
"math"
@@ -907,17 +908,30 @@ type rangeStats struct {
907908
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
908909
// tikv will takes the responsibility to do so.
909910
func (local *local) WriteToTiKV(
910-
ctx context.Context,
911+
pCtx context.Context,
911912
engine *Engine,
912913
region *split.RegionInfo,
913914
start, end []byte,
914915
regionSplitSize int64,
915916
regionSplitKeys int64,
916-
) ([]*sst.SSTMeta, Range, rangeStats, error) {
917+
) (s []*sst.SSTMeta, r Range, r2 rangeStats, errRet error) {
917918
failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) {
918919
failpoint.Return(nil, Range{}, rangeStats{},
919920
errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0))
920921
})
922+
ctx, cancel := context.WithTimeout(pCtx, 15*time.Minute)
923+
defer cancel()
924+
defer func() {
925+
deadline, ok := ctx.Deadline()
926+
if !ok {
927+
// should not happen
928+
return
929+
}
930+
if goerrors.Is(errRet, context.DeadlineExceeded) && time.Now().After(deadline) {
931+
errRet = common.ErrWriteTooSlow
932+
}
933+
}()
934+
921935
if local.checkTiKVAvaliable {
922936
for _, peer := range region.Region.GetPeers() {
923937
var e error

br/pkg/lightning/common/retry.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,19 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{
8888
drivererr.ErrUnknown.ID(): {},
8989
}
9090

91+
// ErrWriteTooSlow is used to get rid of the gRPC blocking issue.
92+
// there are some strange blocking issues of gRPC like
93+
// https://github.com/pingcap/tidb/issues/48352
94+
// https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭
95+
var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever")
96+
9197
func isSingleRetryableError(err error) bool {
9298
err = errors.Cause(err)
9399

94100
switch err {
95101
case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows:
96102
return false
97-
case mysql.ErrInvalidConn, driver.ErrBadConn:
103+
case mysql.ErrInvalidConn, driver.ErrBadConn, ErrWriteTooSlow:
98104
return true
99105
}
100106

br/pkg/lightning/common/retry_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
func TestIsRetryableError(t *testing.T) {
3535
require.False(t, IsRetryableError(context.Canceled))
3636
require.False(t, IsRetryableError(context.DeadlineExceeded))
37+
require.True(t, IsRetryableError(ErrWriteTooSlow))
3738
require.False(t, IsRetryableError(io.EOF))
3839
require.False(t, IsRetryableError(&net.AddrError{}))
3940
require.False(t, IsRetryableError(&net.DNSError{}))

0 commit comments

Comments
 (0)