Skip to content

Commit

Permalink
Add invalid metric pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
bjrara committed May 17, 2024
1 parent 200a5f8 commit 421e99d
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 12 deletions.
12 changes: 12 additions & 0 deletions plugins/processors/awsapplicationsignals/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
MetricAttributeRemoteResourceIdentifier = "RemoteResourceIdentifier"
MetricAttributeRemoteResourceType = "RemoteResourceType"
)

const (
AttributeEKSClusterName = "EKS.Cluster"
AttributeK8SClusterName = "K8s.Cluster"
Expand All @@ -29,3 +30,14 @@ const (
const (
AttributeTmpReserved = "aws.tmp.reserved"
)

var IndexableMetricAttributes = []string{
MetricAttributeLocalService,
MetricAttributeLocalOperation,
MetricAttributeEnvironment,
MetricAttributeRemoteService,
MetricAttributeRemoteEnvironment,
MetricAttributeRemoteOperation,
MetricAttributeRemoteResourceIdentifier,
MetricAttributeRemoteResourceType,
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func NewAttributesNormalizer(logger *zap.Logger) *attributesNormalizer {

func (n *attributesNormalizer) Process(attributes, resourceAttributes pcommon.Map, isTrace bool) error {
n.copyResourceAttributesToAttributes(attributes, resourceAttributes, isTrace)
// It's assumed that all attributes are initially inserted as trace attribute, and attributesRenamingForMetric
// contains all attributes that will be used for CloudWatch metric dimension. Therefore, we iterate the keys
// for enforcing the limits on length.
truncateAttributesByLength(attributes)
n.renameAttributes(attributes, resourceAttributes, isTrace)
n.normalizeTelemetryAttributes(attributes, resourceAttributes, isTrace)
Expand Down Expand Up @@ -194,6 +191,9 @@ func rename(attrs pcommon.Map, renameMap map[string]string) {
}

func truncateAttributesByLength(attributes pcommon.Map) {
// It's assumed that all attributes are initially inserted as trace attribute, and attributesRenamingForMetric
// contains all attributes that will be used for CloudWatch metric dimension. Therefore, we iterate the keys
// for enforcing the limits on length.
for attrKey := range attributesRenamingForMetric {
switch attrKey {
case attr.AWSLocalEnvironment, attr.AWSRemoteEnvironment:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package prune

import (
"errors"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
)

type MetricPruner struct {
}

func (p *MetricPruner) ShouldBeDropped(attributes pcommon.Map) (bool, error) {
for _, attributeKey := range common.IndexableMetricAttributes {
if val, ok := attributes.Get(attributeKey); ok {
if !isAsciiPrintable(val.Str()) {
return true, errors.New("Metric attribute " + attributeKey + " must contain only ASCII characters.")
}
}
}
return false, nil
}

func NewPruner() *MetricPruner {
return &MetricPruner{}
}

func isAsciiPrintable(val string) bool {
nonWhitespaceFound := false
for _, c := range val {
if c < 32 || c > 126 {
return false
} else if !nonWhitespaceFound && c != 32 {
nonWhitespaceFound = true
}
}
return nonWhitespaceFound
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package prune

import (
"testing"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
)

func TestMetricPrunerWithIndexableAttribute(t *testing.T) {
tests := []struct {
name string
val string
want bool
}{
{
"testShouldDropChineseChar",
"漢",
true,
}, {
"testShouldDropSymbolChar",
"€, £, µ",
true,
}, {
"testShouldDropAllBlackSpace",
" ",
true,
},
{
"testShouldDropAllTab",
" ",
true,
}, {
"testShouldKeepEnglishWord",
"abcdefg-",
false,
},
}

p := &MetricPruner{}
for _, tt := range tests {
attributes := pcommon.NewMap()
attributes.PutStr(common.MetricAttributeLocalService, tt.val)
t.Run(tt.name, func(t *testing.T) {
got, _ := p.ShouldBeDropped(attributes)
if got != tt.want {
t.Errorf("ShouldBeDropped() got = %v, want %v", got, tt.want)
}
})
}
}

func TestMetricPrunerWithNonIndexableAttribute(t *testing.T) {
tests := []struct {
name string
val string
want bool
}{
{
"testShouldKeepChineseChar",
"漢",
false,
}, {
"testShouldKeepEnglishWord",
"abcdefg-",
false,
},
}

p := &MetricPruner{}
for _, tt := range tests {
attributes := pcommon.NewMap()
attributes.PutStr(common.AttributeEC2InstanceId, tt.val)
t.Run(tt.name, func(t *testing.T) {
got, _ := p.ShouldBeDropped(attributes)
if got != tt.want {
t.Errorf("ShouldBeDropped() got = %v, want %v", got, tt.want)
}
})
}
}
19 changes: 10 additions & 9 deletions plugins/processors/awsapplicationsignals/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/cardinalitycontrol"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/normalizer"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/prune"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/resolver"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules"
)

const (
failedToProcessAttribute = "failed to process attributes"
failedToProcessAttributeWithCustomRule = "failed to process attributes with custom rule, will drop the metric"
failedToProcessAttributeWithLimiter = "failed to process attributes with limiter, keep the data"
failedToProcessAttribute = "failed to process attributes"
failedToProcessAttributeWithLimiter = "failed to process attributes with limiter, keep the data"
)

var metricCaser = cases.Title(language.English)
Expand Down Expand Up @@ -75,9 +75,10 @@ func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ co

ap.replaceActions = rules.NewReplacer(ap.config.Rules, !limiterConfig.Disabled)

pruner := prune.NewPruner()
keeper := rules.NewKeeper(ap.config.Rules, !limiterConfig.Disabled)
dropper := rules.NewDropper(ap.config.Rules)
ap.allowlistMutators = []allowListMutator{keeper, dropper}
ap.allowlistMutators = []allowListMutator{pruner, keeper, dropper}

return nil
}
Expand Down Expand Up @@ -164,7 +165,7 @@ func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Cont
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttributeWithCustomRule, zap.Error(err))
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
Expand Down Expand Up @@ -199,7 +200,7 @@ func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Cont
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttributeWithCustomRule, zap.Error(err))
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
Expand Down Expand Up @@ -234,7 +235,7 @@ func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Cont
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttributeWithCustomRule, zap.Error(err))
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
Expand Down Expand Up @@ -269,7 +270,7 @@ func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Cont
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttributeWithCustomRule, zap.Error(err))
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
Expand Down Expand Up @@ -304,7 +305,7 @@ func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Cont
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttributeWithCustomRule, zap.Error(err))
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
Expand Down

0 comments on commit 421e99d

Please sign in to comment.