Skip to content

Commit 431781f

Browse files
authored
br: fix Log Backup unexpected paused when adding a already long-running task (#53695) (#53945)
close #53561
1 parent b8bfc5f commit 431781f

File tree

5 files changed

+281
-34
lines changed

5 files changed

+281
-34
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ go_test(
6868
],
6969
flaky = True,
7070
race = "on",
71-
shard_count = 28,
71+
shard_count = 32,
7272
deps = [
7373
":streamhelper",
7474
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
423423
c.task = e.Info
424424
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
425425
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
426-
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
427-
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
426+
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
427+
if err != nil {
428+
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
429+
return err
430+
}
431+
if globalCheckpointTs < c.task.StartTs {
432+
globalCheckpointTs = c.task.StartTs
433+
}
434+
log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs))
435+
c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs)
436+
p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs)
428437
if err != nil {
429438
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
430439
}

br/pkg/streamhelper/advancer_env.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ type StreamMeta interface {
156156
Begin(ctx context.Context, ch chan<- TaskEvent) error
157157
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
158158
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
159+
// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
160+
GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
159161
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
160162
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
161163
PauseTask(ctx context.Context, taskName string) error

0 commit comments

Comments
 (0)