Skip to content

Commit 852dcff

Browse files
committed
initial
Signed-off-by: Wenqi Mou <[email protected]>
1 parent d0ea9e5 commit 852dcff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+3131
-1101
lines changed

br/pkg/checkpoint/checkpoint_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
105105
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
106106
require.False(t, exists)
107107
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
108-
Progress: checkpoint.InLogRestoreAndIdMapPersist,
108+
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
109109
})
110110
require.NoError(t, err)
111111
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
112112
require.NoError(t, err)
113-
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
113+
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)
114114

115115
taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
116116
require.NoError(t, err)
@@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
120120
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
121121
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
122122
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
123-
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
123+
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)
124124

125125
exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
126126
require.False(t, exists)

br/pkg/checkpoint/log_restore.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -194,22 +194,22 @@ func ExistsLogRestoreCheckpointMetadata(
194194
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
195195
}
196196

197-
// A progress type for snapshot + log restore.
197+
// RestoreProgress is a progress type for snapshot + log restore.
198198
//
199-
// Before the id-maps is persist into external storage, the snapshot restore and
200-
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
199+
// Before the id-maps is persisted into external storage, the snapshot restore and
200+
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
201201
// it can retry from snapshot restore.
202202
//
203-
// After the id-maps is persist into external storage, there are some meta-kvs has
204-
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
203+
// After the id-maps is persisted into external storage, there are some meta-kvs has
204+
// been restored into the cluster, such as `rename ddl`. A situation could be:
205205
//
206206
// the first execution:
207207
//
208208
// table A created in snapshot restore is renamed to table B in log restore
209209
// table A (id 80) --------------> table B (id 80)
210210
// ( snapshot restore ) ( log restore )
211211
//
212-
// the second execution if don't skip snasphot restore:
212+
// the second execution if don't skip snapshot restore:
213213
//
214214
// table A is created again in snapshot restore, because there is no table named A
215215
// table A (id 81) --------------> [not in id-maps, so ignored]
@@ -221,8 +221,8 @@ type RestoreProgress int
221221

222222
const (
223223
InSnapshotRestore RestoreProgress = iota
224-
// Only when the id-maps is persist, status turns into it.
225-
InLogRestoreAndIdMapPersist
224+
// Only when the id-maps is persisted, status turns into it.
225+
InLogRestoreAndIdMapPersisted
226226
)
227227

