Skip to content

Commit 06161f5

Browse files
authored
cloudstorage: fix a bug causing context cancelled errors and stuck sink (#12276)
close #12277
1 parent a1b92b8 commit 06161f5

File tree

2 files changed

+89
-2
lines changed

2 files changed

+89
-2
lines changed

pkg/util/external_storage.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,28 @@ func (s *extStorageWithTimeout) DeleteFile(ctx context.Context, name string) err
197197
func (s *extStorageWithTimeout) Open(
198198
ctx context.Context, path string, _ *storage.ReaderOption,
199199
) (storage.ExternalFileReader, error) {
200+
// Unlike other methods, Open method cannot call cancel() in defer.
201+
// This is because the reader's lifetime is bound to the context provided at Open().
202+
// Subsequent Read() calls on reader will observe context cancellation.
203+
// Instead, we wrap the reader in a struct and cancel it's context in Close().
200204
ctx, cancel := context.WithTimeout(ctx, s.timeout)
201-
defer cancel()
202-
return s.ExternalStorage.Open(ctx, path, nil)
205+
r, err := s.ExternalStorage.Open(ctx, path, nil)
206+
if err != nil {
207+
cancel()
208+
return nil, err
209+
}
210+
return &readerWithCancel{ExternalFileReader: r, cancel: cancel}, nil
211+
}
212+
213+
type readerWithCancel struct {
214+
storage.ExternalFileReader
215+
cancel context.CancelFunc
216+
}
217+
218+
// Close the reader and cancel the context.
219+
func (r *readerWithCancel) Close() error {
220+
defer r.cancel()
221+
return r.ExternalFileReader.Close()
203222
}
204223

205224
// WalkDir traverse all the files in a dir.

pkg/util/external_storage_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ func (m *mockExternalStorage) WriteFile(ctx context.Context, name string, data [
6767
return nil
6868
}
6969

70+
// Implement Open so tests can simulate readers that bind to the Open() context.
71+
func (m *mockExternalStorage) Open(ctx context.Context, path string, option *storage.ReaderOption) (storage.ExternalFileReader, error) {
72+
return &ctxBoundReader{ctx: ctx}, nil
73+
}
74+
75+
func (m *mockExternalStorage) URI() string { return "mock://" }
76+
77+
func (m *mockExternalStorage) Close() {}
78+
7079
func TestExtStorageWithTimeoutWriteFileTimeout(t *testing.T) {
7180
testTimeout := 50 * time.Millisecond
7281

@@ -123,3 +132,62 @@ func TestExtStorageWithTimeoutWriteFileSuccess(t *testing.T) {
123132
// Assert success
124133
require.NoError(t, err, "Expected no error for successful write within timeout")
125134
}
135+
136+
// ctxBoundReader is a reader that checks the context passed to Open().
137+
// It simulates backends (e.g., Azure) that bind reader lifetime to the Open() context.
138+
type ctxBoundReader struct {
139+
ctx context.Context
140+
}
141+
142+
func (r *ctxBoundReader) Read(p []byte) (int, error) {
143+
if err := r.ctx.Err(); err != nil {
144+
return 0, err
145+
}
146+
if len(p) == 0 {
147+
return 0, nil
148+
}
149+
p[0] = 'x'
150+
return 1, nil
151+
}
152+
153+
func (r *ctxBoundReader) Seek(offset int64, whence int) (int64, error) {
154+
return 0, nil
155+
}
156+
157+
func (r *ctxBoundReader) Close() error { return nil }
158+
159+
func (r *ctxBoundReader) GetFileSize() (int64, error) { return 1, nil }
160+
161+
func TestExtStorageOpenDoesNotCancelReaderContext(t *testing.T) {
162+
timedStore := &extStorageWithTimeout{
163+
ExternalStorage: &mockExternalStorage{},
164+
timeout: 100 * time.Millisecond,
165+
}
166+
167+
rd, err := timedStore.Open(context.Background(), "file", nil)
168+
require.NoError(t, err)
169+
defer rd.Close()
170+
171+
// If Open() had used a derived context with immediate cancel, this would fail with context canceled.
172+
_, err = rd.Read(make([]byte, 1))
173+
require.NoError(t, err)
174+
}
175+
176+
func TestExtStorageOpenReaderRespectsCallerCancel(t *testing.T) {
177+
timedStore := &extStorageWithTimeout{
178+
ExternalStorage: &mockExternalStorage{},
179+
timeout: 10 * time.Millisecond,
180+
}
181+
182+
ctx, cancel := context.WithCancel(context.Background())
183+
rd, err := timedStore.Open(ctx, "file", nil)
184+
require.NoError(t, err)
185+
defer rd.Close()
186+
187+
// This should cause the reader to fail with context canceled.
188+
cancel()
189+
_, err = rd.Read(make([]byte, 1))
190+
191+
require.Error(t, err)
192+
require.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded))
193+
}

0 commit comments

Comments
 (0)