Skip to content

Commit 3fbf496

Browse files
authored
log backup: use global checkpoint ts as source of truth (#58135) (#59369)
close #58031
1 parent e8a4d8e commit 3fbf496

File tree

4 files changed

+161
-21
lines changed

4 files changed

+161
-21
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ go_test(
6969
],
7070
flaky = True,
7171
race = "on",
72-
shard_count = 33,
72+
shard_count = 34,
7373
deps = [
7474
":streamhelper",
7575
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
425425
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
426426
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
427427
if err != nil {
428-
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
429-
return err
428+
// ignore the error, just log it
429+
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
430430
}
431431
if globalCheckpointTs < c.task.StartTs {
432432
globalCheckpointTs = c.task.StartTs
@@ -567,13 +567,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
567567
if c.cfg.CheckPointLagLimit <= 0 {
568568
return false, nil
569569
}
570+
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
571+
if err != nil {
572+
return false, err
573+
}
574+
if globalTs < c.task.StartTs {
575+
// unreachable.
576+
return false, nil
577+
}
570578

571579
now, err := c.env.FetchCurrentTS(ctx)
572580
if err != nil {
573581
return false, err
574582
}
575583

576-
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
584+
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
577585
if lagDuration > c.cfg.CheckPointLagLimit {
578586
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
579587
zap.Stringer("lag", lagDuration))
@@ -591,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
591599
}
592600
isLagged, err := c.isCheckpointLagged(ctx)
593601
if err != nil {
594-
return errors.Annotate(err, "failed to check timestamp")
602+
// ignore the error, just log it
603+
log.Warn("failed to check timestamp", logutil.ShortError(err))
595604
}
596605
if isLagged {
597606
err := c.env.PauseTask(ctx, c.task.Name)
@@ -656,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
656665
c.taskMu.Lock()
657666
defer c.taskMu.Unlock()
658667
if c.task == nil || c.isPaused.Load() {
659-
log.Debug("No tasks yet, skipping advancing.")
668+
log.Info("No tasks yet, skipping advancing.")
660669
return nil
661670
}
662671

br/pkg/streamhelper/advancer_test.go

Lines changed: 136 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"bytes"
77
"context"
88
"fmt"
9-
"strings"
109
"sync"
1110
"testing"
1211
"time"
@@ -519,6 +518,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
519518
}
520519
}
521520

521+
func TestOwnerChangeCheckPointLagged(t *testing.T) {
522+
c := createFakeCluster(t, 4, false)
523+
defer func() {
524+
fmt.Println(c)
525+
}()
526+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
527+
ctx, cancel := context.WithCancel(context.Background())
528+
defer cancel()
529+
530+
env := newTestEnv(c, t)
531+
rngs := env.ranges
532+
if len(rngs) == 0 {
533+
rngs = []kv.KeyRange{{}}
534+
}
535+
env.task = streamhelper.TaskEvent{
536+
Type: streamhelper.EventAdd,
537+
Name: "whole",
538+
Info: &backup.StreamBackupTaskInfo{
539+
Name: "whole",
540+
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
541+
},
542+
Ranges: rngs,
543+
}
544+
545+
adv := streamhelper.NewCheckpointAdvancer(env)
546+
adv.UpdateConfigWith(func(c *config.Config) {
547+
c.CheckPointLagLimit = 1 * time.Minute
548+
})
549+
ctx1, cancel1 := context.WithCancel(context.Background())
550+
adv.OnStart(ctx1)
551+
adv.OnBecomeOwner(ctx1)
552+
log.Info("advancer1 become owner")
553+
require.NoError(t, adv.OnTick(ctx1))
554+
555+
// another advancer but never advance checkpoint before
556+
adv2 := streamhelper.NewCheckpointAdvancer(env)
557+
adv2.UpdateConfigWith(func(c *config.Config) {
558+
c.CheckPointLagLimit = 1 * time.Minute
559+
})
560+
ctx2, cancel2 := context.WithCancel(context.Background())
561+
adv2.OnStart(ctx2)
562+
563+
for i := 0; i < 5; i++ {
564+
c.advanceClusterTimeBy(2 * time.Minute)
565+
c.advanceCheckpointBy(2 * time.Minute)
566+
require.NoError(t, adv.OnTick(ctx1))
567+
}
568+
c.advanceClusterTimeBy(2 * time.Minute)
569+
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")
570+
571+
// resume task to make next tick normally
572+
c.advanceCheckpointBy(2 * time.Minute)
573+
env.ResumeTask(ctx)
574+
575+
// stop advancer1, and advancer2 should take over
576+
cancel1()
577+
log.Info("advancer1 owner canceled, and advancer2 become owner")
578+
adv2.OnBecomeOwner(ctx2)
579+
require.NoError(t, adv2.OnTick(ctx2))
580+
581+
// advancer2 should take over and tick normally
582+
for i := 0; i < 10; i++ {
583+
c.advanceClusterTimeBy(2 * time.Minute)
584+
c.advanceCheckpointBy(2 * time.Minute)
585+
require.NoError(t, adv2.OnTick(ctx2))
586+
}
587+
c.advanceClusterTimeBy(2 * time.Minute)
588+
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
589+
// stop advancer2, and advancer1 should take over
590+
c.advanceCheckpointBy(2 * time.Minute)
591+
env.ResumeTask(ctx)
592+
cancel2()
593+
log.Info("advancer2 owner canceled, and advancer1 become owner")
594+
595+
adv.OnBecomeOwner(ctx)
596+
// advancer1 should take over and tick normally when come back
597+
require.NoError(t, adv.OnTick(ctx))
598+
}
599+
522600
func TestCheckPointLagged(t *testing.T) {
523601
c := createFakeCluster(t, 4, false)
524602
defer func() {
@@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
548626
})
549627
adv.StartTaskListener(ctx)
550628
c.advanceClusterTimeBy(2 * time.Minute)
629+
// if global ts is not advanced, the checkpoint will not be lagged
630+
c.advanceCheckpointBy(2 * time.Minute)
551631
require.NoError(t, adv.OnTick(ctx))
552-
c.advanceClusterTimeBy(1 * time.Minute)
632+
c.advanceClusterTimeBy(3 * time.Minute)
553633
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
554634
// after some times, the isPaused will be set and ticks are skipped
555635
require.Eventually(t, func() bool {
@@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
573653
})
574654
adv.StartTaskListener(ctx)
575655
c.advanceClusterTimeBy(1 * time.Minute)
656+
// if global ts is not advanced, the checkpoint will not be lagged
657+
c.advanceCheckpointBy(1 * time.Minute)
576658
require.NoError(t, adv.OnTick(ctx))
577-
c.advanceClusterTimeBy(1 * time.Minute)
659+
c.advanceClusterTimeBy(2 * time.Minute)
578660
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
579661
require.Eventually(t, func() bool {
580662
return assert.NoError(t, adv.OnTick(ctx))
@@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) {
604686
c.CheckPointLagLimit = 1 * time.Minute
605687
})
606688
adv.StartTaskListener(ctx)
689+
690+
// wait for the task to be added
691+
require.Eventually(t, func() bool {
692+
return adv.HasTask()
693+
}, 5*time.Second, 100*time.Millisecond)
694+
695+
// task is should be paused when global checkpoint is laggeod
696+
// even the global checkpoint is equal to task start ts(not advanced all the time)
607697
c.advanceClusterTimeBy(1 * time.Minute)
608698
require.NoError(t, adv.OnTick(ctx))
609699
env.PauseTask(ctx, "whole")
610-
time.Sleep(1 * time.Second)
611700
c.advanceClusterTimeBy(1 * time.Minute)
701+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
702+
env.unregisterTask()
703+
env.putTask()
704+
705+
// wait for the task to be added
706+
require.Eventually(t, func() bool {
707+
return adv.HasTask()
708+
}, 5*time.Second, 100*time.Millisecond)
709+
710+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
711+
712+
env.unregisterTask()
713+
// wait for the task to be deleted
714+
require.Eventually(t, func() bool {
715+
return !adv.HasTask()
716+
}, 5*time.Second, 100*time.Millisecond)
717+
718+
// reset
719+
c.advanceClusterTimeBy(-1 * time.Minute)
612720
require.NoError(t, adv.OnTick(ctx))
721+
env.PauseTask(ctx, "whole")
722+
c.advanceClusterTimeBy(1 * time.Minute)
613723
env.unregisterTask()
614724
env.putTask()
725+
// wait for the task to be add
615726
require.Eventually(t, func() bool {
616-
err := adv.OnTick(ctx)
617-
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
618-
}, 5*time.Second, 300*time.Millisecond)
727+
return adv.HasTask()
728+
}, 5*time.Second, 100*time.Millisecond)
729+
730+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
619731
}
620732

