Skip to content

Commit 64f5427

Browse files
authored
br: fix Log Backup unexpected paused when adding a already long-running task (#53695)
close #53561
1 parent 395edab commit 64f5427

File tree

5 files changed

+282
-34
lines changed

5 files changed

+282
-34
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ go_test(
6969
],
7070
flaky = True,
7171
race = "on",
72-
shard_count = 28,
72+
shard_count = 32,
7373
deps = [
7474
":streamhelper",
7575
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
424424
c.task = e.Info
425425
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
426426
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
427-
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
428-
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
427+
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
428+
if err != nil {
429+
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
430+
return err
431+
}
432+
if globalCheckpointTs < c.task.StartTs {
433+
globalCheckpointTs = c.task.StartTs
434+
}
435+
log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs))
436+
c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs)
437+
p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs)
429438
if err != nil {
430439
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
431440
}

br/pkg/streamhelper/advancer_env.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ type StreamMeta interface {
172172
Begin(ctx context.Context, ch chan<- TaskEvent) error
173173
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
174174
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
175+
// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
176+
GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
175177
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
176178
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
177179
PauseTask(ctx context.Context, taskName string) error

0 commit comments

Comments
 (0)