Skip to content

Commit

Permalink
Rename Application Signals attributes in EMF logs
Browse files Browse the repository at this point in the history
  • Loading branch information
bjrara committed May 17, 2024
1 parent 39fce0b commit 200a5f8
Show file tree
Hide file tree
Showing 18 changed files with 134 additions and 112 deletions.
17 changes: 10 additions & 7 deletions plugins/processors/awsapplicationsignals/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ const (
MetricAttributeRemoteResourceType = "RemoteResourceType"
)
const (
AttributeEKSClusterName = "EKS.Cluster"
AttributeK8SClusterName = "K8s.Cluster"
AttributeK8SNamespace = "K8s.Namespace"
AttributeEC2AutoScalingGroupName = "EC2.AutoScalingGroupName"
AttributeEC2InstanceId = "EC2.InstanceId"
AttributePlatformType = "PlatformType"
AttributeSDK = "Telemetry.SDK"
AttributeEKSClusterName = "EKS.Cluster"
AttributeK8SClusterName = "K8s.Cluster"
AttributeK8SNamespace = "K8s.Namespace"
AttributeEC2AutoScalingGroup = "EC2.AutoScalingGroup"
AttributeEC2InstanceId = "EC2.InstanceId"
AttributeHost = "Host"
AttributePlatformType = "PlatformType"
AttributeTelemetrySDK = "Telemetry.SDK"
AttributeTelemetryAgent = "Telemetry.Agent"
AttributeTelemetrySource = "Telemetry.Source"
)

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package attributes

