Skip to content

Commit 79bcffb

Browse files
authored
*: support concurrent write for S3 writer (pingcap#45723) (pingcap#49185)
ref pingcap#45719, close pingcap#48607
1 parent cbba9a9 commit 79bcffb

File tree

15 files changed

+75
-22
lines changed

15 files changed

+75
-22
lines changed

br/pkg/mock/storage/storage.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

br/pkg/storage/azblob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (s *AzureBlobStorage) URI() string {
415415
}
416416

417417
// Create implements the StorageWriter interface.
418-
func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
418+
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
419419
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
420420
uploader := &azblobUploader{
421421
blobClient: client,

br/pkg/storage/compress.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern
2424
return &withCompression{ExternalStorage: inner, compressType: compressionType}
2525
}
2626

27-
func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
27+
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
2828
var (
2929
writer ExternalFileWriter
3030
err error
3131
)
3232
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
3333
writer, err = s3Storage.CreateUploader(ctx, name)
3434
} else {
35-
writer, err = w.ExternalStorage.Create(ctx, name)
35+
writer, err = w.ExternalStorage.Create(ctx, name, nil)
3636
}
3737
if err != nil {
3838
return nil, errors.Trace(err)

br/pkg/storage/gcs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (s *GCSStorage) URI() string {
246246
}
247247

248248
// Create implements ExternalStorage interface.
249-
func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
249+
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
250250
object := s.objectName(name)
251251
wc := s.bucket.Object(object).NewWriter(ctx)
252252
wc.StorageClass = s.gcs.StorageClass

br/pkg/storage/hdfs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string {
123123
}
124124

125125
// Create opens a file writer by path. path is relative path to storage base path
126-
func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
126+
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
127127
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
128128
}
129129

br/pkg/storage/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader,
131131
}
132132

133133
// Create implements ExternalStorage interface.
134-
func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
134+
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
135135
file, err := os.Create(filepath.Join(l.base, name))
136136
if err != nil {
137137
return nil, errors.Trace(err)

br/pkg/storage/local_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestDeleteFile(t *testing.T) {
2525
require.NoError(t, err)
2626
require.Equal(t, false, ret)
2727

28-
_, err = store.Create(context.Background(), name)
28+
_, err = store.Create(context.Background(), name, nil)
2929
require.NoError(t, err)
3030

3131
ret, err = store.FileExists(context.Background(), name)

br/pkg/storage/memstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (*MemStorage) URI() string {
219219
// Create creates a file and returning a writer to write data into.
220220
// When the writer is closed, the data is stored in the file.
221221
// It implements the `ExternalStorage` interface
222-
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
222+
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
223223
select {
224224
case <-ctx.Done():
225225
return nil, ctx.Err()

br/pkg/storage/memstore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestMemStoreBasic(t *testing.T) {
5050
require.NotNil(t, err)
5151

5252
// create a writer to write
53-
w, err := store.Create(ctx, "/hello.txt")
53+
w, err := store.Create(ctx, "/hello.txt", nil)
5454
require.Nil(t, err)
5555
_, err = w.Write(ctx, []byte("hello world 3"))
5656
require.Nil(t, err)

br/pkg/storage/noop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (*noopStorage) URI() string {
4343
}
4444

4545
// Create implements ExternalStorage interface.
46-
func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
46+
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
4747
return &noopWriter{}, nil
4848
}
4949

0 commit comments

Comments
 (0)