621733
// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
@@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
727839
adv.UpdateConfigWith(func(c *config.Config) {
728840
c.CheckPointLagLimit = 1 * time.Minute
729841
})
842+
adv.StartTaskListener(ctx)
730843
c.advanceClusterTimeBy(3 * time.Minute)
731844
c.advanceCheckpointBy(1 * time.Minute)
732845
env.advanceCheckpointBy(2 * time.Minute)
733846
env.mockPDConnectionError()
734-
adv.StartTaskListener(ctx)
735-
// Try update checkpoint
736-
require.NoError(t, adv.OnTick(ctx))
847+
// if cannot connect to pd, the checkpoint will be rolled back
848+
// because at this point. the global ts is 2 minutes
849+
// and the local checkpoint ts is 1 minute
850+
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")
851+
852+
// only when local checkpoint > global ts, the next tick will be normal
853+
c.advanceCheckpointBy(12 * time.Minute)
737854
// Verify no err raised
738855
require.NoError(t, adv.OnTick(ctx))
739856
}
@@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
767884
adv.UpdateConfigWith(func(c *config.Config) {
768885
c.CheckPointLagLimit = 1 * time.Minute
769886
})
770-
c.advanceClusterTimeBy(3 * time.Minute)
887+
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
888+
// if start ts equals to checkpoint, the task will not be paused
889+
adv.StartTaskListener(ctx)
890+
c.advanceClusterTimeBy(2 * time.Minute)
891+
c.advanceCheckpointBy(1 * time.Minute)
892+
env.advanceCheckpointBy(1 * time.Minute)
893+
require.NoError(t, adv.OnTick(ctx))
894+
895+
c.advanceClusterTimeBy(2 * time.Minute)
771896
c.advanceCheckpointBy(1 * time.Minute)
772897
env.advanceCheckpointBy(1 * time.Minute)
773-
env.mockPDConnectionError()
774-
adv.StartTaskListener(ctx)
775898
// Try update checkpoint
776899
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
777900
// Verify no err raised after paused

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
685685
defer t.mu.Unlock()
686686

687687
if checkpoint < t.checkpoint {
688-
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
688+
log.Error("checkpoint rolling back",
689+
zap.Uint64("from", t.checkpoint),
690+
zap.Uint64("to", checkpoint),
691+
zap.Stack("stack"))
692+
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
693+
return errors.New("checkpoint rolling back")
689694
}
690695
t.checkpoint = checkpoint
691696
return nil
@@ -745,6 +750,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
745750
t.mu.Lock()
746751
defer t.mu.Unlock()
747752

753+
log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))
754+
748755
t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
749756
}
750757

@@ -764,7 +771,8 @@ func (t *testEnv) putTask() {
764771
Type: streamhelper.EventAdd,
765772
Name: "whole",
766773
Info: &backup.StreamBackupTaskInfo{
767-
Name: "whole",
774+
Name: "whole",
775+
StartTs: 5,
768776
},
769777
Ranges: rngs,
770778
}

0 commit comments

Comments
 (0)