Skip to content

Commit 9c9ee6e

Browse files
committed
br: enable parallel restore (pingcap#58724)
close pingcap#58725
1 parent f4c8d63 commit 9c9ee6e

File tree

34 files changed

+1622
-139
lines changed

34 files changed

+1622
-139
lines changed

br/pkg/checkpoint/checkpoint_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,20 @@ func TestCheckpointMetaForRestoreOnStorage(t *testing.T) {
6060
base := t.TempDir()
6161
s, err := storage.NewLocalStorage(base)
6262
require.NoError(t, err)
63-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
63+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
6464
defer snapshotMetaManager.Close()
65-
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
65+
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
6666
defer logMetaManager.Close()
6767
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
6868
}
6969

7070
func TestCheckpointMetaForRestoreOnTable(t *testing.T) {
7171
s := utiltest.CreateRestoreSchemaSuite(t)
7272
g := gluetidb.New()
73-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
73+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
7474
require.NoError(t, err)
7575
defer snapshotMetaManager.Close()
76-
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
76+
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
7777
require.NoError(t, err)
7878
defer logMetaManager.Close()
7979
testCheckpointMetaForRestore(t, snapshotMetaManager, logMetaManager)
@@ -136,7 +136,7 @@ func testCheckpointMetaForRestore(
136136
require.NoError(t, err)
137137
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)
138138

139-
taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
139+
taskInfo, err := checkpoint.GetCheckpointTaskInfo(ctx, snapshotMetaManager, logMetaManager)
140140
require.NoError(t, err)
141141
require.Equal(t, uint64(123), taskInfo.Metadata.UpstreamClusterID)
142142
require.Equal(t, uint64(222), taskInfo.Metadata.RestoredTS)
@@ -300,15 +300,15 @@ func TestCheckpointRestoreRunnerOnStorage(t *testing.T) {
300300
base := t.TempDir()
301301
s, err := storage.NewLocalStorage(base)
302302
require.NoError(t, err)
303-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
303+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
304304
defer snapshotMetaManager.Close()
305305
testCheckpointRestoreRunner(t, snapshotMetaManager)
306306
}
307307

308308
func TestCheckpointRestoreRunnerOnTable(t *testing.T) {
309309
s := utiltest.CreateRestoreSchemaSuite(t)
310310
g := gluetidb.New()
311-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
311+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
312312
require.NoError(t, err)
313313
defer snapshotMetaManager.Close()
314314
testCheckpointRestoreRunner(t, snapshotMetaManager)
@@ -408,15 +408,15 @@ func TestCheckpointRunnerRetryOnStorage(t *testing.T) {
408408
base := t.TempDir()
409409
s, err := storage.NewLocalStorage(base)
410410
require.NoError(t, err)
411-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
411+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
412412
defer snapshotMetaManager.Close()
413413
testCheckpointRunnerRetry(t, snapshotMetaManager)
414414
}
415415

416416
func TestCheckpointRunnerRetryOnTable(t *testing.T) {
417417
s := utiltest.CreateRestoreSchemaSuite(t)
418418
g := gluetidb.New()
419-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
419+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
420420
require.NoError(t, err)
421421
defer snapshotMetaManager.Close()
422422
testCheckpointRunnerRetry(t, snapshotMetaManager)
@@ -474,15 +474,15 @@ func TestCheckpointRunnerNoRetryOnStorage(t *testing.T) {
474474
base := t.TempDir()
475475
s, err := storage.NewLocalStorage(base)
476476
require.NoError(t, err)
477-
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot")
477+
snapshotMetaManager := checkpoint.NewSnapshotStorageMetaManager(s, nil, 1, "snapshot", 1)
478478
defer snapshotMetaManager.Close()
479479
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
480480
}
481481

482482
func TestCheckpointRunnerNoRetryOnTable(t *testing.T) {
483483
s := utiltest.CreateRestoreSchemaSuite(t)
484484
g := gluetidb.New()
485-
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
485+
snapshotMetaManager, err := checkpoint.NewSnapshotTableMetaManager(g, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName, 1)
486486
require.NoError(t, err)
487487
defer snapshotMetaManager.Close()
488488
testCheckpointRunnerNoRetry(t, snapshotMetaManager)
@@ -528,15 +528,15 @@ func TestCheckpointLogRestoreRunnerOnStorage(t *testing.T) {
528528
base := t.TempDir()
529529
s, err := storage.NewLocalStorage(base)
530530
require.NoError(t, err)
531-
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log")
531+
logMetaManager := checkpoint.NewLogStorageMetaManager(s, nil, 1, "log", 1)
532532
defer logMetaManager.Close()
533533
testCheckpointLogRestoreRunner(t, logMetaManager)
534534
}
535535

536536
func TestCheckpointLogRestoreRunnerOnTable(t *testing.T) {
537537
s := utiltest.CreateRestoreSchemaSuite(t)
538538
g := gluetidb.New()
539-
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName)
539+
logMetaManager, err := checkpoint.NewLogTableMetaManager(g, s.Mock.Domain, checkpoint.LogRestoreCheckpointDatabaseName, 1)
540540
require.NoError(t, err)
541541
defer logMetaManager.Close()
542542
testCheckpointLogRestoreRunner(t, logMetaManager)

br/pkg/checkpoint/log_restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (t *TaskInfoForLogRestore) IdMapSaved() bool {
191191
return t.Progress == InLogRestoreAndIdMapPersisted
192192
}
193193

194-
func TryToGetCheckpointTaskInfo(
194+
func GetCheckpointTaskInfo(
195195
ctx context.Context,
196196
snapshotManager SnapshotMetaManagerT,
197197
logManager LogMetaManagerT,

br/pkg/checkpoint/manager.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func NewLogTableMetaManager(
9292
g glue.Glue,
9393
dom *domain.Domain,
9494
dbName string,
95+
restoreID uint64,
9596
) (LogMetaManagerT, error) {
9697
se, err := g.CreateSession(dom.Store())
9798
if err != nil {
@@ -107,14 +108,15 @@ func NewLogTableMetaManager(
107108
se: se,
108109
runnerSe: runnerSe,
109110
dom: dom,
110-
dbName: dbName,
111+
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
111112
}, nil
112113
}
113114

114115
func NewSnapshotTableMetaManager(
115116
g glue.Glue,
116117
dom *domain.Domain,
117118
dbName string,
119+
restoreID uint64,
118120
) (SnapshotMetaManagerT, error) {
119121
se, err := g.CreateSession(dom.Store())
120122
if err != nil {
@@ -130,7 +132,7 @@ func NewSnapshotTableMetaManager(
130132
se: se,
131133
runnerSe: runnerSe,
132134
dom: dom,
133-
dbName: dbName,
135+
dbName: fmt.Sprintf("%s_%d", dbName, restoreID),
134136
}, nil
135137
}
136138

@@ -147,7 +149,7 @@ func (manager *TableMetaManager[K, SV, LV, M]) Close() {
147149
}
148150
}
149151

150-
// load the whole checkpoint range data and retrieve the metadata of restored ranges
152+
// LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges
151153
// and return the total time cost in the past executions
152154
func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
153155
ctx context.Context,
@@ -287,14 +289,15 @@ func NewSnapshotStorageMetaManager(
287289
cipher *backuppb.CipherInfo,
288290
clusterID uint64,
289291
prefix string,
292+
restoreID uint64,
290293
) SnapshotMetaManagerT {
291294
return &StorageMetaManager[
292295
RestoreKeyType, RestoreValueType, RestoreValueType, CheckpointMetadataForSnapshotRestore,
293296
]{
294297
storage: storage,
295298
cipher: cipher,
296299
clusterID: fmt.Sprintf("%d", clusterID),
297-
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
300+
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
298301
}
299302
}
300303

@@ -303,14 +306,15 @@ func NewLogStorageMetaManager(
303306
cipher *backuppb.CipherInfo,
304307
clusterID uint64,
305308
prefix string,
309+
restoreID uint64,
306310
) LogMetaManagerT {
307311
return &StorageMetaManager[
308312
LogRestoreKeyType, LogRestoreValueType, LogRestoreValueMarshaled, CheckpointMetadataForLogRestore,
309313
]{
310314
storage: storage,
311315
cipher: cipher,
312316
clusterID: fmt.Sprintf("%d", clusterID),
313-
taskName: fmt.Sprintf("%d/%s", clusterID, prefix),
317+
taskName: fmt.Sprintf("%d/%s_%d", clusterID, prefix, restoreID),
314318
}
315319
}
316320

br/pkg/checkpoint/storage.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"encoding/json"
2121
"fmt"
22+
"strings"
2223
"time"
2324

2425
"github.com/google/uuid"
@@ -89,20 +90,18 @@ const (
8990
)
9091

9192
// IsCheckpointDB checks whether the dbname is checkpoint database.
92-
func IsCheckpointDB(dbname pmodel.CIStr) bool {
93-
return dbname.O == LogRestoreCheckpointDatabaseName ||
94-
dbname.O == SnapshotRestoreCheckpointDatabaseName ||
95-
dbname.O == CustomSSTRestoreCheckpointDatabaseName
93+
func IsCheckpointDB(dbname string) bool {
94+
// Check if the database name starts with any of the checkpoint database name prefixes
95+
return strings.HasPrefix(dbname, LogRestoreCheckpointDatabaseName) ||
96+
strings.HasPrefix(dbname, SnapshotRestoreCheckpointDatabaseName) ||
97+
strings.HasPrefix(dbname, CustomSSTRestoreCheckpointDatabaseName)
9698
}
9799

98100
const CheckpointIdMapBlockSize int = 524288
99101

100102
func chunkInsertCheckpointData(data []byte, fn func(segmentId uint64, chunk []byte) error) error {
101103
for startIdx, segmentId := 0, uint64(0); startIdx < len(data); segmentId += 1 {
102-
endIdx := startIdx + CheckpointIdMapBlockSize
103-
if endIdx > len(data) {
104-
endIdx = len(data)
105-
}
104+
endIdx := min(startIdx+CheckpointIdMapBlockSize, len(data))
106105
if err := fn(segmentId, data[startIdx:endIdx]); err != nil {
107106
return errors.Trace(err)
108107
}
@@ -226,7 +225,7 @@ func selectCheckpointData[K KeyType, V ValueType](
226225
ctx context.Context,
227226
execCtx sqlexec.RestrictedSQLExecutor,
228227
dbName string,
229-
fn func(groupKey K, value V),
228+
fn func(groupKey K, value V) error,
230229
) (time.Duration, error) {
231230
// records the total time cost in the past executions
232231
var pastDureTime time.Duration = 0

br/pkg/registry/BUILD.bazel

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "registry",
5+
srcs = [
6+
"heartbeat.go",
7+
"registration.go",
8+
],
9+
importpath = "github.com/pingcap/tidb/br/pkg/registry",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//br/pkg/errors",
13+
"//br/pkg/glue",
14+
"//br/pkg/metautil",
15+
"//br/pkg/utils",
16+
"//pkg/domain",
17+
"//pkg/kv",
18+
"//pkg/util/sqlexec",
19+
"//pkg/util/table-filter",
20+
"@com_github_pingcap_errors//:errors",
21+
"@com_github_pingcap_log//:log",
22+
"@org_uber_go_zap//:zap",
23+
],
24+
)

br/pkg/registry/heartbeat.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package registry
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"time"
21+
22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/log"
24+
"go.uber.org/zap"
25+
)
26+
27+
const (
28+
UpdateHeartbeatSQLTemplate = `
29+
UPDATE %s.%s
30+
SET last_heartbeat_time = %%?
31+
WHERE id = %%?`
32+
33+
// defaultHeartbeatIntervalSeconds is the default interval in seconds between heartbeat updates
34+
defaultHeartbeatIntervalSeconds = 60
35+
)
36+
37+
// UpdateHeartbeat updates the last_heartbeat_time timestamp for a task
38+
func (r *Registry) UpdateHeartbeat(ctx context.Context, restoreID uint64) error {
39+
currentTime := time.Now()
40+
updateSQL := fmt.Sprintf(UpdateHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
41+
42+
if err := r.heartbeatSession.ExecuteInternal(ctx, updateSQL, currentTime, restoreID); err != nil {
43+
return errors.Annotatef(err, "failed to update heartbeat for task %d", restoreID)
44+
}
45+
46+
log.Debug("updated task heartbeat",
47+
zap.Uint64("restore_id", restoreID),
48+
zap.Time("timestamp", currentTime))
49+
50+
return nil
51+
}
52+
53+
// HeartbeatManager handles periodic heartbeat updates for a restore task
54+
// it only updates the restore task but will not remove any stalled tasks, the purpose of this logic is to provide
55+
// some insights to user of the task status
56+
type HeartbeatManager struct {
57+
registry *Registry
58+
restoreID uint64
59+
interval time.Duration
60+
stopCh chan struct{}
61+
doneCh chan struct{}
62+
}
63+
64+
// NewHeartbeatManager creates a new heartbeat manager for the given restore task
65+
func NewHeartbeatManager(registry *Registry, restoreID uint64) *HeartbeatManager {
66+
return &HeartbeatManager{
67+
registry: registry,
68+
restoreID: restoreID,
69+
interval: time.Duration(defaultHeartbeatIntervalSeconds) * time.Second,
70+
stopCh: make(chan struct{}),
71+
doneCh: make(chan struct{}),
72+
}
73+
}
74+
75+
// Start begins the heartbeat background process
76+
func (m *HeartbeatManager) Start(ctx context.Context) {
77+
go func() {
78+
defer close(m.doneCh)
79+
80+
ticker := time.NewTicker(m.interval)
81+
defer ticker.Stop()
82+
83+
// send an initial heartbeat
84+
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
85+
log.Warn("failed to send initial heartbeat",
86+
zap.Uint64("restore_id", m.restoreID),
87+
zap.Error(err))
88+
}
89+
90+
for {
91+
select {
92+
case <-ticker.C:
93+
if err := m.registry.UpdateHeartbeat(ctx, m.restoreID); err != nil {
94+
log.Warn("failed to update heartbeat",
95+
zap.Uint64("restore_id", m.restoreID),
96+
zap.Error(err))
97+
}
98+
case <-m.stopCh:
99+
return
100+
case <-ctx.Done():
101+
log.Warn("heartbeat manager context done",
102+
zap.Uint64("restore_id", m.restoreID),
103+
zap.Error(ctx.Err()))
104+
return
105+
}
106+
}
107+
}()
108+
}
109+
110+
// Stop ends the heartbeat background process
111+
func (m *HeartbeatManager) Stop() {
112+
close(m.stopCh)
113+
<-m.doneCh // Wait for goroutine to exit
114+
log.Info("stopped heartbeat manager")
115+
}

0 commit comments

Comments
 (0)