Skip to content

Commit

Permalink
moving to start() for exporters to have handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon committed Dec 10, 2024
1 parent 66e9942 commit 0a96c0c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 56 deletions.
39 changes: 21 additions & 18 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type cwlExporter struct {
collectorID string
svcStructuredLog *cwlogs.Client
pusherFactory cwlogs.MultiStreamPusherFactory
params exp.Settings
}

type awsMetadata struct {
Expand All @@ -52,29 +53,16 @@ func newCwLogsPusher(expConfig *Config, params exp.Settings) (*cwlExporter, erro
return nil, errors.New("awscloudwatchlogs exporter config is nil")
}

// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
if err != nil {
return nil, err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session)
collectorIdentifier, err := uuid.NewRandom()
if err != nil {
return nil, err
}

logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog)
multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger)

logsExporter := &cwlExporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherFactory: multiStreamPusherFactory,
Config: expConfig,
logger: params.Logger,
collectorID: collectorIdentifier.String(),
params: params,
}
return logsExporter, nil
}
Expand Down Expand Up @@ -117,10 +105,25 @@ func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
return errs
}

func (e *cwlExporter) start(_ context.Context, host component.Host) error {
func (e *cwlExporter) start(ctx context.Context, host component.Host) error {
// Create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(e.logger, &awsutil.Conn{}, &e.Config.AWSSessionSettings)
if err != nil {
return fmt.Errorf("failed to create AWS session: %w", err)
}

// Create CWLogs client with aws session config
e.svcStructuredLog = cwlogs.NewClient(e.logger, awsConfig, e.params.BuildInfo, e.Config.LogGroupName, e.Config.LogRetention, e.Config.Tags, session)

e.retryCount = *awsConfig.MaxRetries

logStreamManager := cwlogs.NewLogStreamManager(*e.svcStructuredLog)
e.pusherFactory = cwlogs.NewMultiStreamPusherFactory(logStreamManager, *e.svcStructuredLog, e.logger)

if e.Config.MiddlewareID != nil {
awsmiddleware.TryConfigure(e.logger, host, *e.Config.MiddlewareID, awsmiddleware.SDKv1(e.svcStructuredLog.Handlers()))
}

return nil
}

Expand Down
99 changes: 65 additions & 34 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"strings"
"sync"

Expand All @@ -21,7 +22,6 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

Expand All @@ -39,6 +39,7 @@ type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
svcStructuredLog *cwlogs.Client
config *Config
set exporter.Settings

metricTranslator metricTranslator

Expand All @@ -57,42 +58,18 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)

config.logger = set.Logger

// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings)
if err != nil {
return nil, err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(set.Logger,
awsConfig,
set.BuildInfo,
config.LogGroupName,
config.LogRetention,
config.Tags,
session,
cwlogs.WithEnabledContainerInsights(config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(config.IsAppSignalsEnabled()),
)

collectorIdentifier, err := uuid.NewRandom()
if err != nil {
return nil, err
}

// Initialize emfExporter without AWS session and structured logs
emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
retryCnt: config.AWSSessionSettings.MaxRetries,
collectorID: uuid.New().String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
}

if config.IsAppSignalsEnabled() {
userAgent := appsignals.NewUserAgent()
svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler())
emfExporter.processResourceLabels = userAgent.Process
}

Expand Down Expand Up @@ -178,11 +155,36 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
}

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

if emf.svcStructuredLog == nil {
// Initialize svcStructuredLog if it's nil
awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings)
if err != nil {
emf.set.Logger.Error("Failed to create AWS config and session", zap.Error(err))
return nil
}

// Create CWLogs client with aws session config
emf.svcStructuredLog = cwlogs.NewClient(emf.config.logger,
awsConfig,
emf.set.BuildInfo,
emf.config.LogGroupName,
emf.config.LogRetention,
emf.config.Tags,
session,
cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()),
)
}
return emf.pusherMap[key]

pusher, exists := emf.pusherMap[key]
if !exists {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger)
emf.pusherMap[key] = pusher
}
return pusher
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
Expand All @@ -196,10 +198,39 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher {
return pushers
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
func (emf *emfExporter) start(ctx context.Context, host component.Host) error {
// Create AWS session here
awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings)
if err != nil {
return err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(emf.config.logger,
awsConfig,
emf.set.BuildInfo,
emf.config.LogGroupName,
emf.config.LogRetention,
emf.config.Tags,
session,
cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()),
)

// Assign to the struct
emf.svcStructuredLog = svcStructuredLog

if emf.config.IsAppSignalsEnabled() {
userAgent := appsignals.NewUserAgent()
svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler())
emf.processResourceLabels = userAgent.Process
}

// Optionally configure middleware
if emf.config.MiddlewareID != nil {
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(emf.svcStructuredLog.Handlers()))
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(svcStructuredLog.Handlers()))
}

return nil
}

Expand Down
41 changes: 37 additions & 4 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func (p *mockPusher) ForceFlush() error {
return nil
}

type mockHost struct {
component.Host
}

func (m *mockHost) GetExtensions() map[component.ID]component.Component {
return nil
}

func TestConsumeMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -65,6 +73,13 @@ func TestConsumeMetrics(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, exp)

// Create a mock host
mockHost := &mockHost{}

// Call start
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand Down Expand Up @@ -335,9 +350,16 @@ func TestNewExporterWithoutConfig(t *testing.T) {
t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")

exp, err := newEmfExporter(expCfg, settings)
assert.Error(t, err)
assert.Nil(t, exp)
assert.NoError(t, err)
assert.NotNil(t, exp)
assert.Equal(t, settings.Logger, expCfg.logger)

// Create a mock host
mockHost := &mockHost{}

// Check for error in start
err = exp.start(context.Background(), mockHost)
assert.Error(t, err)
}

func TestNewExporterWithMetricDeclarations(t *testing.T) {
Expand Down Expand Up @@ -427,9 +449,20 @@ func TestNewEmfExporterWithoutConfig(t *testing.T) {
t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")

exp, err := newEmfExporter(expCfg, settings)
assert.Error(t, err)
assert.Nil(t, exp)
assert.NoError(t, err)
assert.NotNil(t, exp)
assert.Equal(t, settings.Logger, expCfg.logger)

// Create a mock host
mockHost := &mockHost{}

// Check for error in start
ctx := context.Background()
err = exp.start(ctx, mockHost)
assert.Error(t, err) // We expect an error here due to the fake AWS_STS_REGIONAL_ENDPOINTS

// Verify that svcStructuredLog is still nil after failed start
assert.Nil(t, exp.svcStructuredLog)
}

func TestMiddleware(t *testing.T) {
Expand Down

0 comments on commit 0a96c0c

Please sign in to comment.