From fe0f14003d23bd0fba968b6637ccc878ac2f798a Mon Sep 17 00:00:00 2001 From: Min Xia Date: Tue, 10 Oct 2023 11:50:20 -0700 Subject: [PATCH 1/7] Add Pulse flag in PLE UserAgent (#105) --- .../awscloudwatchlogsexporter/exporter.go | 2 +- exporter/awsemfexporter/emf_exporter.go | 27 ++++++++- exporter/awsemfexporter/emf_exporter_test.go | 55 ++++++++++++++++++ internal/aws/cwlogs/cwlog_client.go | 52 ++++++++++++++--- internal/aws/cwlogs/cwlog_client_test.go | 56 ++++++++++++++----- 5 files changed, 167 insertions(+), 25 deletions(-) diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index 3e7d5793fa68..c2f7cfabe230 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -61,7 +61,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session, false) + svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, expConfig.Tags, session) collectorIdentifier, err := uuid.NewRandom() if err != nil { diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index b565ba8d8cfd..981cf03e8645 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -26,6 +26,10 @@ const ( // OutputDestination Options outputDestinationCloudWatch = "cloudwatch" outputDestinationStdout = "stdout" + + // Pulse EMF config + pulseMetricNamespace = "AWS/APM" + pulseLogGroupNamePrefix = "/aws/apm/" ) type emfExporter struct { @@ -55,7 +59,16 @@ func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, config.Tags, session, isEnhancedContainerInsights(config)) + svcStructuredLog := cwlogs.NewClient(set.Logger, + awsConfig, + set.BuildInfo, + config.LogGroupName, + config.LogRetention, + config.Tags, + session, + cwlogs.WithEnabledContainerInsights(isEnhancedContainerInsights(config)), + cwlogs.WithEnabledPulseApm(isPulseApmEnabled(config)), + ) collectorIdentifier, err := uuid.NewRandom() if err != nil { @@ -203,3 +216,15 @@ func isEnhancedContainerInsights(_ *Config) bool { return false // temporarily disable, also need to rename _config to config // return config.EnhancedContainerInsights && !config.DisableMetricExtraction } + +func isPulseApmEnabled(config *Config) bool { + if config.LogGroupName == "" || config.Namespace == "" { + return false + } + + if config.Namespace == pulseMetricNamespace && strings.HasPrefix(config.LogGroupName, pulseLogGroupNamePrefix) { + return true + } + + return false +} diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 91c9d3fce714..9b061d0c8bc9 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -361,3 +361,58 @@ func TestIsEnhancedContainerInsights(t *testing.T) { cfg.DisableMetricExtraction = true assert.False(t, isEnhancedContainerInsights(cfg)) } + +func TestIsPulseApmEnabled(t *testing.T) { + + tests := []struct { + name string + metricNameSpace string + logGroupName string + expectedResult bool + }{ + { + "validPulseEMF", + "AWS/APM", + "/aws/apm/eks", + true, + }, + { + "invalidPulseLogsGroup", + "AWS/APM", + "/nonaws/apm/eks", + false, + }, + { + "invalidPulseMetricNamespace", + "NonAWS/APM", + "/aws/apm/eks", + false, + }, + { + "invalidPulseEMF", + "NonAWS/APM", + "/nonaws/apm/eks", + false, + }, + { + "defaultConfig", + "", + "", + false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + if len(tc.metricNameSpace) > 0 { + cfg.Namespace = tc.metricNameSpace + } + if len(tc.logGroupName) > 0 { + cfg.LogGroupName = tc.logGroupName + } + + assert.Equal(t, isPulseApmEnabled(cfg), tc.expectedResult) + }) + } +} diff --git a/internal/aws/cwlogs/cwlog_client.go b/internal/aws/cwlogs/cwlog_client.go index 70635b8de9ff..7e36fc5f690e 100644 --- a/internal/aws/cwlogs/cwlog_client.go +++ b/internal/aws/cwlogs/cwlog_client.go @@ -39,6 +39,24 @@ type Client struct { tags map[string]*string logger *zap.Logger } +type UserAgentOption func(*UserAgentFlag) + +type UserAgentFlag struct { + isEnhancedContainerInsights bool + isPulseApm bool +} + +func WithEnabledContainerInsights(flag bool) UserAgentOption { + return func(ua *UserAgentFlag) { + ua.isEnhancedContainerInsights = flag + } +} + +func WithEnabledPulseApm(flag bool) UserAgentOption { + return func(ua *UserAgentFlag) { + ua.isPulseApm = flag + } +} // Create a log client based on the actual cloudwatch logs client. func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, tags map[string]*string, logger *zap.Logger) *Client { @@ -50,12 +68,21 @@ func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetent } // NewClient create Client -func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, enhancedContainerInsights bool) *Client { +func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, tags map[string]*string, sess *session.Session, opts ...UserAgentOption) *Client { client := cloudwatchlogs.New(sess, awsConfig) client.Handlers.Build.PushBackNamed(handler.NewRequestCompressionHandler([]string{"PutLogEvents"}, logger)) client.Handlers.Build.PushBackNamed(handler.RequestStructuredLogHandler) - // temporarily disable the flag - client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, enhancedContainerInsights)) + + // Loop through each option + option := &UserAgentFlag{ + isEnhancedContainerInsights: false, + isPulseApm: false, + } + for _, opt := range opts { + opt(option) + } + + client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName, option)) return newCloudWatchLogClient(client, logRetention, tags, logger) } @@ -190,13 +217,20 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string, return "", nil } -func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, enhancedContainerInsights bool) request.NamedHandler { - fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version) - if enhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName) { - fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "EnhancedEKSContainerInsights") - } else if containerInsightsRegexPattern.MatchString(logGroupName) { - fn = request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, "ContainerInsights") +func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string, userAgentFlag *UserAgentFlag) request.NamedHandler { + extraStr := "" + + switch { + case userAgentFlag.isEnhancedContainerInsights && enhancedContainerInsightsEKSPattern.MatchString(logGroupName): + extraStr = "EnhancedEKSContainerInsights" + case containerInsightsRegexPattern.MatchString(logGroupName): + extraStr = "ContainerInsights" + case userAgentFlag.isPulseApm: + extraStr = "Pulse" } + + fn := request.MakeAddToUserAgentHandler(buildInfo.Command, buildInfo.Version, extraStr) + return request.NamedHandler{ Name: "otel.collector.UserAgentHandler", Fn: fn, diff --git a/internal/aws/cwlogs/cwlog_client_test.go b/internal/aws/cwlogs/cwlog_client_test.go index b63c832ba06d..5a2f6594d1e8 100644 --- a/internal/aws/cwlogs/cwlog_client_test.go +++ b/internal/aws/cwlogs/cwlog_client_test.go @@ -582,59 +582,73 @@ func TestUserAgent(t *testing.T) { logger := zap.NewNop() tests := []struct { - name string - buildInfo component.BuildInfo - logGroupName string - enhancedContainerInsights bool - expectedUserAgentStr string + name string + buildInfo component.BuildInfo + logGroupName string + userAgentOption UserAgentOption + expectedUserAgentStr string }{ { "emptyLogGroup", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "", - false, + WithEnabledContainerInsights(false), + "opentelemetry-collector-contrib/1.0", + }, + { + "emptyLogGroupPulse", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "", + WithEnabledPulseApm(false), "opentelemetry-collector-contrib/1.0", }, { "buildInfoCommandUsed", component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"}, "", - false, + WithEnabledContainerInsights(false), + "test-collector-contrib/1.0", + }, + { + "buildInfoCommandUsedPulse", + component.BuildInfo{Command: "test-collector-contrib", Version: "1.0"}, + "", + WithEnabledPulseApm(false), "test-collector-contrib/1.0", }, { "non container insights", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.1"}, "test-group", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.1", }, { "container insights EKS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/eks-cluster-name/performance", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "container insights ECS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/ecs/containerinsights/ecs-cluster-name/performance", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "container insights prometheus", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/cluster-name/prometheus", - false, + WithEnabledContainerInsights(false), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, { "enhanced container insights EKS", component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, "/aws/containerinsights/eks-cluster-name/performance", - true, + WithEnabledContainerInsights(true), "opentelemetry-collector-contrib/1.0 (EnhancedEKSContainerInsights)", }, { @@ -642,15 +656,29 @@ func TestUserAgent(t *testing.T) { component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, // this is an ECS path, enhanced CI is not supported "/aws/ecs/containerinsights/ecs-cluster-name/performance", - true, + WithEnabledContainerInsights(true), "opentelemetry-collector-contrib/1.0 (ContainerInsights)", }, + { + "validPulseEMFEnabled", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "/aws/apm", + WithEnabledPulseApm(true), + "opentelemetry-collector-contrib/1.0 (Pulse)", + }, + { + "PulseEMFNotEnabled", + component.BuildInfo{Command: "opentelemetry-collector-contrib", Version: "1.0"}, + "/aws/apm", + WithEnabledPulseApm(false), + "opentelemetry-collector-contrib/1.0", + }, } session, _ := session.NewSession() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.enhancedContainerInsights) + cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, map[string]*string{}, session, tc.userAgentOption) logClient := cwlog.svc.(*cloudwatchlogs.CloudWatchLogs) req := request.New(aws.Config{}, metadata.ClientInfo{}, logClient.Handlers, nil, &request.Operation{ From ca00a4d960861623f3d47ada16310d961af1c2d2 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 10 Oct 2023 14:40:48 -0500 Subject: [PATCH 2/7] =?UTF-8?q?move=20container=5Fstatus=5F=20metrics=20to?= =?UTF-8?q?=20pod=5Fcontainer=5F=20so=20we=20can=20pick=20up=20sh=E2=80=A6?= =?UTF-8?q?=20(#109)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * move container_status_ metrics to pod_container_ so we can pick up short lived container states * update README * update README * minor refactor so units work --- internal/aws/containerinsight/const.go | 107 ++++++++--------- .../awscontainerinsightreceiver/README.md | 110 +++++++++--------- .../internal/stores/podstore.go | 74 ++++++++---- .../internal/stores/podstore_test.go | 110 ++++++++++++------ 4 files changed, 233 insertions(+), 168 deletions(-) diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 62228df8ecd4..d6ff6ecdbc9c 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -76,38 +76,39 @@ const ( FSInodesfree = "filesystem_inodes_free" FSUtilization = "filesystem_utilization" - StatusConditionReady = "status_condition_ready" - StatusConditionDiskPressure = "status_condition_disk_pressure" - StatusConditionMemoryPressure = "status_condition_memory_pressure" - StatusConditionPIDPressure = "status_condition_pid_pressure" - StatusConditionNetworkUnavailable = "status_condition_network_unavailable" - StatusConditionUnknown = "status_condition_unknown" - StatusCapacityPods = "status_capacity_pods" - StatusAllocatablePods = "status_allocatable_pods" - StatusNumberAvailable = "status_number_available" - StatusNumberUnavailable = "status_number_unavailable" - StatusDesiredNumberScheduled = "status_desired_number_scheduled" - StatusCurrentNumberScheduled = "status_current_number_scheduled" - StatusReplicasAvailable = "status_replicas_available" - StatusReplicasUnavailable = "status_replicas_unavailable" - SpecReplicas = "spec_replicas" - StatusRunning = "status_running" - StatusTerminated = "status_terminated" - StatusWaiting = "status_waiting" - StatusWaitingReasonCrashLoopBackOff = "status_waiting_reason_crash_loop_back_off" - StatusWaitingReasonImagePullError = "status_waiting_reason_image_pull_error" - StatusWaitingReasonStartError = "status_waiting_reason_start_error" - StatusWaitingReasonCreateContainerError = "status_waiting_reason_create_container_error" - StatusWaitingReasonCreateContainerConfigError = "status_waiting_reason_create_container_config_error" - StatusTerminatedReasonOOMKilled = "status_terminated_reason_oom_killed" - StatusPending = "status_pending" - StatusSucceeded = "status_succeeded" - StatusFailed = "status_failed" - StatusUnknown = "status_unknown" - StatusReady = "status_ready" - StatusScheduled = "status_scheduled" - ReplicasDesired = "replicas_desired" - ReplicasReady = "replicas_ready" + StatusConditionReady = "status_condition_ready" + StatusConditionDiskPressure = "status_condition_disk_pressure" + StatusConditionMemoryPressure = "status_condition_memory_pressure" + StatusConditionPIDPressure = "status_condition_pid_pressure" + StatusConditionNetworkUnavailable = "status_condition_network_unavailable" + StatusConditionUnknown = "status_condition_unknown" + StatusCapacityPods = "status_capacity_pods" + StatusAllocatablePods = "status_allocatable_pods" + StatusNumberAvailable = "status_number_available" + StatusNumberUnavailable = "status_number_unavailable" + StatusDesiredNumberScheduled = "status_desired_number_scheduled" + StatusCurrentNumberScheduled = "status_current_number_scheduled" + StatusReplicasAvailable = "status_replicas_available" + StatusReplicasUnavailable = "status_replicas_unavailable" + SpecReplicas = "spec_replicas" + StatusContainerRunning = "container_status_running" + StatusContainerTerminated = "container_status_terminated" + StatusContainerWaiting = "container_status_waiting" + StatusContainerWaitingReasonCrashLoopBackOff = "container_status_waiting_reason_crash_loop_back_off" + StatusContainerWaitingReasonImagePullError = "container_status_waiting_reason_image_pull_error" + StatusContainerWaitingReasonStartError = "container_status_waiting_reason_start_error" + StatusContainerWaitingReasonCreateContainerError = "container_status_waiting_reason_create_container_error" + StatusContainerWaitingReasonCreateContainerConfigError = "container_status_waiting_reason_create_container_config_error" + StatusContainerTerminatedReasonOOMKilled = "container_status_terminated_reason_oom_killed" + StatusRunning = "status_running" + StatusPending = "status_pending" + StatusSucceeded = "status_succeeded" + StatusFailed = "status_failed" + StatusUnknown = "status_unknown" + StatusReady = "status_ready" + StatusScheduled = "status_scheduled" + ReplicasDesired = "replicas_desired" + ReplicasReady = "replicas_ready" RunningPodCount = "number_of_running_pods" RunningContainerCount = "number_of_running_containers" @@ -163,13 +164,13 @@ const ( ) var WaitingReasonLookup = map[string]string{ - "CrashLoopBackOff": StatusWaitingReasonCrashLoopBackOff, - "ErrImagePull": StatusWaitingReasonImagePullError, - "ImagePullBackOff": StatusWaitingReasonImagePullError, - "InvalidImageName": StatusWaitingReasonImagePullError, - "CreateContainerError": StatusWaitingReasonCreateContainerError, - "CreateContainerConfigError": StatusWaitingReasonCreateContainerConfigError, - "StartError": StatusWaitingReasonStartError, + "CrashLoopBackOff": StatusContainerWaitingReasonCrashLoopBackOff, + "ErrImagePull": StatusContainerWaitingReasonImagePullError, + "ImagePullBackOff": StatusContainerWaitingReasonImagePullError, + "InvalidImageName": StatusContainerWaitingReasonImagePullError, + "CreateContainerError": StatusContainerWaitingReasonCreateContainerError, + "CreateContainerConfigError": StatusContainerWaitingReasonCreateContainerConfigError, + "StartError": StatusContainerWaitingReasonStartError, } var metricToUnitMap map[string]string @@ -261,20 +262,20 @@ func init() { ReplicasReady: UnitCount, // kube-state-metrics equivalents - StatusRunning: UnitCount, - StatusTerminated: UnitCount, - StatusWaiting: UnitCount, - StatusWaitingReasonCrashLoopBackOff: UnitCount, - StatusWaitingReasonImagePullError: UnitCount, - StatusWaitingReasonStartError: UnitCount, - StatusWaitingReasonCreateContainerConfigError: UnitCount, - StatusWaitingReasonCreateContainerError: UnitCount, - StatusFailed: UnitCount, - StatusPending: UnitCount, - StatusSucceeded: UnitCount, - StatusUnknown: UnitCount, - StatusReady: UnitCount, - StatusScheduled: UnitCount, + StatusContainerRunning: UnitCount, + StatusContainerTerminated: UnitCount, + StatusContainerWaiting: UnitCount, + StatusContainerWaitingReasonCrashLoopBackOff: UnitCount, + StatusContainerWaitingReasonImagePullError: UnitCount, + StatusContainerWaitingReasonStartError: UnitCount, + StatusContainerWaitingReasonCreateContainerConfigError: UnitCount, + StatusContainerWaitingReasonCreateContainerError: UnitCount, + StatusFailed: UnitCount, + StatusPending: UnitCount, + StatusSucceeded: UnitCount, + StatusUnknown: UnitCount, + StatusReady: UnitCount, + StatusScheduled: UnitCount, // cluster metrics NodeCount: UnitCount, diff --git a/receiver/awscontainerinsightreceiver/README.md b/receiver/awscontainerinsightreceiver/README.md index 81502bb1954a..c270856d50ae 100644 --- a/receiver/awscontainerinsightreceiver/README.md +++ b/receiver/awscontainerinsightreceiver/README.md @@ -687,52 +687,61 @@ kubectl apply -f config.yaml

### Pod -| Metric | Unit | -|---------------------------------------|--------------| -| pod_cpu_limit | Millicore | -| pod_cpu_request | Millicore | -| pod_cpu_reserved_capacity | Percent | -| pod_cpu_usage_system | Millicore | -| pod_cpu_usage_total | Millicore | -| pod_cpu_usage_user | Millicore | -| pod_cpu_utilization | Percent | -| pod_cpu_utilization_over_pod_limit | Percent | -| pod_memory_cache | Bytes | -| pod_memory_failcnt | Count | -| pod_memory_hierarchical_pgfault | Count/Second | -| pod_memory_hierarchical_pgmajfault | Count/Second | -| pod_memory_limit | Bytes | -| pod_memory_mapped_file | Bytes | -| pod_memory_max_usage | Bytes | -| pod_memory_pgfault | Count/Second | -| pod_memory_pgmajfault | Count/Second | -| pod_memory_request | Bytes | -| pod_memory_reserved_capacity | Percent | -| pod_memory_rss | Bytes | -| pod_memory_swap | Bytes | -| pod_memory_usage | Bytes | -| pod_memory_utilization | Percent | -| pod_memory_utilization_over_pod_limit | Percent | -| pod_memory_working_set | Bytes | -| pod_network_rx_bytes | Bytes/Second | -| pod_network_rx_dropped | Count/Second | -| pod_network_rx_errors | Count/Second | -| pod_network_rx_packets | Count/Second | -| pod_network_total_bytes | Bytes/Second | -| pod_network_tx_bytes | Bytes/Second | -| pod_network_tx_dropped | Count/Second | -| pod_network_tx_errors | Count/Second | -| pod_network_tx_packets | Count/Second | -| pod_number_of_container_restarts | Count | -| pod_number_of_containers | Count | -| pod_number_of_running_containers | Count | -| pod_status_ready | Count | -| pod_status_scheduled | Count | -| pod_status_unknown | Count | -| pod_status_failed | Count | -| pod_status_pending | Count | -| pod_status_running | Count | -| pod_status_succeeded | Count | +| Metric | Unit | +|-------------------------------------------------------------------|--------------| +| pod_cpu_limit | Millicore | +| pod_cpu_request | Millicore | +| pod_cpu_reserved_capacity | Percent | +| pod_cpu_usage_system | Millicore | +| pod_cpu_usage_total | Millicore | +| pod_cpu_usage_user | Millicore | +| pod_cpu_utilization | Percent | +| pod_cpu_utilization_over_pod_limit | Percent | +| pod_memory_cache | Bytes | +| pod_memory_failcnt | Count | +| pod_memory_hierarchical_pgfault | Count/Second | +| pod_memory_hierarchical_pgmajfault | Count/Second | +| pod_memory_limit | Bytes | +| pod_memory_mapped_file | Bytes | +| pod_memory_max_usage | Bytes | +| pod_memory_pgfault | Count/Second | +| pod_memory_pgmajfault | Count/Second | +| pod_memory_request | Bytes | +| pod_memory_reserved_capacity | Percent | +| pod_memory_rss | Bytes | +| pod_memory_swap | Bytes | +| pod_memory_usage | Bytes | +| pod_memory_utilization | Percent | +| pod_memory_utilization_over_pod_limit | Percent | +| pod_memory_working_set | Bytes | +| pod_network_rx_bytes | Bytes/Second | +| pod_network_rx_dropped | Count/Second | +| pod_network_rx_errors | Count/Second | +| pod_network_rx_packets | Count/Second | +| pod_network_total_bytes | Bytes/Second | +| pod_network_tx_bytes | Bytes/Second | +| pod_network_tx_dropped | Count/Second | +| pod_network_tx_errors | Count/Second | +| pod_network_tx_packets | Count/Second | +| pod_number_of_container_restarts | Count | +| pod_number_of_containers | Count | +| pod_number_of_running_containers | Count | +| pod_status_ready | Count | +| pod_status_scheduled | Count | +| pod_status_unknown | Count | +| pod_status_failed | Count | +| pod_status_pending | Count | +| pod_status_running | Count | +| pod_status_succeeded | Count | +| pod_container_status_running | Count | +| pod_container_status_terminated | Count | +| pod_container_status_waiting | Count | +| pod_container_status_waiting_reason_crash_loop_back_off | Count | +| pod_container_status_waiting_reason_image_pull_error | Count | +| pod_container_status_waiting_reason_start_error | Count | +| pod_container_status_waiting_reason_create_container_error | Count | +| pod_container_status_waiting_reason_create_container_config_error | Count | +| pod_container_status_terminated_reason_oom_killed | Count | | Resource Attribute | |----------------------| @@ -816,15 +825,6 @@ kubectl apply -f config.yaml | container_memory_utilization_over_container_limit | Percent | | container_memory_working_set | Bytes | | number_of_container_restarts | Count | -| container_status_running | Count | -| container_status_terminated | Count | -| container_status_waiting | Count | -| container_status_waiting_reason_crash_loop_back_off | Count | -| container_status_waiting_reason_image_pull_error | Count | -| container_status_waiting_reason_start_error | Count | -| container_status_waiting_reason_create_container_error | Count | -| container_status_waiting_reason_create_container_config_error | Count | -| container_status_terminated_reason_oom_killed | Count |

diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 3babc6a37b45..a14817a8c2aa 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -497,12 +497,14 @@ func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { if p.includeEnhancedMetrics { p.addPodStatusMetrics(metric, pod) p.addPodConditionMetrics(metric, pod) + p.addPodContainerStatusMetrics(metric, pod) } var curContainerRestarts int for _, containerStatus := range pod.Status.ContainerStatuses { curContainerRestarts += int(containerStatus.RestartCount) } + podKey := createPodKeyFromMetric(metric) if podKey != "" { content, ok := p.getPrevMeasurement(ci.TypePod, podKey) @@ -520,28 +522,16 @@ func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" { for _, containerStatus := range pod.Status.ContainerStatuses { if containerStatus.Name == containerName { - possibleStatuses := map[string]int{ - ci.StatusRunning: 0, - ci.StatusWaiting: 0, - ci.StatusTerminated: 0, - } switch { case containerStatus.State.Running != nil: metric.AddTag(ci.ContainerStatus, "Running") - possibleStatuses[ci.StatusRunning] = 1 case containerStatus.State.Waiting != nil: metric.AddTag(ci.ContainerStatus, "Waiting") - possibleStatuses[ci.StatusWaiting] = 1 - reason := containerStatus.State.Waiting.Reason - if reason != "" { - metric.AddTag(ci.ContainerStatusReason, reason) - if val, ok := ci.WaitingReasonLookup[reason]; ok { - possibleStatuses[val] = 1 - } + if containerStatus.State.Waiting.Reason != "" { + metric.AddTag(ci.ContainerStatusReason, containerStatus.State.Waiting.Reason) } case containerStatus.State.Terminated != nil: metric.AddTag(ci.ContainerStatus, "Terminated") - possibleStatuses[ci.StatusTerminated] = 1 if containerStatus.State.Terminated.Reason != "" { metric.AddTag(ci.ContainerStatusReason, containerStatus.State.Terminated.Reason) } @@ -549,9 +539,6 @@ func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { if containerStatus.LastTerminationState.Terminated != nil && containerStatus.LastTerminationState.Terminated.Reason != "" { metric.AddTag(ci.ContainerLastTerminationReason, containerStatus.LastTerminationState.Terminated.Reason) - if strings.Contains(containerStatus.LastTerminationState.Terminated.Reason, "OOMKilled") { - possibleStatuses[ci.StatusTerminatedReasonOOMKilled] = 1 - } } containerKey := createContainerKeyFromMetric(metric) if containerKey != "" { @@ -566,13 +553,6 @@ func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { } p.setPrevMeasurement(ci.TypeContainer, containerKey, prevContainerMeasurement{restarts: int(containerStatus.RestartCount)}) } - - // add container containerStatus metrics - if p.includeEnhancedMetrics { - for name, val := range possibleStatuses { - metric.AddField(ci.MetricName(ci.TypeContainer, name), val) - } - } } } } @@ -613,6 +593,52 @@ func (p *PodStore) addPodConditionMetrics(metric CIMetric, pod *corev1.Pod) { } } +func (p *PodStore) addPodContainerStatusMetrics(metric CIMetric, pod *corev1.Pod) { + possibleStatuses := map[string]int{ + ci.StatusContainerRunning: 0, + ci.StatusContainerWaiting: 0, + ci.StatusContainerTerminated: 0, + } + for _, containerStatus := range pod.Status.ContainerStatuses { + switch { + case containerStatus.State.Running != nil: + possibleStatuses[ci.StatusContainerRunning]++ + case containerStatus.State.Waiting != nil: + possibleStatuses[ci.StatusContainerWaiting]++ + reason := containerStatus.State.Waiting.Reason + if reason != "" { + if val, ok := ci.WaitingReasonLookup[reason]; ok { + if _, foundStatus := possibleStatuses[val]; foundStatus { + possibleStatuses[val]++ + } else { + possibleStatuses[val] = 1 + } + } + } + case containerStatus.State.Terminated != nil: + possibleStatuses[ci.StatusContainerTerminated]++ + if containerStatus.State.Terminated.Reason != "" { + metric.AddTag(ci.ContainerStatusReason, containerStatus.State.Terminated.Reason) + } + } + + if containerStatus.LastTerminationState.Terminated != nil && containerStatus.LastTerminationState.Terminated.Reason != "" { + if strings.Contains(containerStatus.LastTerminationState.Terminated.Reason, "OOMKilled") { + if _, foundStatus := possibleStatuses[ci.StatusContainerTerminatedReasonOOMKilled]; foundStatus { + possibleStatuses[ci.StatusContainerTerminatedReasonOOMKilled]++ + } else { + possibleStatuses[ci.StatusContainerTerminatedReasonOOMKilled] = 1 + } + } + } + } + + for name, val := range possibleStatuses { + // desired prefix: pod_container_ + metric.AddField(ci.MetricName(ci.TypePod, name), val) + } +} + // It could be used to get limit/request(depend on the passed-in fn) per pod // return the sum of ResourceSetting and a bool which indicate whether all container set Resource func getResourceSettingForPod(pod *corev1.Pod, bound uint64, resource corev1.ResourceName, fn func(resource corev1.ResourceName, spec corev1.Container) (uint64, bool)) (uint64, bool) { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go index 6a5c98b32455..367ee59f8dbf 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go @@ -629,9 +629,13 @@ func TestPodStore_addStatus_adds_all_pod_conditions_as_metrics_when_unexpected(t assert.Equal(t, 1, decoratedResultMetric.GetField(PodScheduledMetricName)) assert.Equal(t, 0, decoratedResultMetric.GetField(PodUnknownMetricName)) } - func TestPodStore_addStatus_enhanced_metrics(t *testing.T) { pod := getBaseTestPodInfo() + // add another container + containerCopy := pod.Status.ContainerStatuses[0] + containerCopy.Name = "ubuntu2" + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, containerCopy) + tags := map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} fields := map[string]interface{}{ci.MetricName(ci.TypePod, ci.CPUTotal): float64(1)} podStore := getPodStore() @@ -644,21 +648,40 @@ func TestPodStore_addStatus_enhanced_metrics(t *testing.T) { val := metric.GetField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount)) assert.Nil(t, val) + // set up container defaults tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu"} metric = generateMetric(fields, tags) - podStore.addStatus(metric, pod) assert.Equal(t, "Running", metric.GetTag(ci.ContainerStatus)) val = metric.GetField(ci.ContainerRestartCount) assert.Nil(t, val) - val = metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusRunning)) + // set up the other container + tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu2"} + metric = generateMetric(fields, tags) + podStore.addStatus(metric, pod) + assert.Equal(t, "Running", metric.GetTag(ci.ContainerStatus)) + val = metric.GetField(ci.ContainerRestartCount) + assert.Nil(t, val) + + tags = map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} + metric = generateMetric(fields, tags) + + podStore.addStatus(metric, pod) + assert.Equal(t, "Running", metric.GetTag(ci.PodStatus)) + val = metric.GetField(ci.ContainerRestartCount) + assert.Nil(t, val) + val = metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerRunning)) assert.NotNil(t, val) - assert.Equal(t, 1, val) + assert.Equal(t, 2, val) pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &corev1.ContainerStateTerminated{} pod.Status.ContainerStatuses[0].LastTerminationState.Terminated = &corev1.ContainerStateTerminated{Reason: "OOMKilled"} pod.Status.ContainerStatuses[0].RestartCount = 1 + pod.Status.ContainerStatuses[1].State.Running = nil + pod.Status.ContainerStatuses[1].State.Terminated = &corev1.ContainerStateTerminated{} + pod.Status.ContainerStatuses[1].LastTerminationState.Terminated = &corev1.ContainerStateTerminated{Reason: "OOMKilled"} + pod.Status.ContainerStatuses[1].RestartCount = 1 pod.Status.Phase = "Succeeded" tags = map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} @@ -666,69 +689,84 @@ func TestPodStore_addStatus_enhanced_metrics(t *testing.T) { podStore.addStatus(metric, pod) assert.Equal(t, "Succeeded", metric.GetTag(ci.PodStatus)) - assert.Equal(t, int(1), metric.GetField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount)).(int)) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount))) + // update the container metrics + // set up container defaults tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu"} metric = generateMetric(fields, tags) + podStore.addStatus(metric, pod) + assert.Equal(t, 1, metric.GetField(ci.ContainerRestartCount)) + + // test the other container + tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu2"} + metric = generateMetric(fields, tags) + podStore.addStatus(metric, pod) + assert.Equal(t, 1, metric.GetField(ci.ContainerRestartCount)) + + tags = map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} + metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, "Terminated", metric.GetTag(ci.ContainerStatus)) - assert.Equal(t, "OOMKilled", metric.GetTag(ci.ContainerLastTerminationReason)) - assert.Equal(t, int(1), metric.GetField(ci.ContainerRestartCount).(int)) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusTerminated))) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusTerminatedReasonOOMKilled))) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerTerminated))) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerTerminatedReasonOOMKilled))) pod.Status.ContainerStatuses[0].LastTerminationState.Terminated = nil pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"} + pod.Status.ContainerStatuses[1].LastTerminationState.Terminated = nil + pod.Status.ContainerStatuses[1].State.Waiting = &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"} - tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu"} + tags = map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, "Waiting", metric.GetTag(ci.ContainerStatus)) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaiting))) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCrashLoopBackOff))) + //assert.Equal(t, "Waiting", metric.GetTag(ci.ContainerStatus)) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaiting))) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonCrashLoopBackOff))) // sparse metrics - assert.Nil(t, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonImagePullError))) - assert.Nil(t, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusTerminatedReasonOOMKilled))) - assert.Nil(t, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonStartError))) - assert.Nil(t, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCreateContainerError))) - assert.Nil(t, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCreateContainerConfigError))) + assert.Nil(t, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonImagePullError))) + assert.Nil(t, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerTerminatedReasonOOMKilled))) + assert.Nil(t, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonStartError))) + assert.Nil(t, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonCreateContainerError))) + assert.Nil(t, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonCreateContainerConfigError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "ImagePullBackOff"} + pod.Status.ContainerStatuses[1].State.Waiting = &corev1.ContainerStateWaiting{Reason: "StartError"} - tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu"} + tags = map[string]string{ci.MetricType: ci.TypePod, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, "Waiting", metric.GetTag(ci.ContainerStatus)) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaiting))) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonImagePullError))) + assert.Equal(t, "Succeeded", metric.GetTag(ci.PodStatus)) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaiting))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonImagePullError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonStartError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "ErrImagePull"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonImagePullError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonImagePullError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "InvalidImageName"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonImagePullError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonImagePullError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "CreateContainerError"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCreateContainerError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonCreateContainerError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "CreateContainerConfigError"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCreateContainerConfigError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonCreateContainerConfigError))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "StartError"} + pod.Status.ContainerStatuses[1].State.Waiting = nil metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonStartError))) + assert.Equal(t, 1, metric.GetField(ci.MetricName(ci.TypePod, ci.StatusContainerWaitingReasonStartError))) // test delta of restartCount pod.Status.ContainerStatuses[0].RestartCount = 3 @@ -736,13 +774,13 @@ func TestPodStore_addStatus_enhanced_metrics(t *testing.T) { metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, int(2), metric.GetField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount)).(int)) + assert.Equal(t, 2, metric.GetField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount))) tags = map[string]string{ci.MetricType: ci.TypeContainer, ci.K8sNamespace: "default", ci.K8sPodNameKey: "cpu-limit", ci.ContainerNamekey: "ubuntu"} metric = generateMetric(fields, tags) podStore.addStatus(metric, pod) - assert.Equal(t, int(2), metric.GetField(ci.ContainerRestartCount).(int)) + assert.Equal(t, 2, metric.GetField(ci.ContainerRestartCount)) } func TestPodStore_addStatus_without_enhanced_metrics(t *testing.T) { @@ -765,7 +803,7 @@ func TestPodStore_addStatus_without_enhanced_metrics(t *testing.T) { assert.Equal(t, "Running", metric.GetTag(ci.ContainerStatus)) val = metric.GetField(ci.ContainerRestartCount) assert.Nil(t, val) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusRunning))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerRunning))) pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &corev1.ContainerStateTerminated{} @@ -787,7 +825,7 @@ func TestPodStore_addStatus_without_enhanced_metrics(t *testing.T) { assert.Equal(t, "Terminated", metric.GetTag(ci.ContainerStatus)) assert.Equal(t, "OOMKilled", metric.GetTag(ci.ContainerLastTerminationReason)) assert.Equal(t, int(1), metric.GetField(ci.ContainerRestartCount).(int)) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusTerminated))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerTerminated))) pod.Status.ContainerStatuses[0].State.Terminated = nil pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"} @@ -797,8 +835,8 @@ func TestPodStore_addStatus_without_enhanced_metrics(t *testing.T) { podStore.addStatus(metric, pod) assert.Equal(t, "Waiting", metric.GetTag(ci.ContainerStatus)) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusWaiting))) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCrashLoopBackOff))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerWaiting))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerWaitingReasonCrashLoopBackOff))) pod.Status.ContainerStatuses[0].State.Waiting = &corev1.ContainerStateWaiting{Reason: "SomeOtherReason"} @@ -807,8 +845,8 @@ func TestPodStore_addStatus_without_enhanced_metrics(t *testing.T) { podStore.addStatus(metric, pod) assert.Equal(t, "Waiting", metric.GetTag(ci.ContainerStatus)) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusWaiting))) - assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusWaitingReasonCrashLoopBackOff))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerWaiting))) + assert.False(t, metric.HasField(ci.MetricName(ci.TypeContainer, ci.StatusContainerWaitingReasonCrashLoopBackOff))) // test delta of restartCount pod.Status.ContainerStatuses[0].RestartCount = 3 From 846033af79ed4830372aef1bce98e3414c3eb672 Mon Sep 17 00:00:00 2001 From: Mitali Salvi <44349099+mitali-salvi@users.noreply.github.com> Date: Wed, 11 Oct 2023 15:16:31 -0400 Subject: [PATCH 3/7] Filter out Prometheus scrape_ metrics if EnhancedContainer Insights is enabled (#110) --- exporter/awsemfexporter/emf_exporter.go | 5 ++ exporter/awsemfexporter/metric_translator.go | 3 + .../awsemfexporter/metric_translator_test.go | 85 ++++++++++++++++--- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 981cf03e8645..d00f5f074ee8 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -123,6 +123,11 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e if err != nil { return err } + // Drop a nil putLogEvent for EnhancedContainerInsights + if emf.config.EnhancedContainerInsights && putLogEvent == nil { + emf.config.logger.Debug("Dropping empty putLogEvents for EnhancedContainerInsights") + continue + } // Currently we only support two options for "OutputDestination". if strings.EqualFold(outputDestination, outputDestinationStdout) { if putLogEvent != nil && diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index 02fb101c5d34..a3f4e4e12f76 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -431,6 +431,9 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event, */ fieldMap["CloudWatchMetrics"] = cWMetric.measurements } + } else if len(cWMetric.measurements) < 1 && config.EnhancedContainerInsights { + // Return nil if requests does not contain metrics when EnhancedContainerInsights is enabled + return nil, nil } pleMsg, err := json.Marshal(fieldMap) diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index a72c725aa6fa..36b39c87bf31 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -481,6 +481,67 @@ func TestTranslateCWMetricToEMF(t *testing.T) { } +func TestTranslateCWMetricToEMFForEnhancedContainerInsights(t *testing.T) { + testCases := map[string]struct { + EnhancedContainerInsights bool + fields map[string]interface{} + measurements []cWMeasurement + expectedEMFLogEvent interface{} + }{ + "EnhancedContainerInsightsEnabled": { + EnhancedContainerInsights: true, + fields: map[string]interface{}{ + oTellibDimensionKey: "cloudwatch-otel", + "scrape_samples_post_metric_relabeling": "12", + "scrape_samples_scraped": "34", + "scrape_series_added": "56", + "service.instance.id": "1.2.3.4:443", + "Sources": "[\"apiserver\"]", + }, + measurements: nil, + expectedEMFLogEvent: nil, + }, + "EnhancedContainerInsightsDisabled": { + EnhancedContainerInsights: false, + fields: map[string]interface{}{ + oTellibDimensionKey: "cloudwatch-otel", + "scrape_samples_post_metric_relabeling": "12", + "scrape_samples_scraped": "34", + "scrape_series_added": "56", + "service.instance.id": "1.2.3.4:443", + "Sources": "[\"apiserver\"]", + }, + measurements: nil, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"apiserver\"],\"scrape_samples_post_metric_relabeling\":\"12\",\"scrape_samples_scraped\":\"34\",\"scrape_series_added\":\"56\",\"service.instance.id\":\"1.2.3.4:443\"}", + }, + } + + for name, tc := range testCases { + t.Run(name, func(_ *testing.T) { + config := &Config{ + // include valid json string, a non-existing key, and keys whose value are not json/string + ParseJSONEncodedAttributeValues: []string{"Sources"}, + EnhancedContainerInsights: tc.EnhancedContainerInsights, + logger: zap.NewNop(), + } + + cloudwatchMetric := &cWMetrics{ + timestampMs: int64(1596151098037), + fields: tc.fields, + measurements: tc.measurements, + } + + emfLogEvent, err := translateCWMetricToEMF(cloudwatchMetric, config) + require.NoError(t, err) + + if tc.expectedEMFLogEvent != nil { + assert.Equal(t, tc.expectedEMFLogEvent, *emfLogEvent.InputLogEvent.Message) + } + }) + } + +} + func TestTranslateGroupedMetricToCWMetric(t *testing.T) { timestamp := int64(1596151098037) namespace := "Namespace" @@ -1395,22 +1456,22 @@ func TestGroupedMetricToCWMeasurementsWithFilters(t *testing.T) { MetricNameSelectors: []string{"metric(1|3)"}, }, }, []cWMeasurement{ - { - Namespace: namespace, - Dimensions: [][]string{{}}, - Metrics: []map[string]string{ - { - "Name": "metric1", - "Unit": "Count", - }, - { - "Name": "metric3", - "Unit": "Seconds", - }, + { + Namespace: namespace, + Dimensions: [][]string{{}}, + Metrics: []map[string]string{ + { + "Name": "metric1", + "Unit": "Count", + }, + { + "Name": "metric3", + "Unit": "Seconds", }, }, }, }, + }, { "label matchers", []*MetricDeclaration{ From 5b46fae60cb3f988c5531b44daf9eec7316f7961 Mon Sep 17 00:00:00 2001 From: atshaw43 <108552302+atshaw43@users.noreply.github.com> Date: Wed, 11 Oct 2023 13:14:05 -0700 Subject: [PATCH 4/7] Adding segmnt splits (#111) --- .chloggen/aws_exporter_localrootspans.yaml | 27 + exporter/awsxrayexporter/awsxray.go | 8 +- .../internal/translator/segment.go | 237 +++++++- .../internal/translator/segment_test.go | 515 +++++++++++++++++- 4 files changed, 770 insertions(+), 17 deletions(-) create mode 100755 .chloggen/aws_exporter_localrootspans.yaml diff --git a/.chloggen/aws_exporter_localrootspans.yaml b/.chloggen/aws_exporter_localrootspans.yaml new file mode 100755 index 000000000000..49cc484a9928 --- /dev/null +++ b/.chloggen/aws_exporter_localrootspans.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsxrayexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: AWS X-Ray exporter to make local root spans a segment for internal/service spans and subsegment + segment for client/producer/consumer spans. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [102] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/awsxrayexporter/awsxray.go b/exporter/awsxrayexporter/awsxray.go index 6cd3e77be21a..5d68b99fba3b 100644 --- a/exporter/awsxrayexporter/awsxray.go +++ b/exporter/awsxrayexporter/awsxray.go @@ -105,17 +105,21 @@ func extractResourceSpans(config component.Config, logger *zap.Logger, td ptrace for j := 0; j < rspans.ScopeSpans().Len(); j++ { spans := rspans.ScopeSpans().At(j).Spans() for k := 0; k < spans.Len(); k++ { - document, localErr := translator.MakeSegmentDocumentString( + documentsForSpan, localErr := translator.MakeSegmentDocuments( spans.At(k), resource, config.(*Config).IndexedAttributes, config.(*Config).IndexAllAttributes, config.(*Config).LogGroupNames, config.(*Config).skipTimestampValidation) + if localErr != nil { logger.Debug("Error translating span.", zap.Error(localErr)) continue } - documents = append(documents, &document) + + for l := range documentsForSpan { + documents = append(documents, &documentsForSpan[l]) + } } } } diff --git a/exporter/awsxrayexporter/internal/translator/segment.go b/exporter/awsxrayexporter/internal/translator/segment.go index 6d1efab2ff65..21470e0e0dbd 100644 --- a/exporter/awsxrayexporter/internal/translator/segment.go +++ b/exporter/awsxrayexporter/internal/translator/segment.go @@ -36,8 +36,13 @@ const ( // x-ray only span attributes - https://github.com/open-telemetry/opentelemetry-java-contrib/pull/802 const ( - awsLocalService = "aws.local.service" - awsRemoteService = "aws.remote.service" + awsLocalService = "aws.local.service" + awsRemoteService = "aws.remote.service" + awsLocalOperation = "aws.local.operation" + awsRemoteOperation = "aws.remote.operation" + remoteTarget = "remoteTarget" + awsSpanKind = "aws.span.kind" + k8sRemoteNamespace = "K8s.RemoteNamespace" ) var ( @@ -60,16 +65,232 @@ const ( identifierOffset = 11 // offset of identifier within traceID ) +const ( + localRoot = "LOCAL_ROOT" +) + +var removeAnnotationsFromServiceSegment = []string{ + awsRemoteService, + awsRemoteOperation, + remoteTarget, + k8sRemoteNamespace, +} + var ( writers = newWriterPool(2048) ) -// MakeSegmentDocumentString converts an OpenTelemetry Span to an X-Ray Segment and then serialzies to JSON +// MakeSegmentDocuments converts spans to json documents +func MakeSegmentDocuments(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) ([]string, error) { + segments, err := MakeSegmentsFromSpan(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + + if err == nil { + var documents []string + + for _, v := range segments { + document, documentErr := MakeDocumentFromSegment(v) + if documentErr != nil { + return nil, documentErr + } + + documents = append(documents, document) + } + + return documents, nil + } + + return nil, err +} + +func isLocalRootSpanADependencySpan(span ptrace.Span) bool { + return span.Kind() != ptrace.SpanKindServer && + span.Kind() != ptrace.SpanKindInternal +} + +// IsLocalRoot We will move to using isRemote once the collector supports deserializing it. Until then, we will rely on aws.span.kind. +func isLocalRoot(span ptrace.Span) bool { + if myAwsSpanKind, ok := span.Attributes().Get(awsSpanKind); ok { + return localRoot == myAwsSpanKind.Str() + } + + return false +} + +func addNamespaceToSubsegmentWithRemoteService(span ptrace.Span, segment *awsxray.Segment) { + if (span.Kind() == ptrace.SpanKindClient || + span.Kind() == ptrace.SpanKindConsumer || + span.Kind() == ptrace.SpanKindProducer) && + segment.Type != nil && + segment.Namespace == nil { + if _, ok := span.Attributes().Get(awsRemoteService); ok { + segment.Namespace = awsxray.String("remote") + } + } +} + +func MakeDependencySubsegmentForLocalRootDependencySpan(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool, serviceSegmentID pcommon.SpanID) (*awsxray.Segment, error) { + var dependencySpan = ptrace.NewSpan() + span.CopyTo(dependencySpan) + + dependencySpan.SetParentSpanID(serviceSegmentID) + + dependencySubsegment, err := MakeSegment(dependencySpan, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + + if err != nil { + return nil, err + } + + // Make this a subsegment + dependencySubsegment.Type = awsxray.String("subsegment") + + if dependencySubsegment.Namespace == nil { + dependencySubsegment.Namespace = awsxray.String("remote") + } + + // Remove span links from consumer spans + if span.Kind() == ptrace.SpanKindConsumer { + dependencySubsegment.Links = nil + } + + myAwsRemoteService, _ := span.Attributes().Get(awsRemoteService) + + dependencySubsegment.Name = awsxray.String(myAwsRemoteService.Str()) + + return dependencySubsegment, err +} + +func MakeServiceSegmentForLocalRootDependencySpan(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool, serviceSegmentID pcommon.SpanID) (*awsxray.Segment, error) { + // We always create a segment for the service + var serviceSpan ptrace.Span = ptrace.NewSpan() + span.CopyTo(serviceSpan) + + // Set the span id to the one internally generated + serviceSpan.SetSpanID(serviceSegmentID) + + for _, v := range removeAnnotationsFromServiceSegment { + serviceSpan.Attributes().Remove(v) + } + + serviceSegment, err := MakeSegment(serviceSpan, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + + if err != nil { + return nil, err + } + + // Set the name + if myAwsLocalService, ok := span.Attributes().Get(awsLocalService); ok { + serviceSegment.Name = awsxray.String(myAwsLocalService.Str()) + } + + // Remove the HTTP field + serviceSegment.HTTP = nil + + // Remove AWS subsegment fields + serviceSegment.AWS.Operation = nil + serviceSegment.AWS.AccountID = nil + serviceSegment.AWS.RemoteRegion = nil + serviceSegment.AWS.RequestID = nil + serviceSegment.AWS.QueueURL = nil + serviceSegment.AWS.TableName = nil + serviceSegment.AWS.TableNames = nil + + // Delete all metadata that does not start with 'otel.resource.' + for _, metaDataEntry := range serviceSegment.Metadata { + for key := range metaDataEntry { + if !strings.HasPrefix(key, "otel.resource.") { + delete(metaDataEntry, key) + } + } + } + + // Make it a segment + serviceSegment.Type = nil + + // Remote namespace + serviceSegment.Namespace = nil + + // Remove span links from non-consumer spans + if span.Kind() != ptrace.SpanKindConsumer { + serviceSegment.Links = nil + } + + return serviceSegment, nil +} + +func MakeServiceSegmentForLocalRootSpanWithoutDependency(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) ([]*awsxray.Segment, error) { + segment, err := MakeSegment(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + + if err != nil { + return nil, err + } + + segment.Type = nil + segment.Namespace = nil + + return []*awsxray.Segment{segment}, err +} + +func MakeNonLocalRootSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) ([]*awsxray.Segment, error) { + segment, err := MakeSegment(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + + if err != nil { + return nil, err + } + + addNamespaceToSubsegmentWithRemoteService(span, segment) + + return []*awsxray.Segment{segment}, nil +} + +func MakeServiceSegmentAndDependencySubsegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) ([]*awsxray.Segment, error) { + // If it is a local root span and a dependency span, we need to make a segment and subsegment representing the local service and remote service, respectively. + var serviceSegmentID = newSegmentID() + var segments []*awsxray.Segment + + // Make Dependency Subsegment + dependencySubsegment, err := MakeDependencySubsegmentForLocalRootDependencySpan(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation, serviceSegmentID) + if err != nil { + return nil, err + } + segments = append(segments, dependencySubsegment) + + // Make Service Segment + serviceSegment, err := MakeServiceSegmentForLocalRootDependencySpan(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation, serviceSegmentID) + if err != nil { + return nil, err + } + segments = append(segments, serviceSegment) + + return segments, err +} + +// MakeSegmentsFromSpan creates one or more segments from a span +func MakeSegmentsFromSpan(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) ([]*awsxray.Segment, error) { + if !isLocalRoot(span) { + return MakeNonLocalRootSegment(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + } + + if !isLocalRootSpanADependencySpan(span) { + return MakeServiceSegmentForLocalRootSpanWithoutDependency(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + } + + return MakeServiceSegmentAndDependencySubsegment(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) +} + +// MakeSegmentDocumentString converts an OpenTelemetry Span to an X-Ray Segment and then serializes to JSON +// MakeSegmentDocumentString will be deprecated in the future func MakeSegmentDocumentString(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) (string, error) { segment, err := MakeSegment(span, resource, indexedAttrs, indexAllAttrs, logGroupNames, skipTimestampValidation) + if err != nil { return "", err } + + return MakeDocumentFromSegment(segment) +} + +// MakeDocumentFromSegment converts a segment into a JSON document +func MakeDocumentFromSegment(segment *awsxray.Segment) (string, error) { w := writers.borrow() if err := w.Encode(*segment); err != nil { return "", err @@ -122,11 +343,19 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str // X-Ray segment names are service names, unlike span names which are methods. Try to find a service name. // support x-ray specific service name attributes as segment name if it exists - if span.Kind() == ptrace.SpanKindServer || span.Kind() == ptrace.SpanKindConsumer { + if span.Kind() == ptrace.SpanKindServer { if localServiceName, ok := attributes.Get(awsLocalService); ok { name = localServiceName.Str() } } + + myAwsSpanKind, _ := span.Attributes().Get(awsSpanKind) + if span.Kind() == ptrace.SpanKindInternal && myAwsSpanKind.Str() == localRoot { + if localServiceName, ok := attributes.Get(awsLocalService); ok { + name = localServiceName.Str() + } + } + if span.Kind() == ptrace.SpanKindClient || span.Kind() == ptrace.SpanKindProducer { if remoteServiceName, ok := attributes.Get(awsRemoteService); ok { name = remoteServiceName.Str() diff --git a/exporter/awsxrayexporter/internal/translator/segment_test.go b/exporter/awsxrayexporter/internal/translator/segment_test.go index fcdfed6550a4..a9ca076c3659 100644 --- a/exporter/awsxrayexporter/internal/translator/segment_test.go +++ b/exporter/awsxrayexporter/internal/translator/segment_test.go @@ -1024,19 +1024,11 @@ func TestConsumerSpanWithAwsRemoteServiceName(t *testing.T) { spanName := "ABC.payment" parentSpanID := newSegmentID() user := "testingT" - attributes := make(map[string]interface{}) - attributes[conventions.AttributeHTTPMethod] = "POST" - attributes[conventions.AttributeHTTPScheme] = "https" - attributes[conventions.AttributeHTTPHost] = "payment.amazonaws.com" - attributes[conventions.AttributeHTTPTarget] = "/" - attributes[conventions.AttributeRPCService] = "ABC" - attributes[awsLocalService] = "ConsumerService" + attributes := getBasicAttributes() + attributes[awsRemoteService] = "ConsumerService" resource := constructDefaultResource() - span := constructConsumerSpan(parentSpanID, spanName, 0, "OK", attributes) - - segment, _ := MakeSegment(span, resource, nil, false, nil, false) - assert.Equal(t, "ConsumerService", *segment.Name) + span := constructConsumerSpan(parentSpanID, spanName, 0, "Ok", attributes) jsonStr, err := MakeSegmentDocumentString(span, resource, nil, false, nil, false) @@ -1075,6 +1067,480 @@ func TestServerSpanWithAwsLocalServiceName(t *testing.T) { assert.False(t, strings.Contains(jsonStr, "user")) } +func validateLocalRootDependencySubsegment(t *testing.T, segment *awsxray.Segment, span ptrace.Span, parentID string) { + tempTraceID := span.TraceID() + expectedTraceID := "1-" + fmt.Sprintf("%x", tempTraceID[0:4]) + "-" + fmt.Sprintf("%x", tempTraceID[4:16]) + + assert.Equal(t, "subsegment", *segment.Type) + assert.Equal(t, "myRemoteService", *segment.Name) + assert.Equal(t, span.SpanID().String(), *segment.ID) + assert.Equal(t, parentID, *segment.ParentID) + assert.Equal(t, expectedTraceID, *segment.TraceID) + assert.NotNil(t, segment.HTTP) + assert.Equal(t, "POST", *segment.HTTP.Request.Method) + assert.Equal(t, 2, len(segment.Annotations)) + assert.Nil(t, segment.Annotations[awsRemoteService]) + assert.Nil(t, segment.Annotations[remoteTarget]) + assert.Equal(t, "myAnnotationValue", segment.Annotations["myAnnotationKey"]) + + assert.Equal(t, 8, len(segment.Metadata["default"])) + assert.Equal(t, "receive", segment.Metadata["default"][conventions.AttributeMessagingOperation]) + assert.Equal(t, "LOCAL_ROOT", segment.Metadata["default"][awsSpanKind]) + assert.Equal(t, "myRemoteOperation", segment.Metadata["default"][awsRemoteOperation]) + assert.Equal(t, "myTarget", segment.Metadata["default"][remoteTarget]) + assert.Equal(t, "k8sRemoteNamespace", segment.Metadata["default"][k8sRemoteNamespace]) + assert.Equal(t, "myLocalService", segment.Metadata["default"][awsLocalService]) + assert.Equal(t, "awsLocalOperation", segment.Metadata["default"][awsLocalOperation]) + assert.Equal(t, "service.name=myTest", segment.Metadata["default"]["otel.resource.attributes"]) + + assert.Equal(t, "MySDK", *segment.AWS.XRay.SDK) + assert.Equal(t, "1.20.0", *segment.AWS.XRay.SDKVersion) + assert.Equal(t, true, *segment.AWS.XRay.AutoInstrumentation) + + assert.Equal(t, "UpdateItem", *segment.AWS.Operation) + assert.Equal(t, "AWSAccountAttribute", *segment.AWS.AccountID) + assert.Equal(t, "AWSRegionAttribute", *segment.AWS.RemoteRegion) + assert.Equal(t, "AWSRequestIDAttribute", *segment.AWS.RequestID) + assert.Equal(t, "AWSQueueURLAttribute", *segment.AWS.QueueURL) + assert.Equal(t, "TableName", *segment.AWS.TableName) + + assert.Equal(t, "remote", *segment.Namespace) +} + +func validateLocalRootServiceSegment(t *testing.T, segment *awsxray.Segment, span ptrace.Span) { + tempTraceID := span.TraceID() + expectedTraceID := "1-" + fmt.Sprintf("%x", tempTraceID[0:4]) + "-" + fmt.Sprintf("%x", tempTraceID[4:16]) + + assert.Nil(t, segment.Type) + assert.Equal(t, "myLocalService", *segment.Name) + assert.Equal(t, expectedTraceID, *segment.TraceID) + assert.Nil(t, segment.HTTP) + assert.Equal(t, 1, len(segment.Annotations)) + assert.Equal(t, "myAnnotationValue", segment.Annotations["myAnnotationKey"]) + assert.Equal(t, 1, len(segment.Metadata["default"])) + assert.Equal(t, "service.name=myTest", segment.Metadata["default"]["otel.resource.attributes"]) + assert.Equal(t, "MySDK", *segment.AWS.XRay.SDK) + assert.Equal(t, "1.20.0", *segment.AWS.XRay.SDKVersion) + assert.Equal(t, true, *segment.AWS.XRay.AutoInstrumentation) + assert.Nil(t, segment.AWS.Operation) + assert.Nil(t, segment.AWS.AccountID) + assert.Nil(t, segment.AWS.RemoteRegion) + assert.Nil(t, segment.AWS.RequestID) + assert.Nil(t, segment.AWS.QueueURL) + assert.Nil(t, segment.AWS.TableName) + assert.Nil(t, segment.Namespace) + + assert.Nil(t, segment.Namespace) +} + +func getBasicAttributes() map[string]interface{} { + attributes := make(map[string]interface{}) + + attributes[conventions.AttributeHTTPMethod] = "POST" + attributes[conventions.AttributeMessagingOperation] = "receive" + + attributes["otel.resource.attributes"] = "service.name=myTest" + + attributes[awsSpanKind] = "LOCAL_ROOT" + attributes[awsRemoteService] = "myRemoteService" + attributes[awsRemoteOperation] = "myRemoteOperation" + attributes[remoteTarget] = "myTarget" + attributes[k8sRemoteNamespace] = "k8sRemoteNamespace" + attributes[awsLocalService] = "myLocalService" + attributes[awsLocalOperation] = "awsLocalOperation" + + attributes["myAnnotationKey"] = "myAnnotationValue" + + attributes[awsxray.AWSOperationAttribute] = "UpdateItem" + attributes[awsxray.AWSAccountAttribute] = "AWSAccountAttribute" + attributes[awsxray.AWSRegionAttribute] = "AWSRegionAttribute" + attributes[awsxray.AWSRequestIDAttribute] = "AWSRequestIDAttribute" + attributes[awsxray.AWSQueueURLAttribute] = "AWSQueueURLAttribute" + attributes[awsxray.AWSTableNameAttribute] = "TableName" + + return attributes +} + +func getBasicResource() pcommon.Resource { + resource := constructDefaultResource() + + resource.Attributes().PutStr(conventions.AttributeTelemetrySDKName, "MySDK") + resource.Attributes().PutStr(conventions.AttributeTelemetrySDKVersion, "1.20.0") + resource.Attributes().PutStr(conventions.AttributeTelemetryAutoVersion, "1.2.3") + + return resource +} + +func addSpanLink(span ptrace.Span) { + spanLink := span.Links().AppendEmpty() + spanLink.SetTraceID(newTraceID()) + spanLink.SetSpanID(newSegmentID()) +} + +func TestLocalRootConsumer(t *testing.T) { + spanName := "destination operation" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + + span := constructConsumerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 2, len(segments)) + assert.Nil(t, err) + + validateLocalRootDependencySubsegment(t, segments[0], span, *segments[1].ID) + assert.Nil(t, segments[0].Links) + + validateLocalRootServiceSegment(t, segments[1], span) + assert.Equal(t, 1, len(segments[1].Links)) + + // Checks these values are the same for both + assert.Equal(t, segments[0].StartTime, segments[1].StartTime) + assert.Equal(t, segments[0].EndTime, segments[1].EndTime) +} + +func TestNonLocalRootConsumerProcess(t *testing.T) { + spanName := "destination operation" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + delete(attributes, awsRemoteService) + delete(attributes, awsRemoteOperation) + attributes[awsSpanKind] = "Consumer" + + span := constructConsumerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + tempTraceID := span.TraceID() + expectedTraceID := "1-" + fmt.Sprintf("%x", tempTraceID[0:4]) + "-" + fmt.Sprintf("%x", tempTraceID[4:16]) + + // Validate segment 1 (dependency subsegment) + assert.Equal(t, "subsegment", *segments[0].Type) + assert.Equal(t, "destination operation", *segments[0].Name) + assert.NotEqual(t, parentSpanID.String(), *segments[0].ID) + assert.Equal(t, span.SpanID().String(), *segments[0].ID) + assert.Equal(t, 1, len(segments[0].Links)) + assert.Equal(t, expectedTraceID, *segments[0].TraceID) + assert.NotNil(t, segments[0].HTTP) + assert.Equal(t, "POST", *segments[0].HTTP.Request.Method) + assert.Equal(t, 1, len(segments[0].Annotations)) + assert.Equal(t, "myAnnotationValue", segments[0].Annotations["myAnnotationKey"]) + assert.Equal(t, 7, len(segments[0].Metadata["default"])) + assert.Equal(t, "Consumer", segments[0].Metadata["default"][awsSpanKind]) + assert.Equal(t, "myLocalService", segments[0].Metadata["default"][awsLocalService]) + assert.Equal(t, "receive", segments[0].Metadata["default"][conventions.AttributeMessagingOperation]) + assert.Equal(t, "service.name=myTest", segments[0].Metadata["default"]["otel.resource.attributes"]) + assert.Equal(t, "MySDK", *segments[0].AWS.XRay.SDK) + assert.Equal(t, "1.20.0", *segments[0].AWS.XRay.SDKVersion) + assert.Equal(t, true, *segments[0].AWS.XRay.AutoInstrumentation) + assert.Equal(t, "UpdateItem", *segments[0].AWS.Operation) + assert.Nil(t, segments[0].Namespace) +} + +func TestLocalRootConsumerAWSNamespace(t *testing.T) { + spanName := "destination receive" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[conventions.AttributeRPCSystem] = "aws-api" + + span := constructConsumerSpan(parentSpanID, spanName, 200, "OK", attributes) + + spanLink := span.Links().AppendEmpty() + spanLink.SetTraceID(newTraceID()) + spanLink.SetSpanID(newSegmentID()) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 2, len(segments)) + assert.Nil(t, err) + + // Ensure that AWS namespace is not overwritten to remote + assert.Equal(t, "aws", *segments[0].Namespace) +} + +func TestLocalRootClient(t *testing.T) { + spanName := "SQS Get" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + + span := constructClientSpan(parentSpanID, spanName, 200, "OK", attributes) + + spanLink := span.Links().AppendEmpty() + spanLink.SetTraceID(newTraceID()) + spanLink.SetSpanID(newSegmentID()) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 2, len(segments)) + assert.Nil(t, err) + + validateLocalRootDependencySubsegment(t, segments[0], span, *segments[1].ID) + assert.Equal(t, 1, len(segments[0].Links)) + + validateLocalRootServiceSegment(t, segments[1], span) + assert.Nil(t, segments[1].Links) + + // Checks these values are the same for both + assert.Equal(t, segments[0].StartTime, segments[1].StartTime) + assert.Equal(t, segments[0].EndTime, segments[1].EndTime) +} + +func TestLocalRootProducer(t *testing.T) { + spanName := "destination operation" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + + span := constructProducerSpan(parentSpanID, spanName, 200, "Ok", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 2, len(segments)) + assert.Nil(t, err) + + validateLocalRootDependencySubsegment(t, segments[0], span, *segments[1].ID) + assert.Equal(t, 1, len(segments[0].Links)) + + validateLocalRootServiceSegment(t, segments[1], span) + assert.Nil(t, segments[1].Links) + + // Checks these values are the same for both + assert.Equal(t, segments[0].StartTime, segments[1].StartTime) + assert.Equal(t, segments[0].EndTime, segments[1].EndTime) +} + +func validateLocalRootWithoutDependency(t *testing.T, segment *awsxray.Segment, span ptrace.Span) { + tempTraceID := span.TraceID() + expectedTraceID := "1-" + fmt.Sprintf("%x", tempTraceID[0:4]) + "-" + fmt.Sprintf("%x", tempTraceID[4:16]) + + // Validate segment + assert.Nil(t, segment.Type) + assert.Equal(t, "myLocalService", *segment.Name) + assert.Equal(t, span.ParentSpanID().String(), *segment.ParentID) + assert.Equal(t, 1, len(segment.Links)) + assert.Equal(t, expectedTraceID, *segment.TraceID) + assert.Equal(t, "POST", *segment.HTTP.Request.Method) + assert.Equal(t, 2, len(segment.Annotations)) + assert.Equal(t, "myRemoteService", segment.Annotations["aws_remote_service"]) + assert.Equal(t, "myAnnotationValue", segment.Annotations["myAnnotationKey"]) + + var numberOfMetadataKeys = 8 + + if span.Kind() == ptrace.SpanKindServer { + numberOfMetadataKeys = 30 + } + + assert.Equal(t, numberOfMetadataKeys, len(segment.Metadata["default"])) + assert.Equal(t, "receive", segment.Metadata["default"][conventions.AttributeMessagingOperation]) + assert.Equal(t, "LOCAL_ROOT", segment.Metadata["default"][awsSpanKind]) + assert.Equal(t, "myRemoteOperation", segment.Metadata["default"][awsRemoteOperation]) + assert.Equal(t, "myTarget", segment.Metadata["default"][remoteTarget]) + assert.Equal(t, "k8sRemoteNamespace", segment.Metadata["default"][k8sRemoteNamespace]) + assert.Equal(t, "myLocalService", segment.Metadata["default"][awsLocalService]) + assert.Equal(t, "awsLocalOperation", segment.Metadata["default"][awsLocalOperation]) + assert.Equal(t, "service.name=myTest", segment.Metadata["default"]["otel.resource.attributes"]) + + assert.Equal(t, "service.name=myTest", segment.Metadata["default"]["otel.resource.attributes"]) + assert.Equal(t, "MySDK", *segment.AWS.XRay.SDK) + assert.Equal(t, "1.20.0", *segment.AWS.XRay.SDKVersion) + assert.Equal(t, true, *segment.AWS.XRay.AutoInstrumentation) + + assert.Equal(t, "UpdateItem", *segment.AWS.Operation) + assert.Equal(t, "AWSAccountAttribute", *segment.AWS.AccountID) + assert.Equal(t, "AWSRegionAttribute", *segment.AWS.RemoteRegion) + assert.Equal(t, "AWSRequestIDAttribute", *segment.AWS.RequestID) + assert.Equal(t, "AWSQueueURLAttribute", *segment.AWS.QueueURL) + assert.Equal(t, "TableName", *segment.AWS.TableName) + + assert.Nil(t, segment.Namespace) +} + +func TestLocalRootServer(t *testing.T) { + spanName := "MyService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + + span := constructServerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + validateLocalRootWithoutDependency(t, segments[0], span) +} + +func TestLocalRootInternal(t *testing.T) { + spanName := "MyInternalService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + + span := constructInternalSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + validateLocalRootWithoutDependency(t, segments[0], span) +} + +func TestNotLocalRootInternal(t *testing.T) { + spanName := "MyService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[awsSpanKind] = "Internal" + + span := constructInternalSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + // Validate segment + assert.Equal(t, "subsegment", *segments[0].Type) + assert.Nil(t, segments[0].Namespace) + assert.Equal(t, "MyService", *segments[0].Name) +} + +func TestNotLocalRootConsumer(t *testing.T) { + spanName := "MyService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[awsSpanKind] = "Consumer" + + span := constructConsumerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + // Validate segment + assert.Equal(t, "subsegment", *segments[0].Type) + assert.Equal(t, "remote", *segments[0].Namespace) + assert.Equal(t, "MyService", *segments[0].Name) +} + +func TestNotLocalRootClient(t *testing.T) { + spanName := "MyService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[awsSpanKind] = "Client" + + span := constructClientSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + // Validate segment + assert.Equal(t, "subsegment", *segments[0].Type) + assert.Equal(t, "remote", *segments[0].Namespace) + assert.Equal(t, "myRemoteService", *segments[0].Name) +} + +func TestNotLocalRootProducer(t *testing.T) { + spanName := "MyService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[awsSpanKind] = "Producer" + + span := constructProducerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + // Validate segment + assert.Equal(t, "subsegment", *segments[0].Type) + assert.Equal(t, "remote", *segments[0].Namespace) + assert.Equal(t, "myRemoteService", *segments[0].Name) +} + +func TestNotLocalRootServer(t *testing.T) { + spanName := "MyInternalService" + resource := getBasicResource() + parentSpanID := newSegmentID() + + attributes := getBasicAttributes() + attributes[awsSpanKind] = "Server" + delete(attributes, awsRemoteService) + delete(attributes, awsRemoteOperation) + + span := constructServerSpan(parentSpanID, spanName, 200, "OK", attributes) + + addSpanLink(span) + + segments, err := MakeSegmentsFromSpan(span, resource, []string{awsRemoteService, "myAnnotationKey"}, false, nil, false) + + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Nil(t, err) + + // Validate segment + assert.Nil(t, segments[0].Type) + assert.Nil(t, segments[0].Namespace) + assert.Equal(t, "myLocalService", *segments[0].Name) +} + func constructClientSpan(parentSpanID pcommon.SpanID, name string, code ptrace.StatusCode, message string, attributes map[string]interface{}) ptrace.Span { var ( traceID = newTraceID() @@ -1129,6 +1595,33 @@ func constructServerSpan(parentSpanID pcommon.SpanID, name string, code ptrace.S return span } +func constructInternalSpan(parentSpanID pcommon.SpanID, name string, code ptrace.StatusCode, message string, attributes map[string]interface{}) ptrace.Span { + var ( + traceID = newTraceID() + spanID = newSegmentID() + endTime = time.Now() + startTime = endTime.Add(-215 * time.Millisecond) + spanAttributes = constructSpanAttributes(attributes) + ) + + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetParentSpanID(parentSpanID) + span.SetName(name) + span.SetKind(ptrace.SpanKindInternal) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + status := ptrace.NewStatus() + status.SetCode(code) + status.SetMessage(message) + status.CopyTo(span.Status()) + + spanAttributes.CopyTo(span.Attributes()) + return span +} + func constructConsumerSpan(parentSpanID pcommon.SpanID, name string, code ptrace.StatusCode, message string, attributes map[string]interface{}) ptrace.Span { var ( traceID = newTraceID() From ee847deac28998226ec0b542268aee57ebe634cd Mon Sep 17 00:00:00 2001 From: Thomas Pierce Date: Thu, 12 Oct 2023 09:48:47 -0700 Subject: [PATCH 5/7] Update segment conversion logic (#115) In this commit, we are fixing a couple small bugs missed in the previous review. First, we are adding a nil-guard when settting dependencySubsegment.Name, to avoid the (unlikely, but possible) scenario where local root dependency spans are created without awsRemoteService. Second, we are updating the common logic for setting segment.Name, which previously only looked at CLIENT/PRODUCER spans, but now needs to look at CONSUMER spans. --- exporter/awsxrayexporter/internal/translator/segment.go | 8 ++++---- .../awsxrayexporter/internal/translator/segment_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/awsxrayexporter/internal/translator/segment.go b/exporter/awsxrayexporter/internal/translator/segment.go index 21470e0e0dbd..682b83444979 100644 --- a/exporter/awsxrayexporter/internal/translator/segment.go +++ b/exporter/awsxrayexporter/internal/translator/segment.go @@ -152,9 +152,9 @@ func MakeDependencySubsegmentForLocalRootDependencySpan(span ptrace.Span, resour dependencySubsegment.Links = nil } - myAwsRemoteService, _ := span.Attributes().Get(awsRemoteService) - - dependencySubsegment.Name = awsxray.String(myAwsRemoteService.Str()) + if myAwsRemoteService, ok := span.Attributes().Get(awsRemoteService); ok { + dependencySubsegment.Name = awsxray.String(myAwsRemoteService.Str()) + } return dependencySubsegment, err } @@ -356,7 +356,7 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str } } - if span.Kind() == ptrace.SpanKindClient || span.Kind() == ptrace.SpanKindProducer { + if span.Kind() == ptrace.SpanKindClient || span.Kind() == ptrace.SpanKindProducer || span.Kind() == ptrace.SpanKindConsumer { if remoteServiceName, ok := attributes.Get(awsRemoteService); ok { name = remoteServiceName.Str() } diff --git a/exporter/awsxrayexporter/internal/translator/segment_test.go b/exporter/awsxrayexporter/internal/translator/segment_test.go index a9ca076c3659..4c5e5b470951 100644 --- a/exporter/awsxrayexporter/internal/translator/segment_test.go +++ b/exporter/awsxrayexporter/internal/translator/segment_test.go @@ -1464,7 +1464,7 @@ func TestNotLocalRootConsumer(t *testing.T) { // Validate segment assert.Equal(t, "subsegment", *segments[0].Type) assert.Equal(t, "remote", *segments[0].Namespace) - assert.Equal(t, "MyService", *segments[0].Name) + assert.Equal(t, "myRemoteService", *segments[0].Name) } func TestNotLocalRootClient(t *testing.T) { From 20c10bac81f6c39f948dea3662da38ac09f0e77a Mon Sep 17 00:00:00 2001 From: John Knollmeyer Date: Thu, 12 Oct 2023 11:59:19 -0700 Subject: [PATCH 6/7] [exporter/awsxray] Add aws sdk http error events to x-ray subsegment and strip prefix AWS.SDK. from aws remote service name (#113) * add aws sdk http error events to x-ray subsegment * Pull Request review changes for #27232 - Define "aws-api" as a const - Rename some variables in cause.go to make the intended behavior a little more clear - PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/27232 * Change the time used for X-Ray exception events to unix epoch time - timestamp.String() generates a string like "2023-10-10 01:58:24.675761 +0000 UTC", which is not a standardized format - Under the hood, it is just calling AsTime().String() https://github.com/open-telemetry/opentelemetry-collector/blob/pdata/v0.66.0/pdata/pcommon/timestamp.go#L36 - Time.String() has the warning "The returned string is meant for debugging; for a stable serialized representation, use t.MarshalText, t.MarshalBinary, or t.Format with an explicit format string." https://pkg.go.dev/time#Time.String * Change the time used for X-Ray exception events to microsecond precision - This matches X-Ray guidance for timestamps - "Microsecond reoslution is recommended when available" https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html#api-segmentdocuments-fields --------- Co-authored-by: Ping Xiang --- .chloggen/add-aws-http-error-event.yaml | 27 ++++++++++++++ .../internal/translator/cause.go | 35 +++++++++++++++++-- .../internal/translator/cause_test.go | 33 +++++++++++++++++ .../internal/translator/segment.go | 20 ++++++++--- .../internal/translator/segment_test.go | 28 +++++++++++++++ 5 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 .chloggen/add-aws-http-error-event.yaml diff --git a/.chloggen/add-aws-http-error-event.yaml b/.chloggen/add-aws-http-error-event.yaml new file mode 100644 index 000000000000..a92317bec3e3 --- /dev/null +++ b/.chloggen/add-aws-http-error-event.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsxrayexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Convert individual HTTP error events into exceptions within subsegments for AWS SDK spans and strip AWS.SDK prefix from remote aws service name + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27232] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/awsxrayexporter/internal/translator/cause.go b/exporter/awsxrayexporter/internal/translator/cause.go index b5d477a0e282..3f26c84d7b75 100644 --- a/exporter/awsxrayexporter/internal/translator/cause.go +++ b/exporter/awsxrayexporter/internal/translator/cause.go @@ -22,6 +22,10 @@ import ( // ExceptionEventName the name of the exception event. // TODO: Remove this when collector defines this semantic convention. const ExceptionEventName = "exception" +const AwsIndividualHTTPEventName = "HTTP request failure" +const AwsIndividualHTTPErrorEventType = "aws.http.error.event" +const AwsIndividualHTTPErrorCodeAttr = "http.response.status_code" +const AwsIndividualHTTPErrorMsgAttr = "aws.http.error_message" func makeCause(span ptrace.Span, attributes map[string]pcommon.Value, resource pcommon.Resource) (isError, isFault, isThrottle bool, filtered map[string]pcommon.Value, cause *awsxray.CauseData) { @@ -34,14 +38,21 @@ func makeCause(span ptrace.Span, attributes map[string]pcommon.Value, resource p errorKind string ) - hasExceptions := false + isAwsSdkSpan := isAwsSdkSpan(span) + hasExceptionEvents := false + hasAwsIndividualHTTPError := false for i := 0; i < span.Events().Len(); i++ { event := span.Events().At(i) if event.Name() == ExceptionEventName { - hasExceptions = true + hasExceptionEvents = true + break + } + if isAwsSdkSpan && event.Name() == AwsIndividualHTTPEventName { + hasAwsIndividualHTTPError = true break } } + hasExceptions := hasExceptionEvents || hasAwsIndividualHTTPError switch { case hasExceptions: @@ -76,6 +87,26 @@ func makeCause(span ptrace.Span, attributes map[string]pcommon.Value, resource p parsed := parseException(exceptionType, message, stacktrace, isRemote, language) exceptions = append(exceptions, parsed...) + } else if isAwsSdkSpan && event.Name() == AwsIndividualHTTPEventName { + errorCode, ok1 := event.Attributes().Get(AwsIndividualHTTPErrorCodeAttr) + errorMessage, ok2 := event.Attributes().Get(AwsIndividualHTTPErrorMsgAttr) + if ok1 && ok2 { + eventEpochTime := event.Timestamp().AsTime().UnixMicro() + strs := []string{ + errorCode.AsString(), + strconv.FormatFloat(float64(eventEpochTime)/1_000_000, 'f', 6, 64), + errorMessage.Str(), + } + message = strings.Join(strs, "@") + segmentID := newSegmentID() + exception := awsxray.Exception{ + ID: aws.String(hex.EncodeToString(segmentID[:])), + Type: aws.String(AwsIndividualHTTPErrorEventType), + Remote: aws.Bool(true), + Message: aws.String(message), + } + exceptions = append(exceptions, exception) + } } } cause = &awsxray.CauseData{ diff --git a/exporter/awsxrayexporter/internal/translator/cause_test.go b/exporter/awsxrayexporter/internal/translator/cause_test.go index 3a6466f2028f..0d9d4388ef7f 100644 --- a/exporter/awsxrayexporter/internal/translator/cause_test.go +++ b/exporter/awsxrayexporter/internal/translator/cause_test.go @@ -59,6 +59,39 @@ Caused by: java.lang.IllegalArgumentException: bad argument`) assert.Empty(t, cause.Exceptions[2].Message) } +func TestMakeCauseAwsSdkSpan(t *testing.T) { + errorMsg := "this is a test" + attributeMap := make(map[string]interface{}) + attributeMap[conventions.AttributeRPCSystem] = "aws-api" + span := constructExceptionServerSpan(attributeMap, ptrace.StatusCodeError) + span.Status().SetMessage(errorMsg) + + event1 := span.Events().AppendEmpty() + event1.SetName(AwsIndividualHTTPEventName) + event1.Attributes().PutStr(AwsIndividualHTTPErrorCodeAttr, "503") + event1.Attributes().PutStr(AwsIndividualHTTPErrorMsgAttr, "service is temporarily unavailable") + timestamp := pcommon.NewTimestampFromTime(time.UnixMicro(1696954761000001)) + event1.SetTimestamp(timestamp) + + res := pcommon.NewResource() + isError, isFault, isThrottle, _, cause := makeCause(span, nil, res) + + assert.False(t, isError) + assert.True(t, isFault) + assert.False(t, isThrottle) + assert.NotNil(t, cause) + + assert.Equal(t, 1, len(cause.CauseObject.Exceptions)) + exception := cause.CauseObject.Exceptions[0] + assert.Equal(t, AwsIndividualHTTPErrorEventType, *exception.Type) + assert.True(t, *exception.Remote) + + messageParts := strings.SplitN(*exception.Message, "@", 3) + assert.Equal(t, "503", messageParts[0]) + assert.Equal(t, "1696954761.000001", messageParts[1]) + assert.Equal(t, "service is temporarily unavailable", messageParts[2]) +} + func TestCauseExceptionWithoutError(t *testing.T) { var nonErrorStatusCodes = []ptrace.StatusCode{ptrace.StatusCodeUnset, ptrace.StatusCodeOk} diff --git a/exporter/awsxrayexporter/internal/translator/segment.go b/exporter/awsxrayexporter/internal/translator/segment.go index 682b83444979..efda3ed22f2e 100644 --- a/exporter/awsxrayexporter/internal/translator/segment.go +++ b/exporter/awsxrayexporter/internal/translator/segment.go @@ -58,6 +58,8 @@ const ( defaultSegmentName = "span" // maxSegmentNameLength the maximum length of a Segment name maxSegmentNameLength = 200 + // rpc.system value for AWS service remotes + awsAPIRPCSystem = "aws-api" ) const ( @@ -300,6 +302,14 @@ func MakeDocumentFromSegment(segment *awsxray.Segment) (string, error) { return jsonStr, nil } +func isAwsSdkSpan(span ptrace.Span) bool { + attributes := span.Attributes() + if rpcSystem, ok := attributes.Get(conventions.AttributeRPCSystem); ok { + return rpcSystem.Str() == awsAPIRPCSystem + } + return false +} + // MakeSegment converts an OpenTelemetry Span to an X-Ray Segment func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []string, indexAllAttrs bool, logGroupNames []string, skipTimestampValidation bool) (*awsxray.Segment, error) { var segmentType string @@ -359,6 +369,10 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str if span.Kind() == ptrace.SpanKindClient || span.Kind() == ptrace.SpanKindProducer || span.Kind() == ptrace.SpanKindConsumer { if remoteServiceName, ok := attributes.Get(awsRemoteService); ok { name = remoteServiceName.Str() + // only strip the prefix for AWS spans + if isAwsSdkSpan(span) && strings.HasPrefix(name, "AWS.SDK.") { + name = strings.TrimPrefix(name, "AWS.SDK.") + } } } @@ -371,10 +385,8 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str } if namespace == "" { - if rpcSystem, ok := attributes.Get(conventions.AttributeRPCSystem); ok { - if rpcSystem.Str() == "aws-api" { - namespace = conventions.AttributeCloudProviderAWS - } + if isAwsSdkSpan(span) { + namespace = conventions.AttributeCloudProviderAWS } } diff --git a/exporter/awsxrayexporter/internal/translator/segment_test.go b/exporter/awsxrayexporter/internal/translator/segment_test.go index 4c5e5b470951..dd5978c61309 100644 --- a/exporter/awsxrayexporter/internal/translator/segment_test.go +++ b/exporter/awsxrayexporter/internal/translator/segment_test.go @@ -992,6 +992,34 @@ func TestClientSpanWithAwsRemoteServiceName(t *testing.T) { assert.False(t, strings.Contains(jsonStr, "user")) } +func TestAwsSdkSpanWithAwsRemoteServiceName(t *testing.T) { + spanName := "DynamoDB.PutItem" + parentSpanID := newSegmentID() + user := "testingT" + attributes := make(map[string]interface{}) + attributes[conventions.AttributeRPCSystem] = "aws-api" + attributes[conventions.AttributeHTTPMethod] = "POST" + attributes[conventions.AttributeHTTPScheme] = "https" + attributes[conventions.AttributeRPCService] = "DynamoDb" + attributes[awsRemoteService] = "AWS.SDK.DynamoDb" + + resource := constructDefaultResource() + span := constructClientSpan(parentSpanID, spanName, 0, "OK", attributes) + + segment, _ := MakeSegment(span, resource, nil, false, nil, false) + assert.Equal(t, "DynamoDb", *segment.Name) + assert.Equal(t, "subsegment", *segment.Type) + + jsonStr, err := MakeSegmentDocumentString(span, resource, nil, false, nil, false) + + assert.NotNil(t, jsonStr) + assert.Nil(t, err) + assert.True(t, strings.Contains(jsonStr, "DynamoDb")) + assert.False(t, strings.Contains(jsonStr, "DynamoDb.PutItem")) + assert.False(t, strings.Contains(jsonStr, user)) + assert.False(t, strings.Contains(jsonStr, "user")) +} + func TestProducerSpanWithAwsRemoteServiceName(t *testing.T) { spanName := "ABC.payment" parentSpanID := newSegmentID() From d1a2dc44f2f34893710a29fbdb554840652309ea Mon Sep 17 00:00:00 2001 From: Vastin Date: Thu, 12 Oct 2023 15:52:29 -0500 Subject: [PATCH 7/7] Put metrics fields to bottom of EMF log entry. (#108) **Tracking Issues:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26697 --- exporter/awsemfexporter/metric_translator.go | 25 +++++++++++++++++++ .../awsemfexporter/metric_translator_test.go | 6 ++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index a3f4e4e12f76..ea795bd2a3ef 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -436,11 +436,36 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event, return nil, nil } + // remove metrics from fieldMap + metricsMap := make(map[string]interface{}) + for _, measurement := range cWMetric.measurements { + for _, metric := range measurement.Metrics { + metricName, exist := metric["Name"] + if exist { + v, ok := fieldMap[metricName] + if ok { + metricsMap[metricName] = v + delete(fieldMap, metricName) + } + } + } + } + pleMsg, err := json.Marshal(fieldMap) if err != nil { return nil, err } + // append metrics json to pleMsg + if len(metricsMap) > 0 { + metricsMsg, err := json.Marshal(metricsMap) + if err != nil { + return nil, err + } + metricsMsg[0] = ',' + pleMsg = append(pleMsg[:len(pleMsg)-1], metricsMsg...) + } + metricCreationTime := cWMetric.timestampMs logEvent := cwlogs.NewEvent( metricCreationTime, diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 36b39c87bf31..5e5b7b3997be 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -405,7 +405,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { }}, }}, disableMetricExtraction: false, - expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Version\":\"1\",\"_aws\":{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"Timestamp\":1596151098037},\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Version\":\"1\",\"_aws\":{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"Timestamp\":1596151098037},\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanName\":\"test\",\"spanCounter\":0}", }, "WithMeasurementAndEMFV0": { emfVersion: "0", @@ -418,7 +418,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { }}, }}, disableMetricExtraction: false, - expectedEMFLogEvent: "{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":\"1596151098037\",\"Version\":\"0\",\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + expectedEMFLogEvent: "{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":\"1596151098037\",\"Version\":\"0\",\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanName\":\"test\",\"spanCounter\":0}", }, "WithNoMeasurement": { emfVersion: "1", @@ -443,7 +443,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { }}, }}, disableMetricExtraction: true, - expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanName\":\"test\",\"spanCounter\":0}", }, }