Skip to content

Commit 27f4fe7

Browse files
3pointerhawkingrei
authored andcommitted
compacted restore: ignore some keys out of specific range (pingcap#59112)
close pingcap#58238
1 parent 3ae0873 commit 27f4fe7

File tree

10 files changed

+246
-18
lines changed

10 files changed

+246
-18
lines changed

br/pkg/restore/log_client/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (rc *LogClient) Close(ctx context.Context) {
270270
log.Info("Restore client closed")
271271
}
272272

273-
func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
273+
func (rc *LogClient) rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
274274
if r, ok := sst.(RewrittenSSTs); ok {
275275
rewritten := r.RewrittenTo()
276276
if rewritten != sst.TableID() {
@@ -287,6 +287,10 @@ func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.
287287
return rewriteRules, nil
288288
}
289289
}
290+
// Need to set ts range for compacted sst to filter out irrelevant data.
291+
if sst.Type() == CompactedSSTsType && !rules.HasSetTs() {
292+
rules.SetTsRange(rc.shiftStartTS, rc.startTS, rc.restoreTS)
293+
}
290294
return rules, nil
291295
}
292296

@@ -318,7 +322,7 @@ func (rc *LogClient) RestoreSSTFiles(
318322
log.Warn("[Compacted SST Restore] Skipping excluded table during restore.", zap.Int64("table_id", i.TableID()))
319323
continue
320324
}
321-
newRules, err := rewriteRulesFor(i, rewriteRules)
325+
newRules, err := rc.rewriteRulesFor(i, rewriteRules)
322326
if err != nil {
323327
return err
324328
}

br/pkg/restore/log_client/compacted_file_strategy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (cs *CompactedFileSplitStrategy) inspect(ssts SSTs) sstIdentity {
5151
}
5252
}
5353

54-
rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), 0, map[int64]int64{}, false)
54+
rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), map[int64]int64{}, false)
5555

5656
return sstIdentity{
5757
EffectiveID: r.RewrittenTo(),

br/pkg/restore/log_client/log_file_manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,14 +495,19 @@ func (rc *LogFileManager) ReadAllEntries(
495495
return kvEntries, nextKvEntries, nil
496496
}
497497

498-
func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage) SubCompactionIter {
498+
func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage, shiftStartTS, restoredTS uint64) SubCompactionIter {
499499
return iter.FlatMap(storage.UnmarshalDir(
500500
ctx,
501501
&storage.WalkOption{SubDir: prefix},
502502
s,
503503
func(t *backuppb.LogFileSubcompactions, name string, b []byte) error { return t.Unmarshal(b) },
504504
), func(subcs *backuppb.LogFileSubcompactions) iter.TryNextor[*backuppb.LogFileSubcompaction] {
505-
return iter.FromSlice(subcs.Subcompactions)
505+
return iter.MapFilter(iter.FromSlice(subcs.Subcompactions), func(subc *backuppb.LogFileSubcompaction) (*backuppb.LogFileSubcompaction, bool) {
506+
if subc.Meta.InputMaxTs < shiftStartTS || subc.Meta.InputMinTs > restoredTS {
507+
return nil, true
508+
}
509+
return subc, false
510+
})
506511
})
507512
}
508513

br/pkg/restore/log_client/migration.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigr
170170
fullBackups: fullBackups,
171171
restoredTS: builder.restoredTS,
172172
startTS: builder.startTS,
173+
shiftStartTS: builder.shiftStartTS,
173174
}
174175
return withMigrations
175176
}
@@ -222,8 +223,9 @@ type WithMigrations struct {
222223
skipmap metaSkipMap
223224
compactionDirs []string
224225
fullBackups []string
225-
restoredTS uint64
226+
shiftStartTS uint64
226227
startTS uint64
228+
restoredTS uint64
227229
}
228230

229231
func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter {
@@ -249,7 +251,7 @@ func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalSto
249251
compactionDirIter := iter.FromSlice(wm.compactionDirs)
250252
return iter.FlatMap(compactionDirIter, func(name string) iter.TryNextor[*backuppb.LogFileSubcompaction] {
251253
// name is the absolute path in external storage.
252-
return Subcompactions(ctx, name, s)
254+
return Subcompactions(ctx, name, s, wm.shiftStartTS, wm.restoredTS)
253255
})
254256
}
255257

br/pkg/restore/log_client/ssts.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ var (
1717
_ RewrittenSSTs = &CopiedSST{}
1818
)
1919

