Skip to content

Commit 8a1ee5d

Browse files
authored
br: PiTR table filter online support (#59281)
close #59280
1 parent 8ecc397 commit 8a1ee5d

40 files changed

+1939
-1042
lines changed

br/pkg/checkpoint/log_restore.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ type CheckpointProgress struct {
178178
Progress RestoreProgress `json:"progress"`
179179
}
180180

181-
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
181+
// TaskInfoForLogRestore is tied to a specific cluster.
182182
// It represents the last restore task executed in this cluster.
183183
type TaskInfoForLogRestore struct {
184184
Metadata *CheckpointMetadataForLogRestore
@@ -187,6 +187,10 @@ type TaskInfoForLogRestore struct {
187187
Progress RestoreProgress
188188
}
189189

190+
func (t *TaskInfoForLogRestore) IdMapSaved() bool {
191+
return t.Progress == InLogRestoreAndIdMapPersisted
192+
}
193+
190194
func TryToGetCheckpointTaskInfo(
191195
ctx context.Context,
192196
snapshotManager SnapshotMetaManagerT,

br/pkg/glue/glue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ type Session interface {
6161
GetGlobalVariable(name string) (string, error)
6262
GetGlobalSysVar(name string) (string, error)
6363
GetSessionCtx() sessionctx.Context
64+
AlterTableMode(ctx context.Context, schemaID int64, tableID int64, tableMode model.TableMode) error
65+
RefreshMeta(ctx context.Context, args *model.RefreshMetaArgs) error
6466
}
6567

6668
// BatchCreateTableSession is an interface to batch create table parallelly

br/pkg/gluetidb/glue.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package gluetidb
44

55
import (
66
"context"
7+
"fmt"
78
"sync"
89
"time"
910

@@ -260,7 +261,7 @@ func (gs *tidbSession) Close() {
260261
gs.se.Close()
261262
}
262263

263-
// GetGlobalVariables implements glue.Session.
264+
// GetGlobalVariable implements glue.Session.
264265
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
265266
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
266267
}
@@ -273,3 +274,33 @@ func (gs *tidbSession) GetGlobalSysVar(name string) (string, error) {
273274
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
274275
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
275276
}
277+
278+
func (gs *tidbSession) AlterTableMode(
279+
_ context.Context,
280+
schemaID int64,
281+
tableID int64,
282+
tableMode model.TableMode) error {
283+
originQueryString := gs.se.Value(sessionctx.QueryString)
284+
defer gs.se.SetValue(sessionctx.QueryString, originQueryString)
285+
d := domain.GetDomain(gs.se).DDLExecutor()
286+
gs.se.SetValue(sessionctx.QueryString,
287+
fmt.Sprintf("ALTER TABLE MODE SCHEMA_ID=%d TABLE_ID=%d TO %s", schemaID, tableID, tableMode.String()))
288+
args := &model.AlterTableModeArgs{
289+
SchemaID: schemaID,
290+
TableID: tableID,
291+
TableMode: tableMode,
292+
}
293+
return d.AlterTableMode(gs.se, args)
294+
}
295+
296+
// RefreshMeta submits a refresh meta job to update the info schema with the latest metadata.
297+
func (gs *tidbSession) RefreshMeta(
298+
_ context.Context,
299+
args *model.RefreshMetaArgs) error {
300+
originQueryString := gs.se.Value(sessionctx.QueryString)
301+
defer gs.se.SetValue(sessionctx.QueryString, originQueryString)
302+
d := domain.GetDomain(gs.se).DDLExecutor()
303+
gs.se.SetValue(sessionctx.QueryString,
304+
fmt.Sprintf("REFRESH META SCHEMA_ID=%d TABLE_ID=%d", args.SchemaID, args.TableID))
305+
return d.RefreshMeta(gs.se, args)
306+
}

br/pkg/gluetidb/mock/mock.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (s *mockSession) Close() {
9797
s.se.Close()
9898
}
9999

100-
// GetGlobalVariables implements glue.Session.
100+
// GetGlobalVariable implements glue.Session.
101101
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
102102
if ret, ok := s.globalVars[name]; ok {
103103
return ret, nil
@@ -110,6 +110,18 @@ func (s *mockSession) GetGlobalSysVar(string) (string, error) {
110110
return "", nil
111111
}
112112

113+
// AlterTableMode implements glue.Session.
114+
func (*mockSession) AlterTableMode(_ context.Context, _ int64, _ int64, _ model.TableMode) error {
115+
log.Fatal("unimplemented AlterTableMode for mock session")
116+
return nil
117+
}
118+
119+
// RefreshMeta implements glue.Session.
120+
func (*mockSession) RefreshMeta(_ context.Context, _ *model.RefreshMetaArgs) error {
121+
log.Fatal("unimplemented RefreshMeta for mock session")
122+
return nil
123+
}
124+
113125
// MockGlue only used for test
114126
type MockGlue struct {
115127
se sessiontypes.Session

br/pkg/restore/internal/prealloc_db/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)
5555
// not support placement policy, just ignore it
5656
log.Warn("target tidb not support tidb_placement_mode, ignore create policies", zap.Error(err))
5757
} else {
58-
log.Info("set tidb_placement_mode success", zap.String("mode", policyMode))
58+
log.Debug("set tidb_placement_mode success", zap.String("mode", policyMode))
5959
supportPolicy = true
6060
}
6161
}

br/pkg/restore/log_client/batch_meta_processor.go

Lines changed: 12 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,11 @@ package logclient
1616

1717
import (
1818
"context"
19-
"encoding/json"
2019

2120
"github.com/pingcap/errors"
2221
backuppb "github.com/pingcap/kvproto/pkg/brpb"
2322
"github.com/pingcap/log"
2423
"github.com/pingcap/tidb/br/pkg/stream"
25-
"github.com/pingcap/tidb/br/pkg/utils"
26-
"github.com/pingcap/tidb/pkg/meta"
27-
"github.com/pingcap/tidb/pkg/meta/model"
2824
"go.uber.org/zap"
2925
)
3026

@@ -62,6 +58,7 @@ func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.Schemas
6258
// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
6359
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
6460
ctx context.Context,
61+
hasExplicitFilter bool,
6562
files []*backuppb.DataFileInfo,
6663
) error {
6764
// starts gc row collector
@@ -84,10 +81,15 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
8481
return errors.Trace(err)
8582
}
8683

87-
// global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
88-
// the latest schema update.
89-
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
90-
return errors.Trace(err)
84+
if !hasExplicitFilter {
85+
// global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
86+
// the latest schema update.
87+
log.Info("updating schema version to do full reload")
88+
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
89+
return errors.Trace(err)
90+
}
91+
} else {
92+
log.Info("skip doing full reload for filtered PiTR")
9193
}
9294
return nil
9395
}
@@ -159,62 +161,10 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
159161

