Skip to content

Commit e954f27

Browse files
authored
br: ebs tags refactoring (pingcap#44381) (pingcap#44410)
close pingcap#43934
1 parent ca9ffcb commit e954f27

File tree

1 file changed

+11
-99
lines changed

1 file changed

+11
-99
lines changed

br/pkg/aws/ebs.go

Lines changed: 11 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,8 @@ import (
2525
)
2626

2727
const (
28-
AnnPodNameKey string = "tidb.pingcap.com/pod-name"
29-
AnnTemporaryVolumeID string = "temporary/volume-id"
30-
EC2K8SClusterNameKey string = "aws:eks:cluster-name"
31-
3228
pollingPendingSnapshotInterval = 30 * time.Second
3329
errCodeTooManyPendingSnapshots = "PendingSnapshotLimitExceeded"
34-
35-
SourcePvcNameKey string = "source/pvcName"
36-
SourceVolumeIdKey string = "source/VolumeId"
37-
SourceTikvNameKey string = "source/TikvName"
38-
SourceNamespaceKey string = "source/Namespace"
39-
SourceContextKey string = "source/context"
4030
)
4131

4232
type EC2Session struct {
@@ -47,14 +37,6 @@ type EC2Session struct {
4737

4838
type VolumeAZs map[string]string
4939

50-
type SnapshotTags struct {
51-
sourcePVCName string
52-
sourceTiKVName string
53-
sourceNameSpace string
54-
}
55-
56-
type VolumeSnapshotTags map[string]SnapshotTags
57-
5840
func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
5941
// aws-sdk has builtin exponential backoff retry mechanism, see:
6042
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
@@ -71,66 +53,20 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
7153
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
7254
}
7355

74-
func GenerateVolumeSnapshotTags(backupInfo *config.EBSBasedBRMeta, pvVolumeMap map[string]string) (VolumeSnapshotTags, error) {
75-
vst := make(VolumeSnapshotTags)
76-
for j := range backupInfo.KubernetesMeta.PVCs {
77-
pvc := backupInfo.KubernetesMeta.PVCs[j]
78-
volID := pvVolumeMap[pvc.Spec.VolumeName]
79-
if volID == "" {
80-
return vst, errors.Errorf("No matching pv is found with name of [%s]", pvc.Spec.VolumeName)
81-
}
82-
vst[volID] = SnapshotTags{
83-
pvc.GetName(),
84-
pvc.GetLabels()[AnnPodNameKey],
85-
pvc.GetNamespace(),
86-
}
87-
}
88-
return vst, nil
89-
}
90-
9156
// CreateSnapshots is the mainly steps to control the data volume snapshots.
9257
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
9358
snapIDMap := make(map[string]string)
9459
var volumeIDs []*string
9560

9661
var mutex sync.Mutex
9762
eg, _ := errgroup.WithContext(context.Background())
98-
99-
pvVolumeMap := make(map[string]string)
100-
for j := range backupInfo.KubernetesMeta.PVs {
101-
pv := backupInfo.KubernetesMeta.PVs[j]
102-
pvVolumeMap[pv.GetName()] = pv.GetAnnotations()[AnnTemporaryVolumeID]
103-
}
104-
105-
vst, err := GenerateVolumeSnapshotTags(backupInfo, pvVolumeMap)
106-
if err != nil {
107-
return snapIDMap, nil, errors.Trace(err)
108-
}
109-
taggingAndFillResult := func(createOutput *ec2.CreateSnapshotsOutput, vst VolumeSnapshotTags, k8sClusterName *string) error {
63+
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {
11064
mutex.Lock()
11165
defer mutex.Unlock()
11266
for j := range createOutput.Snapshots {
11367
snapshot := createOutput.Snapshots[j]
11468
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)
115-
116-
createTagInput := &ec2.CreateTagsInput{
117-
Resources: []*string{
118-
snapshot.SnapshotId,
119-
},
120-
Tags: []*ec2.Tag{
121-
ec2Tag(SourcePvcNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourcePVCName),
122-
ec2Tag(SourceVolumeIdKey, aws.StringValue(snapshot.VolumeId)),
123-
ec2Tag(SourceTikvNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourceTiKVName),
124-
ec2Tag(SourceNamespaceKey, vst[aws.StringValue(snapshot.VolumeId)].sourceNameSpace),
125-
ec2Tag(SourceContextKey, aws.StringValue(k8sClusterName)),
126-
},
127-
}
128-
_, err := e.ec2.CreateTags(createTagInput)
129-
if err != nil {
130-
return errors.Trace(err)
131-
}
13269
}
133-
return nil
13470
}
13571

13672
workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
@@ -164,17 +100,6 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
164100
return snapIDMap, nil, errors.Trace(err)
165101
}
166102

167-
// retrieve the k8s cluster name from EC2 instance tags
168-
var k8sClusterName *string
169-
170-
for j := range resp1.Reservations[0].Instances[0].Tags {
171-
tag := resp1.Reservations[0].Instances[0].Tags[j]
172-
if aws.StringValue(tag.Key) == EC2K8SClusterNameKey {
173-
k8sClusterName = tag.Value
174-
break
175-
}
176-
}
177-
178103
for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
179104
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
180105
// skip root volume
@@ -205,16 +130,14 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
205130
instanceSpecification.SetInstanceId(aws.StringValue(ec2InstanceId)).SetExcludeBootVolume(true).SetExcludeDataVolumeIds(excludedVolumeIDs)
206131

207132
createSnapshotInput.SetInstanceSpecification(&instanceSpecification)
208-
133+
// Copy tags from source volume
134+
createSnapshotInput.SetCopyTagsFromSource("volume")
209135
resp, err := e.createSnapshotsWithRetry(context.TODO(), &createSnapshotInput)
136+
210137
if err != nil {
211138
return errors.Trace(err)
212139
}
213-
err = taggingAndFillResult(resp, vst, k8sClusterName)
214-
if err != nil {
215-
return errors.Trace(err)
216-
}
217-
140+
fillResult(resp)
218141
return nil
219142
})
220143
}
@@ -381,16 +304,6 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
381304
newVolumeIDMap[oldVol.ID] = *newVol.VolumeId
382305
}
383306