const (
// aws attributes
AWSSpanKind = "aws.span.kind"
AWSLocalService = "aws.local.service"
AWSLocalEnvironment = "aws.local.environment"
AWSLocalOperation = "aws.local.operation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func TestAdmitAndRollup(t *testing.T) {
admittedAttributes[uniqKey.AsString()] = attr
} else {
for _, indexedAttrKey := range awsDeclaredMetricAttributes {
if indexedAttrKey == "Environment" || indexedAttrKey == "Service" || indexedAttrKey == "RemoteService" {
if indexedAttrKey == common.MetricAttributeEnvironment ||
indexedAttrKey == common.MetricAttributeLocalService ||
indexedAttrKey == common.MetricAttributeRemoteService {
continue
}
attrValue, _ := attr.Get(indexedAttrKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/internal/version"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
attr "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/attributes"
)

const (
// Length limits from Application Signals SLOs
MaxEnvironmentLength = 259
MaxServiceNameLength = 255
maxEnvironmentLength = 259
maxServiceNameLength = 255

// Length limits from CloudWatch Metrics
DefaultMetricAttributeLength = 1024
defaultMetricAttributeLength = 1024
)

type attributesNormalizer struct {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (n *attributesNormalizer) Process(attributes, resourceAttributes pcommon.Ma
// for enforcing the limits on length.
truncateAttributesByLength(attributes)
n.renameAttributes(attributes, resourceAttributes, isTrace)
n.appendNewAttributes(attributes, resourceAttributes, isTrace)
n.normalizeTelemetryAttributes(attributes, resourceAttributes, isTrace)
return nil
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func (n *attributesNormalizer) copyResourceAttributesToAttributes(attributes, re
}
}

func (n *attributesNormalizer) appendNewAttributes(attributes, resourceAttributes pcommon.Map, isTrace bool) {
func (n *attributesNormalizer) normalizeTelemetryAttributes(attributes, resourceAttributes pcommon.Map, isTrace bool) {
if isTrace {
return
}
Expand All @@ -140,13 +140,13 @@ func (n *attributesNormalizer) appendNewAttributes(attributes, resourceAttribute
// TODO read telemetry.auto.version from telemetry.distro.* from v1.22
resourceAttributes.Range(func(k string, v pcommon.Value) bool {
switch k {
case conventions.AttributeTelemetrySDKName:
case semconv.AttributeTelemetrySDKName:
sdkName = strings.ReplaceAll(v.Str(), " ", "")
case conventions.AttributeTelemetrySDKLanguage:
case semconv.AttributeTelemetrySDKLanguage:
sdkLang = strings.ReplaceAll(v.Str(), " ", "")
case conventions.AttributeTelemetrySDKVersion:
case semconv.AttributeTelemetrySDKVersion:
sdkVersion = strings.ReplaceAll(v.Str(), " ", "")
case conventions.AttributeTelemetryAutoVersion:
case semconv.AttributeTelemetryAutoVersion:
sdkAutoVersion = strings.ReplaceAll(v.Str(), " ", "")
}
return true
Expand All @@ -155,7 +155,26 @@ func (n *attributesNormalizer) appendNewAttributes(attributes, resourceAttribute
sdkVersion = sdkAutoVersion
mode = instrumentationModeAuto
}
attributes.PutStr(common.AttributeSDK, fmt.Sprintf("%s,%s,%s,%s", sdkName, sdkVersion, sdkLang, mode))
attributes.PutStr(common.AttributeTelemetrySDK, fmt.Sprintf("%s,%s,%s,%s", sdkName, sdkVersion, sdkLang, mode))
attributes.PutStr(common.AttributeTelemetryAgent, fmt.Sprintf("CWAgent/%s", version.Number()))

var telemetrySource string
if val, ok := attributes.Get(attr.AWSSpanKind); ok {
switch val.Str() {
case "CLIENT":
telemetrySource = "ClientSpan"
case "SERVER":
telemetrySource = "ServerSpan"
case "PRODUCER":
telemetrySource = "ProducerSpan"
case "CONSUMER":
telemetrySource = "ConsumerSpan"
case "LOCAL_ROOT":
telemetrySource = "LocalRootSpan"
}
attributes.PutStr(common.AttributeTelemetrySource, telemetrySource)
attributes.Remove(attr.AWSSpanKind)
}
}

func rename(attrs pcommon.Map, renameMap map[string]string) {
Expand All @@ -175,19 +194,19 @@ func rename(attrs pcommon.Map, renameMap map[string]string) {
}

func truncateAttributesByLength(attributes pcommon.Map) {
for attrKey, _ := range attributesRenamingForMetric {
for attrKey := range attributesRenamingForMetric {
switch attrKey {
case attr.AWSLocalEnvironment, attr.AWSRemoteEnvironment:
if val, ok := attributes.Get(attrKey); ok {
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), MaxEnvironmentLength))
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), maxEnvironmentLength))
}
case attr.AWSLocalService, attr.AWSRemoteService:
if val, ok := attributes.Get(attrKey); ok {
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), MaxServiceNameLength))
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), maxServiceNameLength))
}
default:
if val, ok := attributes.Get(attrKey); ok {
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), DefaultMetricAttributeLength))
attributes.PutStr(attrKey, truncateStringByLength(val.Str(), defaultMetricAttributeLength))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func TestTruncateAttributes(t *testing.T) {
truncateAttributesByLength(attributes)

val, _ := attributes.Get(attr.AWSLocalEnvironment)
assert.True(t, len(val.Str()) == MaxEnvironmentLength)
assert.True(t, len(val.Str()) == maxEnvironmentLength)
val, _ = attributes.Get(attr.AWSRemoteEnvironment)
assert.True(t, len(val.Str()) == MaxEnvironmentLength)
assert.True(t, len(val.Str()) == maxEnvironmentLength)
val, _ = attributes.Get(attr.AWSLocalService)
assert.True(t, len(val.Str()) == MaxServiceNameLength)
assert.True(t, len(val.Str()) == maxServiceNameLength)
val, _ = attributes.Get(attr.AWSRemoteService)
assert.True(t, len(val.Str()) == MaxServiceNameLength)
assert.True(t, len(val.Str()) == maxServiceNameLength)
val, _ = attributes.Get(attr.AWSRemoteResourceIdentifier)
assert.True(t, len(val.Str()) == 300)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func Test_attributesNormalizer_appendNewAttributes(t *testing.T) {
n := &attributesNormalizer{
logger: logger,
}
n.appendNewAttributes(tt.attributes, tt.resourceAttributes, tt.isTrace)
n.normalizeTelemetryAttributes(tt.attributes, tt.resourceAttributes, tt.isTrace)

if value, ok := tt.attributes.Get("Telemetry.SDK"); !ok {
if !tt.isTrace {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv1 "go.opentelemetry.io/collector/semconv/v1.17.0"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
Expand All @@ -29,16 +29,16 @@ const (
)

var GenericInheritedAttributes = map[string]string{
semconv1.AttributeDeploymentEnvironment: attr.AWSLocalEnvironment,
attr.ResourceDetectionHostName: attr.ResourceDetectionHostName,
semconv.AttributeDeploymentEnvironment: attr.AWSLocalEnvironment,
attr.ResourceDetectionHostName: common.AttributeHost,
}

// DefaultInheritedAttributes is an allow-list that also renames attributes from the resource detection processor
var DefaultInheritedAttributes = map[string]string{
semconv1.AttributeDeploymentEnvironment: attr.AWSLocalEnvironment,
attr.ResourceDetectionASG: common.AttributeEC2AutoScalingGroupName,
attr.ResourceDetectionHostId: common.AttributeEC2InstanceId,
attr.ResourceDetectionHostName: attr.ResourceDetectionHostName,
semconv.AttributeDeploymentEnvironment: attr.AWSLocalEnvironment,
attr.ResourceDetectionASG: common.AttributeEC2AutoScalingGroup,
attr.ResourceDetectionHostId: common.AttributeEC2InstanceId,
attr.ResourceDetectionHostName: common.AttributeHost,
}

type subResolver interface {
Expand Down Expand Up @@ -85,9 +85,7 @@ func (r *attributesResolver) Process(attributes, resourceAttributes pcommon.Map,
func (r *attributesResolver) Stop(ctx context.Context) error {
var errs error
for _, subResolver := range r.subResolvers {
if err := subResolver.Stop(ctx); err != nil {
errs = errors.Join(errs, err)
}
errs = errors.Join(errs, subResolver.Stop(ctx))
}
return errs
}
Expand Down Expand Up @@ -117,30 +115,29 @@ func (h *resourceAttributesResolver) Process(attributes, resourceAttributes pcom
} else {
if h.defaultEnvPrefix == appsignalsconfig.PlatformECS {
if clusterName, ok := getECSClusterName(resourceAttributes); ok {
attributes.PutStr(attr.AWSLocalEnvironment, GetDefaultEnvironment(h.defaultEnvPrefix, clusterName))
attributes.PutStr(attr.AWSLocalEnvironment, getDefaultEnvironment(h.defaultEnvPrefix, clusterName))
}
}
if h.defaultEnvPrefix == appsignalsconfig.PlatformEC2 {
} else if h.defaultEnvPrefix == appsignalsconfig.PlatformEC2 {
if asgAttr, ok := resourceAttributes.Get(attr.ResourceDetectionASG); ok {
attributes.PutStr(attr.AWSLocalEnvironment, GetDefaultEnvironment(h.defaultEnvPrefix, asgAttr.Str()))
attributes.PutStr(attr.AWSLocalEnvironment, getDefaultEnvironment(h.defaultEnvPrefix, asgAttr.Str()))
}
}
}
}
if _, ok := attributes.Get(attr.AWSLocalEnvironment); !ok {
attributes.PutStr(attr.AWSLocalEnvironment, GetDefaultEnvironment(h.defaultEnvPrefix, AttributeEnvironmentDefault))
attributes.PutStr(attr.AWSLocalEnvironment, getDefaultEnvironment(h.defaultEnvPrefix, AttributeEnvironmentDefault))
}
attributes.PutStr(common.AttributePlatformType, h.platformType)

return nil
}

func getECSClusterName(resourceAttributes pcommon.Map) (string, bool) {
if clusterAttr, ok := resourceAttributes.Get(semconv1.AttributeAWSECSClusterARN); ok {
if clusterAttr, ok := resourceAttributes.Get(semconv.AttributeAWSECSClusterARN); ok {
parts := strings.Split(clusterAttr.Str(), "/")
clusterName := parts[len(parts)-1]
return clusterName, true
} else if taskAttr, ok := resourceAttributes.Get(semconv1.AttributeAWSECSTaskARN); ok {
} else if taskAttr, ok := resourceAttributes.Get(semconv.AttributeAWSECSTaskARN); ok {
parts := strings.SplitAfterN(taskAttr.Str(), ":task/", 2)
if len(parts) == 2 {
taskParts := strings.Split(parts[1], "/")
Expand All @@ -153,7 +150,7 @@ func getECSClusterName(resourceAttributes pcommon.Map) (string, bool) {
return "", false
}

func GetDefaultEnvironment(platformCode, val string) string {
func getDefaultEnvironment(platformCode, val string) string {
return fmt.Sprintf("%s:%s", platformCode, val)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.17.0"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestResourceAttributesResolverWithHostname(t *testing.T) {
resourceAttributes.PutStr(attr.ResourceDetectionHostName, "hostname")

resolver.Process(attributes, resourceAttributes)
envAttr, ok := attributes.Get(attr.ResourceDetectionHostName)
envAttr, ok := attributes.Get(common.AttributeHost)
assert.True(t, ok)
assert.Equal(t, "hostname", envAttr.AsString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttri
}

if val, ok := attributes.Get(attr.AWSLocalEnvironment); !ok {
env := GetDefaultEnvironment(h.platformCode, h.clusterName+"/"+namespace)
env := getDefaultEnvironment(h.platformCode, h.clusterName+"/"+namespace)
attributes.PutStr(attr.AWSLocalEnvironment, env)
} else {
attributes.PutStr(attr.AWSLocalEnvironment, val.Str())
Expand Down
Loading

0 comments on commit 200a5f8

Please sign in to comment.