Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const (
type PauseGcConfig struct {
task.Config

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`
SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
SafePointID string `json:"safepoint-id" yaml:"safepoint-id"`
TTL time.Duration `json:"ttl" yaml:"ttl"`

OnAllReady func() `json:"-" yaml:"-"`
OnExit func() `json:"-" yaml:"-"`
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/operator/prepare_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
defer cx.Close()

initChan := make(chan struct{})
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error { return pauseGCKeeper(cx, cfg.SafePointID) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
select {
Expand Down Expand Up @@ -217,10 +217,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectio
return nil
}

func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext, spID string) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
ID: spID,
TTL: int64(cx.cfg.TTL.Seconds()),
BackupTS: cx.cfg.SafePoint,
}
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_test(
"//br/pkg/summary",
"//br/pkg/task",
"//br/pkg/task/operator",
"//br/pkg/utils",
"//pkg/config",
"//pkg/domain",
"//pkg/executor",
Expand Down
41 changes: 33 additions & 8 deletions tests/realtikvtest/brietest/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/task/operator"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -82,6 +83,27 @@ func verifyGCNotStopped(t *require.Assertions, cfg operator.PauseGcConfig) {
}
}

func verifyTargetGCSafePointExist(t *require.Assertions, cfg operator.PauseGcConfig) {
var result GcSafePoints
t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result))
for _, sp := range result.SPs {
if sp.ServiceID == cfg.SafePointID {
return
}
}
t.FailNowf("the service gc safepoint does not exist", "it is %#v", result)
}

func verifyTargetGCSafePointNotExist(t *require.Assertions, cfg operator.PauseGcConfig) {
var result GcSafePoints
t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result))
for _, sp := range result.SPs {
if sp.ServiceID == cfg.SafePointID {
t.FailNowf("the service gc safepoint exists", "it is %#v", sp)
}
}
}

func verifyLightningStopped(t *require.Assertions, cfg operator.PauseGcConfig) {
cx := context.Background()
pdc, err := pd.NewClient(caller.TestComponent, cfg.Config.PD, pd.SecurityOption{})
Expand Down Expand Up @@ -182,8 +204,9 @@ func TestOperator(t *testing.T) {
Config: task.Config{
PD: []string{"127.0.0.1:2379"},
},
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
SafePointID: utils.MakeSafePointID(),
OnAllReady: func() {
close(rd)
},
Expand All @@ -192,7 +215,7 @@ func TestOperator(t *testing.T) {
},
}

verifyGCNotStopped(req, cfg)
verifyTargetGCSafePointNotExist(req, cfg)
verifySchedulerNotStopped(req, cfg)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -210,6 +233,7 @@ func TestOperator(t *testing.T) {
}, 10*time.Second, time.Second)

verifyGCStopped(req, cfg)
verifyTargetGCSafePointExist(req, cfg)
verifyLightningStopped(req, cfg)
verifySchedulersStopped(req, cfg)
cancel()
Expand All @@ -224,7 +248,7 @@ func TestOperator(t *testing.T) {
}, 10*time.Second, time.Second)

verifySchedulerNotStopped(req, cfg)
verifyGCNotStopped(req, cfg)
verifyTargetGCSafePointNotExist(req, cfg)
}

func TestFailure(t *testing.T) {
Expand All @@ -241,18 +265,19 @@ func TestFailure(t *testing.T) {
Config: task.Config{
PD: []string{"127.0.0.1:2379"},
},
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
SafePointID: utils.MakeSafePointID(),
}

verifyGCNotStopped(req, cfg)
verifyTargetGCSafePointNotExist(req, cfg)
verifySchedulerNotStopped(req, cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
require.Error(t, err)

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)
verifyTargetGCSafePointNotExist(req, cfg)
}