Skip to content

Commit 366ed7d

Browse files
authored
br: fix Log Backup unexpected paused when adding a already long-running task (#53695) (#53947)
close #53561
1 parent 34687d9 commit 366ed7d

File tree

5 files changed

+280
-32
lines changed

5 files changed

+280
-32
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ go_test(
6868
],
6969
flaky = True,
7070
race = "on",
71-
shard_count = 27,
71+
shard_count = 28,
7272
deps = [
7373
":streamhelper",
7474
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,16 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
428428
c.task = e.Info
429429
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
430430
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
431-
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
431+
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
432+
if err != nil {
433+
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
434+
return err
435+
}
436+
if globalCheckpointTs < c.task.StartTs {
437+
globalCheckpointTs = c.task.StartTs
438+
}
439+
log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs))
440+
c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs)
432441
log.Info("added event", zap.Stringer("task", e.Info),
433442
zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
434443
case EventDel:

br/pkg/streamhelper/advancer_env.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ type StreamMeta interface {
144144
Begin(ctx context.Context, ch chan<- TaskEvent) error
145145
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
146146
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
147+
// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
148+
GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
147149
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
148150
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
149151
PauseTask(ctx context.Context, taskName string) error

0 commit comments

Comments
 (0)