Skip to content

Commit 87ed1fe

Browse files
authored
log backup: use global checkpoint ts as source of truth (#58135) (#58265)
close #58031
1 parent 6ec3d04 commit 87ed1fe

File tree

4 files changed

+161
-21
lines changed

4 files changed

+161
-21
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ go_test(
6969
],
7070
flaky = True,
7171
race = "on",
72-
shard_count = 34,
72+
shard_count = 35,
7373
deps = [
7474
":streamhelper",
7575
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
425425
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
426426
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
427427
if err != nil {
428-
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
429-
return err
428+
// ignore the error, just log it
429+
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
430430
}
431431
if globalCheckpointTs < c.task.StartTs {
432432
globalCheckpointTs = c.task.StartTs
@@ -567,13 +567,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
567567
if c.cfg.CheckPointLagLimit <= 0 {
568568
return false, nil
569569
}
570+
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
571+
if err != nil {
572+
return false, err
573+
}
574+
if globalTs < c.task.StartTs {
575+
// unreachable.
576+
return false, nil
577+
}
570578

571579
now, err := c.env.FetchCurrentTS(ctx)
572580
if err != nil {
573581
return false, err
574582
}
575583

576-
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
584+
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
577585
if lagDuration > c.cfg.CheckPointLagLimit {
578586
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
579587
zap.Stringer("lag", lagDuration))
@@ -591,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
591599
}
592600
isLagged, err := c.isCheckpointLagged(ctx)
593601
if err != nil {
594-
return errors.Annotate(err, "failed to check timestamp")
602+
// ignore the error, just log it
603+
log.Warn("failed to check timestamp", logutil.ShortError(err))
595604
}
596605
if isLagged {
597606
err := c.env.PauseTask(ctx, c.task.Name)
@@ -656,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
656665
c.taskMu.Lock()
657666
defer c.taskMu.Unlock()
658667
if c.task == nil || c.isPaused.Load() {
659-
log.Debug("No tasks yet, skipping advancing.")
668+
log.Info("No tasks yet, skipping advancing.")
660669
return nil
661670
}
662671

br/pkg/streamhelper/advancer_test.go

Lines changed: 136 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"bytes"
77
"context"
88
"fmt"
9-
"strings"
109
"sync"
1110
"testing"
1211
"time"
@@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
518517
}
519518
}
520519

520+
func TestOwnerChangeCheckPointLagged(t *testing.T) {
521+
c := createFakeCluster(t, 4, false)
522+
defer func() {
523+
fmt.Println(c)
524+
}()
525+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
526+
ctx, cancel := context.WithCancel(context.Background())
527+
defer cancel()
528+
529+
env := newTestEnv(c, t)
530+
rngs := env.ranges
531+
if len(rngs) == 0 {
532+
rngs = []kv.KeyRange{{}}
533+
}
534+
env.task = streamhelper.TaskEvent{
535+
Type: streamhelper.EventAdd,
536+
Name: "whole",
537+
Info: &backup.StreamBackupTaskInfo{
538+
Name: "whole",
539+
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
540+
},
541+
Ranges: rngs,
542+
}
543+
544+
adv := streamhelper.NewCheckpointAdvancer(env)
545+
adv.UpdateConfigWith(func(c *config.Config) {
546+
c.CheckPointLagLimit = 1 * time.Minute
547+
})
548+
ctx1, cancel1 := context.WithCancel(context.Background())
549+
adv.OnStart(ctx1)
550+
adv.OnBecomeOwner(ctx1)
551+
log.Info("advancer1 become owner")
552+
require.NoError(t, adv.OnTick(ctx1))
553+
554+
// another advancer but never advance checkpoint before
555+
adv2 := streamhelper.NewCheckpointAdvancer(env)
556+
adv2.UpdateConfigWith(func(c *config.Config) {
557+
c.CheckPointLagLimit = 1 * time.Minute
558+
})
559+
ctx2, cancel2 := context.WithCancel(context.Background())
560+
adv2.OnStart(ctx2)
561+
562+
for i := 0; i < 5; i++ {
563+
c.advanceClusterTimeBy(2 * time.Minute)
564+
c.advanceCheckpointBy(2 * time.Minute)
565+
require.NoError(t, adv.OnTick(ctx1))
566+
}
567+
c.advanceClusterTimeBy(2 * time.Minute)
568+
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")
569+
570+
// resume task to make next tick normally
571+
c.advanceCheckpointBy(2 * time.Minute)
572+
env.ResumeTask(ctx)
573+
574+
// stop advancer1, and advancer2 should take over
575+
cancel1()
576+
log.Info("advancer1 owner canceled, and advancer2 become owner")
577+
adv2.OnBecomeOwner(ctx2)
578+
require.NoError(t, adv2.OnTick(ctx2))
579+
580+
// advancer2 should take over and tick normally
581+
for i := 0; i < 10; i++ {
582+
c.advanceClusterTimeBy(2 * time.Minute)
583+
c.advanceCheckpointBy(2 * time.Minute)
584+
require.NoError(t, adv2.OnTick(ctx2))
585+
}
586+
c.advanceClusterTimeBy(2 * time.Minute)
587+
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
588+
// stop advancer2, and advancer1 should take over
589+
c.advanceCheckpointBy(2 * time.Minute)
590+
env.ResumeTask(ctx)
591+
cancel2()
592+
log.Info("advancer2 owner canceled, and advancer1 become owner")
593+
594+
adv.OnBecomeOwner(ctx)
595+
// advancer1 should take over and tick normally when come back
596+
require.NoError(t, adv.OnTick(ctx))
597+
}
598+
521599
func TestCheckPointLagged(t *testing.T) {
522600
c := createFakeCluster(t, 4, false)
523601
defer func() {
@@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
548626
})
549627
adv.StartTaskListener(ctx)
550628
c.advanceClusterTimeBy(2 * time.Minute)
629+
// if global ts is not advanced, the checkpoint will not be lagged
630+
c.advanceCheckpointBy(2 * time.Minute)
551631
require.NoError(t, adv.OnTick(ctx))
552-
c.advanceClusterTimeBy(1 * time.Minute)
632+
c.advanceClusterTimeBy(3 * time.Minute)
553633
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
554634
// after some times, the isPaused will be set and ticks are skipped
555635
require.Eventually(t, func() bool {
@@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
573653
})
574654
adv.StartTaskListener(ctx)
575655
c.advanceClusterTimeBy(1 * time.Minute)
656+
// if global ts is not advanced, the checkpoint will not be lagged
657+
c.advanceCheckpointBy(1 * time.Minute)
576658
require.NoError(t, adv.OnTick(ctx))
577-
c.advanceClusterTimeBy(1 * time.Minute)
659+
c.advanceClusterTimeBy(2 * time.Minute)
578660
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
579661
require.Eventually(t, func() bool {
580662
return assert.NoError(t, adv.OnTick(ctx))
@@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) {
604686
c.CheckPointLagLimit = 1 * time.Minute
605687
})
606688
adv.StartTaskListener(ctx)
689+
690+
// wait for the task to be added
691+
require.Eventually(t, func() bool {
692+
return adv.HasTask()
693+
}, 5*time.Second, 100*time.Millisecond)
694+
695+
// task is should be paused when global checkpoint is laggeod
696+
// even the global checkpoint is equal to task start ts(not advanced all the time)
607697
c.advanceClusterTimeBy(1 * time.Minute)
608698
require.NoError(t, adv.OnTick(ctx))
609699
env.PauseTask(ctx, "whole")
610-
time.Sleep(1 * time.Second)
611700
c.advanceClusterTimeBy(1 * time.Minute)
701+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
702+
env.unregisterTask()
703+
env.putTask()
704+
705+
// wait for the task to be added
706+
require.Eventually(t, func() bool {
707+
return adv.HasTask()
708+
}, 5*time.Second, 100*time.Millisecond)
709+
710+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
711+
712+
env.unregisterTask()
713+
// wait for the task to be deleted
714+
require.Eventually(t, func() bool {
715+
return !adv.HasTask()
716+
}, 5*time.Second, 100*time.Millisecond)
717+
718+
// reset
719+
c.advanceClusterTimeBy(-1 * time.Minute)
612720
require.NoError(t, adv.OnTick(ctx))
721+
env.PauseTask(ctx, "whole")
722+
c.advanceClusterTimeBy(1 * time.Minute)
613723
env.unregisterTask()
614724
env.putTask()
725+
// wait for the task to be add
615726
require.Eventually(t, func() bool {
616-
err := adv.OnTick(ctx)
617-
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
618-
}, 5*time.Second, 300*time.Millisecond)
727+
return adv.HasTask()
728+
}, 5*time.Second, 100*time.Millisecond)
729+
730+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
619731
}
620732

