Skip to content

Commit

Permalink
Add volume ID append dimension for disk metrics (#1156)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored May 2, 2024
1 parent 8382db8 commit e46fae5
Show file tree
Hide file tree
Showing 21 changed files with 847 additions and 311 deletions.
5 changes: 5 additions & 0 deletions plugins/processors/ec2tagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ var SupportedAppendDimensions = map[string]string{
"InstanceType": "${aws:InstanceType}",
}

const (
AttributeVolumeId = "VolumeId"
ValueAppendDimensionVolumeId = "${aws:VolumeId}"
)

type Config struct {
RefreshIntervalSeconds time.Duration `mapstructure:"refresh_interval_seconds"`
EC2MetadataTags []string `mapstructure:"ec2_metadata_tags"`
Expand Down
1 change: 0 additions & 1 deletion plugins/processors/ec2tagger/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ const (
mdKeyInstanceId = "InstanceId"
mdKeyImageId = "ImageId"
mdKeyInstanceType = "InstanceType"
ebsVolumeId = "EBSVolumeId"
)

var (
Expand Down
78 changes: 0 additions & 78 deletions plugins/processors/ec2tagger/ebsvolume.go

This file was deleted.

46 changes: 13 additions & 33 deletions plugins/processors/ec2tagger/ec2tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger/internal/volume"
translatorCtx "github.com/aws/amazon-cloudwatch-agent/translator/context"
)

Expand Down Expand Up @@ -52,14 +53,13 @@ type Tagger struct {
ec2MetadataRespond ec2MetadataRespondType
tagFilters []*ec2.Filter
ec2API ec2iface.EC2API
ebsVolume *EbsVolume
volumeSerialCache volume.Cache

sync.RWMutex //to protect ec2TagCache
}

// newTagger returns a new EC2 Tagger processor.
func newTagger(config *Config, logger *zap.Logger) *Tagger {

_, cancel := context.WithCancel(context.Background())
mdCredentialConfig := &configaws.CredentialConfig{}

Expand All @@ -77,7 +77,6 @@ func newTagger(config *Config, logger *zap.Logger) *Tagger {
})
},
}

return p
}

Expand Down Expand Up @@ -146,11 +145,11 @@ func (t *Tagger) updateOtelAttributes(attributes []pcommon.Map) {
if t.ec2MetadataLookup.instanceType {
attr.PutStr(mdKeyInstanceType, t.ec2MetadataRespond.instanceType)
}
if t.ebsVolume != nil {
if t.volumeSerialCache != nil {
if devName, found := attr.Get(t.DiskDeviceTagKey); found {
ebsVolId := t.ebsVolume.getEbsVolumeId(devName.Str())
if ebsVolId != "" {
attr.PutStr(ebsVolumeId, ebsVolId)
serial := t.volumeSerialCache.Serial(devName.Str())
if serial != "" {
attr.PutStr(AttributeVolumeId, serial)
}
}
}
Expand Down Expand Up @@ -270,7 +269,7 @@ func (t *Tagger) ebsVolumesRetrieved() bool {
if key == "*" {
continue
}
if volId := t.ebsVolume.getEbsVolumeId(key); volId == "" {
if volId := t.volumeSerialCache.Serial(key); volId == "" {
allVolumesRetrieved = false
break
}
Expand All @@ -280,7 +279,7 @@ func (t *Tagger) ebsVolumesRetrieved() bool {

// Start acts as input validation and serves the purpose of updating ec2 tags and ebs volumes if necessary.
// It will be called when OTel is enabling each processor
func (t *Tagger) Start(ctx context.Context, host component.Host) error {
func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
t.shutdownC = make(chan bool)
t.ec2TagCache = map[string]string{}

Expand Down Expand Up @@ -373,34 +372,15 @@ func (t *Tagger) refreshLoopToUpdateTagsAndVolumes() {

// updateVolumes calls EC2 describe volume
func (t *Tagger) updateVolumes() error {
if t.ebsVolume == nil {
t.ebsVolume = NewEbsVolume()
if t.volumeSerialCache == nil {
t.volumeSerialCache = volume.NewCache(volume.NewProvider(t.ec2API, t.ec2MetadataRespond.instanceId))
}

input := &ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("attachment.instance-id"),
Values: aws.StringSlice([]string{t.ec2MetadataRespond.instanceId}),
},
},
if err := t.volumeSerialCache.Refresh(); err != nil {
return err
}

for {
result, err := t.ec2API.DescribeVolumes(input)
if err != nil {
return err
}
for _, volume := range result.Volumes {
for _, attachment := range volume.Attachments {
t.ebsVolume.addEbsVolumeMapping(volume.AvailabilityZone, attachment)
}
}
if result.NextToken == nil {
break
}
input.SetNextToken(*result.NextToken)
}
t.logger.Debug("Volume Serial Cache", zap.Strings("devices", t.volumeSerialCache.Devices()))
return nil
}

Expand Down
Loading

0 comments on commit e46fae5

Please sign in to comment.