From 08bd94c31cf5f93db418180b8c94088557c704a4 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:15:41 +0100 Subject: [PATCH 1/7] fix --- br/pkg/task/operator/prepare_snap.go | 8 ++-- tests/realtikvtest/brietest/operator_test.go | 39 ++++++++++++++++---- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/br/pkg/task/operator/prepare_snap.go b/br/pkg/task/operator/prepare_snap.go index 4bf6ed5b1b8e9..0d208f0aa07bd 100644 --- a/br/pkg/task/operator/prepare_snap.go +++ b/br/pkg/task/operator/prepare_snap.go @@ -123,7 +123,7 @@ func hintAllReady() { // AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. -func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { +func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig, spID string) error { utils.DumpGoroutineWhenExit.Store(true) mgr, err := dialPD(ctx, &cfg.Config) if err != nil { @@ -153,7 +153,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { defer cx.Close() initChan := make(chan struct{}) - cx.run(func() error { return pauseGCKeeper(cx) }) + cx.run(func() error { return pauseGCKeeper(cx, spID) }) cx.run(func() error { log.Info("Pause scheduler waiting all connections established.") select { @@ -217,10 +217,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectio return nil } -func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) { +func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext, spID string) (err error) { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ - ID: utils.MakeSafePointID(), + ID: spID, TTL: int64(cx.cfg.TTL.Seconds()), BackupTS: cx.cfg.SafePoint, } diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index d8746c0038e43..d5b570a3d8eb3 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/task/operator" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -82,6 +83,27 @@ func verifyGCNotStopped(t *require.Assertions, cfg operator.PauseGcConfig) { } } +func verifyTargetGCSafePointExist(t *require.Assertions, cfg operator.PauseGcConfig, spID string) { + var result GcSafePoints + t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) + for _, sp := range result.SPs { + if sp.ServiceID == spID { + return + } + } + t.FailNowf("the service gc safepoint does not exist", "it is %#v", result) +} + +func verifyTargetGCSafePointNotExist(t *require.Assertions, cfg operator.PauseGcConfig, spID string) { + var result GcSafePoints + t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) + for _, sp := range result.SPs { + if sp.ServiceID == spID { + t.FailNowf("the service gc safepoint exists", "it is %#v", sp) + } + } +} + func verifyLightningStopped(t *require.Assertions, cfg operator.PauseGcConfig) { cx := context.Background() pdc, err := pd.NewClient(caller.TestComponent, cfg.Config.PD, pd.SecurityOption{}) @@ -192,13 +214,14 @@ func TestOperator(t *testing.T) { }, } - verifyGCNotStopped(req, cfg) + spID := utils.MakeSafePointID() + verifyTargetGCSafePointNotExist(req, cfg, spID) verifySchedulerNotStopped(req, cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - req.NoError(operator.AdaptEnvForSnapshotBackup(ctx, &cfg)) + req.NoError(operator.AdaptEnvForSnapshotBackup(ctx, &cfg, spID)) }() req.Eventually(func() bool { select { @@ -210,6 +233,7 @@ func TestOperator(t *testing.T) { }, 10*time.Second, time.Second) verifyGCStopped(req, cfg) + verifyTargetGCSafePointExist(req, cfg, spID) verifyLightningStopped(req, cfg) verifySchedulersStopped(req, cfg) cancel() @@ -223,8 +247,8 @@ func TestOperator(t *testing.T) { } }, 10*time.Second, time.Second) - verifySchedulerNotStopped(req, cfg) - verifyGCNotStopped(req, cfg) + verifyTargetGCSafePointNotExist(req, cfg, spID) + verifyTargetGCSafePointExist(req, cfg, spID) } func TestFailure(t *testing.T) { @@ -245,14 +269,15 @@ func TestFailure(t *testing.T) { SafePoint: oracle.GoTimeToTS(time.Now()), } - verifyGCNotStopped(req, cfg) + spID := utils.MakeSafePointID() + verifyTargetGCSafePointNotExist(req, cfg, spID) verifySchedulerNotStopped(req, cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg) + err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg, spID) require.Error(t, err) - verifyGCNotStopped(req, cfg) + verifyTargetGCSafePointNotExist(req, cfg, spID) verifySchedulerNotStopped(req, cfg) } From f87055b3eb071ed4f292e8e1a85cae849b1f2269 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:19:48 +0100 Subject: [PATCH 2/7] fix --- tests/realtikvtest/brietest/operator_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index d5b570a3d8eb3..7f956025a9b08 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -247,8 +247,8 @@ func TestOperator(t *testing.T) { } }, 10*time.Second, time.Second) + verifySchedulerNotStopped(req, cfg) verifyTargetGCSafePointNotExist(req, cfg, spID) - verifyTargetGCSafePointExist(req, cfg, spID) } func TestFailure(t *testing.T) { @@ -278,6 +278,6 @@ func TestFailure(t *testing.T) { err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg, spID) require.Error(t, err) - verifyTargetGCSafePointNotExist(req, cfg, spID) verifySchedulerNotStopped(req, cfg) + verifyTargetGCSafePointNotExist(req, cfg, spID) } From 83fe591b95f489afe3b82b2b703f69f39a55d467 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:31:24 +0100 Subject: [PATCH 3/7] clear code --- br/pkg/task/operator/config.go | 1 + br/pkg/task/operator/prepare_snap.go | 4 +-- tests/realtikvtest/brietest/operator_test.go | 26 ++++++++++---------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 8ccf1ef6266b5..16707eff423b4 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -33,6 +33,7 @@ type PauseGcConfig struct { task.Config SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + SafePointID string `json:"safepoint-id" yaml:"safepoint-id"` TTL time.Duration `json:"ttl" yaml:"ttl"` OnAllReady func() `json:"-" yaml:"-"` diff --git a/br/pkg/task/operator/prepare_snap.go b/br/pkg/task/operator/prepare_snap.go index 0d208f0aa07bd..88eec8e162e96 100644 --- a/br/pkg/task/operator/prepare_snap.go +++ b/br/pkg/task/operator/prepare_snap.go @@ -123,7 +123,7 @@ func hintAllReady() { // AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. -func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig, spID string) error { +func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { utils.DumpGoroutineWhenExit.Store(true) mgr, err := dialPD(ctx, &cfg.Config) if err != nil { @@ -153,7 +153,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig, spID str defer cx.Close() initChan := make(chan struct{}) - cx.run(func() error { return pauseGCKeeper(cx, spID) }) + cx.run(func() error { return pauseGCKeeper(cx, cfg.SafePointID) }) cx.run(func() error { log.Info("Pause scheduler waiting all connections established.") select { diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 7f956025a9b08..284139779af9b 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -83,22 +83,22 @@ func verifyGCNotStopped(t *require.Assertions, cfg operator.PauseGcConfig) { } } -func verifyTargetGCSafePointExist(t *require.Assertions, cfg operator.PauseGcConfig, spID string) { +func verifyTargetGCSafePointExist(t *require.Assertions, cfg operator.PauseGcConfig) { var result GcSafePoints t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) for _, sp := range result.SPs { - if sp.ServiceID == spID { + if sp.ServiceID == cfg.SafePointID { return } } t.FailNowf("the service gc safepoint does not exist", "it is %#v", result) } -func verifyTargetGCSafePointNotExist(t *require.Assertions, cfg operator.PauseGcConfig, spID string) { +func verifyTargetGCSafePointNotExist(t *require.Assertions, cfg operator.PauseGcConfig) { var result GcSafePoints t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) for _, sp := range result.SPs { - if sp.ServiceID == spID { + if sp.ServiceID == cfg.SafePointID { t.FailNowf("the service gc safepoint exists", "it is %#v", sp) } } @@ -206,6 +206,7 @@ func TestOperator(t *testing.T) { }, TTL: 5 * time.Minute, SafePoint: oracle.GoTimeToTS(time.Now()), + SafePointID: utils.MakeSafePointID(), OnAllReady: func() { close(rd) }, @@ -214,14 +215,13 @@ func TestOperator(t *testing.T) { }, } - spID := utils.MakeSafePointID() - verifyTargetGCSafePointNotExist(req, cfg, spID) + verifyTargetGCSafePointNotExist(req, cfg) verifySchedulerNotStopped(req, cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - req.NoError(operator.AdaptEnvForSnapshotBackup(ctx, &cfg, spID)) + req.NoError(operator.AdaptEnvForSnapshotBackup(ctx, &cfg)) }() req.Eventually(func() bool { select { @@ -233,7 +233,7 @@ func TestOperator(t *testing.T) { }, 10*time.Second, time.Second) verifyGCStopped(req, cfg) - verifyTargetGCSafePointExist(req, cfg, spID) + verifyTargetGCSafePointExist(req, cfg) verifyLightningStopped(req, cfg) verifySchedulersStopped(req, cfg) cancel() @@ -248,7 +248,7 @@ func TestOperator(t *testing.T) { }, 10*time.Second, time.Second) verifySchedulerNotStopped(req, cfg) - verifyTargetGCSafePointNotExist(req, cfg, spID) + verifyTargetGCSafePointNotExist(req, cfg) } func TestFailure(t *testing.T) { @@ -267,17 +267,17 @@ func TestFailure(t *testing.T) { }, TTL: 5 * time.Minute, SafePoint: oracle.GoTimeToTS(time.Now()), + SafePointID: utils.MakeSafePointID(), } - spID := utils.MakeSafePointID() - verifyTargetGCSafePointNotExist(req, cfg, spID) + verifyTargetGCSafePointNotExist(req, cfg) verifySchedulerNotStopped(req, cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg, spID) + err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg) require.Error(t, err) verifySchedulerNotStopped(req, cfg) - verifyTargetGCSafePointNotExist(req, cfg, spID) + verifyTargetGCSafePointNotExist(req, cfg) } From 6a3b8372d93abe48e79a451042d60c96aeff80be Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:41:24 +0100 Subject: [PATCH 4/7] lint --- br/pkg/task/operator/config.go | 6 +++--- tests/realtikvtest/brietest/operator_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 16707eff423b4..947a08fcd1cee 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -32,9 +32,9 @@ const ( type PauseGcConfig struct { task.Config - SafePoint uint64 `json:"safepoint" yaml:"safepoint"` - SafePointID string `json:"safepoint-id" yaml:"safepoint-id"` - TTL time.Duration `json:"ttl" yaml:"ttl"` + SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + SafePointID string `json:"safepoint-id" yaml:"safepoint-id"` + TTL time.Duration `json:"ttl" yaml:"ttl"` OnAllReady func() `json:"-" yaml:"-"` OnExit func() `json:"-" yaml:"-"` diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 284139779af9b..c9e3a1f0dd716 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -204,8 +204,8 @@ func TestOperator(t *testing.T) { Config: task.Config{ PD: []string{"127.0.0.1:2379"}, }, - TTL: 5 * time.Minute, - SafePoint: oracle.GoTimeToTS(time.Now()), + TTL: 5 * time.Minute, + SafePoint: oracle.GoTimeToTS(time.Now()), SafePointID: utils.MakeSafePointID(), OnAllReady: func() { close(rd) @@ -265,8 +265,8 @@ func TestFailure(t *testing.T) { Config: task.Config{ PD: []string{"127.0.0.1:2379"}, }, - TTL: 5 * time.Minute, - SafePoint: oracle.GoTimeToTS(time.Now()), + TTL: 5 * time.Minute, + SafePoint: oracle.GoTimeToTS(time.Now()), SafePointID: utils.MakeSafePointID(), } From 140b246878d52330d76b5ecd2ffe7b22e4fa70f7 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 10:12:22 +0100 Subject: [PATCH 5/7] fix bazel --- tests/realtikvtest/brietest/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 5b9612e08a466..53b9de6ca3315 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -21,6 +21,7 @@ go_test( "//br/pkg/summary", "//br/pkg/task", "//br/pkg/task/operator", + "//br/pkg/utils", "//pkg/config", "//pkg/domain", "//pkg/executor", From 795a6ac1eed3e06a3f44f80e34e93f1ad3128dc1 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:19:33 +0100 Subject: [PATCH 6/7] add comment --- br/pkg/task/operator/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 947a08fcd1cee..30e349faccd0f 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -33,6 +33,8 @@ type PauseGcConfig struct { task.Config SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + // SafePointID is used to identify a specific safepoint. + // This field is only used in ***TEST*** now, you shouldn't use it in the src codes. SafePointID string `json:"safepoint-id" yaml:"safepoint-id"` TTL time.Duration `json:"ttl" yaml:"ttl"` From 4474984cf4eda97bb71e6e35cd554bed331232e8 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:32:06 +0100 Subject: [PATCH 7/7] lint --- br/pkg/task/operator/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 30e349faccd0f..01dd27e6b4862 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -32,7 +32,7 @@ const ( type PauseGcConfig struct { task.Config - SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + SafePoint uint64 `json:"safepoint" yaml:"safepoint"` // SafePointID is used to identify a specific safepoint. // This field is only used in ***TEST*** now, you shouldn't use it in the src codes. SafePointID string `json:"safepoint-id" yaml:"safepoint-id"`