Skip to content

Commit cc5e4e2

Browse files
authored
log backup: use global checkpoint ts as source of truth (#58135) (#59061)
close #58031
1 parent 57b9e4b commit cc5e4e2

File tree

3 files changed

+164
-20
lines changed

3 files changed

+164
-20
lines changed

br/pkg/streamhelper/advancer.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
431431
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
432432
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
433433
if err != nil {
434-
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
435-
return err
434+
// ignore the error, just log it
435+
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
436436
}
437437
if globalCheckpointTs < c.task.StartTs {
438438
globalCheckpointTs = c.task.StartTs
@@ -566,13 +566,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
566566
if c.cfg.CheckPointLagLimit <= 0 {
567567
return false, nil
568568
}
569+
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
570+
if err != nil {
571+
return false, err
572+
}
573+
if globalTs < c.task.StartTs {
574+
// unreachable.
575+
return false, nil
576+
}
569577

570578
now, err := c.env.FetchCurrentTS(ctx)
571579
if err != nil {
572580
return false, err
573581
}
574582

575-
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
583+
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
576584
if lagDuration > c.cfg.CheckPointLagLimit {
577585
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
578586
zap.Stringer("lag", lagDuration))
@@ -590,7 +598,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
590598
}
591599
isLagged, err := c.isCheckpointLagged(ctx)
592600
if err != nil {
593-
return errors.Annotate(err, "failed to check timestamp")
601+
// ignore the error, just log it
602+
log.Warn("failed to check timestamp", logutil.ShortError(err))
594603
}
595604
if isLagged {
596605
err := c.env.PauseTask(ctx, c.task.Name)
@@ -640,7 +649,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
640649
c.taskMu.Lock()
641650
defer c.taskMu.Unlock()
642651
if c.task == nil || c.isPaused.Load() {
643-
log.Debug("No tasks yet, skipping advancing.")
652+
log.Info("No tasks yet, skipping advancing.")
644653
return nil
645654
}
646655

br/pkg/streamhelper/advancer_test.go

Lines changed: 136 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"bytes"
77
"context"
88
"fmt"
9-
"strings"
109
"sync"
1110
"testing"
1211
"time"
@@ -498,6 +497,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
498497
}
499498
}
500499

