diff --git a/internal/util/type_conversion.go b/internal/util/type_conversion.go index 96b75fa85e..b8c779d4c9 100644 --- a/internal/util/type_conversion.go +++ b/internal/util/type_conversion.go @@ -3,43 +3,51 @@ package util -import "github.com/aws/amazon-cloudwatch-agent/metric/distribution" +import ( + "fmt" + "math" -func ToOtelValue(value interface{}) interface{} { + "github.com/aws/amazon-cloudwatch-agent/metric/distribution" +) + +func ToOtelValue(value interface{}) (interface{}, error) { switch v := value.(type) { case int: - return int64(v) + return int64(v), nil case int8: - return int64(v) + return int64(v), nil case int16: - return int64(v) + return int64(v), nil case int32: - return int64(v) + return int64(v), nil case int64: - return v + return v, nil case uint: - return int64(v) + return int64(v), nil case uint8: - return int64(v) + return int64(v), nil case uint16: - return int64(v) + return int64(v), nil case uint32: - return int64(v) + return int64(v), nil case uint64: - return int64(v) + return int64(v), nil case float32: - return float64(v) + return float64(v), nil case float64: - return v + if math.IsNaN(v) || math.IsInf(v, 0) { + return nil, fmt.Errorf("unsupported value: %v", v) + } + return v, nil case bool: if v { - return int64(1) + return int64(1), nil } else { - return int64(0) + return int64(0), nil } case distribution.Distribution: - return v + return v, nil default: - return nil + return nil, fmt.Errorf("unsupported type: %T", v) } } diff --git a/internal/util/type_conversion_test.go b/internal/util/type_conversion_test.go new file mode 100644 index 0000000000..8f61116781 --- /dev/null +++ b/internal/util/type_conversion_test.go @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package util + +import ( + "errors" + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/metric/distribution/regular" +) + +func TestToOtelValue(t *testing.T) { + distribution := regular.NewRegularDistribution() + testCases := []struct { + input interface{} + want interface{} + wantErr error + }{ + // ints + {input: 5, want: int64(5)}, + {input: int8(5), want: int64(5)}, + {input: int16(5), want: int64(5)}, + {input: int32(5), want: int64(5)}, + {input: int64(5), want: int64(5)}, + // uints + {input: uint(5), want: int64(5)}, + {input: uint8(5), want: int64(5)}, + {input: uint16(5), want: int64(5)}, + {input: uint32(5), want: int64(5)}, + {input: uint64(5), want: int64(5)}, + // floats + {input: float32(5.5), want: 5.5}, + {input: 5.5, want: 5.5}, + // bool + {input: false, want: int64(0)}, + {input: true, want: int64(1)}, + // distribution + {input: distribution, want: distribution}, + // unsupported floats + {input: math.NaN(), want: nil, wantErr: errors.New("unsupported value: NaN")}, + {input: math.Inf(1), want: nil, wantErr: errors.New("unsupported value: +Inf")}, + {input: math.Inf(-1), want: nil, wantErr: errors.New("unsupported value: -Inf")}, + // unsupported types + {input: "test", want: nil, wantErr: errors.New("unsupported type: string")}, + } + for _, testCase := range testCases { + got, err := ToOtelValue(testCase.input) + assert.Equal(t, testCase.wantErr, err) + assert.Equal(t, testCase.want, got) + } +} diff --git a/metric/distribution/distribution.go b/metric/distribution/distribution.go index 74f592eab5..1e4a6e6899 100644 --- a/metric/distribution/distribution.go +++ b/metric/distribution/distribution.go @@ -3,7 +3,19 @@ package distribution -import "go.opentelemetry.io/collector/pdata/pmetric" +import ( + "errors" + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +var ( + ErrUnsupportedWeight = errors.New("weight must be larger than 0") + ErrUnsupportedValue = errors.New("value cannot be negative, NaN, Inf, or greater than 2^360") + MinValue = -math.Pow(2, 360) + MaxValue = math.Pow(2, 360) +) type Distribution interface { Maximum() float64 @@ -36,3 +48,10 @@ type Distribution interface { } var NewDistribution func() Distribution + +// IsSupportedValue checks to see if the metric is between the min value and 2^360 and not a NaN. +// This matches the accepted range described in the MetricDatum documentation +// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html +func IsSupportedValue(value, min, max float64) bool { + return !math.IsNaN(value) && value >= min && value <= max +} diff --git a/metric/distribution/distribution_test.go b/metric/distribution/distribution_test.go new file mode 100644 index 0000000000..91c1e5377d --- /dev/null +++ b/metric/distribution/distribution_test.go @@ -0,0 +1,30 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package distribution + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsAcceptedValue(t *testing.T) { + testCases := []struct { + input float64 + want bool + }{ + {input: MinValue * 1.0001, want: false}, + {input: MinValue, want: true}, + {input: MaxValue, want: true}, + {input: MaxValue * 1.0001, want: false}, + {input: math.Pow(2, 300), want: true}, + {input: math.NaN(), want: false}, + {input: math.Inf(1), want: false}, + {input: math.Inf(-1), want: false}, + } + for _, testCase := range testCases { + assert.Equal(t, testCase.want, IsSupportedValue(testCase.input, MinValue, MaxValue)) + } +} diff --git a/metric/distribution/regular/regular_distribution.go b/metric/distribution/regular/regular_distribution.go index 02e80cc3c9..4de8779996 100644 --- a/metric/distribution/regular/regular_distribution.go +++ b/metric/distribution/regular/regular_distribution.go @@ -4,7 +4,7 @@ package regular import ( - "errors" + "fmt" "log" "math" @@ -69,34 +69,33 @@ func (regularDist *RegularDistribution) Size() int { // weight is 1/samplingRate func (regularDist *RegularDistribution) AddEntryWithUnit(value float64, weight float64, unit string) error { - if weight > 0 { - if value < 0 { - return errors.New("negative value") - } - //sample count - regularDist.sampleCount += weight - //sum - regularDist.sum += value * weight - //min - if value < regularDist.minimum { - regularDist.minimum = value - } - //max - if value > regularDist.maximum { - regularDist.maximum = value - } + if weight <= 0 { + return fmt.Errorf("unsupported weight %v: %w", weight, distribution.ErrUnsupportedWeight) + } + if !distribution.IsSupportedValue(value, 0, distribution.MaxValue) { + return fmt.Errorf("unsupported value %v: %w", value, distribution.ErrUnsupportedValue) + } + //sample count + regularDist.sampleCount += weight + //sum + regularDist.sum += value * weight + //min + if value < regularDist.minimum { + regularDist.minimum = value + } + //max + if value > regularDist.maximum { + regularDist.maximum = value + } - //values and counts - regularDist.buckets[value] += weight + //values and counts + regularDist.buckets[value] += weight - //unit - if regularDist.unit == "" { - regularDist.unit = unit - } else if regularDist.unit != unit && unit != "" { - log.Printf("D! Multiple units are detected: %s, %s", regularDist.unit, unit) - } - } else { - log.Printf("D! Weight should be larger than 0: %v", weight) + //unit + if regularDist.unit == "" { + regularDist.unit = unit + } else if regularDist.unit != unit && unit != "" { + log.Printf("D! Multiple units are detected: %s, %s", regularDist.unit, unit) } return nil } diff --git a/metric/distribution/regular/regular_distribution_test.go b/metric/distribution/regular/regular_distribution_test.go index 72b33084b4..373b31fe46 100644 --- a/metric/distribution/regular/regular_distribution_test.go +++ b/metric/distribution/regular/regular_distribution_test.go @@ -4,12 +4,15 @@ package regular import ( + "math" "testing" "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) -func TestSEH1Distribution(t *testing.T) { +func TestRegularDistribution(t *testing.T) { //dist new and add entry dist := NewRegularDistribution() @@ -34,9 +37,9 @@ func TestSEH1Distribution(t *testing.T) { //another dist new and add entry anotherDist := NewRegularDistribution() - anotherDist.AddEntry(21, 1) - anotherDist.AddEntry(22, 1) - anotherDist.AddEntry(23, 2) + assert.NoError(t, anotherDist.AddEntry(21, 1)) + assert.NoError(t, anotherDist.AddEntry(22, 1)) + assert.NoError(t, anotherDist.AddEntry(23, 2)) assert.Equal(t, 89.0, anotherDist.Sum()) assert.Equal(t, 4.0, anotherDist.SampleCount()) @@ -75,6 +78,14 @@ func TestSEH1Distribution(t *testing.T) { //add distClone into another dist anotherDist.AddDistribution(distClone) assert.Equal(t, dist, anotherDist) //the direction of AddDistribution should not matter. + + assert.ErrorIs(t, anotherDist.AddEntry(1, 0), distribution.ErrUnsupportedWeight) + assert.ErrorIs(t, anotherDist.AddEntry(-1, 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.NaN(), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.Inf(1), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.Inf(-1), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(distribution.MaxValue*1.001, 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(distribution.MinValue*1.001, 1), distribution.ErrUnsupportedValue) } func cloneRegularDistribution(dist *RegularDistribution) *RegularDistribution { diff --git a/metric/distribution/seh1/seh1_distribution.go b/metric/distribution/seh1/seh1_distribution.go index 57ed45a583..74612b7105 100644 --- a/metric/distribution/seh1/seh1_distribution.go +++ b/metric/distribution/seh1/seh1_distribution.go @@ -4,7 +4,7 @@ package seh1 import ( - "errors" + "fmt" "log" "math" @@ -79,35 +79,34 @@ func (seh1Distribution *SEH1Distribution) Size() int { // weight is 1/samplingRate func (seh1Distribution *SEH1Distribution) AddEntryWithUnit(value float64, weight float64, unit string) error { - if weight > 0 { - if value < 0 { - return errors.New("negative value") - } - //sample count - seh1Distribution.sampleCount += weight - //sum - seh1Distribution.sum += value * weight - //min - if value < seh1Distribution.minimum { - seh1Distribution.minimum = value - } - //max - if value > seh1Distribution.maximum { - seh1Distribution.maximum = value - } + if weight <= 0 { + return fmt.Errorf("unsupported weight %v: %w", weight, distribution.ErrUnsupportedWeight) + } + if !distribution.IsSupportedValue(value, 0, distribution.MaxValue) { + return fmt.Errorf("unsupported value %v: %w", value, distribution.ErrUnsupportedValue) + } + //sample count + seh1Distribution.sampleCount += weight + //sum + seh1Distribution.sum += value * weight + //min + if value < seh1Distribution.minimum { + seh1Distribution.minimum = value + } + //max + if value > seh1Distribution.maximum { + seh1Distribution.maximum = value + } - //seh - bucketNumber := bucketNumber(value) - seh1Distribution.buckets[bucketNumber] += weight + //seh + bucketNumber := bucketNumber(value) + seh1Distribution.buckets[bucketNumber] += weight - //unit - if seh1Distribution.unit == "" { - seh1Distribution.unit = unit - } else if seh1Distribution.unit != unit && unit != "" { - log.Printf("D! Multiple units are detected: %s, %s", seh1Distribution.unit, unit) - } - } else { - log.Printf("D! Weight should be larger than 0: %v", weight) + //unit + if seh1Distribution.unit == "" { + seh1Distribution.unit = unit + } else if seh1Distribution.unit != unit && unit != "" { + log.Printf("D! Multiple units are detected: %s, %s", seh1Distribution.unit, unit) } return nil } @@ -151,7 +150,7 @@ func (seh1Distribution *SEH1Distribution) AddDistributionWithWeight(distribution if seh1Distribution.unit == "" { seh1Distribution.unit = distribution.Unit() } else if seh1Distribution.unit != distribution.Unit() && distribution.Unit() != "" { - log.Printf("D! Multiple units are dected: %s, %s", seh1Distribution.unit, distribution.Unit()) + log.Printf("D! Multiple units are detected: %s, %s", seh1Distribution.unit, distribution.Unit()) } } else { log.Printf("D! SampleCount * Weight should be larger than 0: %v, %v", distribution.SampleCount(), weight) diff --git a/metric/distribution/seh1/seh1_distribution_test.go b/metric/distribution/seh1/seh1_distribution_test.go index 644968d47a..1a02d5bb76 100644 --- a/metric/distribution/seh1/seh1_distribution_test.go +++ b/metric/distribution/seh1/seh1_distribution_test.go @@ -4,10 +4,13 @@ package seh1 import ( + "math" "math/big" "testing" "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) func TestSEH1Distribution(t *testing.T) { @@ -77,6 +80,14 @@ func TestSEH1Distribution(t *testing.T) { //add distClone into another dist anotherDist.AddDistribution(distClone) assert.Equal(t, dist, anotherDist) //the direction of AddDistribution should not matter. + + assert.ErrorIs(t, anotherDist.AddEntry(1, 0), distribution.ErrUnsupportedWeight) + assert.ErrorIs(t, anotherDist.AddEntry(-1, 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.NaN(), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.Inf(1), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(math.Inf(-1), 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(distribution.MaxValue*1.001, 1), distribution.ErrUnsupportedValue) + assert.ErrorIs(t, anotherDist.AddEntry(distribution.MinValue*1.001, 1), distribution.ErrUnsupportedValue) } func cloneSEH1Distribution(dist *SEH1Distribution) *SEH1Distribution { diff --git a/plugins/outputs/cloudwatch/aggregator.go b/plugins/outputs/cloudwatch/aggregator.go index fc9ceb5eb4..43c0a253c7 100644 --- a/plugins/outputs/cloudwatch/aggregator.go +++ b/plugins/outputs/cloudwatch/aggregator.go @@ -4,6 +4,7 @@ package cloudwatch import ( + "errors" "fmt" "log" "strings" @@ -144,7 +145,11 @@ func (durationAgg *durationAggregator) aggregating() { m.distribution = distribution.NewDistribution() err := m.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit) if err != nil { - log.Printf("W! err %s, metric %s", err, *m.MetricName) + if errors.Is(err, distribution.ErrUnsupportedValue) { + log.Printf("W! err %s, metric %s", err, *m.MetricName) + } else { + log.Printf("D! err %s, metric %s", err, *m.MetricName) + } } } // Else the first entry has a distribution, so do nothing. diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 06cfb09fe1..c180abe81f 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -398,6 +398,10 @@ func (c *CloudWatch) BuildMetricDatum(metric *aggregationDatum) []*cloudwatch.Me continue } if len(distList) == 0 { + if !distribution.IsSupportedValue(*metric.Value, distribution.MinValue, distribution.MaxValue) { + log.Printf("E! metric (%s) has an unsupported value: %v, dropping it", *metric.MetricName, *metric.Value) + continue + } // Not a distribution. datum := &cloudwatch.MetricDatum{ MetricName: metric.MetricName, diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go index 75a965144a..3bf121e496 100644 --- a/plugins/outputs/cloudwatch/cloudwatch_test.go +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -6,6 +6,7 @@ package cloudwatch import ( "context" "log" + "math" "strconv" "strings" "testing" @@ -23,6 +24,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/internal/publisher" + "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) // Return true if found. @@ -234,6 +236,27 @@ func TestProcessRollup(t *testing.T) { cw.Shutdown(context.Background()) } +func TestBuildMetricDatumDropUnsupported(t *testing.T) { + svc := new(mockCloudWatchClient) + cw := newCloudWatchClient(svc, time.Second) + testCases := []float64{ + math.NaN(), + math.Inf(1), + math.Inf(-1), + distribution.MaxValue * 1.001, + distribution.MinValue * 1.001, + } + for _, testCase := range testCases { + got := cw.BuildMetricDatum(&aggregationDatum{ + MetricDatum: cloudwatch.MetricDatum{ + MetricName: aws.String("test"), + Value: aws.Float64(testCase), + }, + }) + assert.Empty(t, got) + } +} + func TestGetUniqueRollupList(t *testing.T) { inputLists := [][]string{{"d1"}, {"d1"}, {"d2"}, {"d1"}} actualLists := GetUniqueRollupList(inputLists) diff --git a/receiver/adapter/accumulator/accumulator.go b/receiver/adapter/accumulator/accumulator.go index 3a22bfc955..26f0c28743 100644 --- a/receiver/adapter/accumulator/accumulator.go +++ b/receiver/adapter/accumulator/accumulator.go @@ -5,7 +5,7 @@ package accumulator import ( "context" - "errors" + "fmt" "sync" "time" @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/models" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/aws/amazon-cloudwatch-agent/internal/util" @@ -48,10 +49,6 @@ type otelAccumulator struct { mutex sync.Mutex } -var ( - errEmptyAfterConvert = errors.New("empty metrics after converting fields") -) - func NewAccumulator(input *models.RunningInput, ctx context.Context, consumer consumer.Metrics, logger *zap.Logger) OtelAccumulator { _, isServiceInput := input.Input.(telegraf.ServiceInput) return &otelAccumulator{ @@ -188,9 +185,13 @@ func (o *otelAccumulator) modifyMetricAndConvertToOtelValue(m telegraf.Metric) ( // Otel only supports numeric data. Therefore, filter unsupported data type and convert metrics value to corresponding value before // converting the data model // https://github.com/open-telemetry/opentelemetry-collector/blob/bdc3e22d28006b6c9496568bd8d8bcf0aa1e4950/pdata/pmetric/metrics.go#L106-L113 + var errs error for field, value := range mMetric.Fields() { // Convert all int,uint to int64 and float to float64 and bool to int. - otelValue := util.ToOtelValue(value) + otelValue, err := util.ToOtelValue(value) + if err != nil { + errs = multierr.Append(errs, fmt.Errorf("field (%q): %w", field, err)) + } if otelValue == nil { mMetric.RemoveField(field) @@ -200,7 +201,7 @@ func (o *otelAccumulator) modifyMetricAndConvertToOtelValue(m telegraf.Metric) ( } if len(mMetric.Fields()) == 0 { - return nil, errEmptyAfterConvert + return nil, fmt.Errorf("empty metrics after converting fields: %w", errs) } return mMetric, nil diff --git a/receiver/adapter/accumulator/accumulator_test.go b/receiver/adapter/accumulator/accumulator_test.go index 04a626149f..fe73c3cddd 100644 --- a/receiver/adapter/accumulator/accumulator_test.go +++ b/receiver/adapter/accumulator/accumulator_test.go @@ -5,6 +5,7 @@ package accumulator import ( "fmt" + "math" "math/rand" "runtime" "testing" @@ -204,7 +205,7 @@ func Test_ModifyMetricAndConvertMetricValue(t *testing.T) { testCases := map[string]struct { metric telegraf.Metric - wantErr error + wantErrStr string wantFields map[string]interface{} wantDroppedFields []string }{ @@ -234,11 +235,12 @@ func Test_ModifyMetricAndConvertMetricValue(t *testing.T) { map[string]string{}, map[string]interface{}{ "client": "redis", + "nan": math.NaN(), }, time.Now(), telegraf.Gauge, ), - wantErr: errEmptyAfterConvert, + wantErrStr: "empty metrics after converting fields", }, "WithValid": { metric: testutil.MustMetric( @@ -267,10 +269,12 @@ func Test_ModifyMetricAndConvertMetricValue(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { got, err := acc.modifyMetricAndConvertToOtelValue(testCase.metric) - as.Equal(testCase.wantErr, err) - if len(testCase.wantFields) == 0 { + if testCase.wantErrStr != "" { + as.Error(err) + as.ErrorContains(err, testCase.wantErrStr) as.Nil(got) } else { + as.NoError(err) for field, wantValue := range testCase.wantFields { value, ok := got.GetField(field) as.True(ok) diff --git a/receiver/adapter/accumulator/metrics_test.go b/receiver/adapter/accumulator/metrics_test.go index 92e99f57b9..97d4f8689f 100644 --- a/receiver/adapter/accumulator/metrics_test.go +++ b/receiver/adapter/accumulator/metrics_test.go @@ -244,7 +244,8 @@ func Test_PopulateNumberDataPoint_WithDifferentValueType(t *testing.T) { for _, tc := range test_cases { t.Run(tc.name, func(t *testing.T) { - otelValue := util.ToOtelValue(tc.telegrafDataPointValue) + otelValue, err := util.ToOtelValue(tc.telegrafDataPointValue) + as.NoError(err) as.NotNil(otelValue) switch v := tc.expectedOtelDataPointValue.(type) {