Skip to content

Commit

Permalink
Merge pull request #1329 from aws/entityMerge
Browse files Browse the repository at this point in the history
Implements logic for attaching Entity field to CloudWatch Logs PutLogEvent API
  • Loading branch information
dchappa authored Sep 4, 2024
2 parents 68b82c0 + 3606eca commit f0a39d6
Show file tree
Hide file tree
Showing 123 changed files with 28,687 additions and 2,287 deletions.
4 changes: 4 additions & 0 deletions extension/agenthealth/handler/stats/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Stats struct {
RunningInContainer *int `json:"ric,omitempty"`
RegionType *string `json:"rt,omitempty"`
Mode *string `json:"m,omitempty"`
EntityRejected *int `json:"ent,omitempty"`
}

// Merge the other Stats into the current. If the field is not nil,
Expand Down Expand Up @@ -76,6 +77,9 @@ func (s *Stats) Merge(other Stats) {
if other.Mode != nil {
s.Mode = other.Mode
}
if other.EntityRejected != nil {
s.EntityRejected = other.EntityRejected
}
}

func (s *Stats) Marshal() (string, error) {
Expand Down
24 changes: 24 additions & 0 deletions extension/agenthealth/handler/stats/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package client

import (
"bytes"
"context"
"io"
"net/http"
Expand All @@ -23,6 +24,10 @@ const (
cacheSize = 1000
)

var (
rejectedEntityInfo = []byte("\"rejectedEntityInfo\"")
)

type Stats interface {
awsmiddleware.RequestHandler
awsmiddleware.ResponseHandler
Expand Down Expand Up @@ -108,6 +113,9 @@ func (csh *clientStatsHandler) HandleResponse(ctx context.Context, r *http.Respo
}
latency := time.Since(recorder.start)
stats.LatencyMillis = aws.Int64(latency.Milliseconds())
if rejectedEntityInfoExists(r) {
stats.EntityRejected = aws.Int(1)
}
csh.statsByOperation.Store(operation, stats)
}

Expand All @@ -122,3 +130,19 @@ func (csh *clientStatsHandler) Stats(operation string) agent.Stats {
}
return stats
}

// rejectedEntityInfoExists checks if the response body
// contains element rejectedEntityInfo
func rejectedEntityInfoExists(r *http.Response) bool {
// Example body for rejectedEntityInfo would be:
// {"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}
if r == nil || r.Body == nil {
return false
}
defer r.Body.Close()
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return false
}
return bytes.Contains(bodyBytes, rejectedEntityInfo)
}
21 changes: 20 additions & 1 deletion extension/agenthealth/handler/stats/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"bytes"
"context"
"io"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -37,11 +38,16 @@ func TestHandle(t *testing.T) {
assert.Nil(t, got.PayloadBytes)
assert.Nil(t, got.StatusCode)
time.Sleep(time.Millisecond)
handler.HandleResponse(ctx, &http.Response{StatusCode: http.StatusOK})
handler.HandleResponse(ctx, &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString(`{"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}`)),
})
got = handler.Stats(operation)
assert.NotNil(t, got.LatencyMillis)
assert.NotNil(t, got.PayloadBytes)
assert.NotNil(t, got.StatusCode)
assert.NotNil(t, got.EntityRejected)
assert.Equal(t, 1, *got.EntityRejected)
assert.Equal(t, http.StatusOK, *got.StatusCode)
assert.Equal(t, 20, *got.PayloadBytes)
assert.GreaterOrEqual(t, *got.LatencyMillis, int64(1))
Expand All @@ -65,3 +71,16 @@ func TestHandle(t *testing.T) {
assert.NotNil(t, got.PayloadBytes)
assert.Equal(t, 29, *got.PayloadBytes)
}

func BenchmarkRejectedEntityInfoExists(b *testing.B) {
body := `{"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}`
resp := &http.Response{
Body: io.NopCloser(bytes.NewBufferString(body)),
}

for n := 0; n < b.N; n++ {
rejectedEntityInfoExists(resp)
// Reset the body for the next iteration
resp.Body = io.NopCloser(bytes.NewBufferString(body))
}
}
18 changes: 18 additions & 0 deletions extension/entitystore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"go.opentelemetry.io/collector/component"
)

type Config struct {
Mode string `mapstructure:"mode"`
Region string `mapstructure:"region"`
Profile string `mapstructure:"profile,omitempty"`
RoleARN string `mapstructure:"role_arn,omitempty"`
Filename string `mapstructure:"shared_credential_file,omitempty"`
}

var _ component.Config = (*Config)(nil)
18 changes: 18 additions & 0 deletions extension/entitystore/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, confmap.New().Unmarshal(cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
195 changes: 195 additions & 0 deletions extension/entitystore/ec2Info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"context"
"errors"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/zap"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
)

const (
// InstanceId character maximum length is 19.
// See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_Instance.html.
instanceIdSizeMax = 19

// AutoScalingGroup character maximum length is 255.
// See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_AutoScalingGroup.html.
autoScalingGroupSizeMax = 255
)

type ec2Info struct {
InstanceID string
AutoScalingGroup string

// region is used while making call to describeTags Ec2 API for AutoScalingGroup
Region string

metadataProvider ec2metadataprovider.MetadataProvider
ec2API ec2iface.EC2API
ec2Provider ec2ProviderType
ec2Credential *configaws.CredentialConfig
logger *zap.Logger
done chan struct{}
}

