Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"checkpoint.go",
"external_storage.go",
"log_restore.go",
"manager.go",
"restore.go",
"storage.go",
"ticker.go",
Expand Down Expand Up @@ -45,7 +46,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 13,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/checkpoint/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func StartCheckpointBackupRunnerForTest(
tick time.Duration,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPositionForBackup())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -74,7 +74,7 @@ func StartCheckpointRunnerForBackup(
cipher *backuppb.CipherInfo,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer, flushPositionForBackup())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
267 changes: 147 additions & 120 deletions br/pkg/checkpoint/checkpoint_test.go

Large diffs are not rendered by default.

39 changes: 38 additions & 1 deletion br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,42 @@ import (
"go.uber.org/zap"
)

const (
CheckpointRestoreDirFormat = CheckpointDir + "/restore-%s"
CheckpointDataDirForRestoreFormat = CheckpointRestoreDirFormat + "/data"
CheckpointChecksumDirForRestoreFormat = CheckpointRestoreDirFormat + "/checksum"
CheckpointMetaPathForRestoreFormat = CheckpointRestoreDirFormat + "/checkpoint.meta"
CheckpointProgressPathForRestoreFormat = CheckpointRestoreDirFormat + "/progress.meta"
CheckpointIngestIndexPathForRestoreFormat = CheckpointRestoreDirFormat + "/ingest_index.meta"
)

func flushPositionForRestore(taskName string) flushPosition {
return flushPosition{
CheckpointDataDir: getCheckpointDataDirByName(taskName),
CheckpointChecksumDir: getCheckpointChecksumDirByName(taskName),
}
}

func getCheckpointMetaPathByName(taskName string) string {
return fmt.Sprintf(CheckpointMetaPathForRestoreFormat, taskName)
}

func getCheckpointDataDirByName(taskName string) string {
return fmt.Sprintf(CheckpointDataDirForRestoreFormat, taskName)
}

func getCheckpointChecksumDirByName(taskName string) string {
return fmt.Sprintf(CheckpointChecksumDirForRestoreFormat, taskName)
}

func getCheckpointProgressPathByName(taskName string) string {
return fmt.Sprintf(CheckpointProgressPathForRestoreFormat, taskName)
}

func getCheckpointIngestIndexPathByName(taskName string) string {
return fmt.Sprintf(CheckpointIngestIndexPathForRestoreFormat, taskName)
}

type externalCheckpointStorage struct {
flushPosition
storage storage.ExternalStorage
Expand All @@ -43,9 +79,10 @@ func newExternalCheckpointStorage(
ctx context.Context,
s storage.ExternalStorage,
timer GlobalTimer,
flushPosition flushPosition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe flushLocation or flushPath. position usually refers to somewhere internal a file if I recall right (consider the seek and open syscall.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, maybe flushTarget, flushDestination etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can also put s storage.ExternalStorage inside the flushPosition? feels like they are related

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Actually flushPath is embedded in externalCheckpointStorage. Maybe we can regard externalCheckpointStorage as their same root.

type externalCheckpointStorage struct {
	flushPath
	storage storage.ExternalStorage

        // ...
}

) (*externalCheckpointStorage, error) {
checkpointStorage := &externalCheckpointStorage{
flushPosition: flushPositionForBackup(),
flushPosition: flushPosition,
storage: s,
timer: timer,
}
Expand Down
145 changes: 24 additions & 121 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)

type LogRestoreKeyType = string
Expand Down Expand Up @@ -98,31 +96,21 @@ func newTableCheckpointStorage(se glue.Session, checkpointDBName string) *tableC
// only for test
func StartCheckpointLogRestoreRunnerForTest(
ctx context.Context,
se glue.Session,
tick time.Duration,
manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

runner.startCheckpointMainLoop(ctx, tick, tick, 0, defaultRetryDuration)
return runner, nil
cfg := DefaultTickDurationConfig()
cfg.tickDurationForChecksum = tick
cfg.tickDurationForFlush = tick
return manager.StartCheckpointRunner(ctx, cfg, valueMarshalerForLogRestore)
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration)
return runner, nil
return manager.StartCheckpointRunner(ctx, DefaultTickDurationConfig(), valueMarshalerForLogRestore)
}

func AppendRangeForLogRestore(
Expand All @@ -145,16 +133,6 @@ func AppendRangeForLogRestore(
})
}

// load the whole checkpoint range data and retrieve the metadata of restored ranges
// and return the total time cost in the past executions
func LoadCheckpointDataForLogRestore[K KeyType, V ValueType](
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
fn func(K, V),
) (time.Duration, error) {
return selectCheckpointData(ctx, execCtx, LogRestoreCheckpointDatabaseName, fn)
}

