Skip to content

Commit 6878cce

Browse files
djshow832zeminzhou
authored andcommitted
executor: fix that traffic replay from S3 always report an error (pingcap#59813)
close pingcap#59811
1 parent 716f267 commit 6878cce

File tree

3 files changed

+71
-39
lines changed

3 files changed

+71
-39
lines changed

pkg/executor/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ go_test(
389389
flaky = True,
390390
shard_count = 50,
391391
deps = [
392+
"//br/pkg/storage",
392393
"//pkg/config",
393394
"//pkg/ddl",
394395
"//pkg/ddl/util",

pkg/executor/traffic.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ import (
4444

4545
// The keys for the mocked data that stored in context. They are only used for test.
4646
type tiproxyAddrKeyType struct{}
47-
type trafficPathKeyType struct{}
47+
type trafficStoreKeyType struct{}
4848

4949
var tiproxyAddrKey tiproxyAddrKeyType
50-
var trafficPathKey trafficPathKeyType
50+
var trafficStoreKey trafficStoreKeyType
5151

5252
type trafficJob struct {
5353
Instance string `json:"-"` // not passed from TiProxy
@@ -344,11 +344,11 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
344344
if !ok || len(input) == 0 {
345345
return nil, errors.New("the input path for replay must be specified")
346346
}
347-
u, err := storage.ParseRawURL(input)
347+
backend, err := storage.ParseBackend(input, nil)
348348
if err != nil {
349349
return nil, errors.Wrapf(err, "parse input path failed")
350350
}
351-
if storage.IsLocal(u) {
351+
if backend.GetLocal() != nil {
352352
readers := make([]io.Reader, tiproxyNum)
353353
form := getForm(args)
354354
for i := 0; i < tiproxyNum; i++ {
@@ -357,34 +357,38 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
357357
return readers, nil
358358
}
359359

360-
names := make([]string, 0, tiproxyNum)
361-
if mockNames := ctx.Value(trafficPathKey); mockNames != nil {
362-
names = mockNames.([]string)
360+
var store storage.ExternalStorage
361+
if mockStore := ctx.Value(trafficStoreKey); mockStore != nil {
362+
store = mockStore.(storage.ExternalStorage)
363363
} else {
364-
backend, err := storage.ParseBackendFromURL(u, nil)
365-
if err != nil {
366-
return nil, errors.Wrapf(err, "parse backend from the input path failed")
367-
}
368-
store, err := storage.NewWithDefaultOpt(ctx, backend)
364+
store, err = storage.NewWithDefaultOpt(ctx, backend)
369365
if err != nil {
370366
return nil, errors.Wrapf(err, "create storage for input failed")
371367
}
372368
defer store.Close()
373-
err = store.WalkDir(ctx, &storage.WalkOption{
374-
ObjPrefix: filePrefix,
375-
}, func(name string, _ int64) error {
376-
names = append(names, name)
377-
return nil
378-
})
379-
if err != nil {
380-
return nil, errors.Wrapf(err, "walk input path failed")
369+
}
370+
names := make(map[string]struct{}, tiproxyNum)
371+
err = store.WalkDir(ctx, &storage.WalkOption{
372+
ObjPrefix: filePrefix,
373+
}, func(name string, _ int64) error {
374+
if idx := strings.Index(name, "/"); idx >= 0 {
375+
names[name[:idx]] = struct{}{}
381376
}
377+
return nil
378+
})
379+
if err != nil {
380+
return nil, errors.Wrapf(err, "walk input path failed")
382381
}
383382
if len(names) == 0 {
384383
return nil, errors.New("no replay files found in the input path")
385384
}
386385
readers := make([]io.Reader, 0, len(names))
387-
for _, name := range names {
386+
// ParseBackendFromURL clears URL.RawQuery, so no need to reuse the *url.URL.
387+
u, err := storage.ParseRawURL(input)
388+
if err != nil {
389+
return nil, errors.Wrapf(err, "parse input path failed")
390+
}
391+
for name := range names {
388392
m := maps.Clone(args)
389393
m[inputKey] = u.JoinPath(name).String()
390394
form := getForm(m)

pkg/executor/traffic_test.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sync"
2727
"testing"
2828

29+
"github.com/pingcap/tidb/br/pkg/storage"
2930
"github.com/pingcap/tidb/pkg/domain/infosync"
3031
"github.com/pingcap/tidb/pkg/executor/internal/exec"
3132
"github.com/pingcap/tidb/pkg/infoschema"
@@ -190,16 +191,18 @@ func TestCapturePath(t *testing.T) {
190191
ctx := context.TODO()
191192
tempCtx := fillCtxWithTiProxyAddr(ctx, ports)
192193
suite := newTrafficTestSuite(t, 10)
193-
exec := suite.build(ctx, "traffic capture to 's3://bucket/tmp' duration='1s'")
194+
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
195+
exec := suite.build(ctx, fmt.Sprintf("traffic capture to '%s?%s' duration='1s'", prefix, suffix))
194196
require.NoError(t, exec.Next(tempCtx, nil))
195197

196198
paths := make([]string, 0, tiproxyNum)
197199
expectedPaths := make([]string, 0, tiproxyNum)
198200
for i := 0; i < tiproxyNum; i++ {
199201
httpHandler := handlers[i]
200202
output := httpHandler.getForm().Get("output")
201-
require.True(t, strings.HasPrefix(output, "s3://bucket/tmp/"), output)
202-
paths = append(paths, output[len("s3://bucket/tmp/"):])
203+
require.True(t, strings.HasPrefix(output, prefix), output)
204+
require.True(t, strings.HasSuffix(output, suffix), output)
205+
paths = append(paths, output[len(prefix)+1:len(output)-len(suffix)-1])
203206
expectedPaths = append(expectedPaths, fmt.Sprintf("tiproxy-%d", i))
204207
}
205208
sort.Strings(paths)
@@ -236,40 +239,47 @@ func TestReplayPath(t *testing.T) {
236239
formPaths: []string{},
237240
},
238241
{
239-
paths: []string{"tiproxy-0"},
242+
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-0/traffic-2.log"},
240243
formPaths: []string{"tiproxy-0"},
241244
warn: "tiproxy instances number (2) is greater than input paths number (1)",
242245
},
243246
{
244-
paths: []string{"tiproxy-0", "tiproxy-1"},
247+
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2"},
245248
formPaths: []string{"tiproxy-0", "tiproxy-1"},
246249
},
247250
{
248-
paths: []string{"tiproxy-0", "tiproxy-1", "tiproxy-2"},
251+
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-1/meta", "tiproxy-1/traffic-1.log"},
252+
formPaths: []string{"tiproxy-0", "tiproxy-1"},
253+
},
254+
{
255+
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2/meta"},
249256
formPaths: []string{},
250257
err: "tiproxy instances number (2) is less than input paths number (3)",
251258
},
252259
}
253260
ctx := context.TODO()
261+
store := &mockExternalStorage{}
254262
ctx = fillCtxWithTiProxyAddr(ctx, ports)
263+
ctx = context.WithValue(ctx, trafficStoreKey, store)
264+
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
255265
for i, test := range tests {
256-
tempCtx := context.WithValue(ctx, trafficPathKey, test.paths)
266+
store.paths = test.paths
257267
suite := newTrafficTestSuite(t, 10)
258-
exec := suite.build(ctx, "traffic replay from 's3://bucket/tmp' user='root'")
268+
exec := suite.build(ctx, fmt.Sprintf("traffic replay from '%s?%s' user='root'", prefix, suffix))
259269
for j := 0; j < tiproxyNum; j++ {
260270
handlers[j].reset()
261271
}
262-
err := exec.Next(tempCtx, nil)
272+
err := exec.Next(ctx, nil)
263273
if test.err != "" {
264-
require.ErrorContains(t, err, test.err)
274+
require.ErrorContains(t, err, test.err, "case %d", i)
265275
} else {
266-
require.NoError(t, err)
276+
require.NoError(t, err, "case %d", i)
267277
warnings := suite.stmtCtx().GetWarnings()
268278
if test.warn != "" {
269-
require.Len(t, warnings, 1)
270-
require.ErrorContains(t, warnings[0].Err, test.warn)
279+
require.Len(t, warnings, 1, "case %d", i)
280+
require.ErrorContains(t, warnings[0].Err, test.warn, "case %d", i)
271281
} else {
272-
require.Len(t, warnings, 0)
282+
require.Len(t, warnings, 0, "case %d", i)
273283
}
274284
}
275285

@@ -278,14 +288,15 @@ func TestReplayPath(t *testing.T) {
278288
httpHandler := handlers[j]
279289
if httpHandler.getMethod() != "" {
280290
form := httpHandler.getForm()
281-
require.NotEmpty(t, form)
291+
require.NotEmpty(t, form, "case %d", i)
282292
input := form.Get("input")
283-
require.True(t, strings.HasPrefix(input, "s3://bucket/tmp/"), input)
284-
formPaths = append(formPaths, input[len("s3://bucket/tmp/"):])
293+
require.True(t, strings.HasPrefix(input, prefix), input)
294+
require.True(t, strings.HasSuffix(input, suffix), input)
295+
formPaths = append(formPaths, input[len(prefix)+1:len(input)-len(suffix)-1])
285296
}
286297
}
287298
sort.Strings(formPaths)
288-
require.Equal(t, test.formPaths, formPaths, "case %d", i)
299+
require.Equal(t, test.formPaths, formPaths, "case %d", i, "case %d", i)
289300
}
290301
}
291302

@@ -579,3 +590,19 @@ type mockPrivManager struct {
579590
func (m *mockPrivManager) RequestDynamicVerification(activeRoles []*auth.RoleIdentity, privName string, grantable bool) bool {
580591
return m.Called(activeRoles, privName, grantable).Bool(0)
581592
}
593+
594+
var _ storage.ExternalStorage = (*mockExternalStorage)(nil)
595+
596+
type mockExternalStorage struct {
597+
storage.ExternalStorage
598+
paths []string
599+
}
600+
601+
func (s *mockExternalStorage) WalkDir(ctx context.Context, _ *storage.WalkOption, fn func(string, int64) error) error {
602+
for _, path := range s.paths {
603+
if err := fn(path, 0); err != nil {
604+
return err
605+
}
606+
}
607+
return nil
608+
}

0 commit comments

Comments
 (0)