Skip to content

Commit 1f74ac3

Browse files
authored
br/restore/log_client: use input ts as filter (#58734)
close #58733
1 parent 91706ec commit 1f74ac3

File tree

10 files changed

+148
-44
lines changed

10 files changed

+148
-44
lines changed

DEPS.bzl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5867,13 +5867,13 @@ def go_deps():
58675867
name = "com_github_pingcap_kvproto",
58685868
build_file_proto_mode = "disable_global",
58695869
importpath = "github.com/pingcap/kvproto",
5870-
sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78",
5871-
strip_prefix = "github.com/pingcap/[email protected]20241120071417-b5b7843d9037",
5870+
sha256 = "db34e3f94e5ac8fc5465b5440583c9e037a7f16aea8d0d8a200cdff210b12038",
5871+
strip_prefix = "github.com/pingcap/[email protected]20250102071301-c35d2b410115",
58725872
urls = [
5873-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
5874-
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
5875-
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
5876-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
5873+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
5874+
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
5875+
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
5876+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
58775877
],
58785878
)
58795879
go_repository(

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ go_test(
9090
],
9191
embed = [":log_client"],
9292
flaky = True,
93-
shard_count = 45,
93+
shard_count = 46,
9494
deps = [
9595
"//br/pkg/errors",
9696
"//br/pkg/glue",

br/pkg/restore/log_client/export_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,7 @@ func (helper *FakeStreamMetadataHelper) ReadFile(
127127
) ([]byte, error) {
128128
return helper.Data[offset : offset+length], nil
129129
}
130+
131+
func (m WithMigrations) CompactionDirs() []string {
132+
return m.compactionDirs
133+
}

br/pkg/restore/log_client/migration.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ func (builder *WithMigrationsBuilder) coarseGrainedFilter(mig *backuppb.Migratio
133133
// log file [ .. .. .. .. ]
134134
//
135135
for _, compaction := range mig.Compactions {
136-
if compaction.CompactionUntilTs < builder.shiftStartTS || compaction.CompactionFromTs > builder.restoredTS {
136+
// Some old compaction may not contain input min / max ts.
137+
// In that case, we should never filter it out.
138+
rangeValid := compaction.InputMinTs != 0 && compaction.InputMaxTs != 0
139+
outOfRange := compaction.InputMaxTs < builder.shiftStartTS || compaction.InputMinTs > builder.restoredTS
140+
if rangeValid && outOfRange {
137141
return true
138142
}
139143
}

br/pkg/restore/log_client/migration_test.go

Lines changed: 118 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ func TestMigrations(t *testing.T) {
215215
},
216216
Compactions: []*backuppb.LogFileCompaction{
217217
{
218-
CompactionFromTs: 0,
219-
CompactionUntilTs: 9,
218+
InputMinTs: 1,
219+
InputMaxTs: 9,
220220
},
221221
},
222222
},
@@ -240,8 +240,8 @@ func TestMigrations(t *testing.T) {
240240
},
241241
Compactions: []*backuppb.LogFileCompaction{
242242
{
243-
CompactionFromTs: 50,
244-
CompactionUntilTs: 52,
243+
InputMinTs: 50,
244+
InputMaxTs: 52,
245245
},
246246
},
247247
},
@@ -264,8 +264,8 @@ func TestMigrations(t *testing.T) {
264264
},
265265
Compactions: []*backuppb.LogFileCompaction{
266266
{
267-
CompactionFromTs: 50,
268-
CompactionUntilTs: 52,
267+
InputMinTs: 50,
268+
InputMaxTs: 52,
269269
},
270270
},
271271
},
@@ -275,8 +275,8 @@ func TestMigrations(t *testing.T) {
275275
},
276276
Compactions: []*backuppb.LogFileCompaction{
277277
{
278-
CompactionFromTs: 120,
279-
CompactionUntilTs: 140,
278+
InputMinTs: 120,
279+
InputMaxTs: 140,
280280
},
281281
},
282282
},
@@ -299,8 +299,8 @@ func TestMigrations(t *testing.T) {
299299
},
300300
Compactions: []*backuppb.LogFileCompaction{
301301
{
302-
CompactionFromTs: 50,
303-
CompactionUntilTs: 52,
302+
InputMinTs: 50,
303+
InputMaxTs: 52,
304304
},
305305
},
306306
},
@@ -310,8 +310,8 @@ func TestMigrations(t *testing.T) {
310310
},
311311
Compactions: []*backuppb.LogFileCompaction{
312312
{
313-
CompactionFromTs: 1200,
314-
CompactionUntilTs: 1400,
313+
InputMinTs: 1200,
314+
InputMaxTs: 1400,
315315
},
316316
},
317317
},
@@ -329,24 +329,114 @@ func TestMigrations(t *testing.T) {
329329
}
330330

