Skip to content

Commit b31b3e5

Browse files
authored
ebs br: make sure fsr credit is full filled (#48627) (#48744)
close #48629
1 parent d45020d commit b31b3e5

File tree

4 files changed

+121
-9
lines changed

4 files changed

+121
-9
lines changed

br/pkg/aws/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ go_library(
1212
"@com_github_aws_aws_sdk_go//aws",
1313
"@com_github_aws_aws_sdk_go//aws/awserr",
1414
"@com_github_aws_aws_sdk_go//aws/session",
15+
"@com_github_aws_aws_sdk_go//service/cloudwatch",
1516
"@com_github_aws_aws_sdk_go//service/ec2",
1617
"@com_github_aws_aws_sdk_go//service/ec2/ec2iface",
1718
"@com_github_pingcap_errors//:errors",

br/pkg/aws/ebs.go

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/aws/aws-sdk-go/aws"
1313
"github.com/aws/aws-sdk-go/aws/awserr"
1414
"github.com/aws/aws-sdk-go/aws/session"
15+
"github.com/aws/aws-sdk-go/service/cloudwatch"
1516
"github.com/aws/aws-sdk-go/service/ec2"
1617
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
1718
"github.com/pingcap/errors"
@@ -31,7 +32,8 @@ const (
3132
)
3233

3334
type EC2Session struct {
34-
ec2 ec2iface.EC2API
35+
ec2 ec2iface.EC2API
36+
cloudwatchClient *cloudwatch.CloudWatch
3537
// aws operation concurrency
3638
concurrency uint
3739
}
@@ -51,7 +53,8 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
5153
return nil, errors.Trace(err)
5254
}
5355
ec2Session := ec2.New(sess)
54-
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
56+
cloudwatchClient := cloudwatch.New(sess)
57+
return &EC2Session{ec2: ec2Session, cloudwatchClient: cloudwatchClient, concurrency: concurrency}, nil
5558
}
5659

5760
// CreateSnapshots is the mainly steps to control the data volume snapshots.
@@ -324,8 +327,63 @@ func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string)
324327
return snapshotsIDsMap, eg.Wait()
325328
}
326329

327-
// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
330+
// waitDataFSREnabled waits FSR for data volume snapshots are all enabled and also have enough credit balance
328331
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
332+
// Record current time
333+
start := time.Now()
334+
335+
// get the maximum size of volumes, in GiB
336+
var maxVolumeSize int64 = 0
337+
resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapShotIDs})
338+
if err != nil {
339+
return errors.Trace(err)
340+
}
341+
if len(resp.Snapshots) <= 0 {
342+
return errors.Errorf("specified snapshot [%s] is not found", *snapShotIDs[0])
343+
}
344+
345+
for _, s := range resp.Snapshots {
346+
if *s.VolumeSize > maxVolumeSize {
347+
maxVolumeSize = *s.VolumeSize
348+
}
349+
}
350+
351+
// Calculate the time in minutes to fill 1.0 credit according to
352+
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-fast-snapshot-restore.html#volume-creation-credits
353+
// 5 minutes more is just for safe
354+
fillElapsedTime := 60.0/(min(10, 1024.0/(float64)(maxVolumeSize))) + 5
355+
356+
// We have to sleep for at least fillElapsedTime minutes in order to make credits are filled to 1.0
357+
// Let's heartbeat every 5 minutes
358+
for time.Since(start) <= time.Duration(fillElapsedTime)*time.Minute {
359+
log.Info("FSR enablement is ongoing, going to sleep for 5 minutes...")
360+
time.Sleep(5 * time.Minute)
361+
}
362+
363+
// Wait that all snapshot has enough fsr credit balance, it's very likely true since we have wait for long enough
364+
log.Info("Start check and wait all snapshots have enough fsr credit balance")
365+
366+
startIdx := 0
367+
retryCount := 0
368+
for startIdx < len(snapShotIDs) {
369+
creditBalance, _ := e.getFSRCreditBalance(snapShotIDs[startIdx], targetAZ)
370+
if creditBalance != nil && *creditBalance >= 1.0 {
371+
startIdx++
372+
retryCount = 0
373+
} else {
374+
if creditBalance == nil {
375+
// For invalid calling, retry 3 times
376+
if retryCount >= 3 {
377+
return errors.Errorf("cloudwatch metrics for %s operation failed after retrying", *snapShotIDs[startIdx])
378+
}
379+
retryCount++
380+
}
381+
// Retry for both invalid calling and not enough fsr credit
382+
// Cloudwatch by default flushes every 5 seconds. So, 20 seconds wait should be enough
383+
time.Sleep(20 * time.Second)
384+
}
385+
}
386+
329387
// Create a map to store the strings as keys
330388
pendingSnapshots := make(map[string]struct{})
331389

@@ -378,6 +436,51 @@ func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string)
378436
}
379437
}
380438

