Skip to content

Commit 99d5a6b

Browse files
authored
external store: use separate ctx for GCS clients (#60402) (#60502)
close #60155
1 parent 5424a65 commit 99d5a6b

File tree

2 files changed

+82
-17
lines changed

2 files changed

+82
-17
lines changed

br/pkg/storage/gcs.go

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ type GCSStorage struct {
109109
clientCnt int64
110110
clientOps []option.ClientOption
111111

112-
handles []*storage.BucketHandle
113-
clients []*storage.Client
112+
handles []*storage.BucketHandle
113+
clients []*storage.Client
114+
clientCancel context.CancelFunc
114115
}
115116

116117
// GetBucketHandle gets the handle to the GCS API on the bucket.
@@ -334,6 +335,7 @@ func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string
334335

335336
// Close implements ExternalStorage interface.
336337
func (s *GCSStorage) Close() {
338+
s.clientCancel()
337339
for _, client := range s.clients {
338340
if err := client.Close(); err != nil {
339341
log.Warn("failed to close gcs client", zap.Error(err))
@@ -415,39 +417,73 @@ skipHandleCred:
415417
return ret, nil
416418
}
417419

418-
// Reset resets the GCS storage.
420+
// Reset resets the GCS storage. Reset should not be used concurrently with
421+
// Close.
419422
func (s *GCSStorage) Reset(ctx context.Context) error {
420423
logutil.Logger(ctx).Info("resetting gcs storage")
421424

422-
for _, client := range s.clients {
423-
_ = client.Close()
424-
}
425-
426-
s.clients = make([]*storage.Client, gcsClientCnt)
427-
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
428-
for i := range s.clients {
429-
eg.Go(func() error {
430-
client, err := storage.NewClient(egCtx, s.clientOps...)
425+
s.cancelAndCloseGCSClients()
426+
427+
clientCtx, clientCancel := context.WithCancel(context.Background())
428+
s.clients = make([]*storage.Client, 0, gcsClientCnt)
429+
wg := util.WaitGroupWrapper{}
430+
cliCh := make(chan *storage.Client)
431+
wg.RunWithLog(func() {
432+
for range gcsClientCnt {
433+
select {
434+
case cli := <-cliCh:
435+
s.clients = append(s.clients, cli)
436+
case <-ctx.Done():
437+
clientCancel()
438+
return
439+
case <-clientCtx.Done():
440+
return
441+
}
442+
}
443+
})
444+
firstErr := atomic.NewError(nil)
445+
for range gcsClientCnt {
446+
wg.RunWithLog(func() {
447+
client, err := storage.NewClient(clientCtx, s.clientOps...)
431448
if err != nil {
432-
return errors.Trace(err)
449+
firstErr.CompareAndSwap(nil, err)
450+
clientCancel()
451+
return
433452
}
434453
client.SetRetry(storage.WithErrorFunc(shouldRetry), storage.WithPolicy(storage.RetryAlways))
435-
s.clients[i] = client
436-
return nil
454+
select {
455+
case cliCh <- client:
456+
case <-clientCtx.Done():
457+
}
437458
})
438459
}
439-
err := eg.Wait()
440-
if err != nil {
460+
wg.Wait()
461+
if err := firstErr.Load(); err != nil {
462+
s.cancelAndCloseGCSClients()
441463
return errors.Trace(err)
442464
}
443465

466+
s.clientCancel = clientCancel
444467
s.handles = make([]*storage.BucketHandle, gcsClientCnt)
445468
for i := range s.handles {
446469
s.handles[i] = s.clients[i].Bucket(s.gcs.Bucket)
447470
}
448471
return nil
449472
}
450473

474+
func (s *GCSStorage) cancelAndCloseGCSClients() {
475+
if s.clientCancel != nil {
476+
s.clientCancel()
477+
s.clientCancel = nil
478+
}
479+
480+
for _, client := range s.clients {
481+
if client != nil {
482+
_ = client.Close()
483+
}
484+
}
485+
}
486+
451487
func shouldRetry(err error) bool {
452488
if storage.ShouldRetry(err) {
453489
return true

br/pkg/storage/gcs_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"flag"
1010
"fmt"
1111
"io"
12+
"net/http"
13+
"net/http/httptest"
1214
"os"
1315
"testing"
1416
"time"
@@ -570,3 +572,30 @@ func TestSpeedReadManyFiles(t *testing.T) {
570572
require.NoError(t, eg.Wait())
571573
t.Logf("read %d large files cost %v", len(testFiles), time.Since(now))
572574
}
575+
576+
func TestCtxUsage(t *testing.T) {
577+
httpSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
578+
defer httpSvr.Close()
579+
580+
ctx := context.Background()
581+
gcs := &backuppb.GCS{
582+
Endpoint: httpSvr.URL,
583+
Bucket: "test",
584+
Prefix: "prefix",
585+
StorageClass: "NEARLINE",
586+
PredefinedAcl: "private",
587+
CredentialsBlob: fmt.Sprintf(`
588+
{
589+
"type":"external_account",
590+
"audience":"//iam.googleapis.com/projects/1234567890123/locations/global/workloadIdentityPools/my-pool/providers/my-provider",
591+
"subject_token_type":"urn:ietf:params:oauth:token-type:access_token",
592+
"credential_source":{"url":"%s"}
593+
}`, httpSvr.URL),
594+
}
595+
stg, err := NewGCSStorage(ctx, gcs, &ExternalStorageOptions{})
596+
require.NoError(t, err)
597+
598+
_, err = stg.FileExists(ctx, "key")
599+
// before the fix, it's context canceled error
600+
require.ErrorContains(t, err, "invalid_request")
601+
}

0 commit comments

Comments
 (0)