Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2bd7ac0
first version
Tristan1900 Mar 27, 2025
2336b97
fix tests
Tristan1900 Mar 27, 2025
0e63777
fix test
Tristan1900 Mar 27, 2025
7f460c6
some more fix
Tristan1900 Mar 27, 2025
57977bd
fix unit test
Tristan1900 Mar 28, 2025
bd28d82
some more fix
Tristan1900 Mar 28, 2025
49feca5
fix unit test
Tristan1900 Mar 28, 2025
ff610a7
fix some test issues, add heartbeat, use one single trasnaction
Tristan1900 May 23, 2025
188a45a
id map compatble, delete db/table after restore
Tristan1900 May 28, 2025
117c262
some fixes and tests
Tristan1900 May 28, 2025
83c90c9
fix a test and some comments
Tristan1900 May 28, 2025
2061a56
fix test and sytle
Tristan1900 May 29, 2025
ff73ec0
fix test, address comments, backward compatible
Tristan1900 May 30, 2025
0a306e7
fix nil pointer...
Tristan1900 May 30, 2025
7fb2590
Merge branch 'master' into concurent-restore
Leavrth May 30, 2025
99abdd3
fix test
Leavrth May 30, 2025
19ead02
fix test
Leavrth May 30, 2025
292e622
bazel prepare
Leavrth May 30, 2025
89e5a05
move table to mysql db
Tristan1900 May 31, 2025
145926b
fix test
Tristan1900 May 31, 2025
14afb10
bazel
Tristan1900 May 31, 2025
caee50f
fix a few more tests
Tristan1900 Jun 1, 2025
b97beac
add some log, revert auto id cache, fix test
Tristan1900 Jun 1, 2025
21e5ed5
revert a fix, see if works
Tristan1900 Jun 2, 2025
72933a9
fix tests
Tristan1900 Jun 2, 2025
6e1d8d1
finally fix the unit test, and try to fix the brietest
Tristan1900 Jun 2, 2025
ab5cc78
one final fix
Tristan1900 Jun 2, 2025
c360107
looks like a lot more unit test are impacted by adding a new table
Tristan1900 Jun 2, 2025
90fa6fa
one more
Tristan1900 Jun 2, 2025
8ddcc12
one more...
Tristan1900 Jun 2, 2025
7117a46
uncomment test, minor tweaks
Tristan1900 Jun 2, 2025
48bf9da
cleanup
Tristan1900 Jun 2, 2025
026aa8c
address naming comments
Tristan1900 Jun 3, 2025
d283e4b
missed change to rename, and make test stable
Tristan1900 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ func TestCheckpointMetaForRestoreOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
defer logMetaManager.Close()
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
}

func TestCheckpointMetaForRestoreOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer logMetaManager.Close()
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
Expand Down Expand Up @@ -137,7 +137,7 @@ func testCheckpointMetaForRestore(
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
taskInfo, err := checkpoint.GetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
require.NoError(t, err)
require.Equal(t, uint64(123), taskInfo.Metadata.UpstreamClusterID)
require.Equal(t, uint64(222), taskInfo.Metadata.RestoredTS)
Expand Down Expand Up @@ -302,15 +302,15 @@ func TestCheckpointRestoreRunnerOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRestoreRunner(t, snapshotMetaManager)
}

func TestCheckpointRestoreRunnerOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRestoreRunner(t, snapshotMetaManager)
Expand Down Expand Up @@ -411,15 +411,15 @@ func TestCheckpointRunnerRetryOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRunnerRetry(t, snapshotMetaManager)
}

func TestCheckpointRunnerRetryOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRunnerRetry(t, snapshotMetaManager)
Expand Down Expand Up @@ -478,15 +478,15 @@ func TestCheckpointRunnerNoRetryOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
defer snapshotMetaManager.Close()
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
}

func TestCheckpointRunnerNoRetryOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer snapshotMetaManager.Close()
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
Expand Down Expand Up @@ -533,15 +533,15 @@ func TestCheckpointLogRestoreRunnerOnStorage(t *testing.T) {
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
defer logMetaManager.Close()
testCheckpointLogRestoreRunner(t, logMetaManager)
}

func TestCheckpointLogRestoreRunnerOnTable(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
require.NoError(t, err)
defer logMetaManager.Close()
testCheckpointLogRestoreRunner(t, logMetaManager)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *TaskInfoForLogRestore) IdMapSaved() bool {
return t.Progress == InLogRestoreAndIdMapPersisted
}

