Skip to content

Commit 8645a66

Browse files
YuJuncenti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59559
Signed-off-by: ti-chi-bot <[email protected]>
1 parent aa94e26 commit 8645a66

File tree

3 files changed

+31
-4
lines changed

3 files changed

+31
-4
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ go_test(
6969
],
7070
flaky = True,
7171
race = "on",
72+
<<<<<<< HEAD
7273
shard_count = 34,
74+
=======
75+
shard_count = 36,
76+
>>>>>>> 8e233a32215 (streamhelper: don't cut down connection when subscription stream closed (#59559))
7377
deps = [
7478
":streamhelper",
7579
"//br/pkg/errors",

br/pkg/streamhelper/flush_subscriber.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package streamhelper
44

55
import (
66
"context"
7-
"io"
87
"strconv"
98
"sync"
109
"time"
@@ -278,12 +277,10 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
278277
// Shall we use RecvMsg for better performance?
279278
// Note that the spans.Full requires the input slice be immutable.
280279
msg, err := cli.Recv()
280+
failpoint.InjectCall("listen_flush_stream", s.storeID, &err)
281281
if err != nil {
282282
logutil.CL(ctx).Info("Listen stopped.",
283283
zap.Uint64("store", storeID), logutil.ShortError(err))
284-
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
285-
return
286-
}
287284
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
288285
return
289286
}

br/pkg/streamhelper/subscription_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/pingcap/failpoint"
1213
"github.com/pingcap/tidb/br/pkg/streamhelper"
1314
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
1415
"github.com/stretchr/testify/require"
@@ -240,3 +241,28 @@ func TestSomeOfStoreUnsupported(t *testing.T) {
240241
req.NoError(err)
241242
req.Equal(cp, s.MinValue())
242243
}
244+
245+
func TestEncounterError(t *testing.T) {
246+
req := require.New(t)
247+
ctx := context.Background()
248+
c := createFakeCluster(t, 4, true)
249+
c.splitAndScatter("0001", "0002", "0003", "0008", "0009", "0010", "0100", "0956", "1000")
250+
251+
sub := streamhelper.NewSubscriber(c, c)
252+
installSubscribeSupport(c)
253+
req.NoError(sub.UpdateStoreTopology(ctx))
254+
255+
o := new(sync.Once)
256+
failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/streamhelper/listen_flush_stream", func(storeID uint64, err *error) {
257+
o.Do(func() {
258+
*err = context.Canceled
259+
})
260+
})
261+
262+
c.flushAll()
263+
require.Eventually(t, func() bool {
264+
return sub.PendingErrors() != nil
265+
}, 3*time.Second, 100*time.Millisecond)
266+
sub.HandleErrors(context.Background())
267+
require.NoError(t, sub.PendingErrors())
268+
}

0 commit comments

Comments
 (0)