type CheckpointMetadataForLogRestore struct {
UpstreamClusterID uint64 `json:"upstream-cluster-id"`
RestoredTS uint64 `json:"restored-ts"`
Expand All @@ -165,35 +143,6 @@ type CheckpointMetadataForLogRestore struct {
TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}

func LoadCheckpointMetadataForLogRestore(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointMetadataForLogRestore, error) {
m := &CheckpointMetadataForLogRestore{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, m)
return m, err
}

func SaveCheckpointMetadataForLogRestore(
ctx context.Context,
se glue.Session,
meta *CheckpointMetadataForLogRestore,
) error {
err := initCheckpointTable(ctx, se, LogRestoreCheckpointDatabaseName, []string{checkpointDataTableName})
if err != nil {
return errors.Trace(err)
}
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointMetaTableName, meta)
}

func ExistsLogRestoreCheckpointMetadata(
ctx context.Context,
dom *domain.Domain,
) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointMetaTableName))
}

// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persisted into external storage, the snapshot restore and
Expand Down Expand Up @@ -229,34 +178,9 @@ type CheckpointProgress struct {
Progress RestoreProgress `json:"progress"`
}

func LoadCheckpointProgress(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointProgress, error) {
m := &CheckpointProgress{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, m)
return m, errors.Trace(err)
}

func SaveCheckpointProgress(
ctx context.Context,
se glue.Session,
meta *CheckpointProgress,
) error {
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointProgressTableName, meta)
}

func ExistsCheckpointProgress(
ctx context.Context,
dom *domain.Domain,
) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
type TaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
// the progress for this task
Expand All @@ -265,32 +189,38 @@ type CheckpointTaskInfoForLogRestore struct {

func TryToGetCheckpointTaskInfo(
ctx context.Context,
dom *domain.Domain,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointTaskInfoForLogRestore, error) {
snapshotManager SnapshotMetaManagerT,
logManager LogMetaManagerT,
) (*TaskInfoForLogRestore, error) {
var (
metadata *CheckpointMetadataForLogRestore
progress RestoreProgress
err error
)
// get the progress
if ExistsCheckpointProgress(ctx, dom) {
checkpointProgress, err := LoadCheckpointProgress(ctx, execCtx)
if exists, err := logManager.ExistsCheckpointProgress(ctx); err != nil {
return nil, errors.Trace(err)
} else if exists {
checkpointProgress, err := logManager.LoadCheckpointProgress(ctx)
if err != nil {
return nil, errors.Trace(err)
}
progress = checkpointProgress.Progress
}
// get the checkpoint metadata
if ExistsLogRestoreCheckpointMetadata(ctx, dom) {
metadata, err = LoadCheckpointMetadataForLogRestore(ctx, execCtx)
if exists, err := logManager.ExistsCheckpointMetadata(ctx); err != nil {
return nil, errors.Trace(err)
} else if exists {
metadata, err = logManager.LoadCheckpointMetadata(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}
hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName)
hasSnapshotMetadata, err := snapshotManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return nil, errors.Trace(err)
}

return &CheckpointTaskInfoForLogRestore{
return &TaskInfoForLogRestore{
Metadata: metadata,
HasSnapshotMetadata: hasSnapshotMetadata,
Progress: progress,
Expand All @@ -312,30 +242,3 @@ type CheckpointIngestIndexRepairSQL struct {
type CheckpointIngestIndexRepairSQLs struct {
SQLs []CheckpointIngestIndexRepairSQL
}

func LoadCheckpointIngestIndexRepairSQLs(
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointIngestIndexRepairSQLs, error) {
m := &CheckpointIngestIndexRepairSQLs{}
err := selectCheckpointMeta(ctx, execCtx, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, m)
return m, errors.Trace(err)
}

func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool {
return dom.InfoSchema().
TableExists(ast.NewCIStr(LogRestoreCheckpointDatabaseName), ast.NewCIStr(checkpointIngestTableName))
}

func SaveCheckpointIngestIndexRepairSQLs(
ctx context.Context,
se glue.Session,
meta *CheckpointIngestIndexRepairSQLs,
) error {
return insertCheckpointMeta(ctx, se, LogRestoreCheckpointDatabaseName, checkpointIngestTableName, meta)
}

func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error {
return dropCheckpointTables(ctx, dom, se, LogRestoreCheckpointDatabaseName,
[]string{checkpointDataTableName, checkpointMetaTableName, checkpointProgressTableName, checkpointIngestTableName})
}
Loading