Skip to content

Commit b003558

Browse files
authored
log_backup: Fix owner transfer panic (pingcap#47537) (pingcap#47558)
close pingcap#47533
1 parent a56af1b commit b003558

File tree

5 files changed

+52
-11
lines changed

5 files changed

+52
-11
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ go_test(
6868
],
6969
flaky = True,
7070
race = "on",
71-
shard_count = 19,
71+
shard_count = 20,
7272
deps = [
7373
":streamhelper",
7474
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ func (c *CheckpointAdvancer) stopSubscriber() {
443443
c.subscriber = nil
444444
}
445445

446-
func (c *CheckpointAdvancer) spawnSubscriptionHandler(ctx context.Context) {
446+
func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
447447
c.subscriberMu.Lock()
448448
defer c.subscriberMu.Unlock()
449449
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
@@ -470,9 +470,12 @@ func (c *CheckpointAdvancer) spawnSubscriptionHandler(ctx context.Context) {
470470
}
471471

472472
func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
473+
c.subscriberMu.Lock()
474+
defer c.subscriberMu.Unlock()
473475
if c.subscriber == nil {
474476
return nil
475477
}
478+
failpoint.Inject("get_subscriber", nil)
476479
if err := c.subscriber.UpdateStoreTopology(ctx); err != nil {
477480
log.Warn("[log backup advancer] Error when updating store topology.", logutil.ShortError(err))
478481
}

br/pkg/streamhelper/advancer_daemon.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
3434
// OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called.
3535
func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context) {
3636
metrics.AdvancerOwner.Set(1.0)
37-
c.spawnSubscriptionHandler(ctx)
37+
c.SpawnSubscriptionHandler(ctx)
3838
go func() {
3939
<-ctx.Done()
40-
c.onStop()
40+
c.OnStop()
4141
}()
4242
}
4343

@@ -46,7 +46,7 @@ func (c *CheckpointAdvancer) Name() string {
4646
return "LogBackup::Advancer"
4747
}
4848

49-
func (c *CheckpointAdvancer) onStop() {
49+
func (c *CheckpointAdvancer) OnStop() {
5050
metrics.AdvancerOwner.Set(0.0)
5151
c.stopSubscriber()
5252
}

br/pkg/streamhelper/advancer_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,38 @@ func TestResolveLock(t *testing.T) {
361361
require.Len(t, r.FailureSubRanges, 0)
362362
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
363363
}
364+
365+
func TestOwnerDropped(t *testing.T) {
366+
ctx := context.Background()
367+
c := createFakeCluster(t, 4, false)
368+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
369+
installSubscribeSupport(c)
370+
env := &testEnv{testCtx: t, fakeCluster: c}
371+
fp := "github.com/pingcap/tidb/br/pkg/streamhelper/get_subscriber"
372+
defer func() {
373+
if t.Failed() {
374+
fmt.Println(c)
375+
}
376+
}()
377+
378+
adv := streamhelper.NewCheckpointAdvancer(env)
379+
adv.OnStart(ctx)
380+
adv.SpawnSubscriptionHandler(ctx)
381+
require.NoError(t, adv.OnTick(ctx))
382+
failpoint.Enable(fp, "pause")
383+
ch := make(chan struct{})
384+
go func() {
385+
defer close(ch)
386+
require.NoError(t, adv.OnTick(ctx))
387+
}()
388+
adv.OnStop()
389+
failpoint.Disable(fp)
390+
391+
cp := c.advanceCheckpoints()
392+
c.flushAll()
393+
<-ch
394+
adv.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
395+
// Advancer will manually poll the checkpoint...
396+
require.Equal(t, vsf.MinValue(), cp)
397+
})
398+
}

br/pkg/streamhelper/flush_subscriber.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,10 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
230230
cancel()
231231
return errors.Annotate(err, "failed to subscribe events")
232232
}
233+
lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID),
234+
zap.String("category", "log backup flush subscriber"))
233235
s.cancel = cancel
234-
s.background = spawnJoinable(func() { s.listenOver(cli) })
236+
s.background = spawnJoinable(func() { s.listenOver(lcx, cli) })
235237
return nil
236238
}
237239

@@ -244,15 +246,16 @@ func (s *subscription) close() {
244246
// because it is a ever-sharing channel.
245247
}
246248

247-
func (s *subscription) listenOver(cli eventStream) {
249+
func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
248250
storeID := s.storeID
249-
log.Info("[log backup flush subscriber] Listen starting.", zap.Uint64("store", storeID))
251+
logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID))
250252
for {
251253
// Shall we use RecvMsg for better performance?
252254
// Note that the spans.Full requires the input slice be immutable.
253255
msg, err := cli.Recv()
254256
if err != nil {
255-
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
257+
logutil.CL(ctx).Info("Listen stopped.",
258+
zap.Uint64("store", storeID), logutil.ShortError(err))
256259
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
257260
return
258261
}
@@ -263,13 +266,13 @@ func (s *subscription) listenOver(cli eventStream) {
263266
for _, m := range msg.Events {
264267
start, err := decodeKey(m.StartKey)
265268
if err != nil {
266-
log.Warn("start key not encoded, skipping",
269+
logutil.CL(ctx).Warn("start key not encoded, skipping",
267270
logutil.Key("event", m.StartKey), logutil.ShortError(err))
268271
continue
269272
}
270273
end, err := decodeKey(m.EndKey)
271274
if err != nil {
272-
log.Warn("end key not encoded, skipping",
275+
logutil.CL(ctx).Warn("end key not encoded, skipping",
273276
logutil.Key("event", m.EndKey), logutil.ShortError(err))
274277
continue
275278
}

0 commit comments

Comments
 (0)