diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel index d31e5e01fe19d..511b2595807c3 100644 --- a/br/pkg/checkpoint/BUILD.bazel +++ b/br/pkg/checkpoint/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "checkpoint.go", "external_storage.go", "log_restore.go", + "manager.go", "restore.go", "storage.go", "ticker.go", @@ -45,7 +46,7 @@ go_test( srcs = ["checkpoint_test.go"], flaky = True, race = "on", - shard_count = 9, + shard_count = 13, deps = [ ":checkpoint", "//br/pkg/gluetidb", diff --git a/br/pkg/checkpoint/backup.go b/br/pkg/checkpoint/backup.go index 4d517009937d3..90323f9148e6c 100644 --- a/br/pkg/checkpoint/backup.go +++ b/br/pkg/checkpoint/backup.go @@ -37,8 +37,8 @@ const ( CheckpointLockPathForBackup = CheckpointBackupDir + "/checkpoint.lock" ) -func flushPositionForBackup() flushPosition { - return flushPosition{ +func flushPathForBackup() flushPath { + return flushPath{ CheckpointDataDir: CheckpointDataDirForBackup, CheckpointChecksumDir: CheckpointChecksumDirForBackup, CheckpointLockPath: CheckpointLockPathForBackup, @@ -57,7 +57,7 @@ func StartCheckpointBackupRunnerForTest( tick time.Duration, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error) { - checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer) + checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPathForBackup()) if err != nil { return nil, errors.Trace(err) } @@ -74,7 +74,7 @@ func StartCheckpointRunnerForBackup( cipher *backuppb.CipherInfo, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error) { - checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer) + checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPathForBackup()) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index 78f9a7587d255..075561da27f39 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -41,7 +41,7 @@ import ( const CheckpointDir = "checkpoints" -type flushPosition struct { +type flushPath struct { CheckpointDataDir string CheckpointChecksumDir string CheckpointLockPath string diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index c7412c40b269e..b629f23b475bd 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -56,13 +56,35 @@ func TestCheckpointMetaForBackup(t *testing.T) { require.Equal(t, checkpointMeta.BackupTS, checkpointMeta2.BackupTS) } -func TestCheckpointMetaForRestore(t *testing.T) { - ctx := context.Background() +func TestCheckpointMetaForRestoreOnStorage(t *testing.T) { + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot") + defer snapshotMetaManager.Close() + logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log") + defer logMetaManager.Close() + testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager) +} + +func TestCheckpointMetaForRestoreOnTable(t *testing.T) { s := utiltest.CreateRestoreSchemaSuite(t) - dom := s.Mock.Domain g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) + snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) + require.NoError(t, err) + defer snapshotMetaManager.Close() + logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName) require.NoError(t, err) + defer logMetaManager.Close() + testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager) +} + +func testCheckpointMetaForRestore( + t *testing.T, + snapshotMetaManager checkpoint.SnapshotMetaManagerT, + logMetaManager checkpoint.LogMetaManagerT, +) { + ctx := context.Background() checkpointMetaForSnapshotRestore := &checkpoint.CheckpointMetadataForSnapshotRestore{ UpstreamClusterID: 123, @@ -75,9 +97,9 @@ func TestCheckpointMetaForRestore(t *testing.T) { }, }, } - err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore) + err := snapshotMetaManager.SaveCheckpointMetadata(ctx, checkpointMetaForSnapshotRestore) require.NoError(t, err) - checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + checkpointMetaForSnapshotRestore2, err := snapshotMetaManager.LoadCheckpointMetadata(ctx) require.NoError(t, err) require.Equal(t, checkpointMetaForSnapshotRestore.SchedulersConfig, checkpointMetaForSnapshotRestore2.SchedulersConfig) require.Equal(t, checkpointMetaForSnapshotRestore.UpstreamClusterID, checkpointMetaForSnapshotRestore2.UpstreamClusterID) @@ -91,9 +113,10 @@ func TestCheckpointMetaForRestore(t *testing.T) { GcRatio: "1.0", TiFlashItems: map[int64]model.TiFlashReplicaInfo{1: {Count: 1}}, } - err = checkpoint.SaveCheckpointMetadataForLogRestore(ctx, se, checkpointMetaForLogRestore) + + err = logMetaManager.SaveCheckpointMetadata(ctx, checkpointMetaForLogRestore) require.NoError(t, err) - checkpointMetaForLogRestore2, err := checkpoint.LoadCheckpointMetadataForLogRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + checkpointMetaForLogRestore2, err := logMetaManager.LoadCheckpointMetadata(ctx) require.NoError(t, err) require.Equal(t, checkpointMetaForLogRestore.UpstreamClusterID, checkpointMetaForLogRestore2.UpstreamClusterID) require.Equal(t, checkpointMetaForLogRestore.RestoredTS, checkpointMetaForLogRestore2.RestoredTS) @@ -102,17 +125,18 @@ func TestCheckpointMetaForRestore(t *testing.T) { require.Equal(t, checkpointMetaForLogRestore.GcRatio, checkpointMetaForLogRestore2.GcRatio) require.Equal(t, checkpointMetaForLogRestore.TiFlashItems, checkpointMetaForLogRestore2.TiFlashItems) - exists := checkpoint.ExistsCheckpointProgress(ctx, dom) + exists, err := logMetaManager.ExistsCheckpointProgress(ctx) + require.NoError(t, err) require.False(t, exists) - err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{ + err = logMetaManager.SaveCheckpointProgress(ctx, &checkpoint.CheckpointProgress{ Progress: checkpoint.InLogRestoreAndIdMapPersisted, }) require.NoError(t, err) - progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + progress, err := logMetaManager.LoadCheckpointProgress(ctx) require.NoError(t, err) require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress) - taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor()) + taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager) require.NoError(t, err) require.Equal(t, uint64(123), taskInfo.Metadata.UpstreamClusterID) require.Equal(t, uint64(222), taskInfo.Metadata.RestoredTS) @@ -122,9 +146,10 @@ func TestCheckpointMetaForRestore(t *testing.T) { require.Equal(t, true, taskInfo.HasSnapshotMetadata) require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress) - exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom) + exists, err = logMetaManager.ExistsCheckpointIngestIndexRepairSQLs(ctx) + require.NoError(t, err) require.False(t, exists) - err = checkpoint.SaveCheckpointIngestIndexRepairSQLs(ctx, se, &checkpoint.CheckpointIngestIndexRepairSQLs{ + err = logMetaManager.SaveCheckpointIngestIndexRepairSQLs(ctx, &checkpoint.CheckpointIngestIndexRepairSQLs{ SQLs: []checkpoint.CheckpointIngestIndexRepairSQL{ { IndexID: 1, @@ -137,7 +162,7 @@ func TestCheckpointMetaForRestore(t *testing.T) { }, }) require.NoError(t, err) - repairSQLs, err := checkpoint.LoadCheckpointIngestIndexRepairSQLs(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + repairSQLs, err := logMetaManager.LoadCheckpointIngestIndexRepairSQLs(ctx) require.NoError(t, err) require.Equal(t, repairSQLs.SQLs[0].IndexID, int64(1)) require.Equal(t, repairSQLs.SQLs[0].SchemaName, ast.NewCIStr("2")) @@ -271,16 +296,33 @@ func TestCheckpointBackupRunner(t *testing.T) { } } -func TestCheckpointRestoreRunner(t *testing.T) { - ctx := context.Background() +func TestCheckpointRestoreRunnerOnStorage(t *testing.T) { + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot") + defer snapshotMetaManager.Close() + testCheckpointRestoreRunner(t, snapshotMetaManager) +} + +func TestCheckpointRestoreRunnerOnTable(t *testing.T) { s := utiltest.CreateRestoreSchemaSuite(t) g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) + snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.NoError(t, err) + defer snapshotMetaManager.Close() + testCheckpointRestoreRunner(t, snapshotMetaManager) +} + +func testCheckpointRestoreRunner( + t *testing.T, + snapshotMetaManager checkpoint.SnapshotMetaManagerT, +) { + ctx := context.Background() - err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err := snapshotMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, 5*time.Second, 3*time.Second, snapshotMetaManager) require.NoError(t, err) data := map[string]struct { @@ -326,7 +368,6 @@ func TestCheckpointRestoreRunner(t *testing.T) { checkpointRunner.WaitForFinish(ctx, true) - se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) respCount := 0 checker := func(tableID int64, resp checkpoint.RestoreValueType) { @@ -343,11 +384,11 @@ func TestCheckpointRestoreRunner(t *testing.T) { respCount += 1 } - _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker) + _, err = snapshotMetaManager.LoadCheckpointData(ctx, checker) require.NoError(t, err) require.Equal(t, 4, respCount) - checksum, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + checksum, _, err := snapshotMetaManager.LoadCheckpointChecksum(ctx) require.NoError(t, err) var i int64 @@ -355,25 +396,41 @@ func TestCheckpointRestoreRunner(t *testing.T) { require.Equal(t, checksum[i].Crc64xor, uint64(i)) } - err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) + err = snapshotMetaManager.RemoveCheckpointData(ctx) require.NoError(t, err) - exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) - require.False(t, exists) - exists = s.Mock.Domain.InfoSchema().SchemaExists(ast.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName)) + exists, err := snapshotMetaManager.ExistsCheckpointMetadata(ctx) + require.NoError(t, err) require.False(t, exists) } -func TestCheckpointRunnerRetry(t *testing.T) { - ctx := context.Background() +func TestCheckpointRunnerRetryOnStorage(t *testing.T) { + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot") + defer snapshotMetaManager.Close() + testCheckpointRunnerRetry(t, snapshotMetaManager) +} + +func TestCheckpointRunnerRetryOnTable(t *testing.T) { s := utiltest.CreateRestoreSchemaSuite(t) g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) + snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.NoError(t, err) + defer snapshotMetaManager.Close() + testCheckpointRunnerRetry(t, snapshotMetaManager) +} - err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) +func testCheckpointRunnerRetry( + t *testing.T, + snapshotMetaManager checkpoint.SnapshotMetaManagerT, +) { + ctx := context.Background() + + err := snapshotMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, 100*time.Millisecond, 300*time.Millisecond, snapshotMetaManager) require.NoError(t, err) err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)") @@ -397,35 +454,49 @@ func TestCheckpointRunnerRetry(t *testing.T) { err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3) require.NoError(t, err) checkpointRunner.WaitForFinish(ctx, true) - se, err = g.CreateSession(s.Mock.Storage) - require.NoError(t, err) + recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), - checkpoint.SnapshotRestoreCheckpointDatabaseName, - func(tableID int64, v checkpoint.RestoreValueType) { - recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 - }) + _, err = snapshotMetaManager.LoadCheckpointData(ctx, func(tableID int64, v checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 + }) require.NoError(t, err) require.LessOrEqual(t, 1, recordSet["1_123"]) require.LessOrEqual(t, 1, recordSet["2_456"]) require.LessOrEqual(t, 1, recordSet["3_789"]) - items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + items, _, err := snapshotMetaManager.LoadCheckpointChecksum(ctx) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2") require.Equal(t, fmt.Sprintf("%d_%d_%d", items[3].Crc64xor, items[3].TotalBytes, items[3].TotalKvs), "3_3_3") } -func TestCheckpointRunnerNoRetry(t *testing.T) { - ctx := context.Background() +func TestCheckpointRunnerNoRetryOnStorage(t *testing.T) { + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot") + defer snapshotMetaManager.Close() + testCheckpointRunnerNoRetry(t, snapshotMetaManager) +} + +func TestCheckpointRunnerNoRetryOnTable(t *testing.T) { s := utiltest.CreateRestoreSchemaSuite(t) g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) + snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.NoError(t, err) + defer snapshotMetaManager.Close() + testCheckpointRunnerNoRetry(t, snapshotMetaManager) +} + +func testCheckpointRunnerNoRetry( + t *testing.T, + snapshotMetaManager checkpoint.SnapshotMetaManagerT, +) { + ctx := context.Background() - err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err := snapshotMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, 100*time.Millisecond, 300*time.Millisecond, snapshotMetaManager) require.NoError(t, err) err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123")) @@ -438,33 +509,48 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { require.NoError(t, err) time.Sleep(time.Second) checkpointRunner.WaitForFinish(ctx, true) - se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), - checkpoint.SnapshotRestoreCheckpointDatabaseName, - func(tableID int64, v checkpoint.RestoreValueType) { - recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 - }) + _, err = snapshotMetaManager.LoadCheckpointData(ctx, func(tableID int64, v checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 + }) require.NoError(t, err) require.Equal(t, 1, recordSet["1_123"]) require.Equal(t, 1, recordSet["2_456"]) - items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + items, _, err := snapshotMetaManager.LoadCheckpointChecksum(ctx) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2") } -func TestCheckpointLogRestoreRunner(t *testing.T) { - ctx := context.Background() +func TestCheckpointLogRestoreRunnerOnStorage(t *testing.T) { + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log") + defer logMetaManager.Close() + testCheckpointLogRestoreRunner(t, logMetaManager) +} + +func TestCheckpointLogRestoreRunnerOnTable(t *testing.T) { s := utiltest.CreateRestoreSchemaSuite(t) g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) + logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName) require.NoError(t, err) + defer logMetaManager.Close() + testCheckpointLogRestoreRunner(t, logMetaManager) +} + +func testCheckpointLogRestoreRunner( + t *testing.T, + logMetaManager checkpoint.LogMetaManagerT, +) { + ctx := context.Background() - err = checkpoint.SaveCheckpointMetadataForLogRestore(ctx, se, &checkpoint.CheckpointMetadataForLogRestore{}) + err := logMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForLogRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointLogRestoreRunnerForTest(ctx, se, 5*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointLogRestoreRunnerForTest(ctx, 5*time.Second, logMetaManager) require.NoError(t, err) data := map[string]map[int][]struct { @@ -509,7 +595,6 @@ func TestCheckpointLogRestoreRunner(t *testing.T) { checkpointRunner.WaitForFinish(ctx, true) - se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) respCount := 0 checker := func(metaKey string, resp checkpoint.LogRestoreValueMarshaled) { @@ -536,16 +621,15 @@ func TestCheckpointLogRestoreRunner(t *testing.T) { require.FailNow(t, "not found in the original data") } - _, err = checkpoint.LoadCheckpointDataForLogRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker) + _, err = logMetaManager.LoadCheckpointData(ctx, checker) require.NoError(t, err) require.Equal(t, 4, respCount) - err = checkpoint.RemoveCheckpointDataForLogRestore(ctx, s.Mock.Domain, se) + err = logMetaManager.RemoveCheckpointData(ctx) require.NoError(t, err) - exists := checkpoint.ExistsLogRestoreCheckpointMetadata(ctx, s.Mock.Domain) - require.False(t, exists) - exists = s.Mock.Domain.InfoSchema().SchemaExists(ast.NewCIStr(checkpoint.LogRestoreCheckpointDatabaseName)) + exists, err := logMetaManager.ExistsCheckpointMetadata(ctx) + require.NoError(t, err) require.False(t, exists) } @@ -586,60 +670,3 @@ func TestCheckpointRunnerLock(t *testing.T) { runner.WaitForFinish(ctx, true) } - -func TestCheckpointCompactedRestoreRunner(t *testing.T) { - ctx := context.Background() - s := utiltest.CreateRestoreSchemaSuite(t) - g := gluetidb.New() - se, err := g.CreateSession(s.Mock.Storage) - require.NoError(t, err) - - err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil) - require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second) - require.NoError(t, err) - - data := map[string]struct { - Name string - }{ - "a": {Name: "a"}, - "A": {Name: "A"}, - "1": {Name: "1"}, - } - - for _, d := range data { - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointFileItem(1, d.Name)) - require.NoError(t, err) - } - - checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) - checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2) - - checkpointRunner.WaitForFinish(ctx, true) - - se, err = g.CreateSession(s.Mock.Storage) - require.NoError(t, err) - respCount := 0 - checker := func(tableID int64, resp checkpoint.RestoreValueType) { - require.NotNil(t, resp) - d, ok := data[resp.Name] - require.True(t, ok) - require.Equal(t, d.Name, resp.Name) - respCount++ - } - - exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) - require.True(t, exists) - - _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker) - require.NoError(t, err) - require.Equal(t, 3, respCount) - - err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) - require.NoError(t, err) - - exists = checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) - require.False(t, exists) - exists = s.Mock.Domain.InfoSchema().SchemaExists(ast.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName)) - require.False(t, exists) -} diff --git a/br/pkg/checkpoint/external_storage.go b/br/pkg/checkpoint/external_storage.go index f7d365068e6cf..7e9e070ee5627 100644 --- a/br/pkg/checkpoint/external_storage.go +++ b/br/pkg/checkpoint/external_storage.go @@ -31,8 +31,44 @@ import ( "go.uber.org/zap" ) +const ( + CheckpointRestoreDirFormat = CheckpointDir + "/restore-%s" + CheckpointDataDirForRestoreFormat = CheckpointRestoreDirFormat + "/data" + CheckpointChecksumDirForRestoreFormat = CheckpointRestoreDirFormat + "/checksum" + CheckpointMetaPathForRestoreFormat = CheckpointRestoreDirFormat + "/checkpoint.meta" + CheckpointProgressPathForRestoreFormat = CheckpointRestoreDirFormat + "/progress.meta" + CheckpointIngestIndexPathForRestoreFormat = CheckpointRestoreDirFormat + "/ingest_index.meta" +) + +func flushPathForRestore(taskName string) flushPath { + return flushPath{ + CheckpointDataDir: getCheckpointDataDirByName(taskName), + CheckpointChecksumDir: getCheckpointChecksumDirByName(taskName), + } +} + +func getCheckpointMetaPathByName(taskName string) string { + return fmt.Sprintf(CheckpointMetaPathForRestoreFormat, taskName) +} + +func getCheckpointDataDirByName(taskName string) string { + return fmt.Sprintf(CheckpointDataDirForRestoreFormat, taskName) +} + +func getCheckpointChecksumDirByName(taskName string) string { + return fmt.Sprintf(CheckpointChecksumDirForRestoreFormat, taskName) +} + +func getCheckpointProgressPathByName(taskName string) string { + return fmt.Sprintf(CheckpointProgressPathForRestoreFormat, taskName) +} + +func getCheckpointIngestIndexPathByName(taskName string) string { + return fmt.Sprintf(CheckpointIngestIndexPathForRestoreFormat, taskName) +} + type externalCheckpointStorage struct { - flushPosition + flushPath storage storage.ExternalStorage lockId uint64 @@ -43,11 +79,12 @@ func newExternalCheckpointStorage( ctx context.Context, s storage.ExternalStorage, timer GlobalTimer, + flushPath flushPath, ) (*externalCheckpointStorage, error) { checkpointStorage := &externalCheckpointStorage{ - flushPosition: flushPositionForBackup(), - storage: s, - timer: timer, + flushPath: flushPath, + storage: s, + timer: timer, } if timer != nil { if err := checkpointStorage.initialLock(ctx); err != nil { diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index ff7b25b0a355e..3dae3d4c32f1e 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -21,10 +21,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/util/sqlexec" ) type LogRestoreKeyType = string @@ -98,31 +96,21 @@ func newTableCheckpointStorage(se glue.Session, checkpointDBName string) *tableC // only for test func StartCheckpointLogRestoreRunnerForTest( ctx context.Context, - se glue.Session, tick time.Duration, + manager LogMetaManagerT, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) { - runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType]( - newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName), - nil, valueMarshalerForLogRestore) - - runner.startCheckpointMainLoop(ctx, tick, tick, 0, defaultRetryDuration) - return runner, nil + cfg := DefaultTickDurationConfig() + cfg.tickDurationForChecksum = tick + cfg.tickDurationForFlush = tick + return manager.StartCheckpointRunner(ctx, cfg, valueMarshalerForLogRestore) } // Notice that the session is owned by the checkpoint runner, and it will be also closed by it. func StartCheckpointRunnerForLogRestore( ctx context.Context, - se glue.Session, + manager LogMetaManagerT, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) { - runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType]( - newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName), - nil, valueMarshalerForLogRestore) - - // for restore, no need to set lock - runner.startCheckpointMainLoop( - ctx, - defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration) - return runner, nil + return manager.StartCheckpointRunner(ctx, DefaultTickDurationConfig(), valueMarshalerForLogRestore) } func AppendRangeForLogRestore( @@ -145,16 +133,6 @@ func AppendRangeForLogRestore( }) } -// load the whole checkpoint range data and retrieve the metadata of restored ranges -// and return the total time cost in the past executions -func LoadCheckpointDataForLogRestore[K KeyType, V ValueType]( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, - fn func(K, V), -) (time.Duration, error) { - return selectCheckpointData(ctx, execCtx, LogRestoreCheckpointDatabaseName, fn) -} - type CheckpointMetadataForLogRestore struct { UpstreamClusterID uint64 `json:"upstream-cluster-id"` RestoredTS uint64 `json:"restored-ts"` @@ -165,35 +143,6 @@ type CheckpointMetadataForLogRestore struct { TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"` } -func LoadCheckpointMetadataForLogRestore( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, -) (*CheckpointMetadataForLogRestore, error) { - m := &CheckpointMetadataForLogRestore{} - err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, m) - return m, err -} - -func SaveCheckpointMetadataForLogRestore( - ctx context.Context, - se glue.Session, - meta *CheckpointMetadataForLogRestore, -) error { - err := initCheckpointTable(ctx, se, LogRestoreCheckpointDatabaseName, []string{checkpointDataTableName}) - if err != nil { - return errors.Trace(err) - } - return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, meta) -} - -func ExistsLogRestoreCheckpointMetadata( - ctx context.Context, - dom *domain.Domain, -) bool { - return dom.InfoSchema(). - TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointMetaTableName)) -} - // RestoreProgress is a progress type for snapshot + log restore. // // Before the id-maps is persisted into external storage, the snapshot restore and @@ -229,34 +178,9 @@ type CheckpointProgress struct { Progress RestoreProgress `json:"progress"` } -func LoadCheckpointProgress( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, -) (*CheckpointProgress, error) { - m := &CheckpointProgress{} - err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, m) - return m, errors.Trace(err) -} - -func SaveCheckpointProgress( - ctx context.Context, - se glue.Session, - meta *CheckpointProgress, -) error { - return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, meta) -} - -func ExistsCheckpointProgress( - ctx context.Context, - dom *domain.Domain, -) bool { - return dom.InfoSchema(). - TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointProgressTableName)) -} - // CheckpointTaskInfoForLogRestore is tied to a specific cluster. // It represents the last restore task executed in this cluster. -type CheckpointTaskInfoForLogRestore struct { +type TaskInfoForLogRestore struct { Metadata *CheckpointMetadataForLogRestore HasSnapshotMetadata bool // the progress for this task @@ -265,32 +189,44 @@ type CheckpointTaskInfoForLogRestore struct { func TryToGetCheckpointTaskInfo( ctx context.Context, - dom *domain.Domain, - execCtx sqlexec.RestrictedSQLExecutor, -) (*CheckpointTaskInfoForLogRestore, error) { + snapshotManager SnapshotMetaManagerT, + logManager LogMetaManagerT, +) (*TaskInfoForLogRestore, error) { var ( metadata *CheckpointMetadataForLogRestore progress RestoreProgress - err error + + hasSnapshotMetadata bool = false ) // get the progress - if ExistsCheckpointProgress(ctx, dom) { - checkpointProgress, err := LoadCheckpointProgress(ctx, execCtx) + if exists, err := logManager.ExistsCheckpointProgress(ctx); err != nil { + return nil, errors.Trace(err) + } else if exists { + checkpointProgress, err := logManager.LoadCheckpointProgress(ctx) if err != nil { return nil, errors.Trace(err) } progress = checkpointProgress.Progress } // get the checkpoint metadata - if ExistsLogRestoreCheckpointMetadata(ctx, dom) { - metadata, err = LoadCheckpointMetadataForLogRestore(ctx, execCtx) + if exists, err := logManager.ExistsCheckpointMetadata(ctx); err != nil { + return nil, errors.Trace(err) + } else if exists { + metadata, err = logManager.LoadCheckpointMetadata(ctx) if err != nil { return nil, errors.Trace(err) } } - hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName) + // exists the snapshot checkpoint metadata + if snapshotManager != nil { + existsSnapshotMetadata, err := snapshotManager.ExistsCheckpointMetadata(ctx) + if err != nil { + return nil, errors.Trace(err) + } + hasSnapshotMetadata = existsSnapshotMetadata + } - return &CheckpointTaskInfoForLogRestore{ + return &TaskInfoForLogRestore{ Metadata: metadata, HasSnapshotMetadata: hasSnapshotMetadata, Progress: progress, @@ -312,30 +248,3 @@ type CheckpointIngestIndexRepairSQL struct { type CheckpointIngestIndexRepairSQLs struct { SQLs []CheckpointIngestIndexRepairSQL } - -func LoadCheckpointIngestIndexRepairSQLs( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, -) (*CheckpointIngestIndexRepairSQLs, error) { - m := &CheckpointIngestIndexRepairSQLs{} - err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, m) - return m, errors.Trace(err) -} - -func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool { - return dom.InfoSchema(). - TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointIngestTableName)) -} - -func SaveCheckpointIngestIndexRepairSQLs( - ctx context.Context, - se glue.Session, - meta *CheckpointIngestIndexRepairSQLs, -) error { - return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, meta) -} - -func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error { - return dropCheckpointTables(ctx, dom, se, LogRestoreCheckpointDatabaseName, - []string{checkpointDataTableName, checkpointMetaTableName, checkpointProgressTableName, checkpointIngestTableName}) -} diff --git a/br/pkg/checkpoint/manager.go b/br/pkg/checkpoint/manager.go new file mode 100644 index 0000000000000..2946c5fb48667 --- /dev/null +++ b/br/pkg/checkpoint/manager.go @@ -0,0 +1,417 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/parser/ast" +) + +type SnapshotMetaManagerT = MetaManager[ + RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore, +] +type LogMetaManagerT = LogMetaManager[ + LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore, +] + +type tickDurationConfig struct { + tickDurationForFlush time.Duration + tickDurationForChecksum time.Duration + retryDuration time.Duration +} + +func DefaultTickDurationConfig() tickDurationConfig { + return tickDurationConfig{ + tickDurationForFlush: defaultTickDurationForFlush, + tickDurationForChecksum: defaultTickDurationForChecksum, + retryDuration: defaultRetryDuration, + } +} + +type MetaManager[K KeyType, SV, LV ValueType, M any] interface { + fmt.Stringer + + LoadCheckpointData(context.Context, func(K, LV)) (time.Duration, error) + LoadCheckpointChecksum(context.Context) (map[int64]*ChecksumItem, time.Duration, error) + LoadCheckpointMetadata(context.Context) (*M, error) + SaveCheckpointMetadata(context.Context, *M) error + ExistsCheckpointMetadata(context.Context) (bool, error) + RemoveCheckpointData(context.Context) error + + // start checkpoint runner + StartCheckpointRunner( + context.Context, tickDurationConfig, func(*RangeGroup[K, SV]) ([]byte, error), + ) (*CheckpointRunner[K, SV], error) + + // close session + Close() +} + +type LogMetaManager[K KeyType, SV, LV ValueType, M any] interface { + MetaManager[K, SV, LV, M] + + LoadCheckpointProgress(context.Context) (*CheckpointProgress, error) + SaveCheckpointProgress(context.Context, *CheckpointProgress) error + ExistsCheckpointProgress(context.Context) (bool, error) + + LoadCheckpointIngestIndexRepairSQLs(context.Context) (*CheckpointIngestIndexRepairSQLs, error) + SaveCheckpointIngestIndexRepairSQLs(context.Context, *CheckpointIngestIndexRepairSQLs) error + ExistsCheckpointIngestIndexRepairSQLs(context.Context) (bool, error) +} + +type TableMetaManager[K KeyType, SV, LV ValueType, M any] struct { + se glue.Session + runnerSe glue.Session + dom *domain.Domain + dbName string +} + +func NewLogTableMetaManager( + g glue.Glue, + dom *domain.Domain, + dbName string, +) (LogMetaManagerT, error) { + se, err := g.CreateSession(dom.Store()) + if err != nil { + return nil, errors.Trace(err) + } + runnerSe, err := g.CreateSession(dom.Store()) + if err != nil { + return nil, errors.Trace(err) + } + return &TableMetaManager[ + LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore, + ]{ + se: se, + runnerSe: runnerSe, + dom: dom, + dbName: dbName, + }, nil +} + +func NewSnapshotTableMetaManager( + g glue.Glue, + dom *domain.Domain, + dbName string, +) (SnapshotMetaManagerT, error) { + se, err := g.CreateSession(dom.Store()) + if err != nil { + return nil, errors.Trace(err) + } + runnerSe, err := g.CreateSession(dom.Store()) + if err != nil { + return nil, errors.Trace(err) + } + return &TableMetaManager[ + RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore, + ]{ + se: se, + runnerSe: runnerSe, + dom: dom, + dbName: dbName, + }, nil +} + +func (manager *TableMetaManager[K, SV, LV, M]) String() string { + return fmt.Sprintf("databases[such as %s]", manager.dbName) +} + +func (manager *TableMetaManager[K, SV, LV, M]) Close() { + if manager.se != nil { + manager.se.Close() + } + if manager.runnerSe != nil { + manager.runnerSe.Close() + } +} + +// load the whole checkpoint range data and retrieve the metadata of restored ranges +// and return the total time cost in the past executions +func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData( + ctx context.Context, + fn func(K, LV), +) (time.Duration, error) { + execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor() + return selectCheckpointData(ctx, execCtx, manager.dbName, fn) +} + +func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointChecksum( + ctx context.Context, +) (map[int64]*ChecksumItem, time.Duration, error) { + execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor() + return selectCheckpointChecksum(ctx, execCtx, manager.dbName) +} + +func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointMetadata( + ctx context.Context, +) (*M, error) { + execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor() + m := new(M) + err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointMetaTableName, m) + return m, err +} + +func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointMetadata( + ctx context.Context, + meta *M, +) error { + err := initCheckpointTable(ctx, manager.se, manager.dbName, + []string{checkpointDataTableName, checkpointChecksumTableName}) + if err != nil { + return errors.Trace(err) + } + if meta != nil { + return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointMetaTableName, meta) + } + return nil +} + +func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata( + ctx context.Context, +) (bool, error) { + // we only check the existence of the checkpoint data table + // because the checkpoint metadata is not used for restore + return manager.dom.InfoSchema(). + TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointMetaTableName)), nil +} + +func (manager *TableMetaManager[K, SV, LV, M]) RemoveCheckpointData( + ctx context.Context, +) error { + return dropCheckpointTables(ctx, manager.dom, manager.se, manager.dbName, []string{ + checkpointDataTableName, + checkpointChecksumTableName, + checkpointMetaTableName, + checkpointProgressTableName, + checkpointIngestTableName, + }) +} + +func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointProgress( + ctx context.Context, +) (*CheckpointProgress, error) { + execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor() + m := &CheckpointProgress{} + err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointProgressTableName, m) + return m, errors.Trace(err) +} + +func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointProgress( + ctx context.Context, + meta *CheckpointProgress, +) error { + return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointProgressTableName, meta) +} + +func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointProgress( + ctx context.Context, +) (bool, error) { + return manager.dom.InfoSchema(). + TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointProgressTableName)), nil +} + +func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs( + ctx context.Context, +) (*CheckpointIngestIndexRepairSQLs, error) { + execCtx := manager.se.GetSessionCtx().GetRestrictedSQLExecutor() + m := &CheckpointIngestIndexRepairSQLs{} + err := selectCheckpointMeta(ctx, execCtx, manager.dbName, checkpointIngestTableName, m) + return m, errors.Trace(err) +} + +func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs( + ctx context.Context, + meta *CheckpointIngestIndexRepairSQLs, +) error { + return insertCheckpointMeta(ctx, manager.se, manager.dbName, checkpointIngestTableName, meta) +} + +func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs( + ctx context.Context, +) (bool, error) { + return manager.dom.InfoSchema(). + TableExists(ast.NewCIStr(manager.dbName), ast.NewCIStr(checkpointIngestTableName)), nil +} + +func (manager *TableMetaManager[K, SV, LV, M]) StartCheckpointRunner( + ctx context.Context, + cfg tickDurationConfig, + valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error), +) (*CheckpointRunner[K, SV], error) { + runner := newCheckpointRunner( + newTableCheckpointStorage(manager.runnerSe, manager.dbName), + nil, valueMarshaler) + manager.runnerSe = nil + // for restore, no need to set lock + runner.startCheckpointMainLoop( + ctx, + cfg.tickDurationForFlush, cfg.tickDurationForChecksum, 0, cfg.retryDuration) + return runner, nil +} + +type StorageMetaManager[K KeyType, SV, LV ValueType, M any] struct { + storage storage.ExternalStorage + cipher *backuppb.CipherInfo + clusterID string + taskName string +} + +func NewSnapshotStorageMetaManager( + storage storage.ExternalStorage, + cipher *backuppb.CipherInfo, + clusterID uint64, + prefix string, +) SnapshotMetaManagerT { + return &StorageMetaManager[ + RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore, + ]{ + storage: storage, + cipher: cipher, + clusterID: fmt.Sprintf("%d", clusterID), + taskName: fmt.Sprintf("%d/%s", clusterID, prefix), + } +} + +func NewLogStorageMetaManager( + storage storage.ExternalStorage, + cipher *backuppb.CipherInfo, + clusterID uint64, + prefix string, +) LogMetaManagerT { + return &StorageMetaManager[ + LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore, + ]{ + storage: storage, + cipher: cipher, + clusterID: fmt.Sprintf("%d", clusterID), + taskName: fmt.Sprintf("%d/%s", clusterID, prefix), + } +} + +func (manager *StorageMetaManager[K, SV, LV, M]) String() string { + return fmt.Sprintf("path[%s]", fmt.Sprintf(CheckpointRestoreDirFormat, manager.clusterID)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) Close() {} + +func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointData( + ctx context.Context, + fn func(K, LV), +) (time.Duration, error) { + return walkCheckpointFile(ctx, manager.storage, manager.cipher, getCheckpointDataDirByName(manager.taskName), fn) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointChecksum( + ctx context.Context, +) (map[int64]*ChecksumItem, time.Duration, error) { + return loadCheckpointChecksum(ctx, manager.storage, getCheckpointChecksumDirByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointMetadata( + ctx context.Context, +) (*M, error) { + m := new(M) + err := loadCheckpointMeta(ctx, manager.storage, getCheckpointMetaPathByName(manager.taskName), m) + return m, errors.Trace(err) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointMetadata( + ctx context.Context, + meta *M, +) error { + return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointMetaPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata( + ctx context.Context, +) (bool, error) { + return manager.storage.FileExists(ctx, getCheckpointMetaPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) RemoveCheckpointData( + ctx context.Context, +) error { + prefix := fmt.Sprintf(CheckpointRestoreDirFormat, manager.taskName) + return removeCheckpointData(ctx, manager.storage, prefix) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointProgress( + ctx context.Context, +) (*CheckpointProgress, error) { + m := &CheckpointProgress{} + err := loadCheckpointMeta(ctx, manager.storage, getCheckpointProgressPathByName(manager.taskName), m) + return m, errors.Trace(err) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointProgress( + ctx context.Context, + meta *CheckpointProgress, +) error { + return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointProgressPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointProgress( + ctx context.Context, +) (bool, error) { + return manager.storage.FileExists(ctx, getCheckpointProgressPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs( + ctx context.Context, +) (*CheckpointIngestIndexRepairSQLs, error) { + m := &CheckpointIngestIndexRepairSQLs{} + err := loadCheckpointMeta(ctx, manager.storage, getCheckpointIngestIndexPathByName(manager.taskName), m) + return m, errors.Trace(err) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs( + ctx context.Context, + meta *CheckpointIngestIndexRepairSQLs, +) error { + return saveCheckpointMetadata(ctx, manager.storage, meta, getCheckpointIngestIndexPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs( + ctx context.Context, +) (bool, error) { + return manager.storage.FileExists(ctx, getCheckpointIngestIndexPathByName(manager.taskName)) +} + +func (manager *StorageMetaManager[K, SV, LV, M]) StartCheckpointRunner( + ctx context.Context, + cfg tickDurationConfig, + valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error), +) (*CheckpointRunner[K, SV], error) { + checkpointStorage, err := newExternalCheckpointStorage( + ctx, manager.storage, nil, flushPathForRestore(manager.taskName)) + if err != nil { + return nil, errors.Trace(err) + } + runner := newCheckpointRunner(checkpointStorage, manager.cipher, valueMarshaler) + + // for restore, no need to set lock + runner.startCheckpointMainLoop( + ctx, + cfg.tickDurationForFlush, cfg.tickDurationForChecksum, 0, cfg.retryDuration) + return runner, nil +} diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index 18a429dc58961..3b3f2f987d87e 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -21,11 +21,7 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/util/sqlexec" ) type RestoreKeyType = int64 @@ -65,34 +61,23 @@ func valueMarshalerForRestore(group *RangeGroup[RestoreKeyType, RestoreValueType // only for test func StartCheckpointRestoreRunnerForTest( ctx context.Context, - se glue.Session, - dbName string, tick time.Duration, retryDuration time.Duration, + manager SnapshotMetaManagerT, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { - runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, dbName), - nil, valueMarshalerForRestore) - - runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration) - return runner, nil + cfg := DefaultTickDurationConfig() + cfg.tickDurationForChecksum = tick + cfg.tickDurationForFlush = tick + cfg.retryDuration = retryDuration + return manager.StartCheckpointRunner(ctx, cfg, valueMarshalerForRestore) } // Notice that the session is owned by the checkpoint runner, and it will be also closed by it. func StartCheckpointRunnerForRestore( ctx context.Context, - se glue.Session, - dbName string, + manager SnapshotMetaManagerT, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { - runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, dbName), - nil, valueMarshalerForRestore) - - // for restore, no need to set lock - runner.startCheckpointMainLoop( - ctx, - defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration) - return runner, nil + return manager.StartCheckpointRunner(ctx, DefaultTickDurationConfig(), valueMarshalerForRestore) } func AppendRangesForRestore( @@ -116,24 +101,6 @@ func AppendRangesForRestore( }) } -// load the whole checkpoint range data and retrieve the metadata of restored ranges -// and return the total time cost in the past executions -func LoadCheckpointDataForSstRestore[K KeyType, V ValueType]( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, - dbName string, - fn func(K, V), -) (time.Duration, error) { - return selectCheckpointData(ctx, execCtx, dbName, fn) -} - -func LoadCheckpointChecksumForRestore( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, -) (map[int64]*ChecksumItem, time.Duration, error) { - return selectCheckpointChecksum(ctx, execCtx, SnapshotRestoreCheckpointDatabaseName) -} - type CheckpointMetadataForSnapshotRestore struct { UpstreamClusterID uint64 `json:"upstream-cluster-id"` RestoredTS uint64 `json:"restored-ts"` @@ -142,45 +109,3 @@ type CheckpointMetadataForSnapshotRestore struct { RestoreUUID uuid.UUID `json:"restore-uuid"` } - -func LoadCheckpointMetadataForSnapshotRestore( - ctx context.Context, - execCtx sqlexec.RestrictedSQLExecutor, -) (*CheckpointMetadataForSnapshotRestore, error) { - m := &CheckpointMetadataForSnapshotRestore{} - err := selectCheckpointMeta(ctx, execCtx, SnapshotRestoreCheckpointDatabaseName, checkpointMetaTableName, m) - return m, err -} - -func SaveCheckpointMetadataForSstRestore( - ctx context.Context, - se glue.Session, - dbName string, - meta *CheckpointMetadataForSnapshotRestore, -) error { - err := initCheckpointTable(ctx, se, dbName, - []string{checkpointDataTableName, checkpointChecksumTableName}) - if err != nil { - return errors.Trace(err) - } - if meta != nil { - return insertCheckpointMeta(ctx, se, dbName, checkpointMetaTableName, meta) - } - return nil -} - -func ExistsSstRestoreCheckpoint( - ctx context.Context, - dom *domain.Domain, - dbName string, -) bool { - // we only check the existence of the checkpoint data table - // because the checkpoint metadata is not used for restore - return dom.InfoSchema(). - TableExists(ast.NewCIStr(dbName), ast.NewCIStr(checkpointDataTableName)) -} - -func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error { - return dropCheckpointTables(ctx, dom, se, dbName, - []string{checkpointDataTableName, checkpointChecksumTableName, checkpointMetaTableName}) -} diff --git a/br/pkg/metautil/metafile.go b/br/pkg/metautil/metafile.go index 018954683edf6..acedc82953182 100644 --- a/br/pkg/metautil/metafile.go +++ b/br/pkg/metautil/metafile.go @@ -58,11 +58,6 @@ const ( MetaV2 ) -// PitrIDMapsFilename is filename that used to save id maps in pitr. -func PitrIDMapsFilename(clusterID, restoreTS uint64) string { - return fmt.Sprintf("%s/pitr_id_map.cluster_id:%d.restored_ts:%d", "pitr_id_maps", clusterID, restoreTS) -} - // Encrypt encrypts the content according to CipherInfo. func Encrypt(content []byte, cipher *backuppb.CipherInfo) (encryptedContent, iv []byte, err error) { if len(content) == 0 || cipher == nil { diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 941b39b5aa492..831829cd92a99 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -50,7 +50,6 @@ go_library( "//pkg/util", "//pkg/util/codec", "//pkg/util/redact", - "//pkg/util/sqlexec", "@com_github_docker_go_units//:go-units", "@com_github_fatih_color//:color", "@com_github_gogo_protobuf//proto", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index a74a2b818f552..ac58204755054 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -67,7 +67,6 @@ import ( "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" tidbutil "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/config" kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" @@ -100,7 +99,7 @@ func NewLogRestoreManager( ctx context.Context, fileImporter *LogFileImporter, poolSize uint, - createCheckpointSessionFn func() (glue.Session, error), + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) (*LogRestoreManager, error) { // for compacted reason, user only set --concurrency for log file restore speed. log.Info("log restore worker pool", zap.Uint("size", poolSize)) @@ -108,13 +107,10 @@ func NewLogRestoreManager( fileImporter: fileImporter, workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"), } - se, err := createCheckpointSessionFn() - if err != nil { - return nil, errors.Trace(err) - } - if se != nil { - l.checkpointRunner, err = checkpoint.StartCheckpointRunnerForLogRestore(ctx, se) + if logCheckpointMetaManager != nil { + var err error + l.checkpointRunner, err = checkpoint.StartCheckpointRunnerForLogRestore(ctx, logCheckpointMetaManager) if err != nil { return nil, errors.Trace(err) } @@ -157,7 +153,7 @@ func NewSstRestoreManager( snapFileImporter *snapclient.SnapFileImporter, concurrencyPerStore uint, storeCount uint, - createCheckpointSessionFn func() (glue.Session, error), + sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT, ) (*SstRestoreManager, error) { var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] // This poolSize is similar to full restore, as both workflows are comparable. @@ -169,12 +165,9 @@ func NewSstRestoreManager( s := &SstRestoreManager{ workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"), } - se, err := createCheckpointSessionFn() - if err != nil { - return nil, errors.Trace(err) - } - if se != nil { - checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + if sstCheckpointMetaManager != nil { + var err error + checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, sstCheckpointMetaManager) if err != nil { return nil, errors.Trace(err) } @@ -520,7 +513,8 @@ func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) er func (rc *LogClient) InitClients( ctx context.Context, backend *backuppb.StorageBackend, - createSessionFn func() (glue.Session, error), + logCheckpointMetaManager checkpoint.LogMetaManagerT, + sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT, concurrency uint, concurrencyPerStore uint, ) error { @@ -536,7 +530,7 @@ func (rc *LogClient) InitClients( ctx, NewLogFileImporter(metaClient, importCli, backend), concurrency, - createSessionFn, + logCheckpointMetaManager, ) if err != nil { return errors.Trace(err) @@ -561,20 +555,24 @@ func (rc *LogClient) InitClients( snapFileImporter, concurrencyPerStore, uint(len(stores)), - createSessionFn, + sstCheckpointMetaManager, ) return errors.Trace(err) } func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore( ctx context.Context, + sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT, ) (map[string]struct{}, error) { sstCheckpointSets := make(map[string]struct{}) - if checkpoint.ExistsSstRestoreCheckpoint(ctx, rc.dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName) { + exists, err := sstCheckpointMetaManager.ExistsCheckpointMetadata(ctx) + if err != nil { + return nil, errors.Trace(err) + } + if exists { // we need to load the checkpoint data for the following restore - execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor() - _, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.CustomSSTRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) { + _, err = sstCheckpointMetaManager.LoadCheckpointData(ctx, func(tableID int64, v checkpoint.RestoreValueType) { sstCheckpointSets[v.Name] = struct{}{} }) if err != nil { @@ -582,7 +580,7 @@ func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore( } } else { // initialize the checkpoint metadata since it is the first time to restore. - err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.unsafeSession, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil) + err = sstCheckpointMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForSnapshotRestore{}) if err != nil { return nil, errors.Trace(err) } @@ -595,14 +593,19 @@ func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore( startTS, restoredTS uint64, gcRatio string, tiflashRecorder *tiflashrec.TiFlashRecorder, + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) (string, error) { rc.useCheckpoint = true // if the checkpoint metadata exists in the external storage, the restore is not // for the first time. - if checkpoint.ExistsLogRestoreCheckpointMetadata(ctx, rc.dom) { + exists, err := logCheckpointMetaManager.ExistsCheckpointMetadata(ctx) + if err != nil { + return "", errors.Trace(err) + } + if exists { // load the checkpoint since this is not the first time to restore - meta, err := checkpoint.LoadCheckpointMetadataForLogRestore(ctx, rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()) + meta, err := logCheckpointMetaManager.LoadCheckpointMetadata(ctx) if err != nil { return "", errors.Trace(err) } @@ -620,7 +623,7 @@ func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore( log.Info("save gc ratio into checkpoint metadata", zap.Uint64("start-ts", startTS), zap.Uint64("restored-ts", restoredTS), zap.Uint64("rewrite-ts", rc.currentTS), zap.String("gc-ratio", gcRatio), zap.Int("tiflash-item-count", len(items))) - if err := checkpoint.SaveCheckpointMetadataForLogRestore(ctx, rc.unsafeSession, &checkpoint.CheckpointMetadataForLogRestore{ + if err := logCheckpointMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForLogRestore{ UpstreamClusterID: rc.upstreamClusterID, RestoredTS: restoredTS, StartTS: startTS, @@ -1544,7 +1547,7 @@ func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper( func (rc *LogClient) WrapLogFilesIterWithSplitHelper( ctx context.Context, logIter LogIter, - execCtx sqlexec.RestrictedSQLExecutor, + logCheckpointMetaManager checkpoint.LogMetaManagerT, rules map[int64]*restoreutils.RewriteRules, updateStatsFn func(uint64, uint64), splitSize uint64, @@ -1554,7 +1557,7 @@ func (rc *LogClient) WrapLogFilesIterWithSplitHelper( wrapper := restore.PipelineRestorerWrapper[*LogDataFileInfo]{ PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys), } - strategy, err := NewLogSplitStrategy(ctx, rc.useCheckpoint, execCtx, rules, updateStatsFn) + strategy, err := NewLogSplitStrategy(ctx, rc.useCheckpoint, logCheckpointMetaManager, rules, updateStatsFn) if err != nil { return nil, errors.Trace(err) } @@ -1597,11 +1600,16 @@ const ( func (rc *LogClient) generateRepairIngestIndexSQLs( ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) ([]checkpoint.CheckpointIngestIndexRepairSQL, bool, error) { var sqls []checkpoint.CheckpointIngestIndexRepairSQL if rc.useCheckpoint { - if checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, rc.dom) { - checkpointSQLs, err := checkpoint.LoadCheckpointIngestIndexRepairSQLs(ctx, rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()) + exists, err := logCheckpointMetaManager.ExistsCheckpointIngestIndexRepairSQLs(ctx) + if err != nil { + return sqls, false, errors.Trace(err) + } + if exists { + checkpointSQLs, err := logCheckpointMetaManager.LoadCheckpointIngestIndexRepairSQLs(ctx) if err != nil { return sqls, false, errors.Trace(err) } @@ -1666,7 +1674,7 @@ func (rc *LogClient) generateRepairIngestIndexSQLs( } if rc.useCheckpoint && len(sqls) > 0 { - if err := checkpoint.SaveCheckpointIngestIndexRepairSQLs(ctx, rc.unsafeSession, &checkpoint.CheckpointIngestIndexRepairSQLs{ + if err := logCheckpointMetaManager.SaveCheckpointIngestIndexRepairSQLs(ctx, &checkpoint.CheckpointIngestIndexRepairSQLs{ SQLs: sqls, }); err != nil { return sqls, false, errors.Trace(err) @@ -1676,8 +1684,13 @@ func (rc *LogClient) generateRepairIngestIndexSQLs( } // RepairIngestIndex drops the indexes from IngestRecorder and re-add them. -func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue) error { - sqls, fromCheckpoint, err := rc.generateRepairIngestIndexSQLs(ctx, ingestRecorder) +func (rc *LogClient) RepairIngestIndex( + ctx context.Context, + ingestRecorder *ingestrec.IngestRecorder, + logCheckpointMetaManager checkpoint.LogMetaManagerT, + g glue.Glue, +) error { + sqls, fromCheckpoint, err := rc.generateRepairIngestIndexSQLs(ctx, ingestRecorder, logCheckpointMetaManager) if err != nil { return errors.Trace(err) } @@ -1850,12 +1863,13 @@ const PITRIdMapBlockSize int = 524288 func (rc *LogClient) SaveIdMapWithFailPoints( ctx context.Context, manager *stream.TableMappingManager, + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) error { failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) { failpoint.Return(errors.New("failpoint: failed before id maps saved")) }) - if err := rc.saveIDMap(ctx, manager); err != nil { + if err := rc.saveIDMap(ctx, manager, logCheckpointMetaManager); err != nil { return errors.Trace(err) } @@ -1869,6 +1883,7 @@ func (rc *LogClient) SaveIdMapWithFailPoints( func (rc *LogClient) saveIDMap( ctx context.Context, manager *stream.TableMappingManager, + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) error { backupmeta := &backuppb.BackupMeta{DbMaps: manager.ToProto()} data, err := proto.Marshal(backupmeta) @@ -1895,7 +1910,7 @@ func (rc *LogClient) saveIDMap( if rc.useCheckpoint { log.Info("save checkpoint task info with InLogRestoreAndIdMapPersist status") - if err := checkpoint.SaveCheckpointProgress(ctx, rc.unsafeSession, &checkpoint.CheckpointProgress{ + if err := logCheckpointMetaManager.SaveCheckpointProgress(ctx, &checkpoint.CheckpointProgress{ Progress: checkpoint.InLogRestoreAndIdMapPersisted, }); err != nil { return errors.Trace(err) diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 2d5059c74a5c6..cadef168cf53a 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -1429,7 +1429,7 @@ func TestPITRIDMap(t *testing.T) { baseTableMappingManager := &stream.TableMappingManager{ DBReplaceMap: getDBMap(), } - err = client.TEST_saveIDMap(ctx, baseTableMappingManager) + err = client.TEST_saveIDMap(ctx, baseTableMappingManager, nil) require.NoError(t, err) newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1) require.NoError(t, err) @@ -2073,7 +2073,7 @@ func TestRepairIngestIndex(t *testing.T) { "test", "repair_index_t1", tableInfo.ID, indexIDi2, "i2", "a", json.RawMessage(fmt.Sprintf("[%d, false, [], false]", indexIDi2)), ), false)) - require.NoError(t, client.RepairIngestIndex(ctx, ingestRecorder, g)) + require.NoError(t, client.RepairIngestIndex(ctx, ingestRecorder, nil, g)) infoschema = s.Mock.InfoSchema() table2, err := infoschema.TableByName(ctx, ast.NewCIStr("test"), ast.NewCIStr("repair_index_t1")) require.NoError(t, err) @@ -2128,7 +2128,10 @@ func TestRepairIngestIndexFromCheckpoint(t *testing.T) { _, err = tk.Exec("DROP DATABASE __TiDB_BR_Temporary_Log_Restore_Checkpoint") require.NoError(t, err) }() - require.NoError(t, checkpoint.SaveCheckpointIngestIndexRepairSQLs(ctx, se, &checkpoint.CheckpointIngestIndexRepairSQLs{ + logCheckpointMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName) + require.NoError(t, err) + defer logCheckpointMetaManager.Close() + require.NoError(t, logCheckpointMetaManager.SaveCheckpointIngestIndexRepairSQLs(ctx, &checkpoint.CheckpointIngestIndexRepairSQLs{ SQLs: []checkpoint.CheckpointIngestIndexRepairSQL{ { // from checkpoint and old index (i1) id is found @@ -2160,7 +2163,7 @@ func TestRepairIngestIndexFromCheckpoint(t *testing.T) { }, })) ingestRecorder := ingestrec.New() - require.NoError(t, client.RepairIngestIndex(ctx, ingestRecorder, g)) + require.NoError(t, client.RepairIngestIndex(ctx, ingestRecorder, logCheckpointMetaManager, g)) infoschema = s.Mock.InfoSchema() table2, err := infoschema.TableByName(ctx, ast.NewCIStr("test"), ast.NewCIStr("repair_index_t1")) require.NoError(t, err) diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 921b5e73d3629..251d37cc0fab8 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" @@ -64,8 +65,9 @@ func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup { func (rc *LogClient) TEST_saveIDMap( ctx context.Context, m *stream.TableMappingManager, + logCheckpointMetaManager checkpoint.LogMetaManagerT, ) error { - return rc.SaveIdMapWithFailPoints(ctx, m) + return rc.SaveIdMapWithFailPoints(ctx, m, logCheckpointMetaManager) } func (rc *LogClient) TEST_initSchemasMap( diff --git a/br/pkg/restore/log_client/log_split_strategy.go b/br/pkg/restore/log_client/log_split_strategy.go index 3327991db02fa..bbd48adb93b21 100644 --- a/br/pkg/restore/log_client/log_split_strategy.go +++ b/br/pkg/restore/log_client/log_split_strategy.go @@ -11,7 +11,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/summary" - "github.com/pingcap/tidb/pkg/util/sqlexec" "go.uber.org/zap" ) @@ -26,7 +25,7 @@ var _ split.SplitStrategy[*LogDataFileInfo] = &LogSplitStrategy{} func NewLogSplitStrategy( ctx context.Context, useCheckpoint bool, - execCtx sqlexec.RestrictedSQLExecutor, + logCheckpointMetaManager checkpoint.LogMetaManagerT, rules map[int64]*restoreutils.RewriteRules, updateStatsFn func(uint64, uint64), ) (*LogSplitStrategy, error) { @@ -36,8 +35,8 @@ func NewLogSplitStrategy( } skipMap := NewLogFilesSkipMap() if useCheckpoint { - t, err := checkpoint.LoadCheckpointDataForLogRestore( - ctx, execCtx, func(groupKey checkpoint.LogRestoreKeyType, off checkpoint.LogRestoreValueMarshaled) { + t, err := logCheckpointMetaManager.LoadCheckpointData( + ctx, func(groupKey checkpoint.LogRestoreKeyType, off checkpoint.LogRestoreValueMarshaled) { for tableID, foffs := range off.Foffs { // filter out the checkpoint data of dropped table if _, exists := downstreamIdset[tableID]; exists { diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index a20c0336a44f9..ebfacd0c2e8ca 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -319,7 +319,7 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl // storage. func (rc *SnapClient) InitCheckpoint( ctx context.Context, - g glue.Glue, store kv.Storage, + snapshotCheckpointMetaManager checkpoint.SnapshotMetaManagerT, config *pdutil.ClusterConfig, logRestoredTS uint64, checkpointFirstRun bool, @@ -328,9 +328,8 @@ func (rc *SnapClient) InitCheckpoint( checkpointSetWithTableID = make(map[int64]map[string]struct{}) if !checkpointFirstRun { - execCtx := rc.db.Session().GetSessionCtx().GetRestrictedSQLExecutor() // load the checkpoint since this is not the first time to restore - meta, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, execCtx) + meta, err := snapshotCheckpointMetaManager.LoadCheckpointMetadata(ctx) if err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } @@ -340,16 +339,16 @@ func (rc *SnapClient) InitCheckpoint( return checkpointSetWithTableID, nil, errors.Errorf( "The upstream cluster id[%d] of the current snapshot restore does not match that[%d] recorded in checkpoint. "+ "Perhaps you should specify the last full backup storage instead, "+ - "or just clean the checkpoint database[%s] if the cluster has been cleaned up.", - rc.backupMeta.ClusterId, meta.UpstreamClusterID, checkpoint.SnapshotRestoreCheckpointDatabaseName) + "or just clean the checkpoint %s if the cluster has been cleaned up.", + rc.backupMeta.ClusterId, meta.UpstreamClusterID, snapshotCheckpointMetaManager) } if meta.RestoredTS != rc.backupMeta.EndVersion { return checkpointSetWithTableID, nil, errors.Errorf( "The current snapshot restore want to restore cluster to the BackupTS[%d], which is different from that[%d] recorded in checkpoint. "+ "Perhaps you should specify the last full backup storage instead, "+ - "or just clean the checkpoint database[%s] if the cluster has been cleaned up.", - rc.backupMeta.EndVersion, meta.RestoredTS, checkpoint.SnapshotRestoreCheckpointDatabaseName, + "or just clean the checkpoint %s if the cluster has been cleaned up.", + rc.backupMeta.EndVersion, meta.RestoredTS, snapshotCheckpointMetaManager, ) } @@ -362,7 +361,7 @@ func (rc *SnapClient) InitCheckpoint( "The current PITR want to restore cluster to the log restored ts[%d], which is different from that[%d] recorded in checkpoint. "+ "Perhaps you shoud specify the log restored ts instead, "+ "or just clean the checkpoint database[%s] if the cluster has been cleaned up.", - logRestoredTS, meta.LogRestoredTS, checkpoint.LogRestoreCheckpointDatabaseName, + logRestoredTS, meta.LogRestoredTS, snapshotCheckpointMetaManager, ) } @@ -374,7 +373,7 @@ func (rc *SnapClient) InitCheckpoint( } // t1 is the latest time the checkpoint ranges persisted to the external storage. - t1, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.SnapshotRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) { + t1, err := snapshotCheckpointMetaManager.LoadCheckpointData(ctx, func(tableID int64, v checkpoint.RestoreValueType) { checkpointSet, exists := checkpointSetWithTableID[tableID] if !exists { checkpointSet = make(map[string]struct{}) @@ -386,7 +385,7 @@ func (rc *SnapClient) InitCheckpoint( return checkpointSetWithTableID, nil, errors.Trace(err) } // t2 is the latest time the checkpoint checksum persisted to the external storage. - checkpointChecksum, t2, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, execCtx) + checkpointChecksum, t2, err := snapshotCheckpointMetaManager.LoadCheckpointChecksum(ctx) if err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } @@ -411,16 +410,12 @@ func (rc *SnapClient) InitCheckpoint( if config != nil { meta.SchedulersConfig = &pdutil.ClusterConfig{Schedulers: config.Schedulers, ScheduleCfg: config.ScheduleCfg} } - if err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.db.Session(), checkpoint.SnapshotRestoreCheckpointDatabaseName, meta); err != nil { + if err := snapshotCheckpointMetaManager.SaveCheckpointMetadata(ctx, meta); err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } } - se, err := g.CreateSession(store) - if err != nil { - return checkpointSetWithTableID, nil, errors.Trace(err) - } - rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) + rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, snapshotCheckpointMetaManager) if err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 5305ae24a8904..300fd5cbae0e1 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -81,6 +81,9 @@ const ( // FlagKeyspaceName corresponds to tidb config keyspace-name FlagKeyspaceName = "keyspace-name" + // flagCheckpointStorage use + flagCheckpointStorage = "checkpoint-storage" + // FlagWaitTiFlashReady represents whether wait tiflash replica ready after table restored and checksumed. FlagWaitTiFlashReady = "wait-tiflash-ready" @@ -258,9 +261,14 @@ type RestoreConfig struct { PitrBatchSize uint32 `json:"pitr-batch-size" toml:"pitr-batch-size"` PitrConcurrency uint32 `json:"-" toml:"-"` - UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"` - upstreamClusterID uint64 `json:"-" toml:"-"` - WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"` + UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"` + CheckpointStorage string `json:"checkpoint-storage" toml:"checkpoint-storage"` + upstreamClusterID uint64 `json:"-" toml:"-"` + snapshotCheckpointMetaManager checkpoint.SnapshotMetaManagerT `json:"-" toml:"-"` + logCheckpointMetaManager checkpoint.LogMetaManagerT `json:"-" toml:"-"` + sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT `json:"-" toml:"-"` + + WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"` // for ebs-based restore FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"` @@ -293,6 +301,8 @@ func DefineRestoreFlags(flags *pflag.FlagSet) { flags.Bool(flagUseCheckpoint, true, "use checkpoint mode") _ = flags.MarkHidden(flagUseCheckpoint) + flags.String(flagCheckpointStorage, "", "specify the external storage url where checkpoint data is saved, eg, s3://bucket/path/prefix") + flags.Bool(FlagWaitTiFlashReady, false, "whether wait tiflash replica ready if tiflash exists") flags.Bool(flagAllowPITRFromIncremental, true, "whether make incremental restore compatible with later log restore"+ " default is true, the incremental restore will not perform rewrite on the incremental data"+ @@ -413,7 +423,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig if err != nil { return errors.Annotatef(err, "failed to get flag %s", flagUseCheckpoint) } - + cfg.CheckpointStorage, err = flags.GetString(flagCheckpointStorage) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", flagCheckpointStorage) + } cfg.WaitTiflashReady, err = flags.GetBool(FlagWaitTiFlashReady) if err != nil { return errors.Annotatef(err, "failed to get flag %s", FlagWaitTiFlashReady) @@ -544,6 +557,86 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { } } +func (cfg *RestoreConfig) newStorageCheckpointMetaManagerPITR( + ctx context.Context, + downstreamClusterID uint64, +) error { + _, checkpointStorage, err := GetStorage(ctx, cfg.CheckpointStorage, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + if len(cfg.FullBackupStorage) > 0 { + cfg.snapshotCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager( + checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "snapshot") + } + cfg.logCheckpointMetaManager = checkpoint.NewLogStorageMetaManager( + checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "log") + cfg.sstCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager( + checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "sst") + return nil +} + +func (cfg *RestoreConfig) newStorageCheckpointMetaManagerSnapshot( + ctx context.Context, + downstreamClusterID uint64, +) error { + if cfg.snapshotCheckpointMetaManager != nil { + return nil + } + _, checkpointStorage, err := GetStorage(ctx, cfg.CheckpointStorage, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + cfg.snapshotCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager( + checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "snapshot") + return nil +} + +func (cfg *RestoreConfig) newTableCheckpointMetaManagerPITR(g glue.Glue, dom *domain.Domain) (err error) { + if len(cfg.FullBackupStorage) > 0 { + if cfg.snapshotCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager( + g, dom, checkpoint.SnapshotRestoreCheckpointDatabaseName, + ); err != nil { + return errors.Trace(err) + } + } + if cfg.logCheckpointMetaManager, err = checkpoint.NewLogTableMetaManager( + g, dom, checkpoint.LogRestoreCheckpointDatabaseName, + ); err != nil { + return errors.Trace(err) + } + if cfg.sstCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager( + g, dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName, + ); err != nil { + return errors.Trace(err) + } + return nil +} + +func (cfg *RestoreConfig) newTableCheckpointMetaManagerSnapshot(g glue.Glue, dom *domain.Domain) (err error) { + if cfg.snapshotCheckpointMetaManager != nil { + return nil + } + if cfg.snapshotCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager( + g, dom, checkpoint.SnapshotRestoreCheckpointDatabaseName, + ); err != nil { + return errors.Trace(err) + } + return nil +} + +func (cfg *RestoreConfig) CloseCheckpointMetaManager() { + if cfg.logCheckpointMetaManager != nil { + cfg.logCheckpointMetaManager.Close() + } + if cfg.snapshotCheckpointMetaManager != nil { + cfg.snapshotCheckpointMetaManager.Close() + } + if cfg.sstCheckpointMetaManager != nil { + cfg.sstCheckpointMetaManager.Close() + } +} + func configureRestoreClient(ctx context.Context, client *snapclient.SnapClient, cfg *RestoreConfig) error { client.SetRateLimit(cfg.RateLimit) client.SetCrypter(&cfg.CipherInfo) @@ -709,6 +802,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(err) } defer mgr.Close() + defer cfg.CloseCheckpointMetaManager() defer printRestoreMetrics() @@ -732,32 +826,27 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Clear the checkpoint data if cfg.UseCheckpoint { - se, err := g.CreateSession(mgr.GetStorage()) - if err != nil { - log.Warn("failed to remove checkpoint data", zap.Error(err)) + if IsStreamRestore(cmdName) { + log.Info("start to remove checkpoint data for PITR restore") + err = cfg.logCheckpointMetaManager.RemoveCheckpointData(c) + if err != nil { + log.Warn("failed to remove checkpoint data for log restore", zap.Error(err)) + } + err = cfg.sstCheckpointMetaManager.RemoveCheckpointData(c) + if err != nil { + log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err)) + } + err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(c) + if err != nil { + log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) + } } else { - if IsStreamRestore(cmdName) { - log.Info("start to remove checkpoint data for PITR restore") - err = checkpoint.RemoveCheckpointDataForLogRestore(c, mgr.GetDomain(), se) - if err != nil { - log.Warn("failed to remove checkpoint data for log restore", zap.Error(err)) - } - err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) - if err != nil { - log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err)) - } - err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) - if err != nil { - log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) - } - } else { - err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) - if err != nil { - log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) - } + err = cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(c) + if err != nil { + log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) } - log.Info("all the checkpoint data is removed.") } + log.Info("all the checkpoint data is removed.") } return nil } @@ -850,9 +939,22 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } var checkpointFirstRun = true if cfg.UseCheckpoint { + if len(cfg.CheckpointStorage) > 0 { + clusterID := mgr.PDClient().GetClusterID(ctx) + if err = cfg.newStorageCheckpointMetaManagerSnapshot(ctx, clusterID); err != nil { + return errors.Trace(err) + } + } else { + if err = cfg.newTableCheckpointMetaManagerSnapshot(g, mgr.GetDomain()); err != nil { + return errors.Trace(err) + } + } // if the checkpoint metadata exists in the checkpoint storage, the restore is not // for the first time. - existsCheckpointMetadata := checkpoint.ExistsSstRestoreCheckpoint(ctx, mgr.GetDomain(), checkpoint.SnapshotRestoreCheckpointDatabaseName) + existsCheckpointMetadata, err := cfg.snapshotCheckpointMetaManager.ExistsCheckpointMetadata(ctx) + if err != nil { + return errors.Trace(err) + } checkpointFirstRun = !existsCheckpointMetadata } if err = VerifyDBAndTableInBackup(client.GetDatabases(), cfg.RestoreConfig); err != nil { @@ -950,7 +1052,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s client.InitFullClusterRestore(cfg.ExplicitFilter) } } else if client.IsFull() && checkpointFirstRun && cfg.CheckRequirements { - if err := checkTableExistence(ctx, mgr, tables, g); err != nil { + if err := checkTableExistence(ctx, mgr, tables); err != nil { canRestoreSchedulers = true return errors.Trace(err) } @@ -975,7 +1077,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s logRestoredTS = cfg.piTRTaskInfo.RestoreTS } sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint( - ctx, g, mgr.GetStorage(), schedulersConfig, logRestoredTS, checkpointFirstRun) + ctx, cfg.snapshotCheckpointMetaManager, schedulersConfig, logRestoredTS, checkpointFirstRun) if err != nil { return errors.Trace(err) } @@ -1349,7 +1451,7 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, return nil } -func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error { +func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table) error { message := "table already exists: " allUnique := true for _, table := range tables { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 82bbc44431cc0..512a8fe411d2a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1402,7 +1402,7 @@ func RunStreamRestore( type LogRestoreConfig struct { *RestoreConfig - checkpointTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore + checkpointTaskInfo *checkpoint.TaskInfoForLogRestore tableMappingManager *stream.TableMappingManager logClient *logclient.LogClient ddlFiles []logclient.Log @@ -1522,12 +1522,13 @@ func restoreStream( var sstCheckpointSets map[string]struct{} if cfg.UseCheckpoint { - gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldGCRatio, cfg.tiflashRecorder) + gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore( + ctx, cfg.StartTS, cfg.RestoreTS, oldGCRatio, cfg.tiflashRecorder, cfg.logCheckpointMetaManager) if err != nil { return errors.Trace(err) } oldGCRatio = gcRatioFromCheckpoint - sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx) + sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx, cfg.sstCheckpointMetaManager) if err != nil { return errors.Trace(err) } @@ -1614,7 +1615,7 @@ func restoreStream( return errors.Trace(err) } - logFilesIterWithSplit, err := client.WrapLogFilesIterWithSplitHelper(ctx, logFilesIter, execCtx, rewriteRules, updateStatsWithCheckpoint, splitSize, splitKeys) + logFilesIterWithSplit, err := client.WrapLogFilesIterWithSplitHelper(ctx, logFilesIter, cfg.logCheckpointMetaManager, rewriteRules, updateStatsWithCheckpoint, splitSize, splitKeys) if err != nil { return errors.Trace(err) } @@ -1660,7 +1661,7 @@ func restoreStream( } // index ingestion is not captured by regular log backup, so we need to manually ingest again - if err = client.RepairIngestIndex(ctx, ingestRecorder, g); err != nil { + if err = client.RepairIngestIndex(ctx, ingestRecorder, cfg.logCheckpointMetaManager, g); err != nil { return errors.Annotate(err, "failed to repair ingest index") } @@ -1723,16 +1724,7 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * client.SetCrypter(&cfg.CipherInfo) client.SetUpstreamClusterID(cfg.upstreamClusterID) - createCheckpointSessionFn := func() (glue.Session, error) { - // always create a new session for checkpoint runner - // because session is not thread safe - if cfg.UseCheckpoint { - return g.CreateSession(mgr.GetStorage()) - } - return nil, nil - } - - err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.PitrConcurrency), cfg.ConcurrencyPerStore.Value) + err = client.InitClients(ctx, u, cfg.logCheckpointMetaManager, cfg.sstCheckpointMetaManager, uint(cfg.PitrConcurrency), cfg.ConcurrencyPerStore.Value) if err != nil { return nil, errors.Trace(err) } @@ -1989,7 +1981,7 @@ func checkPiTRRequirements(mgr *conn.Mgr, hasExplicitFilter bool) error { } type PiTRTaskInfo struct { - CheckpointInfo *checkpoint.CheckpointTaskInfoForLogRestore + CheckpointInfo *checkpoint.TaskInfoForLogRestore RestoreTS uint64 NeedFullRestore bool FullRestoreCheckErr error @@ -2007,18 +1999,23 @@ func generatePiTRTaskInfo( ) (*PiTRTaskInfo, error) { var ( doFullRestore = len(cfg.FullBackupStorage) > 0 - curTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore + curTaskInfo *checkpoint.TaskInfoForLogRestore + err error ) checkInfo := &PiTRTaskInfo{} if cfg.UseCheckpoint { - se, err := g.CreateSession(mgr.GetStorage()) - if err != nil { - return nil, errors.Trace(err) + if len(cfg.CheckpointStorage) > 0 { + clusterID := mgr.GetPDClient().GetClusterID(ctx) + if err = cfg.newStorageCheckpointMetaManagerPITR(ctx, clusterID); err != nil { + return nil, errors.Trace(err) + } + } else { + if err = cfg.newTableCheckpointMetaManagerPITR(g, mgr.GetDomain()); err != nil { + return nil, errors.Trace(err) + } } - - execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor() - curTaskInfo, err = checkpoint.TryToGetCheckpointTaskInfo(ctx, mgr.GetDomain(), execCtx) + curTaskInfo, err = checkpoint.TryToGetCheckpointTaskInfo(ctx, cfg.snapshotCheckpointMetaManager, cfg.logCheckpointMetaManager) if err != nil { return checkInfo, errors.Trace(err) } @@ -2031,8 +2028,8 @@ func generatePiTRTaskInfo( return checkInfo, errors.Errorf( "The upstream cluster id[%d] of the current log restore does not match that[%d] recorded in checkpoint. "+ "Perhaps you should specify the last log backup storage instead, "+ - "or just clean the checkpoint database[%s] if the cluster has been cleaned up.", - cfg.upstreamClusterID, curTaskInfo.Metadata.UpstreamClusterID, checkpoint.LogRestoreCheckpointDatabaseName) + "or just clean the checkpoint %s if the cluster has been cleaned up.", + cfg.upstreamClusterID, curTaskInfo.Metadata.UpstreamClusterID, cfg.logCheckpointMetaManager) } if curTaskInfo.Metadata.StartTS != cfg.StartTS || curTaskInfo.Metadata.RestoredTS != cfg.RestoreTS { @@ -2041,8 +2038,8 @@ func generatePiTRTaskInfo( "which is different from that from %d to %d recorded in checkpoint. "+ "Perhaps you should specify the last full backup storage to match the start-ts and "+ "the parameter --restored-ts to match the restored-ts. "+ - "or just clean the checkpoint database[%s] if the cluster has been cleaned up.", - cfg.StartTS, cfg.RestoreTS, curTaskInfo.Metadata.StartTS, curTaskInfo.Metadata.RestoredTS, checkpoint.LogRestoreCheckpointDatabaseName, + "or just clean the checkpoint %s if the cluster has been cleaned up.", + cfg.StartTS, cfg.RestoreTS, curTaskInfo.Metadata.StartTS, curTaskInfo.Metadata.RestoredTS, cfg.logCheckpointMetaManager, ) } @@ -2095,7 +2092,7 @@ func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) err return nil } -func isCurrentIdMapSaved(checkpointTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore) bool { +func isCurrentIdMapSaved(checkpointTaskInfo *checkpoint.TaskInfoForLogRestore) bool { return checkpointTaskInfo != nil && checkpointTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersisted } @@ -2155,7 +2152,7 @@ func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient, if err != nil { return errors.Trace(err) } - if err = client.SaveIdMapWithFailPoints(ctx, tableMappingManager); err != nil { + if err = client.SaveIdMapWithFailPoints(ctx, tableMappingManager, cfg.logCheckpointMetaManager); err != nil { return errors.Trace(err) } }