Skip to content

Commit

Permalink
Adding AgentHealth Middleware to Exporters (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon authored Dec 16, 2024
1 parent 2dd9562 commit 8e059f1
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 78 deletions.
39 changes: 21 additions & 18 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type cwlExporter struct {
collectorID string
svcStructuredLog *cwlogs.Client
pusherFactory cwlogs.MultiStreamPusherFactory
params exp.Settings
}

type awsMetadata struct {
Expand All @@ -52,29 +53,16 @@ func newCwLogsPusher(expConfig *Config, params exp.Settings) (*cwlExporter, erro
return nil, errors.New("awscloudwatchlogs exporter config is nil")
}

// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
if err != nil {
return nil, err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session)
collectorIdentifier, err := uuid.NewRandom()
if err != nil {
return nil, err
}

logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog)
multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger)

logsExporter := &cwlExporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherFactory: multiStreamPusherFactory,
Config: expConfig,
logger: params.Logger,
collectorID: collectorIdentifier.String(),
params: params,
}
return logsExporter, nil
}
Expand Down Expand Up @@ -117,10 +105,25 @@ func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
return errs
}

func (e *cwlExporter) start(_ context.Context, host component.Host) error {
func (e *cwlExporter) start(ctx context.Context, host component.Host) error {
// Create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(e.logger, &awsutil.Conn{}, &e.Config.AWSSessionSettings)
if err != nil {
return fmt.Errorf("failed to create AWS session: %w", err)
}

// Create CWLogs client with aws session config
e.svcStructuredLog = cwlogs.NewClient(e.logger, awsConfig, e.params.BuildInfo, e.Config.LogGroupName, e.Config.LogRetention, e.Config.Tags, session)

e.retryCount = *awsConfig.MaxRetries

logStreamManager := cwlogs.NewLogStreamManager(*e.svcStructuredLog)
e.pusherFactory = cwlogs.NewMultiStreamPusherFactory(logStreamManager, *e.svcStructuredLog, e.logger)

if e.Config.MiddlewareID != nil {
awsmiddleware.TryConfigure(e.logger, host, *e.Config.MiddlewareID, awsmiddleware.SDKv1(e.svcStructuredLog.Handlers()))
}

return nil
}

Expand Down
18 changes: 16 additions & 2 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func init() {
type mockPusher struct {
mock.Mock
}
type mockHost struct {
component.Host
}

func (m *mockHost) GetExtensions() map[component.ID]component.Component {
return nil
}

func (p *mockPusher) AddLogEntry(_ *cwlogs.Event) error {
args := p.Called(nil)
Expand Down Expand Up @@ -509,7 +516,14 @@ func TestNewExporterWithoutRegionErr(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.MaxRetries = 0
expCfg.Region = "" // Ensure the region is not set

exp, err := newCwLogsExporter(expCfg, exportertest.NewNopSettings())
assert.Nil(t, exp)
assert.Error(t, err)
assert.NoError(t, err) // The exporter creation should not fail
assert.NotNil(t, exp) // The exporter should be created

// Now try to start the exporter
err = exp.Start(context.Background(), &mockHost{})
assert.Error(t, err) // The start should fail due to missing region
assert.Contains(t, err.Error(), "NoAwsRegion")
}
76 changes: 48 additions & 28 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
svcStructuredLog *cwlogs.Client
config *Config
set exporter.Settings

metricTranslator metricTranslator

Expand All @@ -57,42 +58,23 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)

config.logger = set.Logger

// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings)
if err != nil {
return nil, err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(set.Logger,
awsConfig,
set.BuildInfo,
config.LogGroupName,
config.LogRetention,
config.Tags,
session,
cwlogs.WithEnabledContainerInsights(config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(config.IsAppSignalsEnabled()),
)

collectorIdentifier, err := uuid.NewRandom()
if err != nil {
return nil, err
}

// Initialize emfExporter without AWS session and structured logs
emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
retryCnt: config.AWSSessionSettings.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
}

if config.IsAppSignalsEnabled() {
userAgent := appsignals.NewUserAgent()
svcStructuredLog.Handlers().Build.PushBackNamed(userAgent.Handler())
emfExporter.processResourceLabels = userAgent.Process
}

Expand Down Expand Up @@ -148,7 +130,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
fmt.Println(*putLogEvent.InputLogEvent.Message)
}
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
emfPusher := emf.getPusher(putLogEvent.StreamKey)
emfPusher, err := emf.getPusher(putLogEvent.StreamKey)
if err != nil {
return fmt.Errorf("failed to get pusher: %w", err)
}
if emfPusher != nil {
returnError := emfPusher.AddLogEntry(putLogEvent)
if returnError != nil {
Expand Down Expand Up @@ -177,12 +162,24 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
return nil
}

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
func (emf *emfExporter) getPusher(key cwlogs.StreamKey) (cwlogs.Pusher, error) {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

if emf.svcStructuredLog == nil {
return nil, errors.New("CloudWatch Logs client not initialized")
}
return emf.pusherMap[key]

pusher, exists := emf.pusherMap[key]
if !exists {
if emf.set.Logger != nil {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger)
} else {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}
emf.pusherMap[key] = pusher
}
return pusher, nil
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
Expand All @@ -197,9 +194,32 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher {
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
// Create AWS session here
awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings)
if err != nil {
return err
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(emf.config.logger,
awsConfig,
emf.set.BuildInfo,
emf.config.LogGroupName,
emf.config.LogRetention,
emf.config.Tags,
session,
cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()),
)

// Assign to the struct
emf.svcStructuredLog = svcStructuredLog

// Optionally configure middleware
if emf.config.MiddlewareID != nil {
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(emf.svcStructuredLog.Handlers()))
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(svcStructuredLog.Handlers()))
}

return nil
}

Expand Down
Loading

0 comments on commit 8e059f1

Please sign in to comment.