From 3ea8820fd6d83f109867db8ebfc0f3f32238b8ee Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 7 Apr 2025 11:08:05 +0800 Subject: [PATCH 1/2] external store: use separate ctx for GCS clients Signed-off-by: lance6716 --- br/pkg/storage/gcs.go | 55 +++++++++++++++++++++++++++++--------- br/pkg/storage/gcs_test.go | 29 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index fd6557d27713e..69be172c07dfe 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -110,8 +110,9 @@ type GCSStorage struct { clientCnt int64 clientOps []option.ClientOption - handles []*storage.BucketHandle - clients []*storage.Client + handles []*storage.BucketHandle + clients []*storage.Client + clientCancel context.CancelFunc } // CopyFrom implements Copier. @@ -366,6 +367,7 @@ func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string // Close implements ExternalStorage interface. func (s *GCSStorage) Close() { + s.clientCancel() for _, client := range s.clients { if err := client.Close(); err != nil { log.Warn("failed to close gcs client", zap.Error(err)) @@ -447,32 +449,46 @@ skipHandleCred: return ret, nil } -// Reset resets the GCS storage. +// Reset resets the GCS storage. Reset should not be used concurrently with +// Close. func (s *GCSStorage) Reset(ctx context.Context) error { logutil.Logger(ctx).Info("resetting gcs storage") - for _, client := range s.clients { - _ = client.Close() - } + s.cancelAndCloseGCSClients() + + clientCtx, clientCancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-done: + case <-ctx.Done(): + clientCancel() + } + }() s.clients = make([]*storage.Client, gcsClientCnt) - eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) + firstErr := atomic.NewError(nil) + wg := util.WaitGroupWrapper{} for i := range s.clients { - eg.Go(func() error { - client, err := storage.NewClient(egCtx, s.clientOps...) + wg.RunWithLog(func() { + client, err := storage.NewClient(clientCtx, s.clientOps...) if err != nil { - return errors.Trace(err) + firstErr.CompareAndSwap(nil, err) + clientCancel() + return } client.SetRetry(storage.WithErrorFunc(shouldRetry), storage.WithPolicy(storage.RetryAlways)) s.clients[i] = client - return nil }) } - err := eg.Wait() - if err != nil { + wg.Wait() + if err := firstErr.Load(); err != nil { + s.cancelAndCloseGCSClients() return errors.Trace(err) } + s.clientCancel = clientCancel s.handles = make([]*storage.BucketHandle, gcsClientCnt) for i := range s.handles { s.handles[i] = s.clients[i].Bucket(s.gcs.Bucket) @@ -480,6 +496,19 @@ func (s *GCSStorage) Reset(ctx context.Context) error { return nil } +func (s *GCSStorage) cancelAndCloseGCSClients() { + if s.clientCancel != nil { + s.clientCancel() + s.clientCancel = nil + } + + for _, client := range s.clients { + if client != nil { + _ = client.Close() + } + } +} + func shouldRetry(err error) bool { if storage.ShouldRetry(err) { return true diff --git a/br/pkg/storage/gcs_test.go b/br/pkg/storage/gcs_test.go index 067a360ef7486..0b3c1f8034f38 100644 --- a/br/pkg/storage/gcs_test.go +++ b/br/pkg/storage/gcs_test.go @@ -10,6 +10,8 @@ import ( "flag" "fmt" "io" + "net/http" + "net/http/httptest" "net/url" "os" "testing" @@ -578,3 +580,30 @@ func TestGCSShouldRetry(t *testing.T) { require.True(t, shouldRetry(&url.Error{Err: goerrors.New("http2: client connection lost"), Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"})) require.True(t, shouldRetry(&url.Error{Err: io.EOF, Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"})) } + +func TestCtxUsage(t *testing.T) { + httpSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer httpSvr.Close() + + ctx := context.Background() + gcs := &backuppb.GCS{ + Endpoint: httpSvr.URL, + Bucket: "test", + Prefix: "prefix", + StorageClass: "NEARLINE", + PredefinedAcl: "private", + CredentialsBlob: fmt.Sprintf(` +{ + "type":"external_account", + "audience":"//iam.googleapis.com/projects/1234567890123/locations/global/workloadIdentityPools/my-pool/providers/my-provider", + "subject_token_type":"urn:ietf:params:oauth:token-type:access_token", + "credential_source":{"url":"%s"} +}`, httpSvr.URL), + } + stg, err := NewGCSStorage(ctx, gcs, &ExternalStorageOptions{}) + require.NoError(t, err) + + _, err = stg.FileExists(ctx, "key") + // before the fix, it's context canceled error + require.ErrorContains(t, err, "invalid_request") +} From 2e9a85b3da0a829b770b0d5457848014b61c5c45 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 10 Apr 2025 21:00:07 +0800 Subject: [PATCH 2/2] address comment Signed-off-by: lance6716 --- br/pkg/storage/gcs.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index 69be172c07dfe..330d11c7b2916 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -457,20 +457,24 @@ func (s *GCSStorage) Reset(ctx context.Context) error { s.cancelAndCloseGCSClients() clientCtx, clientCancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - defer close(done) - go func() { - select { - case <-done: - case <-ctx.Done(): - clientCancel() + s.clients = make([]*storage.Client, 0, gcsClientCnt) + wg := util.WaitGroupWrapper{} + cliCh := make(chan *storage.Client) + wg.RunWithLog(func() { + for range gcsClientCnt { + select { + case cli := <-cliCh: + s.clients = append(s.clients, cli) + case <-ctx.Done(): + clientCancel() + return + case <-clientCtx.Done(): + return + } } - }() - - s.clients = make([]*storage.Client, gcsClientCnt) + }) firstErr := atomic.NewError(nil) - wg := util.WaitGroupWrapper{} - for i := range s.clients { + for range gcsClientCnt { wg.RunWithLog(func() { client, err := storage.NewClient(clientCtx, s.clientOps...) if err != nil { @@ -479,7 +483,10 @@ func (s *GCSStorage) Reset(ctx context.Context) error { return } client.SetRetry(storage.WithErrorFunc(shouldRetry), storage.WithPolicy(storage.RetryAlways)) - s.clients[i] = client + select { + case cliCh <- client: + case <-clientCtx.Done(): + } }) } wg.Wait()