Skip to content

Commit 6b8201d

Browse files
authored
advancer: fix the incorrect gc safepoint behaviours (pingcap#52835) (pingcap#56698)
ref pingcap#52082
1 parent f3038bc commit 6b8201d

File tree

6 files changed

+38
-11
lines changed

6 files changed

+38
-11
lines changed

br/pkg/streamhelper/advancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
453453
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
454454
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
455455
}
456-
if _, err := c.env.BlockGCUntil(ctx, 0); err != nil {
456+
if err := c.env.UnblockGC(ctx); err != nil {
457457
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
458458
}
459459
metrics.LastCheckpoint.DeleteLabelValues(e.Name)

br/pkg/streamhelper/advancer_env.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ package streamhelper
44

55
import (
66
"context"
7+
"math"
78
"time"
89

10+
"github.com/pingcap/errors"
911
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
1012
"github.com/pingcap/tidb/br/pkg/utils"
1113
"github.com/pingcap/tidb/pkg/config"
@@ -46,7 +48,21 @@ type PDRegionScanner struct {
4648
// Returns the minimal service GC safe point across all services.
4749
// If the arguments is `0`, this would remove the service safe point.
4850
func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
49-
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
51+
minimalSafePoint, err := c.UpdateServiceGCSafePoint(
52+
ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
53+
if err != nil {
54+
return 0, errors.Annotate(err, "failed to block gc until")
55+
}
56+
if minimalSafePoint > at {
57+
return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at)
58+
}
59+
return at, nil
60+
}
61+
62+
func (c PDRegionScanner) UnblockGC(ctx context.Context) error {
63+
// set ttl to 0, means remove the safe point.
64+
_, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64)
65+
return err
5066
}
5167

5268
// TODO: It should be able to synchoronize the current TS with the PD.

br/pkg/streamhelper/advancer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestGCServiceSafePoint(t *testing.T) {
214214
req.Eventually(func() bool {
215215
env.fakeCluster.mu.Lock()
216216
defer env.fakeCluster.mu.Unlock()
217-
return env.serviceGCSafePoint == 0
217+
return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted
218218
}, 3*time.Second, 100*time.Millisecond)
219219
}
220220

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ type fakeCluster struct {
102102
maxTs uint64
103103
testCtx *testing.T
104104

105-
onGetClient func(uint64) error
106-
onClearCache func(uint64) error
107-
serviceGCSafePoint uint64
108-
currentTS uint64
105+
onGetClient func(uint64) error
106+
onClearCache func(uint64) error
107+
serviceGCSafePoint uint64
108+
serviceGCSafePointDeleted bool
109+
currentTS uint64
109110
}
110111

111112
func (r *region) splitAt(newID uint64, k string) *region {
@@ -266,17 +267,20 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
266267
func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
267268
f.mu.Lock()
268269
defer f.mu.Unlock()
269-
if at == 0 {
270-
f.serviceGCSafePoint = at
271-
return at, nil
272-
}
273270
if f.serviceGCSafePoint > at {
274271
return f.serviceGCSafePoint, nil
275272
}
276273
f.serviceGCSafePoint = at
277274
return at, nil
278275
}
279276

277+
func (f *fakeCluster) UnblockGC(ctx context.Context) error {
278+
f.mu.Lock()
279+
defer f.mu.Unlock()
280+
f.serviceGCSafePointDeleted = true
281+
return nil
282+
}
283+
280284
func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) {
281285
return f.currentTS, nil
282286
}

br/pkg/streamhelper/regioniter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type TiKVClusterMeta interface {
4343
// For now, all tasks (exactly one task in fact) use the same checkpoint.
4444
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)
4545

46+
// UnblockGC used to remove the service GC safe point in PD.
47+
UnblockGC(ctx context.Context) error
48+
4649
FetchCurrentTS(ctx context.Context) (uint64, error)
4750
}
4851

br/pkg/streamhelper/regioniter_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
8383
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
8484
}
8585

86+
func (c constantRegions) UnblockGC(ctx context.Context) error {
87+
return status.Error(codes.Unimplemented, "Unsupported operation")
88+
}
89+
8690
// TODO: It should be able to synchoronize the current TS with the PD.
8791
func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
8892
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil

0 commit comments

Comments
 (0)