Skip to content

Commit 9205605

Browse files
Tristan1900joechenrh
authored andcommitted
br: enable parallel restore (pingcap#58724)
close pingcap#58725
1 parent dd80f87 commit 9205605

File tree

36 files changed

+1616
-120
lines changed

36 files changed

+1616
-120
lines changed

br/pkg/checkpoint/checkpoint_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,20 @@ func TestCheckpointMetaForRestoreOnStorage(t *testing.T) {
6161
base := t.TempDir()
6262
s, err := storage.NewLocalStorage(base)
6363
require.NoError(t, err)
64-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
64+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
6565
defer snapshotMetaManager.Close()
66-
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
66+
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
6767
defer logMetaManager.Close()
6868
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
6969
}
7070

7171
func TestCheckpointMetaForRestoreOnTable(t *testing.T) {
7272
s := utiltest.CreateRestoreSchemaSuite(t)
7373
g := gluetidb.New()
74-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
74+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
7575
require.NoError(t, err)
7676
defer snapshotMetaManager.Close()
77-
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
77+
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
7878
require.NoError(t, err)
7979
defer logMetaManager.Close()
8080
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
@@ -137,7 +137,7 @@ func testCheckpointMetaForRestore(
137137
require.NoError(t, err)
138138
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)
139139

140-
taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
140+
taskInfo, err := checkpoint.GetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
141141
require.NoError(t, err)
142142
require.Equal(t, uint64(123), taskInfo.Metadata.UpstreamClusterID)
143143
require.Equal(t, uint64(222), taskInfo.Metadata.RestoredTS)
@@ -302,15 +302,15 @@ func TestCheckpointRestoreRunnerOnStorage(t *testing.T) {
302302
base := t.TempDir()
303303
s, err := storage.NewLocalStorage(base)
304304
require.NoError(t, err)
305-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
305+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
306306
defer snapshotMetaManager.Close()
307307
testCheckpointRestoreRunner(t, snapshotMetaManager)
308308
}
309309

310310
func TestCheckpointRestoreRunnerOnTable(t *testing.T) {
311311
s := utiltest.CreateRestoreSchemaSuite(t)
312312
g := gluetidb.New()
313-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
313+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
314314
require.NoError(t, err)
315315
defer snapshotMetaManager.Close()
316316
testCheckpointRestoreRunner(t, snapshotMetaManager)
@@ -411,15 +411,15 @@ func TestCheckpointRunnerRetryOnStorage(t *testing.T) {
411411
base := t.TempDir()
412412
s, err := storage.NewLocalStorage(base)
413413
require.NoError(t, err)
414-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
414+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
415415
defer snapshotMetaManager.Close()
416416
testCheckpointRunnerRetry(t, snapshotMetaManager)
417417
}
418418

419419
func TestCheckpointRunnerRetryOnTable(t *testing.T) {
420420
s := utiltest.CreateRestoreSchemaSuite(t)
421421
g := gluetidb.New()
422-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
422+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
423423
require.NoError(t, err)
424424
defer snapshotMetaManager.Close()
425425
testCheckpointRunnerRetry(t, snapshotMetaManager)
@@ -478,15 +478,15 @@ func TestCheckpointRunnerNoRetryOnStorage(t *testing.T) {
478478
base := t.TempDir()
479479
s, err := storage.NewLocalStorage(base)
480480
require.NoError(t, err)
481-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
481+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
482482
defer snapshotMetaManager.Close()
483483
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
484484
}
485485

486486
func TestCheckpointRunnerNoRetryOnTable(t *testing.T) {
487487
s := utiltest.CreateRestoreSchemaSuite(t)
488488
g := gluetidb.New()
489-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
489+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
490490
require.NoError(t, err)
491491
defer snapshotMetaManager.Close()
492492
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
@@ -533,15 +533,15 @@ func TestCheckpointLogRestoreRunnerOnStorage(t *testing.T) {
533533
base := t.TempDir()
534534
s, err := storage.NewLocalStorage(base)
535535
require.NoError(t, err)
536-
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
536+
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
537537
defer logMetaManager.Close()
538538
testCheckpointLogRestoreRunner(t, logMetaManager)
539539
}
540540

541541
func TestCheckpointLogRestoreRunnerOnTable(t *testing.T) {
542542
s := utiltest.CreateRestoreSchemaSuite(t)
543543
g := gluetidb.New()
544-
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
544+
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
545545
require.NoError(t, err)
546546
defer logMetaManager.Close()
547547
testCheckpointLogRestoreRunner(t, logMetaManager)

br/pkg/checkpoint/log_restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (t *TaskInfoForLogRestore) IdMapSaved() bool {
191191
return t.Progress == InLogRestoreAndIdMapPersisted
192192
}
193193

194-
func TryToGetCheckpointTaskInfo(
194+
func GetCheckpointTaskInfo(
195195
ctx context.Context,
196196
snapshotManager SnapshotMetaManagerT,
197197
logManager LogMetaManagerT,

br/pkg/checkpoint/manager.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func NewLogTableMetaManager(
9292
g glue.Glue,
9393
dom *domain.Domain,
9494
dbName string,
95+
restoreID uint64,
9596
) (LogMetaManagerT, error) {
9697
se, err := g.CreateSession(dom.Store())
9798
if err != nil {
@@ -107,14 +108,15 @@ func NewLogTableMetaManager(
107108
se: se,
108109
runnerSe: runnerSe,
109110
dom: dom,
110-
dbName: dbName,
111+
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
111112
}, nil
112113
}
113114

114115
func NewSnapshotTableMetaManager(
115116
g glue.Glue,
116117
dom *domain.Domain,
117118
dbName string,
119+
restoreID uint64,
118120
) (SnapshotMetaManagerT, error) {
119121
se, err := g.CreateSession(dom.Store())
120122
if err != nil {
@@ -130,7 +132,7 @@ func NewSnapshotTableMetaManager(
130132
se: se,
131133
runnerSe: runnerSe,
132134
dom: dom,
133-
dbName: dbName,
135+
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
134136
}, nil
135137
}
136138

@@ -147,7 +149,7 @@ func (manager *TableMetaManager[K, SV, LV, M]) Close() {
147149
}
148150
}
149151

150-
// load the whole checkpoint range data and retrieve the metadata of restored ranges
152+
// LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges
151153
// and return the total time cost in the past executions
152154
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
153155
ctx context.Context,
@@ -287,14 +289,15 @@ func NewSnapshotStorageMetaManager(
287289
cipher *backuppb.CipherInfo,
288290
clusterID uint64,
289291
prefix string,
292+
restoreID uint64,
290293
) SnapshotMetaManagerT {
291294
return &StorageMetaManager[
292295
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
293296
]{
294297
storage: storage,
295298
cipher: cipher,
296299
clusterID: fmt.Sprintf("%d", clusterID),
297-
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
300+
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
298301
}
299302
}
300303

@@ -303,14 +306,15 @@ func NewLogStorageMetaManager(
303306
cipher *backuppb.CipherInfo,
304307
clusterID uint64,
305308
prefix string,
309+
restoreID uint64,
306310
) LogMetaManagerT {
307311
return &StorageMetaManager[
308312
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
309313
]{
310314
storage: storage,
311315
cipher: cipher,
312316
clusterID: fmt.Sprintf("%d", clusterID),
313-
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
317+
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
314318
}
315319
}
316320

br/pkg/checkpoint/storage.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"encoding/json"
2121
"fmt"
22+
"strings"
2223
"time"
2324

2425
"github.com/google/uuid"
@@ -89,10 +90,11 @@ const (
8990
)
9091

9192
// IsCheckpointDB checks whether the dbname is checkpoint database.
92-
func IsCheckpointDB(dbname ast.CIStr) bool {
93-
return dbname.O == LogRestoreCheckpointDatabaseName ||
94-
dbname.O == SnapshotRestoreCheckpointDatabaseName ||
95-
dbname.O == CustomSSTRestoreCheckpointDatabaseName
93+
func IsCheckpointDB(dbname string) bool {
94+
// Check if the database name starts with any of the checkpoint database name prefixes
95+
return strings.HasPrefix(dbname, LogRestoreCheckpointDatabaseName) ||
96+
strings.HasPrefix(dbname, SnapshotRestoreCheckpointDatabaseName) ||
97+
strings.HasPrefix(dbname, CustomSSTRestoreCheckpointDatabaseName)
9698
}
9799

98100
const CheckpointIdMapBlockSize int = 524288

br/pkg/registry/BUILD.bazel

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "registry",
5+
srcs = [
6+
"heartbeat.go",
7+
"registration.go",
8+
],
9+
importpath = "github.com/pingcap/tidb/br/pkg/registry",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//br/pkg/errors",
13+
"//br/pkg/glue",
14+
"//br/pkg/metautil",
15+
"//br/pkg/utils",
16+
"//pkg/domain",
17+
"//pkg/kv",
18+
"//pkg/util/sqlexec",
19+
"//pkg/util/table-filter",
20+
"@com_github_pingcap_errors//:errors",
21+
"@com_github_pingcap_log//:log",
22+
"@org_uber_go_zap//:zap",
23+
],
24+
)

br/pkg/registry/heartbeat.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package registry
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"time"
21+
22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/log"
24+
"go.uber.org/zap"
25+
)
26+
27+
const (
28+
UpdateHeartbeatSQLTemplate = `
29+
UPDATE %s.%s
30+
SET last_heartbeat_time = %%?
31+
WHERE id = %%?`
32+
33+
// defaultHeartbeatIntervalSeconds is the default interval in seconds between heartbeat updates
34+
defaultHeartbeatIntervalSeconds = 60
35+
)
36+
37+
// UpdateHeartbeat updates the last_heartbeat_time timestamp for a task
38+
func (r *Registry) UpdateHeartbeat(ctx context.Context, restoreID uint64) error {
39+
currentTime := time.Now()
40+
updateSQL := fmt.Sprintf(UpdateHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
41+
42+
if err := r.heartbeatSession.ExecuteInternal(ctx, updateSQL, currentTime, restoreID); err != nil {
43+
return errors.Annotatef(err, "failed to update heartbeat for task %d", restoreID)
44+
}
45+
46+
log.Debug("updated task heartbeat",
47+
zap.Uint64("restore_id", restoreID),
48+
zap.Time("timestamp", currentTime))
49+
50+
return nil
51+
}
52+
53+
// HeartbeatManager handles periodic heartbeat updates for a restore task
54+
// it only updates the restore task but will not remove any stalled tasks, the purpose of this logic is to provide
55+
// some insights to user of the task status
56+
type HeartbeatManager struct {
57+
registry *Registry
58+
restoreID uint64
59+
interval time.Duration
60+
stopCh chan struct{}
61+
doneCh chan struct{}
62+
}
63+
64+
// NewHeartbeatManager creates a new heartbeat manager for the given restore task
65+
func NewHeartbeatManager(registry *Registry, restoreID uint64) *HeartbeatManager {
66+
return &HeartbeatManager{
67+
registry: registry,
68+
restoreID: restoreID,
69+
interval: time.Duration(defaultHeartbeatIntervalSeconds) * time.Second,
70+
stopCh: make(chan struct{}),
71+
doneCh: make(chan struct{}),
72+
}
73+
}
74+
75+
// Start begins the heartbeat background process
76+
func (m *HeartbeatManager) Start(ctx context.Context) {
77+
go func() {
78+
defer close(m.doneCh)
79+
80+
ticker := time.NewTicker(m.interval)
81+
defer ticker.Stop()
82+
83+
// send an initial heartbeat
84+
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
85+
log.Warn("failed to send initial heartbeat",
86+
zap.Uint64("restore_id", m.restoreID),
87+
zap.Error(err))
88+
}
89+
90+
for {
91+
select {
92+
case <-ticker.C:
93+
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
94+
log.Warn("failed to update heartbeat",
95+
zap.Uint64("restore_id", m.restoreID),
96+
zap.Error(err))
97+
}
98+
case <-m.stopCh:
99+
return
100+
case <-ctx.Done():
101+
log.Warn("heartbeat manager context done",
102+
zap.Uint64("restore_id", m.restoreID),
103+
zap.Error(ctx.Err()))
104+
return
105+
}
106+
}
107+
}()
108+
}
109+
110+
// Stop ends the heartbeat background process
111+
func (m *HeartbeatManager) Stop() {
112+
close(m.stopCh)
113+
<-m.doneCh // Wait for goroutine to exit
114+
log.Info("stopped heartbeat manager")
115+
}

0 commit comments

Comments
 (0)