384-
fetchTagValue := func(tags []*ec2.Tag, key string) string {
385-
for i := range tags {
386-
tag := tags[i]
387-
if aws.StringValue(tag.Key) == key {
388-
return aws.StringValue(tag.Value)
389-
}
390-
}
391-
return ""
392-
}
393-
394307
workerPool := utils.NewWorkerPool(e.concurrency, "create volume")
395308
for i := range meta.TiKVComponent.Stores {
396309
store := meta.TiKVComponent.Stores[i]
@@ -413,6 +326,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
413326
tags := []*ec2.Tag{
414327
ec2Tag("TiDBCluster-BR", "new"),
415328
ec2Tag("ebs.csi.aws.com/cluster", "true"),
329+
ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
416330
}
417331
snapshotIds := make([]*string, 0)
418332

@@ -425,13 +339,11 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
425339
return errors.Errorf("specified snapshot [%s] is not found", oldVol.SnapshotID)
426340
}
427341

428-
snapshotTags := resp.Snapshots[0].Tags
429-
tags = append(tags, ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
430-
ec2Tag("snapshot/"+SourcePvcNameKey, fetchTagValue(snapshotTags, SourcePvcNameKey)),
431-
ec2Tag("snapshot/"+SourceVolumeIdKey, fetchTagValue(snapshotTags, SourceVolumeIdKey)),
432-
ec2Tag("snapshot/"+SourceTikvNameKey, fetchTagValue(snapshotTags, SourceTikvNameKey)),
433-
ec2Tag("snapshot/"+SourceNamespaceKey, fetchTagValue(snapshotTags, SourceNamespaceKey)),
434-
ec2Tag("snapshot/"+SourceContextKey, fetchTagValue(snapshotTags, SourceContextKey)))
342+
// Copy tags from source snapshots
343+
for j := range resp.Snapshots[0].Tags {
344+
tags = append(tags,
345+
ec2Tag("snapshot/"+aws.StringValue(resp.Snapshots[0].Tags[j].Key), aws.StringValue(resp.Snapshots[0].Tags[j].Value)))
346+
}
435347

436348
req.SetTagSpecifications([]*ec2.TagSpecification{
437349
{

0 commit comments

Comments
 (0)