500+
func TestOwnerChangeCheckPointLagged(t *testing.T) {
501+
c := createFakeCluster(t, 4, false)
502+
defer func() {
503+
fmt.Println(c)
504+
}()
505+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
506+
ctx, cancel := context.WithCancel(context.Background())
507+
defer cancel()
508+
509+
env := newTestEnv(c, t)
510+
rngs := env.ranges
511+
if len(rngs) == 0 {
512+
rngs = []kv.KeyRange{{}}
513+
}
514+
env.task = streamhelper.TaskEvent{
515+
Type: streamhelper.EventAdd,
516+
Name: "whole",
517+
Info: &backup.StreamBackupTaskInfo{
518+
Name: "whole",
519+
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
520+
},
521+
Ranges: rngs,
522+
}
523+
524+
adv := streamhelper.NewCheckpointAdvancer(env)
525+
adv.UpdateConfigWith(func(c *config.Config) {
526+
c.CheckPointLagLimit = 1 * time.Minute
527+
})
528+
ctx1, cancel1 := context.WithCancel(context.Background())
529+
adv.OnStart(ctx1)
530+
adv.OnBecomeOwner(ctx1)
531+
log.Info("advancer1 become owner")
532+
require.NoError(t, adv.OnTick(ctx1))
533+
534+
// another advancer but never advance checkpoint before
535+
adv2 := streamhelper.NewCheckpointAdvancer(env)
536+
adv2.UpdateConfigWith(func(c *config.Config) {
537+
c.CheckPointLagLimit = 1 * time.Minute
538+
})
539+
ctx2, cancel2 := context.WithCancel(context.Background())
540+
adv2.OnStart(ctx2)
541+
542+
for i := 0; i < 5; i++ {
543+
c.advanceClusterTimeBy(2 * time.Minute)
544+
c.advanceCheckpointBy(2 * time.Minute)
545+
require.NoError(t, adv.OnTick(ctx1))
546+
}
547+
c.advanceClusterTimeBy(2 * time.Minute)
548+
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")
549+
550+
// resume task to make next tick normally
551+
c.advanceCheckpointBy(2 * time.Minute)
552+
env.ResumeTask(ctx)
553+
554+
// stop advancer1, and advancer2 should take over
555+
cancel1()
556+
log.Info("advancer1 owner canceled, and advancer2 become owner")
557+
adv2.OnBecomeOwner(ctx2)
558+
require.NoError(t, adv2.OnTick(ctx2))
559+
560+
// advancer2 should take over and tick normally
561+
for i := 0; i < 10; i++ {
562+
c.advanceClusterTimeBy(2 * time.Minute)
563+
c.advanceCheckpointBy(2 * time.Minute)
564+
require.NoError(t, adv2.OnTick(ctx2))
565+
}
566+
c.advanceClusterTimeBy(2 * time.Minute)
567+
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
568+
// stop advancer2, and advancer1 should take over
569+
c.advanceCheckpointBy(2 * time.Minute)
570+
env.ResumeTask(ctx)
571+
cancel2()
572+
log.Info("advancer2 owner canceled, and advancer1 become owner")
573+
574+
adv.OnBecomeOwner(ctx)
575+
// advancer1 should take over and tick normally when come back
576+
require.NoError(t, adv.OnTick(ctx))
577+
}
578+
501579
func TestCheckPointLagged(t *testing.T) {
502580
c := createFakeCluster(t, 4, false)
503581
defer func() {
@@ -528,8 +606,10 @@ func TestCheckPointLagged(t *testing.T) {
528606
})
529607
adv.StartTaskListener(ctx)
530608
c.advanceClusterTimeBy(2 * time.Minute)
609+
// if global ts is not advanced, the checkpoint will not be lagged
610+
c.advanceCheckpointBy(2 * time.Minute)
531611
require.NoError(t, adv.OnTick(ctx))
532-
c.advanceClusterTimeBy(1 * time.Minute)
612+
c.advanceClusterTimeBy(3 * time.Minute)
533613
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
534614
// after some times, the isPaused will be set and ticks are skipped
535615
require.Eventually(t, func() bool {
@@ -553,8 +633,10 @@ func TestCheckPointResume(t *testing.T) {
553633
})
554634
adv.StartTaskListener(ctx)
555635
c.advanceClusterTimeBy(1 * time.Minute)
636+
// if global ts is not advanced, the checkpoint will not be lagged
637+
c.advanceCheckpointBy(1 * time.Minute)
556638
require.NoError(t, adv.OnTick(ctx))
557-
c.advanceClusterTimeBy(1 * time.Minute)
639+
c.advanceClusterTimeBy(2 * time.Minute)
558640
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
559641
require.Eventually(t, func() bool {
560642
return assert.NoError(t, adv.OnTick(ctx))
@@ -584,18 +666,48 @@ func TestUnregisterAfterPause(t *testing.T) {
584666
c.CheckPointLagLimit = 1 * time.Minute
585667
})
586668
adv.StartTaskListener(ctx)
669+
670+
// wait for the task to be added
671+
require.Eventually(t, func() bool {
672+
return adv.HasTask()
673+
}, 5*time.Second, 100*time.Millisecond)
674+
675+
// task is should be paused when global checkpoint is laggeod
676+
// even the global checkpoint is equal to task start ts(not advanced all the time)
587677
c.advanceClusterTimeBy(1 * time.Minute)
588678
require.NoError(t, adv.OnTick(ctx))
589679
env.PauseTask(ctx, "whole")
590-
time.Sleep(1 * time.Second)
591680
c.advanceClusterTimeBy(1 * time.Minute)
681+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
682+
env.unregisterTask()
683+
env.putTask()
684+
685+
// wait for the task to be added
686+
require.Eventually(t, func() bool {
687+
return adv.HasTask()
688+
}, 5*time.Second, 100*time.Millisecond)
689+
690+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
691+
692+
env.unregisterTask()
693+
// wait for the task to be deleted
694+
require.Eventually(t, func() bool {
695+
return !adv.HasTask()
696+
}, 5*time.Second, 100*time.Millisecond)
697+
698+
// reset
699+
c.advanceClusterTimeBy(-1 * time.Minute)
592700
require.NoError(t, adv.OnTick(ctx))
701+
env.PauseTask(ctx, "whole")
702+
c.advanceClusterTimeBy(1 * time.Minute)
593703
env.unregisterTask()
594704
env.putTask()
705+
// wait for the task to be add
595706
require.Eventually(t, func() bool {
596-
err := adv.OnTick(ctx)
597-
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
598-
}, 5*time.Second, 300*time.Millisecond)
707+
return adv.HasTask()
708+
}, 5*time.Second, 100*time.Millisecond)
709+
710+
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
599711
}
600712

