Skip to content

Commit 8e233a3

Browse files
authored
streamhelper: don't cut down connection when subscription stream closed (#59559)
close #58691
1 parent 21f9bb7 commit 8e233a3

File tree

3 files changed

+28
-5
lines changed

3 files changed

+28
-5
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ go_test(
7070
],
7171
flaky = True,
7272
race = "on",
73-
shard_count = 35,
73+
shard_count = 36,
7474
deps = [
7575
":streamhelper",
7676
"//br/pkg/errors",

br/pkg/streamhelper/flush_subscriber.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package streamhelper
44

55
import (
66
"context"
7-
"io"
87
"strconv"
98
"sync"
109
"time"
@@ -278,12 +277,10 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
278277
// Shall we use RecvMsg for better performance?
279278
// Note that the spans.Full requires the input slice be immutable.
280279
msg, err := cli.Recv()
280+
failpoint.InjectCall("listen_flush_stream", s.storeID, &err)
281281
if err != nil {
282282
logutil.CL(ctx).Info("Listen stopped.",
283283
zap.Uint64("store", storeID), logutil.ShortError(err))
284-
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
285-
return
286-
}
287284
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
288285
return
289286
}

br/pkg/streamhelper/subscription_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/pingcap/failpoint"
1213
"github.com/pingcap/tidb/br/pkg/streamhelper"
1314
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
1415
"github.com/stretchr/testify/require"
@@ -240,3 +241,28 @@ func TestSomeOfStoreUnsupported(t *testing.T) {
240241
req.NoError(err)
241242
req.Equal(cp, s.MinValue())
242243
}
244+
245+
func TestEncounterError(t *testing.T) {
246+
req := require.New(t)
247+
ctx := context.Background()
248+
c := createFakeCluster(t, 4, true)
249+
c.splitAndScatter("0001", "0002", "0003", "0008", "0009", "0010", "0100", "0956", "1000")
250+
251+
sub := streamhelper.NewSubscriber(c, c)
252+
installSubscribeSupport(c)
253+
req.NoError(sub.UpdateStoreTopology(ctx))
254+
255+
o := new(sync.Once)
256+
failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/streamhelper/listen_flush_stream", func(storeID uint64, err *error) {
257+
o.Do(func() {
258+
*err = context.Canceled
259+
})
260+
})
261+
262+
c.flushAll()
263+
require.Eventually(t, func() bool {
264+
return sub.PendingErrors() != nil
265+
}, 3*time.Second, 100*time.Millisecond)
266+
sub.HandleErrors(context.Background())
267+
require.NoError(t, sub.PendingErrors())
268+
}

0 commit comments

Comments
 (0)