Skip to content

Commit 919a8aa

Browse files
D3Hunterzeminzhou
authored andcommitted
objstore: fix read position on retry when reading a file range using s3 (pingcap#59694)
close pingcap#50451
1 parent ae2d373 commit 919a8aa

File tree

3 files changed

+49
-0
lines changed

3 files changed

+49
-0
lines changed

br/pkg/storage/ks3.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (E
450450
storage: rs,
451451
name: path,
452452
reader: reader,
453+
pos: r.Start,
453454
rangeInfo: r,
454455
prefetchSize: prefetchSize,
455456
}, nil

br/pkg/storage/s3.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,7 @@ func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (Ex
828828
storage: rs,
829829
name: path,
830830
reader: reader,
831+
pos: r.Start,
831832
ctx: ctx,
832833
rangeInfo: r,
833834
prefetchSize: prefetchSize,

br/pkg/storage/s3_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os"
1515
"strings"
1616
"sync"
17+
"sync/atomic"
1718
"testing"
1819

1920
"github.com/aws/aws-sdk-go/aws"
@@ -856,6 +857,52 @@ func (s *s3Suite) expectedCalls(ctx context.Context, t *testing.T, data []byte,
856857
}
857858
}
858859

860+
type mockFailReader struct {
861+
r io.Reader
862+
failCount *atomic.Int32
863+
}
864+
865+
func (f *mockFailReader) Read(p []byte) (n int, err error) {
866+
if f.failCount.Load() > 0 {
867+
f.failCount.Add(-1)
868+
return 0, errors.New("mock read error")
869+
}
870+
return f.r.Read(p)
871+
}
872+
873+
func TestS3RangeReaderRetryRead(t *testing.T) {
874+
s := createS3Suite(t)
875+
ctx := aws.BackgroundContext()
876+
content := []byte("0123456789")
877+
var failCount atomic.Int32
878+
s.s3.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any()).
879+
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
880+
var start int
881+
_, err := fmt.Sscanf(*input.Range, "bytes=%d-", &start)
882+
require.NoError(t, err)
883+
requestedBytes := content[start:]
884+
return &s3.GetObjectOutput{
885+
Body: io.NopCloser(&mockFailReader{r: bytes.NewReader(requestedBytes), failCount: &failCount}),
886+
ContentRange: aws.String(fmt.Sprintf("bytes %d-%d/%d", start, len(content)-1, len(content))),
887+
}, nil
888+
}).Times(2)
889+
reader, err := s.storage.Open(ctx, "random", &ReaderOption{StartOffset: aws.Int64(3)})
890+
require.NoError(t, err)
891+
defer func() {
892+
require.NoError(t, reader.Close())
893+
}()
894+
slice := make([]byte, 2)
895+
n, err := reader.Read(slice)
896+
require.NoError(t, err)
897+
require.Equal(t, 2, n)
898+
require.Equal(t, []byte("34"), slice)
899+
failCount.Store(1)
900+
n, err = reader.Read(slice)
901+
require.NoError(t, err)
902+
require.Equal(t, 2, n)
903+
require.Equal(t, []byte("56"), slice)
904+
}
905+
859906
// TestS3ReaderWithRetryEOF check the Read with retry and end with io.EOF.
860907
func TestS3ReaderWithRetryEOF(t *testing.T) {
861908
s := createS3Suite(t)

0 commit comments

Comments
 (0)