Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"checkpoint.go",
"external_storage.go",
"log_restore.go",
"manager.go",
"restore.go",
"storage.go",
"ticker.go",
Expand Down Expand Up @@ -45,7 +46,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 13,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/checkpoint/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const (
CheckpointLockPathForBackup = CheckpointBackupDir + "/checkpoint.lock"
)

func flushPositionForBackup() flushPosition {
return flushPosition{
func flushPathForBackup() flushPath {
return flushPath{
CheckpointDataDir: CheckpointDataDirForBackup,
CheckpointChecksumDir: CheckpointChecksumDirForBackup,
CheckpointLockPath: CheckpointLockPathForBackup,
Expand All @@ -57,7 +57,7 @@ func StartCheckpointBackupRunnerForTest(
tick time.Duration,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPathForBackup())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -74,7 +74,7 @@ func StartCheckpointRunnerForBackup(
cipher *backuppb.CipherInfo,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPathForBackup())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

const CheckpointDir = "checkpoints"

type flushPosition struct {
type flushPath struct {
CheckpointDataDir string
CheckpointChecksumDir string
CheckpointLockPath string
Expand Down
267 changes: 147 additions & 120 deletions br/pkg/checkpoint/checkpoint_test.go

Large diffs are not rendered by default.

45 changes: 41 additions & 4 deletions br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,44 @@ import (
"go.uber.org/zap"
)

const (
CheckpointRestoreDirFormat = CheckpointDir + "/restore-%s"
CheckpointDataDirForRestoreFormat = CheckpointRestoreDirFormat + "/data"
CheckpointChecksumDirForRestoreFormat = CheckpointRestoreDirFormat + "/checksum"
CheckpointMetaPathForRestoreFormat = CheckpointRestoreDirFormat + "/checkpoint.meta"
CheckpointProgressPathForRestoreFormat = CheckpointRestoreDirFormat + "/progress.meta"
CheckpointIngestIndexPathForRestoreFormat = CheckpointRestoreDirFormat + "/ingest_index.meta"
)

func flushPathForRestore(taskName string) flushPath {
return flushPath{
CheckpointDataDir: getCheckpointDataDirByName(taskName),
CheckpointChecksumDir: getCheckpointChecksumDirByName(taskName),
}
}

func getCheckpointMetaPathByName(taskName string) string {
return fmt.Sprintf(CheckpointMetaPathForRestoreFormat, taskName)
}

func getCheckpointDataDirByName(taskName string) string {
return fmt.Sprintf(CheckpointDataDirForRestoreFormat, taskName)
}

func getCheckpointChecksumDirByName(taskName string) string {
return fmt.Sprintf(CheckpointChecksumDirForRestoreFormat, taskName)
}

func getCheckpointProgressPathByName(taskName string) string {
return fmt.Sprintf(CheckpointProgressPathForRestoreFormat, taskName)
}

func getCheckpointIngestIndexPathByName(taskName string) string {
return fmt.Sprintf(CheckpointIngestIndexPathForRestoreFormat, taskName)
}

type externalCheckpointStorage struct {
flushPosition
flushPath
storage storage.ExternalStorage

lockId uint64
Expand All @@ -43,11 +79,12 @@ func newExternalCheckpointStorage(
ctx context.Context,
s storage.ExternalStorage,
timer GlobalTimer,
flushPath flushPath,
) (*externalCheckpointStorage, error) {
checkpointStorage := &externalCheckpointStorage{
flushPosition: flushPositionForBackup(),
storage: s,
timer: timer,
flushPath: flushPath,
storage: s,
timer: timer,
}
if timer != nil {
if err := checkpointStorage.initialLock(ctx); err != nil {
Expand Down
151 changes: 30 additions & 121 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)

type LogRestoreKeyType = string
Expand Down Expand Up @@ -98,31 +96,21 @@ func newTableCheckpointStorage(se glue.Session, checkpointDBName string) *tableC
// only for test
func StartCheckpointLogRestoreRunnerForTest(
ctx context.Context,
se glue.Session,
tick time.Duration,
manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

runner.startCheckpointMainLoop(ctx, tick, tick, 0, defaultRetryDuration)
return runner, nil
cfg := DefaultTickDurationConfig()
cfg.tickDurationForChecksum = tick
cfg.tickDurationForFlush = tick
return manager.StartCheckpointRunner(ctx, cfg, valueMarshalerForLogRestore)
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration)
return runner, nil
return manager.StartCheckpointRunner(ctx, DefaultTickDurationConfig(), valueMarshalerForLogRestore)
}

func AppendRangeForLogRestore(
Expand All @@ -145,16 +133,6 @@ func AppendRangeForLogRestore(
})
}

// load the whole checkpoint range data and retrieve the metadata of restored ranges
// and return the total time cost in the past executions
func LoadCheckpointDataForLogRestore[K KeyType, V ValueType](
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
fn func(K, V),
) (time.Duration, error) {
return selectCheckpointData(ctx, execCtx, LogRestoreCheckpointDatabaseName, fn)
}

type CheckpointMetadataForLogRestore struct {
UpstreamClusterID uint64 `json:"upstream-cluster-id"`
RestoredTS uint64 `json:"restored-ts"`
Expand All @@ -165,35 +143,6 @@ type CheckpointMetadataForLogRestore struct {
TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}

func LoadCheckpointMetadataForLogRestore(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointMetadataForLogRestore, error) {
m := &CheckpointMetadataForLogRestore{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, m)
return m, err
}

func SaveCheckpointMetadataForLogRestore(
ctx context.Context,
se glue.Session,
meta *CheckpointMetadataForLogRestore,
) error {
err := initCheckpointTable(ctx, se, LogRestoreCheckpointDatabaseName, []string{checkpointDataTableName})
if err != nil {
return errors.Trace(err)
}
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, meta)
}

func ExistsLogRestoreCheckpointMetadata(
ctx context.Context,
dom *domain.Domain,
) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointMetaTableName))
}

// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persisted into external storage, the snapshot restore and
Expand Down Expand Up @@ -229,34 +178,9 @@ type CheckpointProgress struct {
Progress RestoreProgress `json:"progress"`
}

func LoadCheckpointProgress(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointProgress, error) {
m := &CheckpointProgress{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, m)
return m, errors.Trace(err)
}

func SaveCheckpointProgress(
ctx context.Context,
se glue.Session,
meta *CheckpointProgress,
) error {
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, meta)
}

func ExistsCheckpointProgress(
ctx context.Context,
dom *domain.Domain,
) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
type TaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
// the progress for this task
Expand All @@ -265,32 +189,44 @@ type CheckpointTaskInfoForLogRestore struct {

func TryToGetCheckpointTaskInfo(
ctx context.Context,
dom *domain.Domain,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointTaskInfoForLogRestore, error) {
snapshotManager SnapshotMetaManagerT,
logManager LogMetaManagerT,
) (*TaskInfoForLogRestore, error) {
var (
metadata *CheckpointMetadataForLogRestore
progress RestoreProgress
err error

hasSnapshotMetadata bool = false
)
// get the progress
if ExistsCheckpointProgress(ctx, dom) {
checkpointProgress, err := LoadCheckpointProgress(ctx, execCtx)
if exists, err := logManager.ExistsCheckpointProgress(ctx); err != nil {
return nil, errors.Trace(err)
} else if exists {
checkpointProgress, err := logManager.LoadCheckpointProgress(ctx)
if err != nil {
return nil, errors.Trace(err)
}
progress = checkpointProgress.Progress
}
// get the checkpoint metadata
if ExistsLogRestoreCheckpointMetadata(ctx, dom) {
metadata, err = LoadCheckpointMetadataForLogRestore(ctx, execCtx)
if exists, err := logManager.ExistsCheckpointMetadata(ctx); err != nil {
return nil, errors.Trace(err)
} else if exists {
metadata, err = logManager.LoadCheckpointMetadata(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}
hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName)
// exists the snapshot checkpoint metadata
if snapshotManager != nil {
existsSnapshotMetadata, err := snapshotManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return nil, errors.Trace(err)
}
hasSnapshotMetadata = existsSnapshotMetadata
}

return &CheckpointTaskInfoForLogRestore{
return &TaskInfoForLogRestore{
Metadata: metadata,
HasSnapshotMetadata: hasSnapshotMetadata,
Progress: progress,
Expand All @@ -312,30 +248,3 @@ type CheckpointIngestIndexRepairSQL struct {
type CheckpointIngestIndexRepairSQLs struct {
SQLs []CheckpointIngestIndexRepairSQL
}

func LoadCheckpointIngestIndexRepairSQLs(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointIngestIndexRepairSQLs, error) {
m := &CheckpointIngestIndexRepairSQLs{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, m)
return m, errors.Trace(err)
}

func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointIngestTableName))
}

func SaveCheckpointIngestIndexRepairSQLs(
ctx context.Context,
se glue.Session,
meta *CheckpointIngestIndexRepairSQLs,
) error {
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, meta)
}

func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error {
return dropCheckpointTables(ctx, dom, se, LogRestoreCheckpointDatabaseName,
[]string{checkpointDataTableName, checkpointMetaTableName, checkpointProgressTableName, checkpointIngestTableName})
}
Loading