Skip to content

Commit

Permalink
Add Pulse flag in PLE UserAgent (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxiamxia authored Oct 10, 2023
1 parent 4b74f35 commit fe0f140
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 25 deletions.
2 changes: 1 addition & 1 deletion exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session, false)
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session)
collectorIdentifier, err := uuid.NewRandom()

if err != nil {
Expand Down
27 changes: 26 additions & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
// OutputDestination Options
outputDestinationCloudWatch = "cloudwatch"
outputDestinationStdout = "stdout"

// Pulse EMF config
pulseMetricNamespace = "AWS/APM"
pulseLogGroupNamePrefix = "/aws/apm/"
)

type emfExporter struct {
Expand Down Expand Up @@ -55,7 +59,16 @@ func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter,
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, config.Tags, session, isEnhancedContainerInsights(config))
svcStructuredLog := cwlogs.NewClient(set.Logger,
awsConfig,
set.BuildInfo,
config.LogGroupName,
config.LogRetention,
config.Tags,
session,
cwlogs.WithEnabledContainerInsights(isEnhancedContainerInsights(config)),
cwlogs.WithEnabledPulseApm(isPulseApmEnabled(config)),
)
collectorIdentifier, err := uuid.NewRandom()

if err != nil {
Expand Down Expand Up @@ -203,3 +216,15 @@ func isEnhancedContainerInsights(_ *Config) bool {
return false // temporarily disable, also need to rename _config to config
// return config.EnhancedContainerInsights && !config.DisableMetricExtraction
}

func isPulseApmEnabled(config *Config) bool {
if config.LogGroupName == "" || config.Namespace == "" {
return false
}

if config.Namespace == pulseMetricNamespace && strings.HasPrefix(config.LogGroupName, pulseLogGroupNamePrefix) {
return true
}

return false
}
55 changes: 55 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,58 @@ func TestIsEnhancedContainerInsights(t *testing.T) {
cfg.DisableMetricExtraction = true
assert.False(t, isEnhancedContainerInsights(cfg))
}

func TestIsPulseApmEnabled(t *testing.T) {

tests := []struct {
name string
metricNameSpace string
logGroupName string
expectedResult bool
}{
{
"validPulseEMF",
"AWS/APM",
"/aws/apm/eks",
true,
},
{
"invalidPulseLogsGroup",
"AWS/APM",
"/nonaws/apm/eks",
false,
},
{
"invalidPulseMetricNamespace",
"NonAWS/APM",
"/aws/apm/eks",
false,
},
{
"invalidPulseEMF",
"NonAWS/APM",
"/nonaws/apm/eks",
false,
},
{
"defaultConfig",
"",
"",
false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
if len(tc.metricNameSpace) > 0 {
cfg.Namespace = tc.metricNameSpace
}
if len(tc.logGroupName) > 0 {
cfg.LogGroupName = tc.logGroupName
}

assert.Equal(t, isPulseApmEnabled(cfg), tc.expectedResult)
})
}
}
52 changes: 43 additions & 9 deletions internal/aws/cwlogs/cwlog_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ type Client struct {
tags map[string]*string
logger *zap.Logger
}
type UserAgentOption func(*UserAgentFlag)

type UserAgentFlag struct {
isEnhancedContainerInsights bool
isPulseApm bool
}

func WithEnabledContainerInsights(flag bool) UserAgentOption {
return func(ua *UserAgentFlag) {
ua.isEnhancedContainerInsights = flag
}
}

func WithEnabledPulseApm(flag bool) UserAgentOption {
return func(ua *UserAgentFlag) {
ua.isPulseApm = flag
}
}

// Create a log client based on the actual cloudwatch logs client.
func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, tags map[string]*string, logger *zap.Logger) *Client {
Expand All @@ -50,12 +68,21 @@ func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetent
}

// NewClient create Client
func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, enhancedContainerInsights bool) *Client {
func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, opts ...UserAgentOption) *Client {
client := cloudwatchlogs.New(sess, awsConfig)
client.Handlers.Build.PushBackNamed(handler.NewRequestCompressionHandler([]string{"PutLogEvents"}, logger))
client.Handlers.Build.PushBackNamed(handler.RequestStructuredLogHandler)
// temporarily disable the flag
client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, enhancedContainerInsights))

