Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//br/pkg/storage",
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util",
Expand Down
46 changes: 25 additions & 21 deletions pkg/executor/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (

// The keys for the mocked data that stored in context. They are only used for test.
type tiproxyAddrKeyType struct{}
type trafficPathKeyType struct{}
type trafficStoreKeyType struct{}

var tiproxyAddrKey tiproxyAddrKeyType
var trafficPathKey trafficPathKeyType
var trafficStoreKey trafficStoreKeyType

type trafficJob struct {
Instance string `json:"-"` // not passed from TiProxy
Expand Down Expand Up @@ -344,11 +344,11 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
if !ok || len(input) == 0 {
return nil, errors.New("the input path for replay must be specified")
}
u, err := storage.ParseRawURL(input)
backend, err := storage.ParseBackend(input, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
if storage.IsLocal(u) {
if backend.GetLocal() != nil {
readers := make([]io.Reader, tiproxyNum)
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
Expand All @@ -357,34 +357,38 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i
return readers, nil
}

names := make([]string, 0, tiproxyNum)
if mockNames := ctx.Value(trafficPathKey); mockNames != nil {
names = mockNames.([]string)
var store storage.ExternalStorage
if mockStore := ctx.Value(trafficStoreKey); mockStore != nil {
store = mockStore.(storage.ExternalStorage)
} else {
backend, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse backend from the input path failed")
}
store, err := storage.NewWithDefaultOpt(ctx, backend)
store, err = storage.NewWithDefaultOpt(ctx, backend)
if err != nil {
return nil, errors.Wrapf(err, "create storage for input failed")
}
defer store.Close()
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
names = append(names, name)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
names := make(map[string]struct{}, tiproxyNum)
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
if idx := strings.Index(name, "/"); idx >= 0 {
names[name[:idx]] = struct{}{}
}
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
if len(names) == 0 {
return nil, errors.New("no replay files found in the input path")
}
readers := make([]io.Reader, 0, len(names))
for _, name := range names {
// ParseBackendFromURL clears URL.RawQuery, so no need to reuse the *url.URL.
u, err := storage.ParseRawURL(input)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
for name := range names {
m := maps.Clone(args)
m[inputKey] = u.JoinPath(name).String()
form := getForm(m)
Expand Down
63 changes: 45 additions & 18 deletions pkg/executor/traffic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -190,16 +191,18 @@ func TestCapturePath(t *testing.T) {
ctx := context.TODO()
tempCtx := fillCtxWithTiProxyAddr(ctx, ports)
suite := newTrafficTestSuite(t, 10)
exec := suite.build(ctx, "traffic capture to 's3://bucket/tmp' duration='1s'")
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
exec := suite.build(ctx, fmt.Sprintf("traffic capture to '%s?%s' duration='1s'", prefix, suffix))
require.NoError(t, exec.Next(tempCtx, nil))

paths := make([]string, 0, tiproxyNum)
expectedPaths := make([]string, 0, tiproxyNum)
for i := 0; i < tiproxyNum; i++ {
httpHandler := handlers[i]
output := httpHandler.getForm().Get("output")
require.True(t, strings.HasPrefix(output, "s3://bucket/tmp/"), output)
paths = append(paths, output[len("s3://bucket/tmp/"):])
require.True(t, strings.HasPrefix(output, prefix), output)
require.True(t, strings.HasSuffix(output, suffix), output)
paths = append(paths, output[len(prefix)+1:len(output)-len(suffix)-1])
expectedPaths = append(expectedPaths, fmt.Sprintf("tiproxy-%d", i))
}
sort.Strings(paths)
Expand Down Expand Up @@ -236,40 +239,47 @@ func TestReplayPath(t *testing.T) {
formPaths: []string{},
},
{
paths: []string{"tiproxy-0"},
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-0/traffic-2.log"},
formPaths: []string{"tiproxy-0"},
warn: "tiproxy instances number (2) is greater than input paths number (1)",
},
{
paths: []string{"tiproxy-0", "tiproxy-1"},
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2"},
formPaths: []string{"tiproxy-0", "tiproxy-1"},
},
{
paths: []string{"tiproxy-0", "tiproxy-1", "tiproxy-2"},
paths: []string{"tiproxy-0/meta", "tiproxy-0/traffic-1.log", "tiproxy-1/meta", "tiproxy-1/traffic-1.log"},
formPaths: []string{"tiproxy-0", "tiproxy-1"},
},
{
paths: []string{"tiproxy-0/meta", "tiproxy-1/meta", "tiproxy-2/meta"},
formPaths: []string{},
err: "tiproxy instances number (2) is less than input paths number (3)",
},
}
ctx := context.TODO()
store := &mockExternalStorage{}
ctx = fillCtxWithTiProxyAddr(ctx, ports)
ctx = context.WithValue(ctx, trafficStoreKey, store)
prefix, suffix := "s3://bucket/tmp", "access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://minio:8000&force-path-style=true"
for i, test := range tests {
tempCtx := context.WithValue(ctx, trafficPathKey, test.paths)
store.paths = test.paths
suite := newTrafficTestSuite(t, 10)
exec := suite.build(ctx, "traffic replay from 's3://bucket/tmp' user='root'")
exec := suite.build(ctx, fmt.Sprintf("traffic replay from '%s?%s' user='root'", prefix, suffix))
for j := 0; j < tiproxyNum; j++ {
handlers[j].reset()
}
err := exec.Next(tempCtx, nil)
err := exec.Next(ctx, nil)
if test.err != "" {
require.ErrorContains(t, err, test.err)
require.ErrorContains(t, err, test.err, "case %d", i)
} else {
require.NoError(t, err)
require.NoError(t, err, "case %d", i)
warnings := suite.stmtCtx().GetWarnings()
if test.warn != "" {
require.Len(t, warnings, 1)
require.ErrorContains(t, warnings[0].Err, test.warn)
require.Len(t, warnings, 1, "case %d", i)
require.ErrorContains(t, warnings[0].Err, test.warn, "case %d", i)
} else {
require.Len(t, warnings, 0)
require.Len(t, warnings, 0, "case %d", i)
}
}

Expand All @@ -278,14 +288,15 @@ func TestReplayPath(t *testing.T) {
httpHandler := handlers[j]
if httpHandler.getMethod() != "" {
form := httpHandler.getForm()
require.NotEmpty(t, form)
require.NotEmpty(t, form, "case %d", i)
input := form.Get("input")
require.True(t, strings.HasPrefix(input, "s3://bucket/tmp/"), input)
formPaths = append(formPaths, input[len("s3://bucket/tmp/"):])
require.True(t, strings.HasPrefix(input, prefix), input)
require.True(t, strings.HasSuffix(input, suffix), input)
formPaths = append(formPaths, input[len(prefix)+1:len(input)-len(suffix)-1])
}
}
sort.Strings(formPaths)
require.Equal(t, test.formPaths, formPaths, "case %d", i)
require.Equal(t, test.formPaths, formPaths, "case %d", i, "case %d", i)
}
}

Expand Down Expand Up @@ -579,3 +590,19 @@ type mockPrivManager struct {
func (m *mockPrivManager) RequestDynamicVerification(activeRoles []*auth.RoleIdentity, privName string, grantable bool) bool {
return m.Called(activeRoles, privName, grantable).Bool(0)
}

var _ storage.ExternalStorage = (*mockExternalStorage)(nil)

type mockExternalStorage struct {
storage.ExternalStorage
paths []string
}

func (s *mockExternalStorage) WalkDir(ctx context.Context, _ *storage.WalkOption, fn func(string, int64) error) error {
for _, path := range s.paths {
if err := fn(path, 0); err != nil {
return err
}
}
return nil
}