Skip to content

Commit

Permalink
Drop entity if account ID is not present
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl committed Oct 29, 2024
1 parent b3c03e7 commit 5c6298d
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 51 deletions.
18 changes: 8 additions & 10 deletions extension/entitystore/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/sts/stsiface"
"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
Expand All @@ -20,6 +19,7 @@ import (
configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/entityattributes"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)
Expand All @@ -31,9 +31,6 @@ const (
ServiceNameSourceKey = "AWS.ServiceNameSource"
PlatformType = "PlatformType"
EC2PlatForm = "AWS::EC2"
Type = "Type"
Name = "Name"
Environment = "Environment"
podTerminationCheckInterval = 5 * time.Minute
)

Expand Down Expand Up @@ -74,8 +71,6 @@ type EntityStore struct {

metadataprovider ec2metadataprovider.MetadataProvider

stsClient stsiface.STSAPI

podTerminationCheckInterval time.Duration
}

Expand Down Expand Up @@ -149,7 +144,9 @@ func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName
keyAttributes := e.createServiceKeyAttributes(serviceAttr)
attributeMap := e.createAttributeMap()
addNonEmptyToMap(attributeMap, ServiceNameSourceKey, serviceAttr.ServiceNameSource)

if _, ok := keyAttributes[entityattributes.AwsAccountId]; !ok {
return nil
}
return &cloudwatchlogs.Entity{
KeyAttributes: keyAttributes,
Attributes: attributeMap,
Expand Down Expand Up @@ -225,10 +222,11 @@ func (e *EntityStore) createAttributeMap() map[string]*string {
// createServiceKeyAttribute creates KeyAttributes for Service entities
func (e *EntityStore) createServiceKeyAttributes(serviceAttr ServiceAttribute) map[string]*string {
serviceKeyAttr := map[string]*string{
Type: aws.String(Service),
entityattributes.EntityType: aws.String(Service),
}
addNonEmptyToMap(serviceKeyAttr, Name, serviceAttr.ServiceName)
addNonEmptyToMap(serviceKeyAttr, Environment, serviceAttr.Environment)
addNonEmptyToMap(serviceKeyAttr, entityattributes.ServiceName, serviceAttr.ServiceName)
addNonEmptyToMap(serviceKeyAttr, entityattributes.DeploymentEnvironment, serviceAttr.Environment)
addNonEmptyToMap(serviceKeyAttr, entityattributes.AwsAccountId, e.ec2Info.GetAccountID())
return serviceKeyAttr
}

Expand Down
38 changes: 13 additions & 25 deletions extension/entitystore/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/aws/aws-sdk-go/service/sts/stsiface"
"github.com/jellydator/ttlcache/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/entityattributes"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)
Expand Down Expand Up @@ -64,15 +63,6 @@ func (s *mockServiceProvider) getServiceNameAndSource() (string, string) {
return "test-service-name", "UserConfiguration"
}

type mockSTSClient struct {
stsiface.STSAPI
accountId string
}

func (ms *mockSTSClient) GetCallerIdentity(*sts.GetCallerIdentityInput) (*sts.GetCallerIdentityOutput, error) {
return &sts.GetCallerIdentityOutput{Account: aws.String(ms.accountId)}, nil
}

type mockMetadataProvider struct {
InstanceIdentityDocument *ec2metadata.EC2InstanceIdentityDocument
Tags map[string]string
Expand Down Expand Up @@ -288,25 +278,25 @@ func TestEntityStore_createServiceKeyAttributes(t *testing.T) {
name: "NameAndEnvironmentSet",
serviceAttr: ServiceAttribute{ServiceName: "test-service", Environment: "test-environment"},
want: map[string]*string{
Environment: aws.String("test-environment"),
Name: aws.String("test-service"),
Type: aws.String(Service),
entityattributes.DeploymentEnvironment: aws.String("test-environment"),
entityattributes.ServiceName: aws.String("test-service"),
entityattributes.EntityType: aws.String(Service),
},
},
{
name: "OnlyNameSet",
serviceAttr: ServiceAttribute{ServiceName: "test-service"},
want: map[string]*string{
Name: aws.String("test-service"),
Type: aws.String(Service),
entityattributes.ServiceName: aws.String("test-service"),
entityattributes.EntityType: aws.String(Service),
},
},
{
name: "OnlyEnvironmentSet",
serviceAttr: ServiceAttribute{Environment: "test-environment"},
want: map[string]*string{
Environment: aws.String("test-environment"),
Type: aws.String(Service),
entityattributes.DeploymentEnvironment: aws.String("test-environment"),
entityattributes.EntityType: aws.String(Service),
},
},
}
Expand All @@ -332,20 +322,19 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
sp.On("logFileServiceAttribute", glob, group).Return(serviceAttr)
e := EntityStore{
mode: config.ModeEC2,
ec2Info: EC2Info{InstanceID: instanceId},
ec2Info: EC2Info{InstanceID: instanceId, AccountID: accountId},
serviceprovider: sp,
metadataprovider: mockMetadataProviderWithAccountId(accountId),
stsClient: &mockSTSClient{accountId: accountId},
nativeCredential: &session.Session{},
}

entity := e.CreateLogFileEntity(glob, group)

expectedEntity := cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
Environment: aws.String("test-environment"),
Name: aws.String("test-service"),
Type: aws.String(Service),
entityattributes.DeploymentEnvironment: aws.String("test-environment"),
entityattributes.ServiceName: aws.String("test-service"),
entityattributes.EntityType: aws.String(Service),
entityattributes.AwsAccountId: aws.String(accountId),
},
Attributes: map[string]*string{
InstanceIDKey: aws.String(instanceId),
Expand Down Expand Up @@ -543,7 +532,6 @@ func TestEntityStore_GetMetricServiceNameSource(t *testing.T) {
ec2Info: EC2Info{InstanceID: instanceId},
serviceprovider: sp,
metadataprovider: mockMetadataProviderWithAccountId(accountId),
stsClient: &mockSTSClient{accountId: accountId},
nativeCredential: &session.Session{},
}

Expand Down
4 changes: 3 additions & 1 deletion plugins/outputs/cloudwatch/convert_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func fetchEntityFields(resourceAttributes pcommon.Map) cloudwatch.Entity {
processEntityAttributes(entityattributes.GetKeyAttributeEntityShortNameMap(), keyAttributesMap, resourceAttributes)
processEntityAttributes(entityattributes.GetAttributeEntityShortNameMap(platformType), attributeMap, resourceAttributes)
removeEntityFields(resourceAttributes)

if _, ok := keyAttributesMap[entityattributes.AwsAccountId]; !ok {
return cloudwatch.Entity{}
}
return cloudwatch.Entity{
KeyAttributes: keyAttributesMap,
Attributes: attributeMap,
Expand Down
33 changes: 30 additions & 3 deletions plugins/outputs/cloudwatch/convert_otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestProcessAndRemoveEntityAttributes(t *testing.T) {
}
}

func TestFetchEntityFields(t *testing.T) {
func TestFetchEntityFields_WithoutAccountID(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
Expand All @@ -390,11 +390,34 @@ func TestFetchEntityFields(t *testing.T) {
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EKS")
assert.Equal(t, 8, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: nil,
Attributes: nil,
}
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())
assert.Equal(t, expectedEntity, entity)
}

func TestFetchEntityFields_WithAccountID(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNode, "my-node")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityCluster, "my-cluster")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityWorkload, "my-workload")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EKS")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 9, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Node: aws.String("my-node"),
Expand All @@ -421,13 +444,15 @@ func TestFetchEntityFieldsOnK8s(t *testing.T) {
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityWorkload, "my-workload")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "K8s")
assert.Equal(t, 8, resourceMetrics.Resource().Attributes().Len())
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 9, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Node: aws.String("my-node"),
Expand All @@ -448,13 +473,15 @@ func TestFetchEntityFieldsOnEc2(t *testing.T) {
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EC2")
assert.Equal(t, 4, resourceMetrics.Resource().Attributes().Len())
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 5, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Platform: aws.String("AWS::EC2"),
Expand Down
9 changes: 3 additions & 6 deletions plugins/outputs/cloudwatchlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,15 @@ func (p *pusher) send() {
if p.needSort {
sort.Stable(ByTimestamp(p.events))
}
var entity *cloudwatchlogs.Entity
if p.logSrc != nil {
entity = p.logSrc.Entity()
}

input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: p.events,
LogGroupName: &p.Group,
LogStreamName: &p.Stream,
SequenceToken: p.sequenceToken,
}
input.Entity = entity
if p.logSrc != nil {
input.Entity = p.logSrc.Entity()
}

startTime := time.Now()

Expand Down
61 changes: 55 additions & 6 deletions plugins/outputs/cloudwatchlogs/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@ import (

type mockLogSrc struct {
logs.LogSrc
returnEmpty bool
}

func (m *mockLogSrc) Entity() *cloudwatchlogs.Entity {
return &cloudwatchlogs.Entity{
entity := &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"PlatformType": aws.String("AWS::EC2"),
"EC2.InstanceId": aws.String("i-123456789"),
"EC2.AutoScalingGroup": aws.String("test-group"),
},
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"AwsAccountId": aws.String("123456789"),
},
}
if m.returnEmpty {
return nil
}
return entity
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -103,7 +109,7 @@ func (e evtMock) Done() {
}
}

func TestAddSingleEvent(t *testing.T) {
func TestAddSingleEvent_WithAccountId(t *testing.T) {
var s svcMock
called := false
nst := "NEXT_SEQ_TOKEN"
Expand All @@ -114,8 +120,9 @@ func TestAddSingleEvent(t *testing.T) {
"EC2.AutoScalingGroup": aws.String("test-group"),
},
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"AwsAccountId": aws.String("123456789"),
},
}

Expand Down Expand Up @@ -155,6 +162,48 @@ func TestAddSingleEvent(t *testing.T) {
wg.Wait()
}

func TestAddSingleEvent_WithoutAccountId(t *testing.T) {
var s svcMock
called := false
nst := "NEXT_SEQ_TOKEN"

s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
called = true

if in.SequenceToken != nil {
t.Errorf("PutLogEvents called with wrong sequenceToken, first call should not provide any token")
}

if *in.LogGroupName != "G" || *in.LogStreamName != "S" {
t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName)
}

if len(in.LogEvents) != 1 || *in.LogEvents[0].Message != "MSG" {
t.Errorf("PutLogEvents called with incorrect message, got: '%v'", *in.LogEvents[0].Message)
}
require.Nil(t, in.Entity)
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: &nst,
}, nil
}

stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout)
p.logSrc = &mockLogSrc{returnEmpty: true}

p.AddEvent(evtMock{"MSG", time.Now(), nil})
require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.")

p.FlushTimeout = 10 * time.Millisecond
p.resetFlushTimer()

time.Sleep(3 * time.Second)
require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.")
require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken")

close(stop)
wg.Wait()
}

func TestStopPusherWouldDoFinalSend(t *testing.T) {
var s svcMock
called := false
Expand Down

0 comments on commit 5c6298d

Please sign in to comment.