From fe0f14003d23bd0fba968b6637ccc878ac2f798a Mon Sep 17 00:00:00 2001 From: Min Xia Date: Tue, 10 Oct 2023 11:50:20 -0700 Subject: [PATCH] Add Pulse flag in PLE UserAgent (#105) --- .../awscloudwatchlogsexporter/exporter.go | 2 +- exporter/awsemfexporter/emf_exporter.go | 27 ++++++++- exporter/awsemfexporter/emf_exporter_test.go | 55 ++++++++++++++++++ internal/aws/cwlogs/cwlog_client.go | 52 ++++++++++++++--- internal/aws/cwlogs/cwlog_client_test.go | 56 ++++++++++++++----- 5 files changed, 167 insertions(+), 25 deletions(-) diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index 3e7d5793fa68..c2f7cfabe230 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -61,7 +61,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session, false) + svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session) collectorIdentifier, err := uuid.NewRandom() if err != nil { diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index b565ba8d8cfd..981cf03e8645 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -26,6 +26,10 @@ const ( // OutputDestination Options outputDestinationCloudWatch = "cloudwatch" outputDestinationStdout = "stdout" + + // Pulse EMF config + pulseMetricNamespace = "AWS/APM" + pulseLogGroupNamePrefix = "/aws/apm/" ) type emfExporter struct { @@ -55,7 +59,16 @@ func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, config.Tags, session, isEnhancedContainerInsights(config)) + svcStructuredLog := cwlogs.NewClient(set.Logger, + awsConfig, + set.BuildInfo, + config.LogGroupName, + config.LogRetention, + config.Tags, + session, + cwlogs.WithEnabledContainerInsights(isEnhancedContainerInsights(config)), + cwlogs.WithEnabledPulseApm(isPulseApmEnabled(config)), + ) collectorIdentifier, err := uuid.NewRandom() if err != nil { @@ -203,3 +216,15 @@ func isEnhancedContainerInsights(_ *Config) bool { return false // temporarily disable, also need to rename _config to config // return config.EnhancedContainerInsights && !config.DisableMetricExtraction } + +func isPulseApmEnabled(config *Config) bool { + if config.LogGroupName == "" || config.Namespace == "" { + return false + } + + if config.Namespace == pulseMetricNamespace && strings.HasPrefix(config.LogGroupName, pulseLogGroupNamePrefix) { + return true + } + + return false +} diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 91c9d3fce714..9b061d0c8bc9 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -361,3 +361,58 @@ func TestIsEnhancedContainerInsights(t *testing.T) { cfg.DisableMetricExtraction = true assert.False(t, isEnhancedContainerInsights(cfg)) } + +func TestIsPulseApmEnabled(t *testing.T) { + + tests := []struct { + name string + metricNameSpace string + logGroupName string + expectedResult bool + }{ + { + "validPulseEMF", + "AWS/APM", + "/aws/apm/eks", + true, + }, + { + "invalidPulseLogsGroup", + "AWS/APM", + "/nonaws/apm/eks", + false, + }, + { + "invalidPulseMetricNamespace", + "NonAWS/APM", + "/aws/apm/eks", + false, + }, + { + "invalidPulseEMF", + "NonAWS/APM", + "/nonaws/apm/eks", + false, + }, + { + "defaultConfig", + "", + "", + false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + if len(tc.metricNameSpace) > 0 { + cfg.Namespace = tc.metricNameSpace + } + if len(tc.logGroupName) > 0 { + cfg.LogGroupName = tc.logGroupName + } + + assert.Equal(t, isPulseApmEnabled(cfg), tc.expectedResult) + }) + } +} diff --git a/internal/aws/cwlogs/cwlog_client.go b/internal/aws/cwlogs/cwlog_client.go index 70635b8de9ff..7e36fc5f690e 100644 --- a/internal/aws/cwlogs/cwlog_client.go +++ b/internal/aws/cwlogs/cwlog_client.go @@ -39,6 +39,24 @@ type Client struct { tags map[string]*string logger *zap.Logger } +type UserAgentOption func(*UserAgentFlag) + +type UserAgentFlag struct { + isEnhancedContainerInsights bool + isPulseApm bool +} + +func WithEnabledContainerInsights(flag bool) UserAgentOption { + return func(ua *UserAgentFlag) { + ua.isEnhancedContainerInsights = flag + } +} + +func WithEnabledPulseApm(flag bool) UserAgentOption { + return func(ua *UserAgentFlag) { + ua.isPulseApm = flag + } +} // Create a log client based on the actual cloudwatch logs client. func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, tags map[string]*string, logger *zap.Logger) *Client { @@ -50,12 +68,21 @@ func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetent } // NewClient create Client -func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, enhancedContainerInsights bool) *Client { +func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, opts ...UserAgentOption) *Client { client := cloudwatchlogs.New(sess, awsConfig) client.Handlers.Build.PushBackNamed(handler.NewRequestCompressionHandler([]string{"PutLogEvents"}, logger)) client.Handlers.Build.PushBackNamed(handler.RequestStructuredLogHandler) - // temporarily disable the flag - client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, enhancedContainerInsights)) + + // Loop through each option + option := &UserAgentFlag{ + isEnhancedContainerInsights: false, + isPulseApm: false, + } + for _, opt := range opts { + opt(option) + } + + client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, option)) return newCloudWatchLogClient(client, logRetention, tags, logger) } @@ -190,13 +217,20 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string, return "", nil } -func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, enhancedContainerInsights bool) request.NamedHandler { - fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version) - if enhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName) { - fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "EnhancedEKSContainerInsights") - } else if containerInsightsRegexPattern.MatchString(logGroupName) { - fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "ContainerInsights") +func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, userAgentFlag *UserAgentFlag) request.NamedHandler { + extraStr := "" + + switch { + case userAgentFlag.isEnhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName): + extraStr = "EnhancedEKSContainerInsights" + case containerInsightsRegexPattern.MatchString(logGroupName): + extraStr = "ContainerInsights" + case userAgentFlag.isPulseApm: + extraStr = "Pulse" } + + fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, extraStr) + return request.NamedHandler{ Name: "otel.collector.UserAgentHandler", Fn: fn, diff --git a/internal/aws/cwlogs/cwlog_client_test.go b/internal/aws/cwlogs/cwlog_client_test.go index b63c832ba06d..5a2f6594d1e8 100644 --- a/internal/aws/cwlogs/cwlog_client_test.go +++ b/internal/aws/cwlogs/cwlog_client_test.go @@ -582,59 +582,73 @@ func TestUserAgent(t *testing.T) { logger := zap.NewNop() tests := []struct { - name string - buildInfo component.BuildInfo - logGroupName string - enhancedContainerInsights bool - expectedUserAgentStr string + name string + buildInfo component.BuildInfo + logGroupName string + userAgentOption UserAgentOption + expectedUserAgentStr string }{ { "emptyLogGroup", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "", - false, + WithEnabledContainerInsights(false), + "opentelemetry-collector-contrib/1.0", + }, + { + "emptyLogGroupPulse", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "", + WithEnabledPulseApm(false), "opentelemetry-collector-contrib/1.0", }, { "buildInfoCommandUsed", component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"}, "", - false, + WithEnabledContainerInsights(false), + "test-collector-contrib/1.0", + }, + { + "buildInfoCommandUsedPulse", + component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"}, + "", + WithEnabledPulseApm(false), "test-collector-contrib/1.0", }, { "non container insights", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.1"}, "test-group", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.1", }, { "container insights EKS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/eks-cluster-name/performance", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "container insights ECS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/ecs/containerinsights/ecs-cluster-name/performance", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "container insights prometheus", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/cluster-name/prometheus", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "enhanced container insights EKS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/eks-cluster-name/performance", - true, + WithEnabledContainerInsights(true), "opentelemetry-collector-contrib/1.0 (EnhancedEKSContainerInsights)", }, { @@ -642,15 +656,29 @@ func TestUserAgent(t *testing.T) { component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, // this is an ECS path, enhanced CI is not supported "/aws/ecs/containerinsights/ecs-cluster-name/performance", - true, + WithEnabledContainerInsights(true), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, + { + "validPulseEMFEnabled", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "/aws/apm", + WithEnabledPulseApm(true), + "opentelemetry-collector-contrib/1.0 (Pulse)", + }, + { + "PulseEMFNotEnabled", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "/aws/apm", + WithEnabledPulseApm(false), + "opentelemetry-collector-contrib/1.0", + }, } session, _ := session.NewSession() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.enhancedContainerInsights) + cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.userAgentOption) logClient := cwlog.svc.(*cloudwatchlogs.CloudWatchLogs) req := request.New(aws.Config{}, metadata.ClientInfo{}, logClient.Handlers, nil, &request.Operation{