331331
ctx := context.Background()
332-
for _, cs := range cases {
333-
builder := logclient.NewMigrationBuilder(10, 100, 200)
334-
withMigrations := builder.Build(cs.migrations)
335-
it := withMigrations.Metas(generateMetaNameIter())
336-
checkMetaNameIter(t, cs.expectStoreIds, it)
337-
it = withMigrations.Metas(generateMetaNameIter())
338-
collect := iter.CollectAll(ctx, it)
339-
require.NoError(t, collect.Err)
340-
for j, meta := range collect.Item {
341-
physicalIter := generatePhysicalIter(meta)
342-
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
343-
physicalIter = generatePhysicalIter(meta)
344-
collect := iter.CollectAll(ctx, physicalIter)
332+
for i, cs := range cases {
333+
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
334+
builder := logclient.NewMigrationBuilder(10, 100, 200)
335+
withMigrations := builder.Build(cs.migrations)
336+
it := withMigrations.Metas(generateMetaNameIter())
337+
checkMetaNameIter(t, cs.expectStoreIds, it)
338+
it = withMigrations.Metas(generateMetaNameIter())
339+
collect := iter.CollectAll(ctx, it)
345340
require.NoError(t, collect.Err)
346-
for k, phy := range collect.Item {
347-
logicalIter := generateLogicalIter(phy)
348-
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
341+
for j, meta := range collect.Item {
342+
physicalIter := generatePhysicalIter(meta)
343+
checkPhysicalIter(t, cs.expectPhyLengths[j], physicalIter)
344+
physicalIter = generatePhysicalIter(meta)
345+
collect := iter.CollectAll(ctx, physicalIter)
346+
require.NoError(t, collect.Err)
347+
for k, phy := range collect.Item {
348+
logicalIter := generateLogicalIter(phy)
349+
checkLogicalIter(t, cs.expectLogLengths[j][k], logicalIter)
350+
}
349351
}
352+
})
353+
}
354+
}
355+
356+
func pack[T any](ts ...T) []T {
357+
return ts
358+
}
359+
360+
func TestFilterOut(t *testing.T) {
361+
type Case struct {
362+
ShiftedStartTs uint64
363+
RestoredTs uint64
364+
Migs []*backuppb.Migration
365+
366+
ExceptedCompactionsArtificateDir []string
367+
}
368+
withCompactTsCompaction := func(iMin, iMax, cFrom, cUntil uint64, name string) *backuppb.LogFileCompaction {
369+
return &backuppb.LogFileCompaction{
370+
InputMinTs: iMin,
371+
InputMaxTs: iMax,
372+
CompactionFromTs: cFrom,
373+
CompactionUntilTs: cUntil,
374+
Artifacts: name,
375+
}
376+
}
377+
simpleCompaction := func(iMin, iMax uint64, name string) *backuppb.LogFileCompaction {
378+
return &backuppb.LogFileCompaction{
379+
InputMinTs: iMin,
380+
InputMaxTs: iMax,
381+
Artifacts: name,
350382
}
351383
}
384+
makeMig := func(cs ...*backuppb.LogFileCompaction) *backuppb.Migration {
385+
return &backuppb.Migration{Compactions: cs}
386+
}
387+
388+
cases := []Case{
389+
{
390+
ShiftedStartTs: 50,
391+
RestoredTs: 60,
392+
Migs: pack(
393+
makeMig(simpleCompaction(49, 61, "a")),
394+
makeMig(simpleCompaction(61, 80, "b")),
395+
),
396+
397+
ExceptedCompactionsArtificateDir: pack("a"),
398+
},
399+
{
400+
ShiftedStartTs: 30,
401+
RestoredTs: 50,
402+
Migs: pack(
403+
makeMig(simpleCompaction(40, 60, "1a")),
404+
makeMig(simpleCompaction(10, 20, "1b")),
405+
makeMig(simpleCompaction(31, 50, "2a")),
406+
makeMig(simpleCompaction(50, 80, "2b")),
407+
),
408+
409+
ExceptedCompactionsArtificateDir: pack("1a", "2a", "2b"),
410+
},
411+
{
412+
ShiftedStartTs: 30,
413+
RestoredTs: 50,
414+
Migs: pack(
415+
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
416+
makeMig(withCompactTsCompaction(10, 30, 15, 29, "b")),
417+
makeMig(withCompactTsCompaction(8, 29, 10, 20, "c")),
418+
),
419+
420+
ExceptedCompactionsArtificateDir: pack("a", "b"),
421+
},
422+
{
423+
ShiftedStartTs: 100,
424+
RestoredTs: 120,
425+
Migs: pack(
426+
makeMig(withCompactTsCompaction(49, 100, 50, 99, "a")),
427+
makeMig(withCompactTsCompaction(0, 0, 15, 29, "b")),
428+
makeMig(withCompactTsCompaction(0, 0, 10, 20, "c")),
429+
),
430+
431+
ExceptedCompactionsArtificateDir: pack("a", "b", "c"),
432+
},
433+
}
434+
435+
for i, c := range cases {
436+
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
437+
b := logclient.NewMigrationBuilder(c.ShiftedStartTs, c.ShiftedStartTs, c.RestoredTs)
438+
i := b.Build(c.Migs)
439+
require.ElementsMatch(t, i.CompactionDirs(), c.ExceptedCompactionsArtificateDir)
440+
})
441+
}
352442
}

