Skip to content

Commit

Permalink
Merge branch 'main' into ta-collector
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ authored Nov 11, 2024
2 parents 205603b + 896a6b6 commit afca216
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 58 deletions.
44 changes: 43 additions & 1 deletion .github/workflows/application-signals-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,46 @@ jobs:
secrets: inherit
with:
aws-region: us-east-1
caller-workflow-name: 'main-build'
caller-workflow-name: 'main-build'

node-eks-e2e-test:
# Because we share the same eks cluster for different languages, we want to run the tests sequentially to avoid interference
needs: [ CheckBuildTestArtifacts, python-eks-e2e-test ]
uses: aws-observability/aws-application-signals-test-framework/.github/workflows/node-eks-test.yml@main
secrets: inherit
with:
aws-region: us-east-1
test-cluster-name: 'e2e-cw-agent-test'
caller-workflow-name: 'main-build'

node-ec2-default-e2e-test:
needs: [ CheckBuildTestArtifacts ]
uses: aws-observability/aws-application-signals-test-framework/.github/workflows/node-ec2-default-test.yml@main
secrets: inherit
with:
aws-region: us-east-1
caller-workflow-name: 'main-build'

node-ec2-asg-e2e-test:
needs: [ CheckBuildTestArtifacts ]
uses: aws-observability/aws-application-signals-test-framework/.github/workflows/node-ec2-asg-test.yml@main
secrets: inherit
with:
aws-region: us-east-1
caller-workflow-name: 'main-build'

node-k8s-e2e-test:
needs: [ CheckBuildTestArtifacts ]
uses: aws-observability/aws-application-signals-test-framework/.github/workflows/node-k8s-test.yml@main
secrets: inherit
with:
aws-region: us-east-1
caller-workflow-name: 'main-build'

node-ecs-e2e-test:
needs: [ CheckBuildTestArtifacts ]
uses: aws-observability/aws-application-signals-test-framework/.github/workflows/node-ecs-test.yml@main
secrets: inherit
with:
aws-region: us-east-1
caller-workflow-name: 'main-build'
11 changes: 11 additions & 0 deletions RELEASE_NOTES
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
========================================================================
Amazon CloudWatch Agent 1.300049.1 (2024-11-06)
========================================================================
Enhancements:
* [Logs] Attach Account ID to entity for cross-account checks

Bug Fixes:
* [Logs] Fix log stream name placeholder resolution for EMF logs
* [Logs] Fix race condition in service provider component of entitystore extension
* [Metrics/Logs] Use IAM credential endpoint from IMDS for accurate entity IAM role name

========================================================================
Amazon CloudWatch Agent 1.300049.0 (2024-10-30)
========================================================================
Expand Down
7 changes: 5 additions & 2 deletions extension/entitystore/ec2Info.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type EC2Info struct {
}

