Skip to content

Commit f27561b

Browse files
lance6716ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#60402
Signed-off-by: ti-chi-bot <[email protected]>
1 parent de4f73a commit f27561b

File tree

2 files changed

+95
-17
lines changed

2 files changed

+95
-17
lines changed

br/pkg/storage/gcs.go

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ type GCSStorage struct {
109109
clientCnt int64
110110
clientOps []option.ClientOption
111111

112-
handles []*storage.BucketHandle
113-
clients []*storage.Client
112+
handles []*storage.BucketHandle
113+
clients []*storage.Client
114+
clientCancel context.CancelFunc
114115
}
115116

116117
// GetBucketHandle gets the handle to the GCS API on the bucket.
@@ -334,6 +335,7 @@ func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string
334335

335336
// Close implements ExternalStorage interface.
336337
func (s *GCSStorage) Close() {
338+
s.clientCancel()
337339
for _, client := range s.clients {
338340
if err := client.Close(); err != nil {
339341
log.Warn("failed to close gcs client", zap.Error(err))
@@ -415,39 +417,73 @@ skipHandleCred:
415417
return ret, nil
416418
}
417419

418-
// Reset resets the GCS storage.
420+
// Reset resets the GCS storage. Reset should not be used concurrently with
421+
// Close.
419422
func (s *GCSStorage) Reset(ctx context.Context) error {
420423
logutil.Logger(ctx).Info("resetting gcs storage")
421424

422-
for _, client := range s.clients {
423-
_ = client.Close()
424-
}
425-
426-
s.clients = make([]*storage.Client, gcsClientCnt)
427-
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
428-
for i := range s.clients {
429-
eg.Go(func() error {
430-
client, err := storage.NewClient(egCtx, s.clientOps...)
425+
s.cancelAndCloseGCSClients()
426+
427+
clientCtx, clientCancel := context.WithCancel(context.Background())
428+
s.clients = make([]*storage.Client, 0, gcsClientCnt)
429+
wg := util.WaitGroupWrapper{}
430+
cliCh := make(chan *storage.Client)
431+
wg.RunWithLog(func() {
432+
for range gcsClientCnt {
433+
select {
434+
case cli := <-cliCh:
435+
s.clients = append(s.clients, cli)
436+
case <-ctx.Done():
437+
clientCancel()
438+
return
439+
case <-clientCtx.Done():
440+
return
441+
}
442+
}
443+
})
444+
firstErr := atomic.NewError(nil)
445+
for range gcsClientCnt {
446+
wg.RunWithLog(func() {
447+
client, err := storage.NewClient(clientCtx, s.clientOps...)
431448
if err != nil {
432-
return errors.Trace(err)
449+
firstErr.CompareAndSwap(nil, err)
450+
clientCancel()
451+
return
433452
}
434453
client.SetRetry(storage.WithErrorFunc(shouldRetry), storage.WithPolicy(storage.RetryAlways))
435-
s.clients[i] = client
436-
return nil
454+
select {
455+
case cliCh <- client:
456+
case <-clientCtx.Done():
457+
}
437458
})
438459
}
439-
err := eg.Wait()
440-
if err != nil {
460+
wg.Wait()
461+
if err := firstErr.Load(); err != nil {
462+
s.cancelAndCloseGCSClients()
441463
return errors.Trace(err)
442464
}
443465

466+
s.clientCancel = clientCancel
444467
s.handles = make([]*storage.BucketHandle, gcsClientCnt)
445468
for i := range s.handles {
446469
s.handles[i] = s.clients[i].Bucket(s.gcs.Bucket)
447470
}
448471
return nil
449472
}
450473

474+
func (s *GCSStorage) cancelAndCloseGCSClients() {
475+
if s.clientCancel != nil {
476+
s.clientCancel()
477+
s.clientCancel = nil
478+
}
479+
480+
for _, client := range s.clients {
481+
if client != nil {
482+
_ = client.Close()
483+
}
484+
}
485+
}
486+
451487
func shouldRetry(err error) bool {
452488
if storage.ShouldRetry(err) {
453489
return true

br/pkg/storage/gcs_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ import (
99
"flag"
1010
"fmt"
1111
"io"
12+
<<<<<<< HEAD
13+
=======
14+
"net/http"
15+
"net/http/httptest"
16+
"net/url"
17+
>>>>>>> 7424255fef7 (external store: use separate ctx for GCS clients (#60402))
1218
"os"
1319
"testing"
1420
"time"
@@ -570,3 +576,39 @@ func TestSpeedReadManyFiles(t *testing.T) {
570576
require.NoError(t, eg.Wait())
571577
t.Logf("read %d large files cost %v", len(testFiles), time.Since(now))
572578
}
579+
<<<<<<< HEAD
580+
=======
581+
582+
func TestGCSShouldRetry(t *testing.T) {
583+
require.True(t, shouldRetry(&url.Error{Err: goerrors.New("http2: server sent GOAWAY and closed the connectiont"), Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
584+
require.True(t, shouldRetry(&url.Error{Err: goerrors.New("http2: client connection lost"), Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
585+
require.True(t, shouldRetry(&url.Error{Err: io.EOF, Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
586+
}
587+
588+
func TestCtxUsage(t *testing.T) {
589+
httpSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
590+
defer httpSvr.Close()
591+
592+
ctx := context.Background()
593+
gcs := &backuppb.GCS{
594+
Endpoint: httpSvr.URL,
595+
Bucket: "test",
596+
Prefix: "prefix",
597+
StorageClass: "NEARLINE",
598+
PredefinedAcl: "private",
599+
CredentialsBlob: fmt.Sprintf(`
600+
{
601+
"type":"external_account",
602+
"audience":"//iam.googleapis.com/projects/1234567890123/locations/global/workloadIdentityPools/my-pool/providers/my-provider",
603+
"subject_token_type":"urn:ietf:params:oauth:token-type:access_token",
604+
"credential_source":{"url":"%s"}
605+
}`, httpSvr.URL),
606+
}
607+
stg, err := NewGCSStorage(ctx, gcs, &ExternalStorageOptions{})
608+
require.NoError(t, err)
609+
610+
_, err = stg.FileExists(ctx, "key")
611+
// before the fix, it's context canceled error
612+
require.ErrorContains(t, err, "invalid_request")
613+
}
614+
>>>>>>> 7424255fef7 (external store: use separate ctx for GCS clients (#60402))

0 commit comments

Comments
 (0)