diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index cfea7cf900721..f2079793514a8 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -70,7 +70,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 35, + shard_count = 36, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/flush_subscriber.go b/br/pkg/streamhelper/flush_subscriber.go index f1310b0212372..fdab936ad5f97 100644 --- a/br/pkg/streamhelper/flush_subscriber.go +++ b/br/pkg/streamhelper/flush_subscriber.go @@ -4,7 +4,6 @@ package streamhelper import ( "context" - "io" "strconv" "sync" "time" @@ -278,12 +277,10 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) { // Shall we use RecvMsg for better performance? // Note that the spans.Full requires the input slice be immutable. msg, err := cli.Recv() + failpoint.InjectCall("listen_flush_stream", s.storeID, &err) if err != nil { logutil.CL(ctx).Info("Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err)) - if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled { - return - } s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID)) return } diff --git a/br/pkg/streamhelper/subscription_test.go b/br/pkg/streamhelper/subscription_test.go index da7aa627eabd0..4025e96ba6d4f 100644 --- a/br/pkg/streamhelper/subscription_test.go +++ b/br/pkg/streamhelper/subscription_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/stretchr/testify/require" @@ -240,3 +241,28 @@ func TestSomeOfStoreUnsupported(t *testing.T) { req.NoError(err) req.Equal(cp, s.MinValue()) } + +func TestEncounterError(t *testing.T) { + req := require.New(t) + ctx := context.Background() + c := createFakeCluster(t, 4, true) + c.splitAndScatter("0001", "0002", "0003", "0008", "0009", "0010", "0100", "0956", "1000") + + sub := streamhelper.NewSubscriber(c, c) + installSubscribeSupport(c) + req.NoError(sub.UpdateStoreTopology(ctx)) + + o := new(sync.Once) + failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/streamhelper/listen_flush_stream", func(storeID uint64, err *error) { + o.Do(func() { + *err = context.Canceled + }) + }) + + c.flushAll() + require.Eventually(t, func() bool { + return sub.PendingErrors() != nil + }, 3*time.Second, 100*time.Millisecond) + sub.HandleErrors(context.Background()) + require.NoError(t, sub.PendingErrors()) +}