439+
// getFSRCreditBalance is used to get maximum fsr credit balance of snapshot for last 5 minutes
440+
func (e *EC2Session) getFSRCreditBalance(snapshotID *string, targetAZ string) (*float64, error) {
441+
// Set the time range to query for metrics
442+
startTime := time.Now().Add(-5 * time.Minute)
443+
endTime := time.Now()
444+
445+
// Prepare the input for the GetMetricStatisticsWithContext API call
446+
input := &cloudwatch.GetMetricStatisticsInput{
447+
StartTime: aws.Time(startTime),
448+
EndTime: aws.Time(endTime),
449+
Namespace: aws.String("AWS/EBS"),
450+
MetricName: aws.String("FastSnapshotRestoreCreditsBalance"),
451+
Dimensions: []*cloudwatch.Dimension{
452+
{
453+
Name: aws.String("SnapshotId"),
454+
Value: snapshotID,
455+
},
456+
{
457+
Name: aws.String("AvailabilityZone"),
458+
Value: aws.String(targetAZ),
459+
},
460+
},
461+
Period: aws.Int64(300),
462+
Statistics: []*string{aws.String("Maximum")},
463+
}
464+
465+
log.Info("metrics input", zap.Any("input", input))
466+
467+
// Call cloudwatchClient API to retrieve the FastSnapshotRestoreCreditsBalance metric data
468+
resp, err := e.cloudwatchClient.GetMetricStatisticsWithContext(context.Background(), input)
469+
if err != nil {
470+
log.Error("GetMetricStatisticsWithContext failed", zap.Error(err))
471+
return nil, errors.Trace(err)
472+
}
473+
474+
// parse the response
475+
if len(resp.Datapoints) == 0 {
476+
log.Warn("No result for metric FastSnapshotRestoreCreditsBalance returned", zap.Stringp("snapshot", snapshotID))
477+
return nil, nil
478+
}
479+
result := resp.Datapoints[0]
480+
log.Info("credit balance", zap.Stringp("snapshot", snapshotID), zap.Float64p("credit", result.Maximum))
481+
return result.Maximum, nil
482+
}
483+
381484
// DisableDataFSR disables FSR for data volume snapshots
382485
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
383486
if len(snapshotsIDsMap) == 0 {
@@ -529,7 +632,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
529632
return newVolumeIDMap, eg.Wait()
530633
}
531634

532-
func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress) (int64, error) {
635+
func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress, fsrEnabledRequired bool) (int64, error) {
533636
pendingVolumes := make([]*string, 0, len(volumeIDMap))
534637
for oldVolID := range volumeIDMap {
535638
newVolumeID := volumeIDMap[oldVolID]
@@ -549,7 +652,11 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
549652
return 0, errors.Trace(err)
550653
}
551654

552-
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
655+
createdVolumeSize, unfinishedVolumes, err := e.HandleDescribeVolumesResponse(resp, fsrEnabledRequired)
656+
if err != nil {
657+
return 0, errors.Trace(err)
658+
}
659+
553660
progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
554661
totalVolumeSize += createdVolumeSize
555662
pendingVolumes = unfinishedVolumes
@@ -592,12 +699,16 @@ func ec2Tag(key, val string) *ec2.Tag {
592699
return &ec2.Tag{Key: &key, Value: &val}
593700
}
594701

595-
func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
702+
func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput, fsrEnabledRequired bool) (int64, []*string, error) {
596703
totalVolumeSize := int64(0)
597704

598705
var unfinishedVolumes []*string
599706
for _, volume := range resp.Volumes {
600707
if *volume.State == ec2.VolumeStateAvailable {
708+
if fsrEnabledRequired && volume.FastRestored != nil && !*volume.FastRestored {
709+
log.Error("snapshot fsr is not enabled for the volume", zap.String("volume", *volume.SnapshotId))
710+
return 0, nil, errors.Errorf("Snapshot [%s] of volume [%s] is not fsr enabled", *volume.SnapshotId, *volume.VolumeId)
711+
}
601712
log.Info("volume is available", zap.String("id", *volume.VolumeId))
602713
totalVolumeSize += *volume.Size
603714
} else {
@@ -606,5 +717,5 @@ func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutp
606717
}
607718
}
608719

609-
return totalVolumeSize, unfinishedVolumes
720+
return totalVolumeSize, unfinishedVolumes, nil
610721
}

br/pkg/aws/ebs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestHandleDescribeVolumesResponse(t *testing.T) {
7272
}
7373

7474
e := &EC2Session{}
75-
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
75+
createdVolumeSize, unfinishedVolumes, _ := e.HandleDescribeVolumesResponse(curentVolumesStates, false)
7676
require.Equal(t, int64(4), createdVolumeSize)
7777
require.Equal(t, 1, len(unfinishedVolumes))
7878
}

br/pkg/task/restore_ebs_meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
263263
if err != nil {
264264
return nil, 0, errors.Trace(err)
265265
}
266-
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress)
266+
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress, h.cfg.UseFSR)
267267
if err != nil {
268268
return nil, 0, errors.Trace(err)
269269
}

0 commit comments

Comments
 (0)