From dcde5855770d4ad8ea796e77daaf52ba5e0baf2c Mon Sep 17 00:00:00 2001 From: jjllee Date: Tue, 22 Oct 2024 12:45:01 -0700 Subject: [PATCH] Add logic to get cluster name from attributes populated by resourcedetectionprocessor --- .../config/config.go | 3 -- .../config/config_test.go | 8 ++--- .../config/resolvers.go | 6 ++++ .../internal/attributes/attributes.go | 7 +++-- .../internal/resolver/attributesresolver.go | 3 ++ .../internal/resolver/kubernetes.go | 22 +++++++++++--- .../internal/resolver/kubernetes_test.go | 29 +++++++++++++++++-- 7 files changed, 62 insertions(+), 16 deletions(-) diff --git a/processor/awsapplicationsignalsprocessor/config/config.go b/processor/awsapplicationsignalsprocessor/config/config.go index a853f5ef3128..e88872d49f5c 100644 --- a/processor/awsapplicationsignalsprocessor/config/config.go +++ b/processor/awsapplicationsignalsprocessor/config/config.go @@ -55,9 +55,6 @@ func (cfg *Config) Validate() error { for _, resolver := range cfg.Resolvers { switch resolver.Platform { case PlatformEKS: - if resolver.Name == "" { - return errors.New("name must not be empty for eks resolver") - } case PlatformK8s: if resolver.Name == "" { return errors.New("name must not be empty for k8s resolver") diff --git a/processor/awsapplicationsignalsprocessor/config/config_test.go b/processor/awsapplicationsignalsprocessor/config/config_test.go index 75762e214582..484254f8eaf1 100644 --- a/processor/awsapplicationsignalsprocessor/config/config_test.go +++ b/processor/awsapplicationsignalsprocessor/config/config_test.go @@ -18,6 +18,10 @@ func TestValidatePassed(t *testing.T) { "testEKS", NewEKSResolver("test"), }, + { + "testEKSWithoutName", + NewEKSResolverWithoutName(), + }, { "testK8S", NewK8sResolver("test"), @@ -60,10 +64,6 @@ func TestValidateFailedOnEmptyResolverName(t *testing.T) { name string resolver Resolver }{ - { - "testEKS", - NewEKSResolver(""), - }, { "testK8S", NewK8sResolver(""), diff --git a/processor/awsapplicationsignalsprocessor/config/resolvers.go b/processor/awsapplicationsignalsprocessor/config/resolvers.go index 3c66b5d7adeb..f9382a676529 100644 --- a/processor/awsapplicationsignalsprocessor/config/resolvers.go +++ b/processor/awsapplicationsignalsprocessor/config/resolvers.go @@ -28,6 +28,12 @@ func NewEKSResolver(name string) Resolver { } } +func NewEKSResolverWithoutName() Resolver { + return Resolver{ + Platform: PlatformEKS, + } +} + func NewK8sResolver(name string) Resolver { return Resolver{ Name: name, diff --git a/processor/awsapplicationsignalsprocessor/internal/attributes/attributes.go b/processor/awsapplicationsignalsprocessor/internal/attributes/attributes.go index e3af62ce6281..d739ddcae280 100644 --- a/processor/awsapplicationsignalsprocessor/internal/attributes/attributes.go +++ b/processor/awsapplicationsignalsprocessor/internal/attributes/attributes.go @@ -23,7 +23,8 @@ const ( AWSECSTaskID = "aws.ecs.task.id" // resource detection processor attributes - ResourceDetectionHostID = "host.id" - ResourceDetectionHostName = "host.name" - ResourceDetectionASG = "ec2.tag.aws:autoscaling:groupName" + ResourceDetectionHostID = "host.id" + ResourceDetectionHostName = "host.name" + ResourceDetectionASG = "ec2.tag.aws:autoscaling:groupName" + ResourceDetectionClusterName = "k8s.cluster.name" ) diff --git a/processor/awsapplicationsignalsprocessor/internal/resolver/attributesresolver.go b/processor/awsapplicationsignalsprocessor/internal/resolver/attributesresolver.go index dbdcc79ee8bc..b6cdd22ab296 100644 --- a/processor/awsapplicationsignalsprocessor/internal/resolver/attributesresolver.go +++ b/processor/awsapplicationsignalsprocessor/internal/resolver/attributesresolver.go @@ -54,6 +54,9 @@ func NewAttributesResolver(resolvers []appsignalsconfig.Resolver, logger *zap.Lo for _, resolver := range resolvers { switch resolver.Platform { case appsignalsconfig.PlatformEKS, appsignalsconfig.PlatformK8s: + if resolver.Name == "" { + resolver.Name = "UNKNOWN" + } subResolvers = append(subResolvers, getKubernetesResolver(resolver.Platform, resolver.Name, logger), newKubernetesResourceAttributesResolver(resolver.Platform, resolver.Name)) case appsignalsconfig.PlatformEC2: subResolvers = append(subResolvers, newResourceAttributesResolver(resolver.Platform, AttributePlatformEC2, DefaultInheritedAttributes)) diff --git a/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes.go b/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes.go index 917d2bf895c8..5ff1eec7f15b 100644 --- a/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes.go +++ b/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes.go @@ -592,6 +592,20 @@ func newKubernetesResourceAttributesResolver(platformCode, clusterName string) * attributeMap: DefaultInheritedAttributes, } } + +// Attempt to get the `k8s.cluster.name“ attribute that should be populated from resourcedetectionprocessor. +// If that attribute doesn't exist (e.g. resourcedetectionprocessor is not used or fails to get the k8s attributes), +// fallback to the processor's configured clusterName (which is "UNKNOWN" if not specified). +func (h *kubernetesResourceAttributesResolver) getResourceDetectorClusterName(resourceAttributes pcommon.Map) string { + clusterName := h.clusterName + + if val, ok := resourceAttributes.Get(attr.ResourceDetectionClusterName); ok { + clusterName = val.Str() + } + + return clusterName +} + func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttributes pcommon.Map) error { for attrKey, mappingKey := range h.attributeMap { if val, ok := resourceAttributes.Get(attrKey); ok { @@ -600,10 +614,10 @@ func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttri } if h.platformCode == config.PlatformEKS { attributes.PutStr(common.AttributePlatformType, AttributePlatformEKS) - attributes.PutStr(common.AttributeEKSClusterName, h.clusterName) + attributes.PutStr(common.AttributeEKSClusterName, h.getResourceDetectorClusterName(resourceAttributes)) } else { attributes.PutStr(common.AttributePlatformType, AttributePlatformK8S) - attributes.PutStr(common.AttributeK8SClusterName, h.clusterName) + attributes.PutStr(common.AttributeK8SClusterName, h.getResourceDetectorClusterName(resourceAttributes)) } var namespace string if nsAttr, ok := resourceAttributes.Get(semconv.AttributeK8SNamespaceName); ok { @@ -613,7 +627,7 @@ func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttri } if val, ok := attributes.Get(attr.AWSLocalEnvironment); !ok { - env := generateLocalEnvironment(h.platformCode, h.clusterName+"/"+namespace) + env := generateLocalEnvironment(h.platformCode, h.getResourceDetectorClusterName(resourceAttributes)+"/"+namespace) attributes.PutStr(attr.AWSLocalEnvironment, env) } else { attributes.PutStr(attr.AWSLocalEnvironment, val.Str()) @@ -623,7 +637,7 @@ func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttri // The application log group in Container Insights is a fixed pattern: // "/aws/containerinsights/{Cluster_Name}/application" // See https://github.com/aws/amazon-cloudwatch-agent-operator/blob/fe144bb02d7b1930715aa3ea32e57a5ff13406aa/helm/templates/fluent-bit-configmap.yaml#L82 - logGroupName := "/aws/containerinsights/" + h.clusterName + "/application" + logGroupName := "/aws/containerinsights/" + h.getResourceDetectorClusterName(resourceAttributes) + "/application" resourceAttributes.PutStr(semconv.AttributeAWSLogGroupNames, logGroupName) return nil diff --git a/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes_test.go b/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes_test.go index aedb5e909520..4ea02503c409 100644 --- a/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes_test.go +++ b/processor/awsapplicationsignalsprocessor/internal/resolver/kubernetes_test.go @@ -861,7 +861,6 @@ func TestK8sResourceAttributesResolverOnEKS(t *testing.T) { { "testDefault", map[string]string{}, - map[string]string{ attr.AWSLocalEnvironment: "eks:test-cluster/test-namespace-3", common.AttributeK8SNamespace: "test-namespace-3", @@ -885,6 +884,16 @@ func TestK8sResourceAttributesResolverOnEKS(t *testing.T) { common.AttributeEC2AutoScalingGroup: "asg", }, }, + { + "testClusterNameFromDetector", + map[string]string{ + attr.ResourceDetectionClusterName: "DetectedClusterName", + }, + map[string]string{ + attr.AWSLocalEnvironment: "eks:DetectedClusterName/test-namespace-3", + common.AttributeEKSClusterName: "DetectedClusterName", + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -902,11 +911,27 @@ func TestK8sResourceAttributesResolverOnEKS(t *testing.T) { for key, val := range tt.expectedAttributes { assert.Equal(t, val, getStrAttr(attributes, key, t), fmt.Sprintf("expected %s for key %s", val, key)) } - assert.Equal(t, "/aws/containerinsights/test-cluster/application", getStrAttr(resourceAttributes, semconv.AttributeAWSLogGroupNames, t)) + + if _, ok := resourceAttributes.Get(attr.ResourceDetectionClusterName); ok { + assert.Equal(t, "/aws/containerinsights/DetectedClusterName/application", getStrAttr(resourceAttributes, semconv.AttributeAWSLogGroupNames, t)) + } else { + assert.Equal(t, "/aws/containerinsights/test-cluster/application", getStrAttr(resourceAttributes, semconv.AttributeAWSLogGroupNames, t)) + } }) } } +func TestGetResourceDetectorClusterName(t *testing.T) { + resolver := newKubernetesResourceAttributesResolver(config.PlatformEKS, "test-cluster") + + resourceDetectorAttributes := pcommon.NewMap() + resourceDetectorClusterName := resolver.getResourceDetectorClusterName(resourceDetectorAttributes) + resourceDetectorAttributes.PutStr(attr.ResourceDetectionClusterName, "DetectedClusterName") + assert.Equal(t, resourceDetectorClusterName, "test-cluster") + resourceDetectorClusterName = resolver.getResourceDetectorClusterName(resourceDetectorAttributes) + assert.Equal(t, resourceDetectorClusterName, "DetectedClusterName") +} + func TestK8sResourceAttributesResolverOnK8S(t *testing.T) { // helper function to get string values from the attributes getStrAttr := func(attributes pcommon.Map, key string, t *testing.T) string {