// Loop through each option
option := &UserAgentFlag{
isEnhancedContainerInsights: false,
isPulseApm: false,
}
for _, opt := range opts {
opt(option)
}

client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, option))
return newCloudWatchLogClient(client, logRetention, tags, logger)
}

Expand Down Expand Up @@ -190,13 +217,20 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string,
return "", nil
}

func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, enhancedContainerInsights bool) request.NamedHandler {
fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version)
if enhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName) {
fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "EnhancedEKSContainerInsights")
} else if containerInsightsRegexPattern.MatchString(logGroupName) {
fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "ContainerInsights")
func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, userAgentFlag *UserAgentFlag) request.NamedHandler {
extraStr := ""

switch {
case userAgentFlag.isEnhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName):
extraStr = "EnhancedEKSContainerInsights"
case containerInsightsRegexPattern.MatchString(logGroupName):
extraStr = "ContainerInsights"
case userAgentFlag.isPulseApm:
extraStr = "Pulse"
}

fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, extraStr)

return request.NamedHandler{
Name: "otel.collector.UserAgentHandler",
Fn: fn,
Expand Down
56 changes: 42 additions & 14 deletions internal/aws/cwlogs/cwlog_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,75 +582,103 @@ func TestUserAgent(t *testing.T) {
logger := zap.NewNop()

tests := []struct {
name string
buildInfo component.BuildInfo
logGroupName string
enhancedContainerInsights bool
expectedUserAgentStr string
name string
buildInfo component.BuildInfo
logGroupName string
userAgentOption UserAgentOption
expectedUserAgentStr string
}{
{
"emptyLogGroup",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"",
false,
WithEnabledContainerInsights(false),
"opentelemetry-collector-contrib/1.0",
},
{
"emptyLogGroupPulse",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"",
WithEnabledPulseApm(false),
"opentelemetry-collector-contrib/1.0",
},
{
"buildInfoCommandUsed",
component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"},
"",
false,
WithEnabledContainerInsights(false),
"test-collector-contrib/1.0",
},
{
"buildInfoCommandUsedPulse",
component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"},
"",
WithEnabledPulseApm(false),
"test-collector-contrib/1.0",
},
{
"non container insights",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.1"},
"test-group",
false,
WithEnabledContainerInsights(false),
"opentelemetry-collector-contrib/1.1",
},
{
"container insights EKS",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/containerinsights/eks-cluster-name/performance",
false,
WithEnabledContainerInsights(false),
"opentelemetry-collector-contrib/1.0 (ContainerInsights)",
},
{
"container insights ECS",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/ecs/containerinsights/ecs-cluster-name/performance",
false,
WithEnabledContainerInsights(false),
"opentelemetry-collector-contrib/1.0 (ContainerInsights)",
},
{
"container insights prometheus",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/containerinsights/cluster-name/prometheus",
false,
WithEnabledContainerInsights(false),
"opentelemetry-collector-contrib/1.0 (ContainerInsights)",
},
{
"enhanced container insights EKS",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/containerinsights/eks-cluster-name/performance",
true,
WithEnabledContainerInsights(true),
"opentelemetry-collector-contrib/1.0 (EnhancedEKSContainerInsights)",
},
{
"negative - enhanced container insights ECS",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
// this is an ECS path, enhanced CI is not supported
"/aws/ecs/containerinsights/ecs-cluster-name/performance",
true,
WithEnabledContainerInsights(true),
"opentelemetry-collector-contrib/1.0 (ContainerInsights)",
},
{
"validPulseEMFEnabled",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/apm",
WithEnabledPulseApm(true),
"opentelemetry-collector-contrib/1.0 (Pulse)",
},
{
"PulseEMFNotEnabled",
component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"},
"/aws/apm",
WithEnabledPulseApm(false),
"opentelemetry-collector-contrib/1.0",
},
}

session, _ := session.NewSession()
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.enhancedContainerInsights)
cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.userAgentOption)
logClient := cwlog.svc.(*cloudwatchlogs.CloudWatchLogs)

req := request.New(aws.Config{}, metadata.ClientInfo{}, logClient.Handlers, nil, &request.Operation{
Expand Down

0 comments on commit fe0f140

Please sign in to comment.