228228
type CheckpointProgress struct {
@@ -254,8 +254,8 @@ func ExistsCheckpointProgress(
254254
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
255255
}
256256

257-
// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
258-
// restore task executed for this cluster.
257+
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
258+
// It represents the last restore task executed in this cluster.
259259
type CheckpointTaskInfoForLogRestore struct {
260260
Metadata *CheckpointMetadataForLogRestore
261261
HasSnapshotMetadata bool

br/pkg/checkpoint/restore.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ func StartCheckpointRestoreRunnerForTest(
7777
return runner, nil
7878
}
7979

80-
// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
8180
func StartCheckpointRunnerForRestore(
8281
ctx context.Context,
8382
se glue.Session,

br/pkg/restore/import_mode_switcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (switcher *ImportModeSwitcher) GoSwitchToImportMode(
176176
return nil
177177
}
178178

179-
// RestorePreWork executes some prepare work before restore.
179+
// RestorePreWork switches to import mode and removes pd schedulers if needed
180180
// TODO make this function returns a restore post work.
181181
func RestorePreWork(
182182
ctx context.Context,

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "log_client",
55
srcs = [
6+
"batch_meta_processor.go",
67
"client.go",
78
"compacted_file_strategy.go",
89
"import.go",
@@ -36,6 +37,7 @@ go_library(
3637
"//br/pkg/stream",
3738
"//br/pkg/summary",
3839
"//br/pkg/utils",
40+
"//br/pkg/utils/consts",
3941
"//br/pkg/utils/iter",
4042
"//br/pkg/version",
4143
"//pkg/ddl/util",
@@ -71,7 +73,6 @@ go_library(
7173
"@org_golang_x_sync//errgroup",
7274
"@org_uber_go_multierr//:multierr",
7375
"@org_uber_go_zap//:zap",
74-
"@org_uber_go_zap//zapcore",
7576
],
7677
)
7778

@@ -103,6 +104,7 @@ go_test(
103104
"//br/pkg/storage",
104105
"//br/pkg/stream",
105106
"//br/pkg/utils",
107+
"//br/pkg/utils/consts",
106108
"//br/pkg/utils/iter",
107109
"//br/pkg/utiltest",
108110
"//pkg/domain",
@@ -117,7 +119,6 @@ go_test(
117119
"//pkg/util/chunk",
118120
"//pkg/util/codec",
119121
"//pkg/util/sqlexec",
120-
"//pkg/util/table-filter",
121122
"@com_github_docker_go_units//:go-units",
122123
"@com_github_pingcap_errors//:errors",
123124
"@com_github_pingcap_failpoint//:failpoint",
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package logclient
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
21+
"github.com/pingcap/errors"
22+
backuppb "github.com/pingcap/kvproto/pkg/brpb"
23+
"github.com/pingcap/log"
24+
"github.com/pingcap/tidb/br/pkg/stream"
25+
"github.com/pingcap/tidb/br/pkg/utils"
26+
"github.com/pingcap/tidb/pkg/meta"
27+
"github.com/pingcap/tidb/pkg/meta/model"
28+
"go.uber.org/zap"
29+
)
30+
31+
// BatchMetaKVProcessor defines how to process a batch of files
32+
type BatchMetaKVProcessor interface {
33+
// ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration
34+
ProcessBatch(
35+
ctx context.Context,
36+
files []*backuppb.DataFileInfo,
37+
entries []*KvEntryWithTS,
38+
filterTS uint64,
39+
cf string,
40+
) ([]*KvEntryWithTS, error)
41+
}
42+
43+
// RestoreMetaKVProcessor implements BatchMetaKVProcessor for restoring files in batches
44+
type RestoreMetaKVProcessor struct {
45+
client *LogClient
46+
schemasReplace *stream.SchemasReplace
47+
updateStats func(kvCount uint64, size uint64)
48+
progressInc func()
49+
}
50+
51+
func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.SchemasReplace,
52+
updateStats func(kvCount uint64, size uint64),
53+
progressInc func()) *RestoreMetaKVProcessor {
54+
return &RestoreMetaKVProcessor{
55+
client: client,
56+
schemasReplace: schemasReplace,
57+
updateStats: updateStats,
58+
progressInc: progressInc,
59+
}
60+
}
61+
62+
// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
63+
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
64+
ctx context.Context,
65+
files []*backuppb.DataFileInfo,
66+
) error {
67+
// starts gc row collector
68+
rp.client.RunGCRowsLoader(ctx)
69+
70+
// separate the files by CF and sort each group by TS
71+
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)
72+
73+
log.Info("start to restore meta files",
74+
zap.Int("total files", len(files)),
75+
zap.Int("default files", len(filesInDefaultCF)),
76+
zap.Int("write files", len(filesInWriteCF)))
77+
78+
if err := LoadAndProcessMetaKVFilesInBatch(
79+
ctx,
80+
filesInDefaultCF,
81+
filesInWriteCF,
82+
rp,
83+
); err != nil {
84+
return errors.Trace(err)
85+
}
86+
87+
// UpdateTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
88+
// the latest schema update.
89+
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
90+
return errors.Trace(err)
91+
}
92+
return nil
93+
}
94+
95+
func (rp *RestoreMetaKVProcessor) ProcessBatch(
96+
ctx context.Context,
97+
files []*backuppb.DataFileInfo,
98+
entries []*KvEntryWithTS,
99+
filterTS uint64,
100+
cf string,
101+
) ([]*KvEntryWithTS, error) {
102+
return rp.client.RestoreBatchMetaKVFiles(
103+
ctx, files, rp.schemasReplace, entries,
104+
filterTS, rp.updateStats, rp.progressInc, cf,
105+
)
106+
}
107+
108+
// MetaKVInfoProcessor implements BatchMetaKVProcessor to iterate meta kv and collect information.
109+
//
110+
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
111+
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
112+
// that corresponds to the same table id.
113+
//
114+
// 2. It builds the id mapping from upstream to downstream. This logic was nested into table rewrite previously and now
115+
// separated out to its own component.
116+
type MetaKVInfoProcessor struct {
117+
client *LogClient
118+
tableHistoryManager *stream.LogBackupTableHistoryManager
119+
tableMappingManager *stream.TableMappingManager
120+
}
121+
122+
func NewMetaKVInfoProcessor(client *LogClient) *MetaKVInfoProcessor {
123+
return &MetaKVInfoProcessor{
124+
client: client,
125+
tableHistoryManager: stream.NewTableHistoryManager(),
126+
tableMappingManager: stream.NewTableMappingManager(),
127+
}
128+
}
129+
130+
func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(
131+
ctx context.Context,
132+
files []*backuppb.DataFileInfo,
133+
) error {
134+
// separate the files by CF and sort each group by TS
135+
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)
136+
137+
if err := LoadAndProcessMetaKVFilesInBatch(
138+
ctx,
139+
filesInDefaultCF,
140+
filesInWriteCF,
141+
mp,
142+
); err != nil {
143+
return errors.Trace(err)
144+
}
145+
return nil
146+
}
147+
148+
func (mp *MetaKVInfoProcessor) ProcessBatch(
149+
ctx context.Context,
150+
files []*backuppb.DataFileInfo,
151+
entries []*KvEntryWithTS,
152+
filterTS uint64,
153+
cf string,
154+
) ([]*KvEntryWithTS, error) {
155+
curSortedEntries, filteredEntries, err := mp.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
156+
if err != nil {
157+
return nil, errors.Trace(err)
158+
}
159+
160+
// process entries to collect table IDs
161+
for _, entry := range curSortedEntries {
162+
// get value from default cf and get the short value if possible from write cf
163+
value, err := stream.ExtractValue(&entry.E, cf)
164+
165+
// write cf doesn't have short value in it
166+
if value == nil {
167+
continue
168+
}
169+
if err != nil {
170+
return nil, errors.Trace(err)
171+
}
172+
173+
if utils.IsMetaDBKey(entry.E.Key) {
174+
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
175+
if err != nil {
176+
return nil, errors.Trace(err)
177+
}
178+
179+
if meta.IsDBkey(rawKey.Field) {
180+
var dbInfo model.DBInfo
181+
if err := json.Unmarshal(value, &dbInfo); err != nil {
182+
return nil, errors.Trace(err)
183+
}
184+
185+
// collect db id -> name mapping during log backup, it will contain information about newly created db
186+
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
187+
188+
// update the id map
189+
if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(dbInfo); err != nil {
190+
return nil, errors.Trace(err)
191+
}
192+
} else if !meta.IsDBkey(rawKey.Key) {
193+
// also see RewriteMetaKvEntry
194+
continue
195+
}
196+
197+
// collect table history indexed by table id, same id may have different table names in history
198+
if meta.IsTableKey(rawKey.Field) {
199+
var tableInfo model.TableInfo
200+
if err := json.Unmarshal(value, &tableInfo); err != nil {
201+
return nil, errors.Trace(err)
202+
}
203+
// cannot use dbib in the parsed table info cuz it might not set so default to 0
204+
dbID, err := meta.ParseDBKey(rawKey.Key)
205+
if err != nil {
206+
return nil, errors.Trace(err)
207+
}
208+
209+
// add to table rename history
210+
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)
211+
212+
// update the id map
213+
if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, tableInfo); err != nil {
214+
return nil, errors.Trace(err)
215+
}
216+
}
217+
}
218+
}
219+
return filteredEntries, nil
220+
}
221+
222+
func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager {
223+
return mp.tableMappingManager
224+
}
225+
226+
func (mp *MetaKVInfoProcessor) GetTableHistoryManager() *stream.LogBackupTableHistoryManager {
227+
return mp.tableHistoryManager
228+
}

0 commit comments

Comments
 (0)