20+
const (
21+
CompactedSSTsType = 1
22+
CopiedSSTsType = 2
23+
)
24+
2025
// RewrittenSSTs is an extension to the `SSTs` that needs extra key rewriting.
2126
// This allows a SST being restored "as if" it in another table.
2227
//
@@ -37,6 +42,7 @@ type RewrittenSSTs interface {
3742
type SSTs interface {
3843
fmt.Stringer
3944

45+
Type() int
4046
// TableID returns the ID of the table associated with the SST files.
4147
// This should be the same as the physical content's table ID.
4248
TableID() int64
@@ -51,6 +57,10 @@ type CompactedSSTs struct {
5157
*backuppb.LogFileSubcompaction
5258
}
5359

60+
func (s *CompactedSSTs) Type() int {
61+
return CompactedSSTsType
62+
}
63+
5464
func (s *CompactedSSTs) String() string {
5565
return fmt.Sprintf("CompactedSSTs: %s", s.Meta)
5666
}
@@ -75,7 +85,11 @@ type CopiedSST struct {
7585
}
7686

7787
func (s *CopiedSST) String() string {
78-
return fmt.Sprintf("AddedSSTs: %s", s.File)
88+
return fmt.Sprintf("CopiedSSTs: %s", s.File)
89+
}
90+
91+
func (s *CopiedSST) Type() int {
92+
return CopiedSSTsType
7993
}
8094

8195
func (s *CopiedSST) TableID() int64 {

br/pkg/restore/snap_client/import.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,10 @@ func (importer *SnapFileImporter) buildDownloadRequest(
585585
regionInfo *split.RegionInfo,
586586
cipher *backuppb.CipherInfo,
587587
) (*import_sstpb.DownloadRequest, import_sstpb.SSTMeta, error) {
588+
err := rewriteRules.SetTimeRangeFilter(file.Cf)
589+
if err != nil {
590+
return nil, import_sstpb.SSTMeta{}, err
591+
}
588592
// Get the rewrite rule for the file.
589593
fileRule := restoreutils.FindMatchedRewriteRule(file, rewriteRules)
590594
if fileRule == nil {

br/pkg/restore/utils/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ go_test(
3535
"rewrite_rule_test.go",
3636
],
3737
flaky = True,
38-
shard_count = 14,
38+
shard_count = 15,
3939
deps = [
4040
":utils",
4141
"//br/pkg/conn",

br/pkg/restore/utils/rewrite_rule.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,25 @@ type RewriteRules struct {
4848
NewKeyspace []byte
4949
// used to record checkpoint data
5050
NewTableID int64
51+
52+
ShiftStartTs uint64
53+
StartTs uint64
54+
RestoredTs uint64
5155
// used to record backup files to pitr.
5256
// note: should NewTableID merged with this?
5357
TableIDRemapHint []TableIDRemap
5458
}
5559

60+
func (r *RewriteRules) HasSetTs() bool {
61+
return r.StartTs != 0 && r.RestoredTs != 0
62+
}
63+
64+
func (r *RewriteRules) SetTsRange(shiftStartTs, startTs, restoredTs uint64) {
65+
r.ShiftStartTs = shiftStartTs
66+
r.StartTs = startTs
67+
r.RestoredTs = restoredTs
68+
}
69+
5670
func (r *RewriteRules) RewriteSourceTableID(from, to int64) (rewritten bool) {
5771
toPrefix := tablecodec.EncodeTablePrefix(to)
5872
fromPrefix := tablecodec.EncodeTablePrefix(from)
@@ -93,6 +107,33 @@ func (r *RewriteRules) Append(other RewriteRules) {
93107
r.Data = append(r.Data, other.Data...)
94108
}
95109

110+
func (r *RewriteRules) SetTimeRangeFilter(cfName string) error {
111+
// for some sst files like db restore copy ssts, we don't need to set the time range filter
112+
if !r.HasSetTs() {
113+
return nil
114+
}
115+
116+
var ignoreBeforeTs uint64
117+
switch {
118+
case strings.Contains(cfName, DefaultCFName):
119+
// for default cf, we need to check if shift start ts is greater than start ts
120+
if r.ShiftStartTs > r.StartTs {
121+
return errors.Errorf("shift start ts %d is greater than start ts %d", r.ShiftStartTs, r.StartTs)
122+
}
123+
ignoreBeforeTs = r.ShiftStartTs
124+
case strings.Contains(cfName, WriteCFName):
125+
ignoreBeforeTs = r.StartTs
126+
default:
127+
return errors.Errorf("unsupported column family type: %s", cfName)
128+
}
129+
130+
for _, rule := range r.Data {
131+
rule.IgnoreBeforeTimestamp = ignoreBeforeTs
132+
rule.IgnoreAfterTimestamp = r.RestoredTs
133+
}
134+
return nil
135+
}
136+
96137
// EmptyRewriteRule make a map of new, empty rewrite rules.
97138
func EmptyRewriteRulesMap() map[int64]*RewriteRules {
98139
return make(map[int64]*RewriteRules)
@@ -192,7 +233,6 @@ func GetRewriteRulesMap(
192233
// GetRewriteRuleOfTable returns a rewrite rule from t_{oldID} to t_{newID}.
193234
func GetRewriteRuleOfTable(
194235
oldTableID, newTableID int64,
195-
newTimeStamp uint64,
196236
indexIDs map[int64]int64,
197237
getDetailRule bool,
198238
) *RewriteRules {
@@ -202,20 +242,17 @@ func GetRewriteRuleOfTable(
202242
dataRules = append(dataRules, &import_sstpb.RewriteRule{
203243
OldKeyPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
204244
NewKeyPrefix: tablecodec.GenTableRecordPrefix(newTableID),
205-
NewTimestamp: newTimeStamp,
206245
})
207246
for oldIndexID, newIndexID := range indexIDs {
208247
dataRules = append(dataRules, &import_sstpb.RewriteRule{
209248
OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID),
210249
NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID),
211-
NewTimestamp: newTimeStamp,
212250
})
213251
}
214252
} else {
215253
dataRules = append(dataRules, &import_sstpb.RewriteRule{
216254
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID),
217255
NewKeyPrefix: tablecodec.EncodeTablePrefix(newTableID),
218-
NewTimestamp: newTimeStamp,
219256
})
220257
}
221258

0 commit comments

Comments
 (0)