diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index f58f1ca932da..d2d11f94cf4a 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -34,6 +34,7 @@ type cwlExporter struct { collectorID string svcStructuredLog *cwlogs.Client pusherFactory cwlogs.MultiStreamPusherFactory + params exp.Settings } type awsMetadata struct { @@ -52,29 +53,16 @@ func newCwLogsPusher(expConfig *Config, params exp.Settings) (*cwlExporter, erro return nil, errors.New("awscloudwatchlogs exporter config is nil") } - // create AWS session - awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings) - if err != nil { - return nil, err - } - - // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session) collectorIdentifier, err := uuid.NewRandom() if err != nil { return nil, err } - logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog) - multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger) - logsExporter := &cwlExporter{ - svcStructuredLog: svcStructuredLog, - Config: expConfig, - logger: params.Logger, - retryCount: *awsConfig.MaxRetries, - collectorID: collectorIdentifier.String(), - pusherFactory: multiStreamPusherFactory, + Config: expConfig, + logger: params.Logger, + collectorID: collectorIdentifier.String(), + params: params, } return logsExporter, nil } @@ -117,10 +105,25 @@ func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error { return errs } -func (e *cwlExporter) start(_ context.Context, host component.Host) error { +func (e *cwlExporter) start(ctx context.Context, host component.Host) error { + // Create AWS session + awsConfig, session, err := awsutil.GetAWSConfigSession(e.logger, &awsutil.Conn{}, &e.Config.AWSSessionSettings) + if err != nil { + return fmt.Errorf("failed to create AWS session: %w", err) + } + + // Create CWLogs client with aws session config + e.svcStructuredLog = cwlogs.NewClient(e.logger, awsConfig, e.params.BuildInfo, e.Config.LogGroupName, e.Config.LogRetention, e.Config.Tags, session) + + e.retryCount = *awsConfig.MaxRetries + + logStreamManager := cwlogs.NewLogStreamManager(*e.svcStructuredLog) + e.pusherFactory = cwlogs.NewMultiStreamPusherFactory(logStreamManager, *e.svcStructuredLog, e.logger) + if e.Config.MiddlewareID != nil { awsmiddleware.TryConfigure(e.logger, host, *e.Config.MiddlewareID, awsmiddleware.SDKv1(e.svcStructuredLog.Handlers())) } + return nil } diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 3aaa89a0d8b7..cdd40bacee79 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "strings" "sync" @@ -21,7 +22,6 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" ) @@ -39,6 +39,7 @@ type emfExporter struct { pusherMap map[cwlogs.StreamKey]cwlogs.Pusher svcStructuredLog *cwlogs.Client config *Config + set exporter.Settings metricTranslator metricTranslator @@ -57,42 +58,18 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error) config.logger = set.Logger - // create AWS session - awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings) - if err != nil { - return nil, err - } - - // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(set.Logger, - awsConfig, - set.BuildInfo, - config.LogGroupName, - config.LogRetention, - config.Tags, - session, - cwlogs.WithEnabledContainerInsights(config.IsEnhancedContainerInsights()), - cwlogs.WithEnabledAppSignals(config.IsAppSignalsEnabled()), - ) - - collectorIdentifier, err := uuid.NewRandom() - if err != nil { - return nil, err - } - + // Initialize emfExporter without AWS session and structured logs emfExporter := &emfExporter{ - svcStructuredLog: svcStructuredLog, config: config, metricTranslator: newMetricTranslator(*config), - retryCnt: *awsConfig.MaxRetries, - collectorID: collectorIdentifier.String(), + retryCnt: config.AWSSessionSettings.MaxRetries, + collectorID: uuid.New().String(), pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{}, processResourceLabels: func(map[string]string) {}, } if config.IsAppSignalsEnabled() { userAgent := appsignals.NewUserAgent() - svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler()) emfExporter.processResourceLabels = userAgent.Process } @@ -178,11 +155,36 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e } func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher { - var ok bool - if _, ok = emf.pusherMap[key]; !ok { - emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) + emf.pusherMapLock.Lock() + defer emf.pusherMapLock.Unlock() + + if emf.svcStructuredLog == nil { + // Initialize svcStructuredLog if it's nil + awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings) + if err != nil { + emf.set.Logger.Error("Failed to create AWS config and session", zap.Error(err)) + return nil + } + + // Create CWLogs client with aws session config + emf.svcStructuredLog = cwlogs.NewClient(emf.config.logger, + awsConfig, + emf.set.BuildInfo, + emf.config.LogGroupName, + emf.config.LogRetention, + emf.config.Tags, + session, + cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()), + cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()), + ) } - return emf.pusherMap[key] + + pusher, exists := emf.pusherMap[key] + if !exists { + pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger) + emf.pusherMap[key] = pusher + } + return pusher } func (emf *emfExporter) listPushers() []cwlogs.Pusher { @@ -196,10 +198,39 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher { return pushers } -func (emf *emfExporter) start(_ context.Context, host component.Host) error { +func (emf *emfExporter) start(ctx context.Context, host component.Host) error { + // Create AWS session here + awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings) + if err != nil { + return err + } + + // create CWLogs client with aws session config + svcStructuredLog := cwlogs.NewClient(emf.config.logger, + awsConfig, + emf.set.BuildInfo, + emf.config.LogGroupName, + emf.config.LogRetention, + emf.config.Tags, + session, + cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()), + cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()), + ) + + // Assign to the struct + emf.svcStructuredLog = svcStructuredLog + + if emf.config.IsAppSignalsEnabled() { + userAgent := appsignals.NewUserAgent() + svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler()) + emf.processResourceLabels = userAgent.Process + } + + // Optionally configure middleware if emf.config.MiddlewareID != nil { - awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(emf.svcStructuredLog.Handlers())) + awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(svcStructuredLog.Handlers())) } + return nil } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index ecb2d54036df..82bf42651e2b 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -54,6 +54,14 @@ func (p *mockPusher) ForceFlush() error { return nil } +type mockHost struct { + component.Host +} + +func (m *mockHost) GetExtensions() map[component.ID]component.Component { + return nil +} + func TestConsumeMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -65,6 +73,13 @@ func TestConsumeMetrics(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, exp) + // Create a mock host + mockHost := &mockHost{} + + // Call start + err = exp.start(ctx, mockHost) + assert.NoError(t, err) + md := generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, @@ -335,9 +350,16 @@ func TestNewExporterWithoutConfig(t *testing.T) { t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake") exp, err := newEmfExporter(expCfg, settings) - assert.Error(t, err) - assert.Nil(t, exp) + assert.NoError(t, err) + assert.NotNil(t, exp) assert.Equal(t, settings.Logger, expCfg.logger) + + // Create a mock host + mockHost := &mockHost{} + + // Check for error in start + err = exp.start(context.Background(), mockHost) + assert.Error(t, err) } func TestNewExporterWithMetricDeclarations(t *testing.T) { @@ -427,9 +449,20 @@ func TestNewEmfExporterWithoutConfig(t *testing.T) { t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake") exp, err := newEmfExporter(expCfg, settings) - assert.Error(t, err) - assert.Nil(t, exp) + assert.NoError(t, err) + assert.NotNil(t, exp) assert.Equal(t, settings.Logger, expCfg.logger) + + // Create a mock host + mockHost := &mockHost{} + + // Check for error in start + ctx := context.Background() + err = exp.start(ctx, mockHost) + assert.Error(t, err) // We expect an error here due to the fake AWS_STS_REGIONAL_ENDPOINTS + + // Verify that svcStructuredLog is still nil after failed start + assert.Nil(t, exp.svcStructuredLog) } func TestMiddleware(t *testing.T) {