Skip to content

Commit 6b41f81

Browse files
RidRisRti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#53695
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 3c02c2a commit 6b41f81

File tree

5 files changed

+388
-28
lines changed

5 files changed

+388
-28
lines changed

br/pkg/streamhelper/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ go_test(
6868
],
6969
flaky = True,
7070
race = "on",
71+
<<<<<<< HEAD
7172
shard_count = 24,
73+
=======
74+
shard_count = 32,
75+
>>>>>>> 64f5427448b (br: fix Log Backup unexpected paused when adding a already long-running task (#53695))
7276
deps = [
7377
":streamhelper",
7478
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
422422
c.task = e.Info
423423
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
424424
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
425-
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
426-
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
425+
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
426+
if err != nil {
427+
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
428+
return err
429+
}
430+
if globalCheckpointTs < c.task.StartTs {
431+
globalCheckpointTs = c.task.StartTs
432+
}
433+
log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs))
434+
c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs)
435+
p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs)
427436
if err != nil {
428437
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
429438
}

br/pkg/streamhelper/advancer_env.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ type StreamMeta interface {
150150
Begin(ctx context.Context, ch chan<- TaskEvent) error
151151
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
152152
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
153+
// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
154+
GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
153155
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
154156
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
155157
}

0 commit comments

Comments
 (0)