621733
// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
@@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
727839
adv.UpdateConfigWith(func(c *config.Config) {
728840
c.CheckPointLagLimit = 1 * time.Minute
729841
})
842+
adv.StartTaskListener(ctx)
730843
c.advanceClusterTimeBy(3 * time.Minute)
731844
c.advanceCheckpointBy(1 * time.Minute)
732845
env.advanceCheckpointBy(2 * time.Minute)
733846
env.mockPDConnectionError()
734-
adv.StartTaskListener(ctx)
735-
// Try update checkpoint
736-
require.NoError(t, adv.OnTick(ctx))
847+
// if cannot connect to pd, the checkpoint will be rolled back
848+
// because at this point. the global ts is 2 minutes
849+
// and the local checkpoint ts is 1 minute
850+
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")
851+
852+
// only when local checkpoint > global ts, the next tick will be normal
853+
c.advanceCheckpointBy(12 * time.Minute)
737854
// Verify no err raised
738855
require.NoError(t, adv.OnTick(ctx))
739856
}
@@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
767884
adv.UpdateConfigWith(func(c *config.Config) {
768885
c.CheckPointLagLimit = 1 * time.Minute
769886
})
770-
c.advanceClusterTimeBy(3 * time.Minute)
887+
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
888+
// if start ts equals to checkpoint, the task will not be paused
889+
adv.StartTaskListener(ctx)
890+
c.advanceClusterTimeBy(2 * time.Minute)
891+
c.advanceCheckpointBy(1 * time.Minute)
892+
env.advanceCheckpointBy(1 * time.Minute)
893+
require.NoError(t, adv.OnTick(ctx))
894+
895+
c.advanceClusterTimeBy(2 * time.Minute)
771896
c.advanceCheckpointBy(1 * time.Minute)
772897
env.advanceCheckpointBy(1 * time.Minute)
773-
env.mockPDConnectionError()
774-
adv.StartTaskListener(ctx)
775898
// Try update checkpoint
776899
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
777900
// Verify no err raised after paused

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
687687
defer t.mu.Unlock()
688688

689689
if checkpoint < t.checkpoint {
690-
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
690+
log.Error("checkpoint rolling back",
691+
zap.Uint64("from", t.checkpoint),
692+
zap.Uint64("to", checkpoint),
693+
zap.Stack("stack"))
694+
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
695+
return errors.New("checkpoint rolling back")
691696
}
692697
t.checkpoint = checkpoint
693698
return nil
@@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
747752
t.mu.Lock()
748753
defer t.mu.Unlock()
749754

755+
log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))
756+
750757
t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
751758
}
752759

@@ -766,7 +773,8 @@ func (t *testEnv) putTask() {
766773
Type: streamhelper.EventAdd,
767774
Name: "whole",
768775
Info: &backup.StreamBackupTaskInfo{
769-
Name: "whole",
776+
Name: "whole",
777+
StartTs: 5,
770778
},
771779
Ranges: rngs,
772780
}

0 commit comments

Comments
 (0)