Skip to content

Commit 93d4efd

Browse files
3pointerti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#52106
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 69894ec commit 93d4efd

File tree

3 files changed

+65
-4
lines changed

3 files changed

+65
-4
lines changed

br/pkg/task/stream.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"encoding/binary"
2121
"fmt"
22+
"math"
2223
"net/http"
2324
"strings"
2425
"sync"
@@ -729,8 +730,8 @@ func RunStreamStop(
729730
if err := streamMgr.setGCSafePoint(ctx,
730731
utils.BRServiceSafePoint{
731732
ID: buildPauseSafePointName(ti.Info.Name),
732-
TTL: utils.DefaultStreamStartSafePointTTL,
733-
BackupTS: 0,
733+
TTL: 0, // 0 means remove this service safe point.
734+
BackupTS: math.MaxUint64,
734735
},
735736
); err != nil {
736737
log.Warn("failed to remove safe point", zap.String("error", err.Error()))

br/pkg/utils/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ go_test(
9494
],
9595
embed = [":utils"],
9696
flaky = True,
97+
<<<<<<< HEAD
9798
shard_count = 38,
99+
=======
100+
shard_count = 32,
101+
>>>>>>> 8bba8261b0c (log backup: fix the issue that paused service safepoint not removed (#52106))
98102
deps = [
99103
"//br/pkg/errors",
100104
"//br/pkg/metautil",

br/pkg/utils/safe_point_test.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package utils_test
44

55
import (
66
"context"
7+
"math"
78
"sync"
89
"testing"
910

@@ -14,7 +15,7 @@ import (
1415

1516
func TestCheckGCSafepoint(t *testing.T) {
1617
ctx := context.Background()
17-
pdClient := &mockSafePoint{safepoint: 2333}
18+
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}
1819
{
1920
err := utils.CheckGCSafePoint(ctx, pdClient, 2333+1)
2021
require.NoError(t, err)
@@ -34,23 +35,78 @@ func TestCheckGCSafepoint(t *testing.T) {
3435
}
3536
}
3637

38+
func TestCheckUpdateServiceSafepoint(t *testing.T) {
39+
ctx := context.Background()
40+
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}
41+
{
42+
// nothing happened, because current safepoint is large than servicee safepoint.
43+
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
44+
"BR_SERVICE",
45+
1,
46+
1,
47+
})
48+
require.NoError(t, err)
49+
curSafePoint, err := pdClient.UpdateGCSafePoint(ctx, 0)
50+
require.NoError(t, err)
51+
require.Equal(t, uint64(2333), curSafePoint)
52+
}
53+
{
54+
// register br service safepoint
55+
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
56+
"BR_SERVICE",
57+
1,
58+
2334,
59+
})
60+
require.NoError(t, err)
61+
curSafePoint, find := pdClient.GetServiceSafePoint("BR_SERVICE")
62+
// update with new safepoint - 1.
63+
require.Equal(t, uint64(2333), curSafePoint)
64+
require.True(t, find)
65+
}
66+
{
67+
// remove br service safepoint
68+
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
69+
"BR_SERVICE",
70+
0,
71+
math.MaxUint64,
72+
})
73+
require.NoError(t, err)
74+
_, find := pdClient.GetServiceSafePoint("BR_SERVICE")
75+
require.False(t, find)
76+
}
77+
}
78+
3779
type mockSafePoint struct {
3880
sync.Mutex
3981
pd.Client
82+
services map[string]uint64
4083
safepoint uint64
4184
minServiceSafepoint uint64
4285
}
4386

87+
func (m *mockSafePoint) GetServiceSafePoint(serviceID string) (uint64, bool) {
88+
m.Lock()
89+
defer m.Unlock()
90+
safepoint, ok := m.services[serviceID]
91+
return safepoint, ok
92+
}
93+
4494
func (m *mockSafePoint) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
4595
m.Lock()
4696
defer m.Unlock()
4797

98+
if ttl <= 0 {
99+
delete(m.services, serviceID)
100+
return 0, nil
101+
}
102+
48103
if m.safepoint > safePoint {
49104
return m.safepoint, nil
50105
}
51106
if m.minServiceSafepoint == 0 || m.minServiceSafepoint > safePoint {
52107
m.minServiceSafepoint = safePoint
53108
}
109+
m.services[serviceID] = safePoint
54110
return m.minServiceSafepoint, nil
55111
}
56112

@@ -65,7 +121,7 @@ func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64)
65121
}
66122

67123
func TestStartServiceSafePointKeeper(t *testing.T) {
68-
pdClient := &mockSafePoint{safepoint: 2333}
124+
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}
69125

70126
cases := []struct {
71127
sp utils.BRServiceSafePoint

0 commit comments

Comments
 (0)