Skip to content

Commit c780f4e

Browse files
authored
log restore: filter backupmeta file by ts to speed up pitr (#61347) (#62115)
ref #61318
1 parent 7294dfb commit c780f4e

File tree

5 files changed

+235
-59
lines changed

5 files changed

+235
-59
lines changed

br/pkg/restore/log_client/log_file_manager.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,24 +124,30 @@ func (rc *LogFileManager) loadShiftTS(ctx context.Context) error {
124124
value uint64
125125
exists bool
126126
}{}
127-
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
128-
m, err := rc.helper.ParseToMetadata(raw)
129-
if err != nil {
130-
return err
131-
}
132-
log.Info("read meta from storage and parse", zap.String("path", path), zap.Uint64("min-ts", m.MinTs),
133-
zap.Uint64("max-ts", m.MaxTs), zap.Int32("meta-version", int32(m.MetaVersion)))
134-
135-
ts, ok := stream.UpdateShiftTS(m, rc.startTS, rc.restoreTS)
136-
shiftTS.Lock()
137-
if ok && (!shiftTS.exists || shiftTS.value > ts) {
138-
shiftTS.value = ts
139-
shiftTS.exists = true
140-
}
141-
shiftTS.Unlock()
142127

143-
return nil
144-
})
128+
err := stream.FastUnmarshalMetaData(ctx,
129+
rc.storage,
130+
// use start ts to calculate shift start ts
131+
rc.startTS,
132+
rc.restoreTS,
133+
rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
134+
m, err := rc.helper.ParseToMetadata(raw)
135+
if err != nil {
136+
return err
137+
}
138+
log.Info("read meta from storage and parse", zap.String("path", path), zap.Uint64("min-ts", m.MinTs),
139+
zap.Uint64("max-ts", m.MaxTs), zap.Int32("meta-version", int32(m.MetaVersion)))
140+
141+
ts, ok := stream.UpdateShiftTS(m, rc.startTS, rc.restoreTS)
142+
shiftTS.Lock()
143+
if ok && (!shiftTS.exists || shiftTS.value > ts) {
144+
shiftTS.value = ts
145+
shiftTS.exists = true
146+
}
147+
shiftTS.Unlock()
148+
149+
return nil
150+
})
145151
if err != nil {
146152
return err
147153
}
@@ -175,7 +181,10 @@ func (rc *LogFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
175181
if !strings.HasSuffix(path, ".meta") {
176182
return nil
177183
}
178-
names = append(names, path)
184+
newPath := stream.FilterPathByTs(path, rc.shiftStartTS, rc.restoreTS)
185+
if len(newPath) > 0 {
186+
names = append(names, newPath)
187+
}
179188
return nil
180189
})
181190
if err != nil {

br/pkg/stream/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ go_test(
6464
],
6565
embed = [":stream"],
6666
flaky = True,
67-
shard_count = 44,
67+
shard_count = 45,
6868
deps = [
6969
"//br/pkg/storage",
7070
"//br/pkg/streamhelper",

br/pkg/stream/stream_metas.go

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -89,48 +89,51 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(
8989
metadataMap.metas = make(map[string]*MetadataInfo)
9090
// `shiftUntilTS` must be less than `until`
9191
metadataMap.shiftUntilTS = until
92-
err := FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
93-
m, err := ms.Helper.ParseToMetadataHard(raw)
94-
if err != nil {
95-
return err
96-
}
97-
// If the meta file contains only files with ts grater than `until`, when the file is from
98-
// `Default`: it should be kept, because its corresponding `write` must has commit ts grater
99-
// than it, which should not be considered.
100-
// `Write`: it should trivially not be considered.
101-
if m.MinTs <= until {
102-
// record these meta-information for statistics and filtering
103-
fileGroupInfos := make([]*FileGroupInfo, 0, len(m.FileGroups))
104-
for _, group := range m.FileGroups {
105-
var kvCount int64 = 0
106-
for _, file := range group.DataFilesInfo {
107-
kvCount += file.NumberOfEntries
108-
}
109-
fileGroupInfos = append(fileGroupInfos, &FileGroupInfo{
110-
MaxTS: group.MaxTs,
111-
Length: group.Length,
112-
KVCount: kvCount,
113-
})
92+
err := FastUnmarshalMetaData(ctx, s,
93+
0,
94+
until,
95+
ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
96+
m, err := ms.Helper.ParseToMetadataHard(raw)
97+
if err != nil {
98+
return err
11499
}
115-
metadataMap.Lock()
116-
metadataMap.metas[path] = &MetadataInfo{
117-
MinTS: m.MinTs,
118-
FileGroupInfos: fileGroupInfos,
100+
// If the meta file contains only files with ts grater than `until`, when the file is from
101+
// `Default`: it should be kept, because its corresponding `write` must has commit ts grater
102+
// than it, which should not be considered.
103+
// `Write`: it should trivially not be considered.
104+
if m.MinTs <= until {
105+
// record these meta-information for statistics and filtering
106+
fileGroupInfos := make([]*FileGroupInfo, 0, len(m.FileGroups))
107+
for _, group := range m.FileGroups {
108+
var kvCount int64 = 0
109+
for _, file := range group.DataFilesInfo {
110+
kvCount += file.NumberOfEntries
111+
}
112+
fileGroupInfos = append(fileGroupInfos, &FileGroupInfo{
113+
MaxTS: group.MaxTs,
114+
Length: group.Length,
115+
KVCount: kvCount,
116+
})
117+
}
118+
metadataMap.Lock()
119+
metadataMap.metas[path] = &MetadataInfo{
120+
MinTS: m.MinTs,
121+
FileGroupInfos: fileGroupInfos,
122+
}
123+
metadataMap.Unlock()
119124
}
120-
metadataMap.Unlock()
121-
}
122-
// filter out the metadatas whose ts-range is overlap with [until, +inf)
123-
// and calculate their minimum begin-default-ts
124-
ts, ok := UpdateShiftTS(m, until, mathutil.MaxUint)
125-
if ok {
126-
metadataMap.Lock()
127-
if ts < metadataMap.shiftUntilTS {
128-
metadataMap.shiftUntilTS = ts
125+
// filter out the metadatas whose ts-range is overlap with [until, +inf)
126+
// and calculate their minimum begin-default-ts
127+
ts, ok := UpdateShiftTS(m, until, mathutil.MaxUint)
128+
if ok {
129+
metadataMap.Lock()
130+
if ts < metadataMap.shiftUntilTS {
131+
metadataMap.shiftUntilTS = ts
132+
}
133+
metadataMap.Unlock()
129134
}
130-
metadataMap.Unlock()
131-
}
132-
return nil
133-
})
135+
return nil
136+
})
134137
if err != nil {
135138
return 0, errors.Trace(err)
136139
}

br/pkg/stream/stream_mgr.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"crypto/sha256"
2020
"encoding/hex"
21+
"regexp"
22+
"strconv"
2123
"strings"
2224

2325
"github.com/klauspost/compress/zstd"
@@ -43,6 +45,20 @@ const (
4345
streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"
4446
)
4547

48+
// metaPattern is a regular expression used to match backup metadata filenames.
49+
// The expected filename format is:
50+
//
51+
// {flushTs}-{minDefaultTs}-{minTs}-{maxTs}.meta
52+
//
53+
// where each part is a hexadecimal string (0-9, a-f, A-F).
54+
// Example:
55+
//
56+
// 0000000000000001-0000000000003039-065CCFF1D8AC0000-065CCFF1D8AC0006.meta
57+
//
58+
// The pattern captures all four parts as separate groups.
59+
// Leading zeros are necessary for the pattern to match.
60+
var metaPattern = regexp.MustCompile(`^([0-9a-fA-F]{16})-([0-9a-fA-F]{16})-([0-9a-fA-F]{16})-([0-9a-fA-F]{16})$`)
61+
4662
func GetStreamBackupMetaPrefix() string {
4763
return streamBackupMetaPrefix
4864
}
@@ -356,11 +372,58 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
356372
return meta.Marshal()
357373
}
358374

375+
func (m *MetadataHelper) Close() {
376+
if m.decoder != nil {
377+
m.decoder.Close()
378+
}
379+
if m.encryptionManager != nil {
380+
m.encryptionManager.Close()
381+
}
382+
}
383+
384+
func FilterPathByTs(path string, left, right uint64) string {
385+
filename := strings.TrimSuffix(path, ".meta")
386+
filename = filename[strings.LastIndex(filename, "/")+1:]
387+
388+
if metaPattern.MatchString(filename) {
389+
matches := metaPattern.FindStringSubmatch(filename)
390+
if len(matches) < 5 {
391+
log.Warn("invalid meta file name format", zap.String("file", path))
392+
// consider compatible with future file path change
393+
return path
394+
}
395+
396+
flushTs, _ := strconv.ParseUint(matches[1], 16, 64)
397+
minDefaultTs, _ := strconv.ParseUint(matches[2], 16, 64)
398+
minTs, _ := strconv.ParseUint(matches[3], 16, 64)
399+
maxTs, _ := strconv.ParseUint(matches[4], 16, 64)
400+
401+
if minDefaultTs == 0 || minDefaultTs > minTs {
402+
log.Warn("minDefaultTs is not correct, fallback to minTs",
403+
zap.String("file", path),
404+
zap.Uint64("flushTs", flushTs),
405+
zap.Uint64("minTs", minTs),
406+
zap.Uint64("minDefaultTs", minDefaultTs),
407+
)
408+
minDefaultTs = minTs
409+
}
410+
411+
if right < minDefaultTs || maxTs < left {
412+
return ""
413+
}
414+
}
415+
416+
// keep consistency with old behaviour
417+
return path
418+
}
419+
359420
// FastUnmarshalMetaData used a 128 worker pool to speed up
360421
// read metadata content from external_storage.
361422
func FastUnmarshalMetaData(
362423
ctx context.Context,
363424
s storage.ExternalStorage,
425+
startTS uint64,
426+
endTS uint64,
364427
metaDataWorkerPoolSize uint,
365428
fn func(path string, rawMetaData []byte) error,
366429
) error {
@@ -372,7 +435,15 @@ func FastUnmarshalMetaData(
372435
if !strings.HasSuffix(path, metaSuffix) {
373436
return nil
374437
}
375-
readPath := path
438+
readPath := FilterPathByTs(path, startTS, endTS)
439+
if len(readPath) == 0 {
440+
log.Info("skip download meta file out of range",
441+
zap.String("file", path),
442+
zap.Uint64("startTs", startTS),
443+
zap.Uint64("endTs", endTS),
444+
)
445+
return nil
446+
}
376447
pool.ApplyOnErrorGroup(eg, func() error {
377448
b, err := s.ReadFile(ectx, readPath)
378449
if err != nil {

br/pkg/stream/stream_misc_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,96 @@ func TestMetadataHelperReadFile(t *testing.T) {
7777
require.NoError(t, err)
7878
require.Equal(t, data1, get_data)
7979
}
80+
81+
func TestFilterPath(t *testing.T) {
82+
type args struct {
83+
path string
84+
shiftStartTS uint64
85+
restoreTS uint64
86+
}
87+
tests := []struct {
88+
name string
89+
args args
90+
expected string
91+
}{
92+
{
93+
name: "normal: minDefaultTs < minTs",
94+
args: args{
95+
path: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-000000000000001e.meta", // flush=10, minDefault=5, min=10, max=30
96+
shiftStartTS: 5,
97+
restoreTS: 10,
98+
},
99+
expected: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-000000000000001e.meta",
100+
},
101+
{
102+
name: "normal: minDefaultTs == minTs",
103+
args: args{
104+
path: "v1/backupmeta/000000000000000a-000000000000000a-000000000000000a-000000000000001e.meta", // all = 10
105+
shiftStartTS: 5,
106+
restoreTS: 10,
107+
},
108+
expected: "v1/backupmeta/000000000000000a-000000000000000a-000000000000000a-000000000000001e.meta",
109+
},
110+
{
111+
name: "fallback: minDefaultTs == 0",
112+
args: args{
113+
path: "v1/backupmeta/000000000000000a-0000000000000000-000000000000000a-000000000000001e.meta", // minDefault=0, min=10
114+
shiftStartTS: 5,
115+
restoreTS: 10,
116+
},
117+
expected: "v1/backupmeta/000000000000000a-0000000000000000-000000000000000a-000000000000001e.meta",
118+
},
119+
{
120+
name: "fallback: minDefaultTs > minTs, should fallback to minTs",
121+
args: args{
122+
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta", // minDefault=20, min=10
123+
shiftStartTS: 5,
124+
restoreTS: 11, // fallback to 10, 11>10
125+
},
126+
expected: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta",
127+
},
128+
{
129+
name: "restoreTS < fallback minTs, should be filtered",
130+
args: args{
131+
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta", // fallback to min=10
132+
shiftStartTS: 5,
133+
restoreTS: 9, // 9 < 10, should be filtered
134+
},
135+
expected: "",
136+
},
137+
{
138+
name: "maxTs < shiftStartTS, should be filtered",
139+
args: args{
140+
path: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-0000000000000004.meta", // max=4 < 5
141+
shiftStartTS: 5,
142+
restoreTS: 10,
143+
},
144+
expected: "",
145+
},
146+
{
147+
name: "invalid: minDefaultTs > minTs, fallback, but restoreTS < fallback",
148+
args: args{
149+
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta",
150+
shiftStartTS: 5,
151+
restoreTS: 8, // fallback to 10, 8 < 10
152+
},
153+
expected: "",
154+
},
155+
{
156+
name: "non-matching file name format, preserved for compatibility",
157+
args: args{
158+
path: "v1/backupmeta/unexpected_format.meta",
159+
shiftStartTS: 10,
160+
restoreTS: 10,
161+
},
162+
expected: "v1/backupmeta/unexpected_format.meta",
163+
},
164+
}
165+
166+
for _, tt := range tests {
167+
t.Run(tt.name, func(t *testing.T) {
168+
got := stream.FilterPathByTs(tt.args.path, tt.args.shiftStartTS, tt.args.restoreTS)
169+
require.Equal(t, tt.expected, got)
170+
})
171+
}
172+
}

0 commit comments

Comments
 (0)