br/pkg/stream/stream_metas.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,9 @@ func (m MigrationExt) doTruncating(ctx context.Context, mig *pb.Migration, resul
10731073
// NOTE: Execution of truncation wasn't implemented here.
10741074
// If we are going to truncate some files, for now we still need to use `br log truncate`.
10751075
for _, compaction := range mig.Compactions {
1076-
// Can we also remove the compaction when `until-ts` is equal to `truncated-to`...?
1077-
if compaction.CompactionUntilTs > mig.TruncatedTo {
1076+
// We can only clean up a compaction when we are sure all its inputs
1077+
// are no more used.
1078+
if compaction.InputMaxTs > mig.TruncatedTo {
10781079
result.NewBase.Compactions = append(result.NewBase.Compactions, compaction)
10791080
} else {
10801081
m.tryRemovePrefix(ctx, compaction.Artifacts, result)

br/pkg/stream/stream_metas_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,13 +452,13 @@ func mDstrPfx(path ...string) migOP {
452452
}
453453
}
454454

455-
func mCompaction(cPath, aPath string, fromTs, untilTs uint64) migOP {
455+
func mCompaction(cPath, aPath string, minTs, maxTs uint64) migOP {
456456
return func(m *backuppb.Migration) {
457457
c := &backuppb.LogFileCompaction{}
458458
c.GeneratedFiles = cPath
459459
c.Artifacts = aPath
460-
c.CompactionFromTs = fromTs
461-
c.CompactionUntilTs = untilTs
460+
c.InputMinTs = minTs
461+
c.InputMaxTs = maxTs
462462
m.Compactions = append(m.Compactions, c)
463463
}
464464
}

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ func (t trivialFlushStream) RecvMsg(m any) error {
179179
return nil
180180
}
181181

182+
func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) {
183+
f.flush()
184+
return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil
185+
}
186+
182187
func (f *fakeStore) GetID() uint64 {
183188
return f.id
184189
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ require (
8787
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
8888
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
8989
github.com/pingcap/fn v1.0.0
90-
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
90+
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115
9191
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a
9292
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
9393
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
676676
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
677677
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
678678
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
679-
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
680-
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
679+
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115 h1:tFaBKtuVsTaYgWVa4fJVBHEi3vqdqRtmjMypEK+CN88=
680+
github.com/pingcap/kvproto v0.0.0-20250102071301-c35d2b410115/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
681681
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
682682
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
683683
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=

0 commit comments

Comments
 (0)