160162
// process entries to collect table IDs
161163
for _, entry := range curSortedEntries {
162-
// parse entry and do the table mapping
163-
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf); err != nil {
164-
return nil, errors.Trace(err)
165-
}
166-
167-
// collect rename/partition exchange history
168-
// get value from default cf and get the short value if possible from write cf
169-
value, err := stream.ExtractValue(&entry.E, cf)
170-
if err != nil {
164+
// parse entry and do the table mapping, using tableHistoryManager as the collector
165+
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, mp.tableHistoryManager); err != nil {
171166
return nil, errors.Trace(err)
172167
}
173-
174-
// write cf doesn't have short value in it
175-
if value == nil {
176-
continue
177-
}
178-
179-
if utils.IsMetaDBKey(entry.E.Key) {
180-
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
181-
if err != nil {
182-
return nil, errors.Trace(err)
183-
}
184-
185-
if meta.IsDBkey(rawKey.Field) {
186-
var dbInfo model.DBInfo
187-
if err := json.Unmarshal(value, &dbInfo); err != nil {
188-
return nil, errors.Trace(err)
189-
}
190-
// collect db id -> name mapping during log backup, it will contain information about newly created db
191-
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
192-
} else if !meta.IsDBkey(rawKey.Key) {
193-
// also see RewriteMetaKvEntry
194-
continue
195-
} else if meta.IsTableKey(rawKey.Field) {
196-
// collect table history indexed by table id, same id may have different table names in history
197-
var tableInfo model.TableInfo
198-
if err := json.Unmarshal(value, &tableInfo); err != nil {
199-
return nil, errors.Trace(err)
200-
}
201-
// cannot use dbib in the parsed table info cuz it might not set so default to 0
202-
dbID, err := meta.ParseDBKey(rawKey.Key)
203-
if err != nil {
204-
return nil, errors.Trace(err)
205-
}
206-
207-
// add to table rename history
208-
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)
209-
210-
// track partitions if this is a partitioned table
211-
if tableInfo.Partition != nil {
212-
for _, def := range tableInfo.Partition.Definitions {
213-
mp.tableHistoryManager.AddPartitionHistory(def.ID, tableInfo.Name.String(), dbID, tableInfo.ID)
214-
}
215-
}
216-
}
217-
}
218168
}
219169
return filteredEntries, nil
220170
}

0 commit comments

Comments
 (0)