func (ei *ec2Info) initEc2Info() {
ei.logger.Debug("Initializing ec2Info")
if err := ei.setInstanceId(); err != nil {
return
}
ei.ec2API = ei.ec2Provider(ei.Region, ei.ec2Credential)
if err := ei.setAutoScalingGroup(); err != nil {
return
}
ei.logger.Debug("Finished initializing ec2Info")
ei.ignoreInvalidFields()
}

func (ei *ec2Info) setInstanceId() error {
for {
metadataDoc, err := ei.metadataProvider.Get(context.Background())
if err != nil {
ei.logger.Warn("Failed to get Instance Id through metadata provider", zap.Error(err))
wait := time.NewTimer(1 * time.Minute)
select {
case <-ei.done:
wait.Stop()
return errors.New("shutdown signal received")
case <-wait.C:
continue
}
}
ei.logger.Debug("Successfully retrieved Instance ID")
ei.InstanceID = metadataDoc.InstanceID
return nil
}
}

func (ei *ec2Info) setAutoScalingGroup() error {
retry := 0
for {
var waitDuration time.Duration
if retry < len(ec2tagger.BackoffSleepArray) {
waitDuration = ec2tagger.BackoffSleepArray[retry]
} else {
waitDuration = ec2tagger.BackoffSleepArray[len(ec2tagger.BackoffSleepArray)-1]
}

wait := time.NewTimer(waitDuration)
select {
case <-ei.done:
wait.Stop()
return errors.New("shutdown signal received")
case <-wait.C:
}

if retry > 0 {
ei.logger.Debug("Initial retrieval of tags and volumes", zap.Int("retry", retry))
}

if err := ei.retrieveAsgName(ei.ec2API); err != nil {
ei.logger.Warn("Unable to describe ec2 tags", zap.Int("retry", retry), zap.Error(err))
} else {
ei.logger.Debug("Retrieval of auto-scaling group tags succeeded")
return nil
}

retry++
}

}

/*
This can also be implemented by just calling the InstanceTagValue and then DescribeTags on failure. But preferred the current implementation
as we need to distinguish the tags not being fetchable at all, from the ASG tag in particular not existing.
*/
func (ei *ec2Info) retrieveAsgName(ec2API ec2iface.EC2API) error {
tags, err := ei.metadataProvider.InstanceTags(context.Background())
if err != nil {
ei.logger.Debug("Failed to get tags through metadata provider", zap.Error(err))
return ei.retrieveAsgNameWithDescribeTags(ec2API)
} else if strings.Contains(tags, ec2tagger.Ec2InstanceTagKeyASG) {
asg, err := ei.metadataProvider.InstanceTagValue(context.Background(), ec2tagger.Ec2InstanceTagKeyASG)
if err != nil {
ei.logger.Error("Failed to get AutoScalingGroup through metadata provider", zap.Error(err))
} else {
ei.logger.Debug("AutoScalingGroup retrieved through IMDS")
ei.AutoScalingGroup = asg
}
}
return nil
}

func (ei *ec2Info) retrieveAsgNameWithDescribeTags(ec2API ec2iface.EC2API) error {
tagFilters := []*ec2.Filter{
{
Name: aws.String("resource-type"),
Values: aws.StringSlice([]string{"instance"}),
},
{
Name: aws.String("resource-id"),
Values: aws.StringSlice([]string{ei.InstanceID}),
},
{
Name: aws.String("key"),
Values: aws.StringSlice([]string{ec2tagger.Ec2InstanceTagKeyASG}),
},
}
input := &ec2.DescribeTagsInput{
Filters: tagFilters,
}
for {
result, err := ec2API.DescribeTags(input)
if err != nil {
ei.logger.Error("Unable to retrieve EC2 AutoScalingGroup. This feature must only be used on an EC2 instance.")
return err
}
for _, tag := range result.Tags {
key := *tag.Key
if ec2tagger.Ec2InstanceTagKeyASG == key {
ei.AutoScalingGroup = *tag.Value
return nil
}
}
if result.NextToken == nil {
break
}
input.SetNextToken(*result.NextToken)
}
return nil
}

func newEC2Info(metadataProvider ec2metadataprovider.MetadataProvider, providerType ec2ProviderType, ec2Credential *configaws.CredentialConfig, done chan struct{}, region string, logger *zap.Logger) *ec2Info {
return &ec2Info{
metadataProvider: metadataProvider,
ec2Provider: providerType,
ec2Credential: ec2Credential,
done: done,
Region: region,
logger: logger,
}
}

func (ei *ec2Info) ignoreInvalidFields() {
if idLength := len(ei.InstanceID); idLength > instanceIdSizeMax {
ei.logger.Warn("InstanceId length exceeds characters limit and will be ignored", zap.Int("length", idLength), zap.Int("character limit", instanceIdSizeMax))
ei.InstanceID = ""
}

if asgLength := len(ei.AutoScalingGroup); asgLength > autoScalingGroupSizeMax {
ei.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
ei.AutoScalingGroup = ""
}
}
Loading

0 comments on commit f0a39d6

Please sign in to comment.