Skip to content

Commit e183b20

Browse files
committed
added basic migration support
make bazel prepare Signed-off-by: hillium <[email protected]> fix linters Signed-off-by: hillium <[email protected]> try fix the test case Signed-off-by: hillium <[email protected]>
1 parent 499e45f commit e183b20

File tree

12 files changed

+1909
-158
lines changed

12 files changed

+1909
-158
lines changed

br/pkg/storage/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "storage",
55
srcs = [
66
"azblob.go",
7+
"batch.go",
78
"compress.go",
89
"flags.go",
910
"gcs.go",
@@ -27,6 +28,7 @@ go_library(
2728
deps = [
2829
"//br/pkg/errors",
2930
"//br/pkg/logutil",
31+
"//br/pkg/utils/iter",
3032
"//pkg/lightning/log",
3133
"//pkg/sessionctx/variable",
3234
"//pkg/util",
@@ -76,6 +78,7 @@ go_library(
7678
"@org_golang_x_net//http2",
7779
"@org_golang_x_oauth2//google",
7880
"@org_uber_go_atomic//:atomic",
81+
"@org_uber_go_multierr//:multierr",
7982
"@org_uber_go_zap//:zap",
8083
],
8184
)
@@ -85,6 +88,7 @@ go_test(
8588
timeout = "short",
8689
srcs = [
8790
"azblob_test.go",
91+
"batch_test.go",
8892
"compress_test.go",
8993
"gcs_test.go",
9094
"local_test.go",

br/pkg/storage/batch.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"os"
9+
"path"
10+
"sync"
11+
12+
"github.com/pingcap/errors"
13+
berrors "github.com/pingcap/tidb/br/pkg/errors"
14+
"go.uber.org/multierr"
15+
)
16+
17+
// Effect is an side effect that happens in the batch storage.
18+
type Effect any
19+
20+
// EffPut is the side effect of a call to `WriteFile`.
21+
type EffPut struct {
22+
File string `json:"file"`
23+
Content []byte `json:"content"`
24+
}
25+
26+
// EffDeleteFiles is the side effect of a call to `DeleteFiles`.
27+
type EffDeleteFiles struct {
28+
Files []string `json:"files"`
29+
}
30+
31+
// EffDeleteFile is the side effect of a call to `DeleteFile`.
32+
type EffDeleteFile string
33+
34+
// EffRename is the side effect of a call to `Rename`.
35+
type EffRename struct {
36+
From string `json:"from"`
37+
To string `json:"to"`
38+
}
39+
40+
// JSONEffects converts a slices of effects into json.
41+
// The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}`
42+
func JSONEffects(es []Effect, output io.Writer) error {
43+
type Typed struct {
44+
Type string `json:"type"`
45+
Eff Effect `json:"effect"`
46+
}
47+
48+
out := make([]Typed, 0, len(es))
49+
for _, eff := range es {
50+
out = append(out, Typed{
51+
Type: fmt.Sprintf("%T", eff),
52+
Eff: eff,
53+
})
54+
}
55+
56+
return json.NewEncoder(output).Encode(out)
57+
}
58+
59+
func SaveJSONEffectsToTmp(es []Effect) (string, error) {
60+
// Save the json to a subdir so user can redirect the output path by symlinking...
61+
tmp, err := os.CreateTemp(path.Join(os.TempDir(), "tidb_br"), "br-effects-*.json")
62+
if err != nil {
63+
return "", errors.Trace(err)
64+
}
65+
if err := JSONEffects(es, tmp); err != nil {
66+
return "", err
67+
}
68+
if err := tmp.Sync(); err != nil {
69+
return "", errors.Trace(err)
70+
}
71+
if err := tmp.Close(); err != nil {
72+
return "", errors.Trace(err)
73+
}
74+
return tmp.Name(), nil
75+
}
76+
77+
// Batched is a wrapper of an external storage that suspends all write operations ("effects").
78+
// If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage.
79+
// In that case, we have done a "dry run".
80+
//
81+
// You may use `ReadOnlyEffects()` to get the history of the effects.
82+
// But don't modify the returned slice!
83+
//
84+
// You may use `Commit()` to execute all suspended effects.
85+
type Batched struct {
86+
ExternalStorage
87+
effectsMu sync.Mutex
88+
// It will be one of:
89+
// EffPut, EffDeleteFiles, EffDeleteFile, EffRename
90+
effects []Effect
91+
}
92+
93+
// Batch wraps an external storage instance to a batched version.
94+
func Batch(s ExternalStorage) *Batched {
95+
return &Batched{ExternalStorage: s}
96+
}
97+
98+
// Fetch all effects from the batched storage.
99+
//
100+
// **The returned slice should not be modified.**
101+
func (d *Batched) ReadOnlyEffects() []Effect {
102+
d.effectsMu.Lock()
103+
defer d.effectsMu.Unlock()
104+
return d.effects
105+
}
106+
107+
// CleanEffects cleans all suspended effects.
108+
func (d *Batched) CleanEffects() {
109+
d.effectsMu.Lock()
110+
defer d.effectsMu.Unlock()
111+
d.effects = nil
112+
}
113+
114+
func (d *Batched) DeleteFiles(ctx context.Context, names []string) error {
115+
d.effectsMu.Lock()
116+
defer d.effectsMu.Unlock()
117+
d.effects = append(d.effects, EffDeleteFiles{Files: names})
118+
return nil
119+
}
120+
121+
func (d *Batched) DeleteFile(ctx context.Context, name string) error {
122+
d.effectsMu.Lock()
123+
defer d.effectsMu.Unlock()
124+
d.effects = append(d.effects, EffDeleteFile(name))
125+
return nil
126+
}
127+
128+
func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error {
129+
d.effectsMu.Lock()
130+
defer d.effectsMu.Unlock()
131+
d.effects = append(d.effects, EffPut{File: name, Content: data})
132+
return nil
133+
}
134+
135+
func (d *Batched) Rename(ctx context.Context, oldName, newName string) error {
136+
d.effectsMu.Lock()
137+
defer d.effectsMu.Unlock()
138+
d.effects = append(d.effects, EffRename{From: oldName, To: newName})
139+
return nil
140+
}
141+
142+
func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) {
143+
return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.")
144+
}
145+
146+
// Commit performs all effects recorded so long in the REAL external storage.
147+
// This will cleanup all of the suspended effects.
148+
func (d *Batched) Commit(ctx context.Context) error {
149+
d.effectsMu.Lock()
150+
defer d.effectsMu.Unlock()
151+
152+
var err error
153+
for _, eff := range d.effects {
154+
switch e := eff.(type) {
155+
case EffPut:
156+
err = multierr.Combine(d.ExternalStorage.WriteFile(ctx, e.File, e.Content), err)
157+
case EffDeleteFiles:
158+
err = multierr.Combine(d.ExternalStorage.DeleteFiles(ctx, e.Files), err)
159+
case EffDeleteFile:
160+
err = multierr.Combine(d.ExternalStorage.DeleteFile(ctx, string(e)), err)
161+
case EffRename:
162+
err = multierr.Combine(d.ExternalStorage.Rename(ctx, e.From, e.To), err)
163+
default:
164+
return errors.Annotatef(berrors.ErrStorageUnknown, "Unknown effect type %T", eff)
165+
}
166+
}
167+
168+
d.effects = nil
169+
170+
return nil
171+
}

br/pkg/storage/batch_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package storage_test
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"testing"
8+
9+
. "github.com/pingcap/tidb/br/pkg/storage"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestBatched(t *testing.T) {
14+
ctx := context.Background()
15+
bat := Batch(nil) // Passing nil as we don't need actual storage operations
16+
17+
// Test operations
18+
operations := []struct {
19+
name string
20+
op func() error
21+
expected []Effect
22+
}{
23+
{
24+
name: "DeleteFiles",
25+
op: func() error {
26+
return bat.DeleteFiles(ctx, []string{"file1.txt", "file2.txt"})
27+
},
28+
expected: []Effect{EffDeleteFiles{Files: []string{"file1.txt", "file2.txt"}}},
29+
},
30+
{
31+
name: "DeleteFile",
32+
op: func() error {
33+
return bat.DeleteFile(ctx, "file3.txt")
34+
},
35+
expected: []Effect{EffDeleteFile("file3.txt")},
36+
},
37+
{
38+
name: "WriteFile",
39+
op: func() error {
40+
return bat.WriteFile(ctx, "file4.txt", []byte("content"))
41+
},
42+
expected: []Effect{EffPut{File: "file4.txt", Content: []byte("content")}},
43+
},
44+
{
45+
name: "Rename",
46+
op: func() error {
47+
return bat.Rename(ctx, "oldName.txt", "newName.txt")
48+
},
49+
expected: []Effect{EffRename{From: "oldName.txt", To: "newName.txt"}},
50+
},
51+
{
52+
name: "SequenceOfOperations",
53+
op: func() error {
54+
if err := bat.DeleteFile(ctx, "file5.txt"); err != nil {
55+
return err
56+
}
57+
if err := bat.WriteFile(ctx, "file6.txt", []byte("new content")); err != nil {
58+
return err
59+
}
60+
return bat.Rename(ctx, "file6.txt", "fileRenamed.txt")
61+
},
62+
expected: []Effect{
63+
EffDeleteFile("file5.txt"),
64+
EffPut{File: "file6.txt", Content: []byte("new content")},
65+
EffRename{From: "file6.txt", To: "fileRenamed.txt"},
66+
}},
67+
}
68+
69+
for _, op := range operations {
70+
t.Run(op.name, func(t *testing.T) {
71+
require.NoError(t, op.op())
72+
73+
effects := bat.ReadOnlyEffects()
74+
require.Equal(t, len(op.expected), len(effects))
75+
for i, effect := range effects {
76+
require.Equal(t, op.expected[i], effect)
77+
}
78+
79+
// Reset effects for the next test
80+
bat.CleanEffects()
81+
})
82+
}
83+
}
84+
85+
func TestJSONEffects(t *testing.T) {
86+
effects := []Effect{
87+
EffPut{File: "example.txt", Content: []byte("Hello, world")},
88+
EffDeleteFiles{Files: []string{"old_file.txt", "temp.txt"}},
89+
EffDeleteFile("obsolete.txt"),
90+
EffRename{From: "old_name.txt", To: "new_name.txt"},
91+
}
92+
93+
tmp, err := SaveJSONEffectsToTmp(effects)
94+
require.NoError(t, err)
95+
f, err := os.Open(tmp)
96+
require.NoError(t, err)
97+
buf, err := io.ReadAll(f)
98+
require.NoError(t, err)
99+
100+
expectedJSON := `[
101+
{"type":"storage.EffPut","effect":{"file":"example.txt","content":"SGVsbG8sIHdvcmxk"}},
102+
{"type":"storage.EffDeleteFiles","effect":{"files":["old_file.txt","temp.txt"]}},
103+
{"type":"storage.EffDeleteFile","effect":"obsolete.txt"},
104+
{"type":"storage.EffRename","effect":{"from":"old_name.txt","to":"new_name.txt"}}
105+
]`
106+
107+
require.JSONEq(t, expectedJSON, string(buf), "Output JSON should match expected JSON")
108+
}

br/pkg/storage/helper.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"context"
77
"sync/atomic"
88

9+
"github.com/pingcap/errors"
10+
"github.com/pingcap/tidb/br/pkg/utils/iter"
911
"github.com/pingcap/tidb/pkg/sessionctx/variable"
1012
)
1113

@@ -37,3 +39,50 @@ var activeUploadWorkerCnt atomic.Int64
3739
func GetActiveUploadWorkerCount() int64 {
3840
return activeUploadWorkerCnt.Load()
3941
}
42+
43+
// UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function.
44+
// Returning an iterator that yields the unmarshaled content.
45+
// The "unmarshal" function should put the result of unmarshalling to the `target` argument.
46+
func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T] {
47+
ch := make(chan *T)
48+
errCh := make(chan error, 1)
49+
reader := func() {
50+
defer close(ch)
51+
err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error {
52+
metaBytes, err := s.ReadFile(ctx, path)
53+
if err != nil {
54+
return errors.Annotatef(err, "failed during reading file %s", path)
55+
}
56+
var meta T
57+
if err := unmarshal(&meta, path, metaBytes); err != nil {
58+
return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path)
59+
}
60+
select {
61+
case ch <- &meta:
62+
case <-ctx.Done():
63+
return ctx.Err()
64+
}
65+
return nil
66+
})
67+
if err != nil {
68+
select {
69+
case errCh <- err:
70+
case <-ctx.Done():
71+
}
72+
}
73+
}
74+
go reader()
75+
return iter.Func(func(ctx context.Context) iter.IterResult[*T] {
76+
select {
77+
case <-ctx.Done():
78+
return iter.Throw[*T](ctx.Err())
79+
case err := <-errCh:
80+
return iter.Throw[*T](err)
81+
case meta, ok := <-ch:
82+
if !ok {
83+
return iter.Done[*T]()
84+
}
85+
return iter.Emit(meta)
86+
}
87+
})
88+
}

0 commit comments

Comments
 (0)