func (ei *EC2Info) initEc2Info() {
if ei.metadataProvider == nil {
return
}
ei.logger.Debug("Initializing EC2Info")
if err := ei.setInstanceIDAccountID(); err != nil {
return
Expand Down Expand Up @@ -73,7 +76,7 @@ func (ei *EC2Info) setInstanceIDAccountID() error {
for {
metadataDoc, err := ei.metadataProvider.Get(context.Background())
if err != nil {
ei.logger.Warn("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err))
ei.logger.Debug("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err))
wait := time.NewTimer(1 * time.Minute)
select {
case <-ei.done:
Expand Down Expand Up @@ -119,7 +122,7 @@ func (ei *EC2Info) setAutoScalingGroup() error {
}

if err := ei.retrieveAsgName(); err != nil {
ei.logger.Warn("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err))
ei.logger.Debug("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err))
} else {
ei.logger.Debug("Retrieval of auto-scaling group tags succeeded")
return nil
Expand Down
31 changes: 31 additions & 0 deletions extension/entitystore/ec2Info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,34 @@ func TestLogMessageDoesNotIncludeResourceInfo(t *testing.T) {
})
}
}

func TestNotInitIfMetadataProviderIsEmpty(t *testing.T) {
tests := []struct {
name string
}{
{
name: "AutoScalingGroupWithInstanceTags",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a buffer to capture the logger output
var buf bytes.Buffer

logger := CreateTestLogger(&buf)
done := make(chan struct{})

ei := &EC2Info{
logger: logger,
done: done,
}
go ei.initEc2Info()
time.Sleep(3 * time.Second)

logOutput := buf.String()
log.Println(logOutput)
assert.NotContains(t, logOutput, "Initializing EC2Info")
assert.NotContains(t, logOutput, "Finished initializing EC2Info")
})
}
}
30 changes: 22 additions & 8 deletions extension/entitystore/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.uber.org/atomic"
"go.uber.org/zap"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
Expand Down Expand Up @@ -48,6 +49,7 @@ type EntityStore struct {
logger *zap.Logger
config *Config
done chan struct{}
ready atomic.Bool

// mode should be EC2, ECS, EKS, and K8S
mode string
Expand Down Expand Up @@ -101,6 +103,7 @@ func (e *EntityStore) Start(ctx context.Context, host component.Host) error {
// Starting the ttl cache will automatically evict all expired pods from the map
go e.StartPodToServiceEnvironmentMappingTtlCache()
}
e.ready.Store(true)
return nil
}

Expand Down Expand Up @@ -139,12 +142,17 @@ func (e *EntityStore) NativeCredentialExists() bool {

// CreateLogFileEntity creates the entity for log events that are being uploaded from a log file in the environment.
func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName LogGroupName) *cloudwatchlogs.Entity {
if e.serviceprovider == nil {
return nil
}
serviceAttr := e.serviceprovider.logFileServiceAttribute(logFileGlob, logGroupName)

keyAttributes := e.createServiceKeyAttributes(serviceAttr)
attributeMap := e.createAttributeMap()
addNonEmptyToMap(attributeMap, ServiceNameSourceKey, serviceAttr.ServiceNameSource)

if _, ok := keyAttributes[entityattributes.AwsAccountId]; !ok {
return nil
}
return &cloudwatchlogs.Entity{
KeyAttributes: keyAttributes,
Attributes: attributeMap,
Expand All @@ -153,6 +161,9 @@ func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName

// GetMetricServiceNameAndSource gets the service name source for service metrics if not customer provided
func (e *EntityStore) GetMetricServiceNameAndSource() (string, string) {
if e.serviceprovider == nil {
return "", ""
}
return e.serviceprovider.getServiceNameAndSource()
}

Expand All @@ -175,11 +186,13 @@ func (e *EntityStore) AddServiceAttrEntryForLogFile(fileGlob LogFileGlob, servic

// AddServiceAttrEntryForLogGroup adds an entry to the entity store for the provided log group nme -> (serviceName, environmentName) key-value pair
func (e *EntityStore) AddServiceAttrEntryForLogGroup(logGroupName LogGroupName, serviceName string, environmentName string) {
e.serviceprovider.addEntryForLogGroup(logGroupName, ServiceAttribute{
ServiceName: serviceName,
ServiceNameSource: ServiceNameSourceInstrumentation,
Environment: environmentName,
})
if e.serviceprovider != nil {
e.serviceprovider.addEntryForLogGroup(logGroupName, ServiceAttribute{
ServiceName: serviceName,
ServiceNameSource: ServiceNameSourceInstrumentation,
Environment: environmentName,
})
}
}

func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceName string, environmentName string, serviceNameSource string) {
Expand All @@ -189,8 +202,8 @@ func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceNam
}

func (e *EntityStore) StartPodToServiceEnvironmentMappingTtlCache() {
if e.eksInfo != nil {
e.eksInfo.podToServiceEnvMap.Start()
if e.eksInfo != nil && e.eksInfo.GetPodServiceEnvironmentMapping() != nil {
e.eksInfo.GetPodServiceEnvironmentMapping().Start()
}
}

Expand Down Expand Up @@ -224,6 +237,7 @@ func (e *EntityStore) createServiceKeyAttributes(serviceAttr ServiceAttribute) m
}
addNonEmptyToMap(serviceKeyAttr, entityattributes.ServiceName, serviceAttr.ServiceName)
addNonEmptyToMap(serviceKeyAttr, entityattributes.DeploymentEnvironment, serviceAttr.Environment)
addNonEmptyToMap(serviceKeyAttr, entityattributes.AwsAccountId, e.ec2Info.GetAccountID())
return serviceKeyAttr
}

Expand Down
45 changes: 39 additions & 6 deletions extension/entitystore/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ func (m *mockMetadataProvider) InstanceID(ctx context.Context) (string, error) {
return "MockInstanceID", nil
}

func (m *mockMetadataProvider) InstanceProfileIAMRole() (string, error) {
return "arn:aws:iam::123456789:instance-profile/TestRole", nil
}

func (m *mockMetadataProvider) InstanceTags(ctx context.Context) (string, error) {
if m.InstanceTagError {
return "", errors.New("an error occurred for instance tag retrieval")
Expand All @@ -118,6 +114,10 @@ func (m *mockMetadataProvider) InstanceTags(ctx context.Context) (string, error)
return tagsString, nil
}

func (m *mockMetadataProvider) ClientIAMRole(ctx context.Context) (string, error) {
return "TestRole", nil
}

func (m *mockMetadataProvider) InstanceTagValue(ctx context.Context, tagKey string) (string, error) {
tag, ok := m.Tags[tagKey]
if !ok {
Expand Down Expand Up @@ -310,6 +310,7 @@ func TestEntityStore_createServiceKeyAttributes(t *testing.T) {

func TestEntityStore_createLogFileRID(t *testing.T) {
instanceId := "i-abcd1234"
accountId := "123456789012"
glob := LogFileGlob("glob")
group := LogGroupName("group")
serviceAttr := ServiceAttribute{
Expand All @@ -321,7 +322,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
sp.On("logFileServiceAttribute", glob, group).Return(serviceAttr)
e := EntityStore{
mode: config.ModeEC2,
ec2Info: EC2Info{InstanceID: instanceId},
ec2Info: EC2Info{InstanceID: instanceId, AccountID: accountId},
serviceprovider: sp,
nativeCredential: &session.Session{},
}
Expand All @@ -333,6 +334,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
entityattributes.DeploymentEnvironment: aws.String("test-environment"),
entityattributes.ServiceName: aws.String("test-service"),
entityattributes.EntityType: aws.String(Service),
entityattributes.AwsAccountId: aws.String(accountId),
},
Attributes: map[string]*string{
InstanceIDKey: aws.String(instanceId),
Expand All @@ -344,6 +346,21 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
assert.Equal(t, dereferenceMap(expectedEntity.Attributes), dereferenceMap(entity.Attributes))
}

func TestEntityStore_createLogFileRID_ServiceProviderIsEmpty(t *testing.T) {
instanceId := "i-abcd1234"
glob := LogFileGlob("glob")
group := LogGroupName("group")
e := EntityStore{
mode: config.ModeEC2,
ec2Info: EC2Info{InstanceID: instanceId},
nativeCredential: &session.Session{},
}

entity := e.CreateLogFileEntity(glob, group)

assert.Nil(t, entity)
}

func dereferenceMap(input map[string]*string) map[string]string {
result := make(map[string]string)
for k, v := range input {
Expand Down Expand Up @@ -539,6 +556,22 @@ func TestEntityStore_GetMetricServiceNameSource(t *testing.T) {
assert.Equal(t, "UserConfiguration", serviceNameSource)
}

func TestEntityStore_GetMetricServiceNameSource_ServiceProviderEmpty(t *testing.T) {
instanceId := "i-abcd1234"
accountId := "123456789012"
e := EntityStore{
mode: config.ModeEC2,
ec2Info: EC2Info{InstanceID: instanceId},
metadataprovider: mockMetadataProviderWithAccountId(accountId),
nativeCredential: &session.Session{},
}

serviceName, serviceNameSource := e.GetMetricServiceNameAndSource()

assert.Equal(t, "", serviceName)
assert.Equal(t, "", serviceNameSource)
}

func TestEntityStore_LogMessageDoesNotIncludeResourceInfo(t *testing.T) {
type args struct {
metadataProvider ec2metadataprovider.MetadataProvider
Expand Down Expand Up @@ -588,7 +621,7 @@ func TestEntityStore_LogMessageDoesNotIncludeResourceInfo(t *testing.T) {
assertIfNonEmpty(t, logOutput, es.ec2Info.GetInstanceID())
assertIfNonEmpty(t, logOutput, es.ec2Info.GetAutoScalingGroup())
assertIfNonEmpty(t, logOutput, es.ec2Info.GetAccountID())

assert.True(t, es.ready.Load(), "EntityStore should be ready")
})
}
}
Expand Down
11 changes: 10 additions & 1 deletion extension/entitystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package entitystore

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
Expand All @@ -13,10 +14,16 @@ import (
var (
TypeStr, _ = component.NewType("entitystore")
entityStore *EntityStore
mutex sync.RWMutex
)

func GetEntityStore() *EntityStore {
return entityStore
mutex.RLock()
defer mutex.RUnlock()
if entityStore != nil && entityStore.ready.Load() {
return entityStore
}
return nil
}

func NewFactory() extension.Factory {
Expand All @@ -33,6 +40,8 @@ func createDefaultConfig() component.Config {
}

func createExtension(_ context.Context, settings extension.CreateSettings, cfg component.Config) (extension.Extension, error) {
mutex.Lock()
defer mutex.Unlock()
entityStore = &EntityStore{
logger: settings.Logger,
config: cfg.(*Config),
Expand Down
Loading

0 comments on commit afca216

Please sign in to comment.