601713
// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
@@ -707,13 +819,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
707819
adv.UpdateConfigWith(func(c *config.Config) {
708820
c.CheckPointLagLimit = 1 * time.Minute
709821
})
822+
adv.StartTaskListener(ctx)
710823
c.advanceClusterTimeBy(3 * time.Minute)
711824
c.advanceCheckpointBy(1 * time.Minute)
712825
env.advanceCheckpointBy(2 * time.Minute)
713826
env.mockPDConnectionError()
714-
adv.StartTaskListener(ctx)
715-
// Try update checkpoint
716-
require.NoError(t, adv.OnTick(ctx))
827+
// if cannot connect to pd, the checkpoint will be rolled back
828+
// because at this point. the global ts is 2 minutes
829+
// and the local checkpoint ts is 1 minute
830+
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")
831+
832+
// only when local checkpoint > global ts, the next tick will be normal
833+
c.advanceCheckpointBy(12 * time.Minute)
717834
// Verify no err raised
718835
require.NoError(t, adv.OnTick(ctx))
719836
}
@@ -747,11 +864,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
747864
adv.UpdateConfigWith(func(c *config.Config) {
748865
c.CheckPointLagLimit = 1 * time.Minute
749866
})
750-
c.advanceClusterTimeBy(3 * time.Minute)
867+
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
868+
// if start ts equals to checkpoint, the task will not be paused
869+
adv.StartTaskListener(ctx)
870+
c.advanceClusterTimeBy(2 * time.Minute)
871+
c.advanceCheckpointBy(1 * time.Minute)
872+
env.advanceCheckpointBy(1 * time.Minute)
873+
require.NoError(t, adv.OnTick(ctx))
874+
875+
c.advanceClusterTimeBy(2 * time.Minute)
751876
c.advanceCheckpointBy(1 * time.Minute)
752877
env.advanceCheckpointBy(1 * time.Minute)
753-
env.mockPDConnectionError()
754-
adv.StartTaskListener(ctx)
755878
// Try update checkpoint
756879
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
757880
// Verify no err raised after paused

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
681681
defer t.mu.Unlock()
682682

683683
if checkpoint < t.checkpoint {
684-
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
684+
log.Error("checkpoint rolling back",
685+
zap.Uint64("from", t.checkpoint),
686+
zap.Uint64("to", checkpoint),
687+
zap.Stack("stack"))
688+
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
689+
return errors.New("checkpoint rolling back")
685690
}
686691
t.checkpoint = checkpoint
687692
return nil
@@ -741,6 +746,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
741746
t.mu.Lock()
742747
defer t.mu.Unlock()
743748

749+
log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))
750+
744751
t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
745752
}
746753

@@ -760,14 +767,17 @@ func (t *testEnv) putTask() {
760767
Type: streamhelper.EventAdd,
761768
Name: "whole",
762769
Info: &backup.StreamBackupTaskInfo{
763-
Name: "whole",
770+
Name: "whole",
771+
StartTs: 5,
764772
},
765773
Ranges: rngs,
766774
}
767775
t.taskCh <- tsk
768776
}
769777

770778
func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
779+
t.mu.Lock()
780+
defer t.mu.Unlock()
771781
if t.maxTs != maxVersion {
772782
return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion)
773783
}
@@ -789,6 +799,8 @@ func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersio
789799
}
790800

791801
func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
802+
t.mu.Lock()
803+
defer t.mu.Unlock()
792804
for _, r := range t.regions {
793805
if loc != nil && loc.Region.GetID() == r.id {
794806
// reset locks

0 commit comments

Comments
 (0)