diff --git a/extension/entitystore/ec2Info.go b/extension/entitystore/ec2Info.go index 8309c209b5..ae641ddbfd 100644 --- a/extension/entitystore/ec2Info.go +++ b/extension/entitystore/ec2Info.go @@ -41,6 +41,9 @@ type EC2Info struct { } func (ei *EC2Info) initEc2Info() { + if ei.metadataProvider == nil { + return + } ei.logger.Debug("Initializing EC2Info") if err := ei.setInstanceIDAccountID(); err != nil { return diff --git a/extension/entitystore/ec2Info_test.go b/extension/entitystore/ec2Info_test.go index 21fc8d148c..6602752c5a 100644 --- a/extension/entitystore/ec2Info_test.go +++ b/extension/entitystore/ec2Info_test.go @@ -205,3 +205,34 @@ func TestLogMessageDoesNotIncludeResourceInfo(t *testing.T) { }) } } + +func TestNotInitIfMetadataProviderIsEmpty(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "AutoScalingGroupWithInstanceTags", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a buffer to capture the logger output + var buf bytes.Buffer + + logger := CreateTestLogger(&buf) + done := make(chan struct{}) + + ei := &EC2Info{ + logger: logger, + done: done, + } + go ei.initEc2Info() + time.Sleep(3 * time.Second) + + logOutput := buf.String() + log.Println(logOutput) + assert.NotContains(t, logOutput, "Initializing EC2Info") + assert.NotContains(t, logOutput, "Finished initializing EC2Info") + }) + } +} diff --git a/extension/entitystore/extension.go b/extension/entitystore/extension.go index ec3a3e4d1d..737223d7e5 100644 --- a/extension/entitystore/extension.go +++ b/extension/entitystore/extension.go @@ -14,6 +14,7 @@ import ( "github.com/jellydator/ttlcache/v3" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" + "go.uber.org/atomic" "go.uber.org/zap" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" @@ -48,6 +49,7 @@ type EntityStore struct { logger *zap.Logger config *Config done chan struct{} + ready atomic.Bool // mode should be EC2, ECS, EKS, and K8S mode string @@ -101,6 +103,7 @@ func (e *EntityStore) Start(ctx context.Context, host component.Host) error { // Starting the ttl cache will automatically evict all expired pods from the map go e.StartPodToServiceEnvironmentMappingTtlCache() } + e.ready.Store(true) return nil } @@ -139,6 +142,9 @@ func (e *EntityStore) NativeCredentialExists() bool { // CreateLogFileEntity creates the entity for log events that are being uploaded from a log file in the environment. func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName LogGroupName) *cloudwatchlogs.Entity { + if e.serviceprovider == nil { + return nil + } serviceAttr := e.serviceprovider.logFileServiceAttribute(logFileGlob, logGroupName) keyAttributes := e.createServiceKeyAttributes(serviceAttr) @@ -153,6 +159,9 @@ func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName // GetMetricServiceNameAndSource gets the service name source for service metrics if not customer provided func (e *EntityStore) GetMetricServiceNameAndSource() (string, string) { + if e.serviceprovider == nil { + return "", "" + } return e.serviceprovider.getServiceNameAndSource() } @@ -175,11 +184,13 @@ func (e *EntityStore) AddServiceAttrEntryForLogFile(fileGlob LogFileGlob, servic // AddServiceAttrEntryForLogGroup adds an entry to the entity store for the provided log group nme -> (serviceName, environmentName) key-value pair func (e *EntityStore) AddServiceAttrEntryForLogGroup(logGroupName LogGroupName, serviceName string, environmentName string) { - e.serviceprovider.addEntryForLogGroup(logGroupName, ServiceAttribute{ - ServiceName: serviceName, - ServiceNameSource: ServiceNameSourceInstrumentation, - Environment: environmentName, - }) + if e.serviceprovider != nil { + e.serviceprovider.addEntryForLogGroup(logGroupName, ServiceAttribute{ + ServiceName: serviceName, + ServiceNameSource: ServiceNameSourceInstrumentation, + Environment: environmentName, + }) + } } func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceName string, environmentName string, serviceNameSource string) { @@ -189,8 +200,8 @@ func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceNam } func (e *EntityStore) StartPodToServiceEnvironmentMappingTtlCache() { - if e.eksInfo != nil { - e.eksInfo.podToServiceEnvMap.Start() + if e.eksInfo != nil && e.eksInfo.GetPodServiceEnvironmentMapping() != nil { + e.eksInfo.GetPodServiceEnvironmentMapping().Start() } } diff --git a/extension/entitystore/extension_test.go b/extension/entitystore/extension_test.go index c4d44c006d..d28cfdcdfc 100644 --- a/extension/entitystore/extension_test.go +++ b/extension/entitystore/extension_test.go @@ -344,6 +344,21 @@ func TestEntityStore_createLogFileRID(t *testing.T) { assert.Equal(t, dereferenceMap(expectedEntity.Attributes), dereferenceMap(entity.Attributes)) } +func TestEntityStore_createLogFileRID_ServiceProviderIsEmpty(t *testing.T) { + instanceId := "i-abcd1234" + glob := LogFileGlob("glob") + group := LogGroupName("group") + e := EntityStore{ + mode: config.ModeEC2, + ec2Info: EC2Info{InstanceID: instanceId}, + nativeCredential: &session.Session{}, + } + + entity := e.CreateLogFileEntity(glob, group) + + assert.Nil(t, entity) +} + func dereferenceMap(input map[string]*string) map[string]string { result := make(map[string]string) for k, v := range input { @@ -539,6 +554,22 @@ func TestEntityStore_GetMetricServiceNameSource(t *testing.T) { assert.Equal(t, "UserConfiguration", serviceNameSource) } +func TestEntityStore_GetMetricServiceNameSource_ServiceProviderEmpty(t *testing.T) { + instanceId := "i-abcd1234" + accountId := "123456789012" + e := EntityStore{ + mode: config.ModeEC2, + ec2Info: EC2Info{InstanceID: instanceId}, + metadataprovider: mockMetadataProviderWithAccountId(accountId), + nativeCredential: &session.Session{}, + } + + serviceName, serviceNameSource := e.GetMetricServiceNameAndSource() + + assert.Equal(t, "", serviceName) + assert.Equal(t, "", serviceNameSource) +} + func TestEntityStore_LogMessageDoesNotIncludeResourceInfo(t *testing.T) { type args struct { metadataProvider ec2metadataprovider.MetadataProvider @@ -588,7 +619,7 @@ func TestEntityStore_LogMessageDoesNotIncludeResourceInfo(t *testing.T) { assertIfNonEmpty(t, logOutput, es.ec2Info.GetInstanceID()) assertIfNonEmpty(t, logOutput, es.ec2Info.GetAutoScalingGroup()) assertIfNonEmpty(t, logOutput, es.ec2Info.GetAccountID()) - + assert.True(t, es.ready.Load(), "EntityStore should be ready") }) } } diff --git a/extension/entitystore/factory.go b/extension/entitystore/factory.go index 10e2e20913..95de9bdb80 100644 --- a/extension/entitystore/factory.go +++ b/extension/entitystore/factory.go @@ -5,6 +5,7 @@ package entitystore import ( "context" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" @@ -13,10 +14,16 @@ import ( var ( TypeStr, _ = component.NewType("entitystore") entityStore *EntityStore + mutex sync.RWMutex ) func GetEntityStore() *EntityStore { - return entityStore + mutex.RLock() + defer mutex.RUnlock() + if entityStore != nil && entityStore.ready.Load() { + return entityStore + } + return nil } func NewFactory() extension.Factory { @@ -33,6 +40,8 @@ func createDefaultConfig() component.Config { } func createExtension(_ context.Context, settings extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + mutex.Lock() + defer mutex.Unlock() entityStore = &EntityStore{ logger: settings.Logger, config: cfg.(*Config), diff --git a/extension/entitystore/serviceprovider.go b/extension/entitystore/serviceprovider.go index 41942028cd..157dbfff2f 100644 --- a/extension/entitystore/serviceprovider.go +++ b/extension/entitystore/serviceprovider.go @@ -78,6 +78,9 @@ type serviceprovider struct { } func (s *serviceprovider) startServiceProvider() { + if s.metadataProvider == nil { + return + } unlimitedRetryer := NewRetryer(false, true, defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry, s.done, s.logger) limitedRetryer := NewRetryer(false, true, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.ThrottleBackOffArray, maxRetry, s.done, s.logger) go unlimitedRetryer.refreshLoop(s.scrapeIAMRole) @@ -99,12 +102,18 @@ func (s *serviceprovider) GetIMDSServiceName() string { // addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the // CloudWatch Agent config. func (s *serviceprovider) addEntryForLogFile(logFileGlob LogFileGlob, serviceAttr ServiceAttribute) { + if s.logFiles == nil { + s.logFiles = make(map[LogFileGlob]ServiceAttribute) + } s.logFiles[logFileGlob] = serviceAttr } // addEntryForLogGroup adds an association between a log group name and a service attribute, as observed from incoming // telemetry received by CloudWatch Agent. func (s *serviceprovider) addEntryForLogGroup(logGroupName LogGroupName, serviceAttr ServiceAttribute) { + if s.logGroups == nil { + s.logGroups = make(map[LogGroupName]ServiceAttribute) + } s.logGroups[logGroupName] = serviceAttr } @@ -162,7 +171,7 @@ func (s *serviceprovider) getServiceNameAndSource() (string, string) { } func (s *serviceprovider) serviceAttributeForLogGroup(logGroup LogGroupName) ServiceAttribute { - if logGroup == "" { + if logGroup == "" || s.logGroups == nil { return ServiceAttribute{} } @@ -170,7 +179,7 @@ func (s *serviceprovider) serviceAttributeForLogGroup(logGroup LogGroupName) Ser } func (s *serviceprovider) serviceAttributeForLogFile(logFile LogFileGlob) ServiceAttribute { - if logFile == "" { + if logFile == "" || s.logFiles == nil { return ServiceAttribute{} } diff --git a/extension/entitystore/serviceprovider_test.go b/extension/entitystore/serviceprovider_test.go index 011a522c96..c049c03305 100644 --- a/extension/entitystore/serviceprovider_test.go +++ b/extension/entitystore/serviceprovider_test.go @@ -32,6 +32,11 @@ func Test_serviceprovider_startServiceProvider(t *testing.T) { wantIAM: "TestRole", wantTag: "test-service", }, + { + name: "EmptyServiceProvider", + wantIAM: "", + wantTag: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -64,6 +69,17 @@ func Test_serviceprovider_addEntryForLogFile(t *testing.T) { assert.Equal(t, serviceAttr, actual) } +func Test_serviceprovider_addEntryForLogFile_logFilesEmpty(t *testing.T) { + s := &serviceprovider{} + glob := LogFileGlob("glob") + serviceAttr := ServiceAttribute{ServiceName: "test-service"} + + s.addEntryForLogFile(glob, serviceAttr) + + actual := s.logFiles[glob] + assert.Equal(t, serviceAttr, actual) +} + func Test_serviceprovider_addEntryForLogGroup(t *testing.T) { s := &serviceprovider{ logGroups: make(map[LogGroupName]ServiceAttribute), @@ -77,6 +93,17 @@ func Test_serviceprovider_addEntryForLogGroup(t *testing.T) { assert.Equal(t, serviceAttr, actual) } +func Test_serviceprovider_addEntryForLogGroup_logGroupsEmpty(t *testing.T) { + s := &serviceprovider{} + group := LogGroupName("group") + serviceAttr := ServiceAttribute{ServiceName: "test-service"} + + s.addEntryForLogGroup(group, serviceAttr) + + actual := s.logGroups[group] + assert.Equal(t, serviceAttr, actual) +} + func Test_serviceprovider_mergeServiceAttributes(t *testing.T) { onlySvc1 := func() ServiceAttribute { return ServiceAttribute{ServiceName: "service1", ServiceNameSource: "source1"} @@ -142,6 +169,8 @@ func Test_serviceprovider_serviceAttributeForLogGroup(t *testing.T) { assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogGroup("")) assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogGroup("othergroup")) assert.Equal(t, ServiceAttribute{ServiceName: "test-service"}, s.serviceAttributeForLogGroup("group")) + s.logGroups = nil + assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogGroup("group")) } func Test_serviceprovider_serviceAttributeForLogFile(t *testing.T) { @@ -149,6 +178,8 @@ func Test_serviceprovider_serviceAttributeForLogFile(t *testing.T) { assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogFile("")) assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogFile("otherglob")) assert.Equal(t, ServiceAttribute{ServiceName: "test-service"}, s.serviceAttributeForLogFile("glob")) + s.logFiles = nil + assert.Equal(t, ServiceAttribute{}, s.serviceAttributeForLogFile("glob")) } func Test_serviceprovider_serviceAttributeFromEc2Tags(t *testing.T) {