Skip to content

Commit 536a878

Browse files
authored
cloudstorage: fix a bug that may cause storage sink get stuck (#12142) (#12145)
close #9162
1 parent 0fb0b54 commit 536a878

File tree

4 files changed

+135
-4
lines changed

4 files changed

+135
-4
lines changed

cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func newDDLSink(ctx context.Context,
7474
return nil, errors.Trace(err)
7575
}
7676

77-
storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String())
77+
storage, err := util.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String())
7878
if err != nil {
7979
return nil, err
8080
}

cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func NewDMLSink(ctx context.Context,
117117
}
118118

119119
// create an external storage.
120-
storage, err := putil.GetExternalStorageFromURI(ctx, sinkURI.String())
120+
storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String())
121121
if err != nil {
122122
return nil, err
123123
}

pkg/util/external_storage.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ import (
3535
"golang.org/x/sync/errgroup"
3636
)
3737

38-
const defaultTimeout = 5 * time.Minute
38+
const (
39+
defaultTimeout = 5 * time.Minute
40+
)
3941

4042
// GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
4143
func GetExternalStorageFromURI(
@@ -163,7 +165,11 @@ type extStorageWithTimeout struct {
163165
func (s *extStorageWithTimeout) WriteFile(ctx context.Context, name string, data []byte) error {
164166
ctx, cancel := context.WithTimeout(ctx, s.timeout)
165167
defer cancel()
166-
return s.ExternalStorage.WriteFile(ctx, name, data)
168+
err := s.ExternalStorage.WriteFile(ctx, name, data)
169+
if err != nil {
170+
err = errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("WriteFile")
171+
}
172+
return err
167173
}
168174

169175
// ReadFile reads a complete file from storage, similar to os.ReadFile

pkg/util/external_storage_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package util
15+
16+
import (
17+
"context"
18+
"errors"
19+
"net/http"
20+
"testing"
21+
"time"
22+
23+
"github.com/pingcap/tidb/br/pkg/storage"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
// mockRoundTripper blocks until the context is done.
28+
type mockRoundTripper struct {
29+
blockUntilContextDone bool
30+
err error
31+
}
32+
33+
func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
34+
if m.blockUntilContextDone {
35+
<-req.Context().Done()
36+
return nil, req.Context().Err()
37+
}
38+
// Return immediately for success case
39+
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, m.err
40+
}
41+
42+
// mockExternalStorage is a mock implementation for testing timeouts via http client.
43+
type mockExternalStorage struct {
44+
storage.ExternalStorage // Embed the interface to satisfy it easily
45+
httpClient *http.Client
46+
}
47+
48+
// WriteFile simulates a write operation by making an HTTP request that respects context cancellation.
49+
func (m *mockExternalStorage) WriteFile(ctx context.Context, name string, data []byte) error {
50+
if m.httpClient == nil {
51+
panic("httpClient not set in mockExternalStorage") // Should be set in tests
52+
}
53+
// Create a dummy request. The URL doesn't matter as the RoundTripper is mocked.
54+
req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://mock/"+name, http.NoBody)
55+
if err != nil {
56+
return err // Should not happen with valid inputs
57+
}
58+
59+
resp, err := m.httpClient.Do(req)
60+
if err != nil {
61+
return err // This will include context errors like DeadlineExceeded
62+
}
63+
resp.Body.Close() // Important to close the body
64+
if resp.StatusCode != http.StatusOK {
65+
return errors.New("mock http request failed") // Or handle specific statuses
66+
}
67+
return nil
68+
}
69+
70+
func TestExtStorageWithTimeoutWriteFileTimeout(t *testing.T) {
71+
testTimeout := 50 * time.Millisecond
72+
73+
// Create a mock HTTP client that blocks until context is done
74+
mockClient := &http.Client{
75+
Transport: &mockRoundTripper{blockUntilContextDone: true},
76+
}
77+
78+
mockStore := &mockExternalStorage{
79+
httpClient: mockClient,
80+
}
81+
82+
// Wrap the mock store with the timeout logic
83+
timedStore := &extStorageWithTimeout{
84+
ExternalStorage: mockStore,
85+
timeout: testTimeout,
86+
}
87+
88+
startTime := time.Now()
89+
// Use context.Background() as the base context
90+
err := timedStore.WriteFile(context.Background(), "testfile", []byte("data"))
91+
duration := time.Since(startTime)
92+
93+
// 1. Assert that an error occurred
94+
require.Error(t, err, "Expected an error due to timeout")
95+
96+
// 2. Assert that the error is context.DeadlineExceeded
97+
require.True(t, errors.Is(err, context.DeadlineExceeded), "Expected context.DeadlineExceeded error, got: %v", err)
98+
99+
// 3. Assert that the function returned quickly (around the timeout duration)
100+
require.InDelta(t, testTimeout, duration, float64(testTimeout)*0.5, "Duration (%v) should be close to the timeout (%v)", duration, testTimeout)
101+
}
102+
103+
func TestExtStorageWithTimeoutWriteFileSuccess(t *testing.T) {
104+
testTimeout := 100 * time.Millisecond
105+
106+
// Create a mock HTTP client that returns success immediately
107+
mockClient := &http.Client{
108+
Transport: &mockRoundTripper{blockUntilContextDone: false, err: nil},
109+
}
110+
111+
mockStore := &mockExternalStorage{
112+
httpClient: mockClient,
113+
}
114+
115+
timedStore := &extStorageWithTimeout{
116+
ExternalStorage: mockStore,
117+
timeout: testTimeout,
118+
}
119+
120+
// Use context.Background() as the base context
121+
err := timedStore.WriteFile(context.Background(), "testfile", []byte("data"))
122+
123+
// Assert success
124+
require.NoError(t, err, "Expected no error for successful write within timeout")
125+
}

0 commit comments

Comments
 (0)