func TryToGetCheckpointTaskInfo(
func GetCheckpointTaskInfo(
ctx context.Context,
snapshotManager SnapshotMetaManagerT,
logManager LogMetaManagerT,
Expand Down
14 changes: 9 additions & 5 deletions br/pkg/checkpoint/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func NewLogTableMetaManager(
g glue.Glue,
dom *domain.Domain,
dbName string,
restoreID uint64,
) (LogMetaManagerT, error) {
se, err := g.CreateSession(dom.Store())
if err != nil {
Expand All @@ -107,14 +108,15 @@ func NewLogTableMetaManager(
se: se,
runnerSe: runnerSe,
dom: dom,
dbName: dbName,
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
}, nil
}

func NewSnapshotTableMetaManager(
g glue.Glue,
dom *domain.Domain,
dbName string,
restoreID uint64,
) (SnapshotMetaManagerT, error) {
se, err := g.CreateSession(dom.Store())
if err != nil {
Expand All @@ -130,7 +132,7 @@ func NewSnapshotTableMetaManager(
se: se,
runnerSe: runnerSe,
dom: dom,
dbName: dbName,
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
}, nil
}

Expand All @@ -147,7 +149,7 @@ func (manager *TableMetaManager[K, SV, LV, M]) Close() {
}
}

// load the whole checkpoint range data and retrieve the metadata of restored ranges
// LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges
// and return the total time cost in the past executions
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
ctx context.Context,
Expand Down Expand Up @@ -287,14 +289,15 @@ func NewSnapshotStorageMetaManager(
cipher *backuppb.CipherInfo,
clusterID uint64,
prefix string,
restoreID uint64,
) SnapshotMetaManagerT {
return &StorageMetaManager[
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
]{
storage: storage,
cipher: cipher,
clusterID: fmt.Sprintf("%d", clusterID),
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
}
}

Expand All @@ -303,14 +306,15 @@ func NewLogStorageMetaManager(
cipher *backuppb.CipherInfo,
clusterID uint64,
prefix string,
restoreID uint64,
) LogMetaManagerT {
return &StorageMetaManager[
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
]{
storage: storage,
cipher: cipher,
clusterID: fmt.Sprintf("%d", clusterID),
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
}
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/checkpoint/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -89,10 +90,11 @@ const (
)

// IsCheckpointDB checks whether the dbname is checkpoint database.
func IsCheckpointDB(dbname ast.CIStr) bool {
return dbname.O == LogRestoreCheckpointDatabaseName ||
dbname.O == SnapshotRestoreCheckpointDatabaseName ||
dbname.O == CustomSSTRestoreCheckpointDatabaseName
func IsCheckpointDB(dbname string) bool {
// Check if the database name starts with any of the checkpoint database name prefixes
return strings.HasPrefix(dbname, LogRestoreCheckpointDatabaseName) ||
strings.HasPrefix(dbname, SnapshotRestoreCheckpointDatabaseName) ||
strings.HasPrefix(dbname, CustomSSTRestoreCheckpointDatabaseName)
}

const CheckpointIdMapBlockSize int = 524288
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/registry/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "registry",
srcs = [
"heartbeat.go",
"registration.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/registry",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/metautil",
"//br/pkg/utils",
"//pkg/domain",
"//pkg/kv",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)
115 changes: 115 additions & 0 deletions br/pkg/registry/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
UpdateHeartbeatSQLTemplate = `
UPDATE %s.%s
SET last_heartbeat_time = %%?
WHERE id = %%?`

// defaultHeartbeatIntervalSeconds is the default interval in seconds between heartbeat updates
defaultHeartbeatIntervalSeconds = 60
)

// UpdateHeartbeat updates the last_heartbeat_time timestamp for a task
func (r *Registry) UpdateHeartbeat(ctx context.Context, restoreID uint64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of introduce heartbeat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to give use an idea of whether this restore task is orphaned or not. Cuz in the case of a running BR pod OOM there is no chance for it to clean up the table and restore task, so user might need to manually clean it up but such case should be very rare.

currentTime := time.Now()
updateSQL := fmt.Sprintf(UpdateHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)

if err := r.heartbeatSession.ExecuteInternal(ctx, updateSQL, currentTime, restoreID); err != nil {
return errors.Annotatef(err, "failed to update heartbeat for task %d", restoreID)
}

log.Debug("updated task heartbeat",
zap.Uint64("restore_id", restoreID),
zap.Time("timestamp", currentTime))

return nil
}

// HeartbeatManager handles periodic heartbeat updates for a restore task
// it only updates the restore task but will not remove any stalled tasks, the purpose of this logic is to provide
// some insights to user of the task status
type HeartbeatManager struct {
registry *Registry
restoreID uint64
interval time.Duration
stopCh chan struct{}
doneCh chan struct{}
}

// NewHeartbeatManager creates a new heartbeat manager for the given restore task
func NewHeartbeatManager(registry *Registry, restoreID uint64) *HeartbeatManager {
return &HeartbeatManager{
registry: registry,
restoreID: restoreID,
interval: time.Duration(defaultHeartbeatIntervalSeconds) * time.Second,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}

// Start begins the heartbeat background process
func (m *HeartbeatManager) Start(ctx context.Context) {
go func() {
defer close(m.doneCh)

ticker := time.NewTicker(m.interval)
defer ticker.Stop()

// send an initial heartbeat
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
log.Warn("failed to send initial heartbeat",
zap.Uint64("restore_id", m.restoreID),
zap.Error(err))
}

for {
select {
case <-ticker.C:
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If heartbeat is expired, will it override it?

Copy link
Contributor Author

@Tristan1900 Tristan1900 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is just periodically publishing a new timestamp, it doesn't do anything else, just a hint to user of the status of this task and they can decide it's live or an orphan

log.Warn("failed to update heartbeat",
zap.Uint64("restore_id", m.restoreID),
zap.Error(err))
}
case <-m.stopCh:
return
case <-ctx.Done():
log.Warn("heartbeat manager context done",
zap.Uint64("restore_id", m.restoreID),
zap.Error(ctx.Err()))
return
}
}
}()
}

// Stop ends the heartbeat background process
func (m *HeartbeatManager) Stop() {
close(m.stopCh)
<-m.doneCh // Wait for goroutine to exit
log.Info("stopped heartbeat manager")
}
Loading
Loading