diff --git a/.chloggen/add-json-cw-metric-stream.yaml b/.chloggen/add-json-cw-metric-stream.yaml new file mode 100644 index 000000000000..1a22ad585132 --- /dev/null +++ b/.chloggen/add-json-cw-metric-stream.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscloudwatchmetricstreamsencodingextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add unmarshaler for JSON cloudwatch metric stream + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38407] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension.go index a2cbc0967918..1041f3e32937 100644 --- a/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension.go +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" @@ -19,12 +20,15 @@ type encodingExtension struct { pmetric.Unmarshaler } -func newExtension(cfg *Config) (*encodingExtension, error) { +func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension, error) { switch cfg.Format { case formatJSON: - return &encodingExtension{Unmarshaler: formatJSONUnmarshaler{}}, nil + return &encodingExtension{&formatJSONUnmarshaler{ + buildInfo: settings.BuildInfo, + logger: settings.Logger, + }}, nil case formatOpenTelemetry10: - return &encodingExtension{Unmarshaler: formatOpenTelemetry10Unmarshaler{}}, nil + return &encodingExtension{Unmarshaler: &formatOpenTelemetry10Unmarshaler{}}, nil default: // Format will have been validated by Config.Validate, // so we'll only get here if we haven't handled a valid @@ -40,17 +44,3 @@ func (*encodingExtension) Start(_ context.Context, _ component.Host) error { func (*encodingExtension) Shutdown(_ context.Context) error { return nil } - -type formatJSONUnmarshaler struct{} - -func (formatJSONUnmarshaler) UnmarshalMetrics([]byte) (pmetric.Metrics, error) { - // TODO implement - return pmetric.Metrics{}, fmt.Errorf("UnmarshalMetrics unimplemented for format %q", formatJSON) -} - -type formatOpenTelemetry10Unmarshaler struct{} - -func (formatOpenTelemetry10Unmarshaler) UnmarshalMetrics([]byte) (pmetric.Metrics, error) { - // TODO implement - return pmetric.Metrics{}, fmt.Errorf("UnmarshalMetrics unimplemented for format %q", formatOpenTelemetry10) -} diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension_test.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension_test.go index b411d8b2b281..a237d56781f7 100644 --- a/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension_test.go +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/extension_test.go @@ -8,19 +8,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/extension/extensiontest" ) func TestNew_JSON(t *testing.T) { - e, err := newExtension(&Config{Format: formatJSON}) + e, err := newExtension(&Config{Format: formatJSON}, extensiontest.NewNopSettings(extensiontest.NopType)) require.NoError(t, err) require.NotNil(t, e) _, err = e.UnmarshalMetrics([]byte{}) - assert.EqualError(t, err, `UnmarshalMetrics unimplemented for format "json"`) + assert.EqualError(t, err, errEmptyRecord.Error()) } func TestNew_OpenTelemetry10(t *testing.T) { - e, err := newExtension(&Config{Format: formatOpenTelemetry10}) + e, err := newExtension(&Config{Format: formatOpenTelemetry10}, extensiontest.NewNopSettings(extensiontest.NopType)) require.NoError(t, err) require.NotNil(t, e) @@ -29,7 +30,7 @@ func TestNew_OpenTelemetry10(t *testing.T) { } func TestNew_Unimplemented(t *testing.T) { - e, err := newExtension(&Config{Format: "invalid"}) + e, err := newExtension(&Config{Format: "invalid"}, extensiontest.NewNopSettings(extensiontest.NopType)) require.Error(t, err) require.Nil(t, e) assert.EqualError(t, err, `unimplemented format "invalid"`) diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/factory.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/factory.go index b0d723a83a4d..5164e6f5ec13 100644 --- a/extension/encoding/awscloudwatchmetricstreamsencodingextension/factory.go +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/factory.go @@ -21,8 +21,8 @@ func NewFactory() extension.Factory { ) } -func createExtension(_ context.Context, _ extension.Settings, cfg component.Config) (extension.Extension, error) { - return newExtension(cfg.(*Config)) +func createExtension(_ context.Context, settings extension.Settings, cfg component.Config) (extension.Extension, error) { + return newExtension(cfg.(*Config), settings) } func createDefaultConfig() component.Config { diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.mod b/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.mod index 1223a8152b0a..0e457098a238 100644 --- a/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.mod +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.mod @@ -3,7 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/encod go 1.23.0 require ( + github.com/json-iterator/go v1.1.12 github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.121.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.121.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.121.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.27.0 go.opentelemetry.io/collector/component/componenttest v0.121.0 @@ -12,17 +15,19 @@ require ( go.opentelemetry.io/collector/extension v1.27.0 go.opentelemetry.io/collector/extension/extensiontest v0.121.0 go.opentelemetry.io/collector/pdata v1.27.0 + go.opentelemetry.io/collector/semconv v0.121.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect @@ -30,6 +35,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.121.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.121.0 // indirect @@ -39,10 +45,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect google.golang.org/grpc v1.70.0 // indirect google.golang.org/protobuf v1.36.5 // indirect diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.sum b/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.sum index 42e05dafa173..60b556f9b234 100644 --- a/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.sum +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/go.sum @@ -1,3 +1,5 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -41,6 +43,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.121.0 h1:VvMR0isNCB2lVX3R8VMwuePa+UDUj/4jBzCj8ik7r3M= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.121.0/go.mod h1:MoCMz/TtwE0yYmOL3uJ+VoOxZpt7+obfdLrKNG40deI= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.121.0 h1:efEcUMbyFWBx56TQDz2IMsuI0kQ5g8Im0DjQc9w9HBU= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.121.0/go.mod h1:9ghLP9djsDo5xzmzkADqeJjZb3l92XIRhpAz/ToX2QM= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.121.0 h1:D7mQQKd4rncv3PSsbDGayNENqmVwN1dFvPo3wHFzhI4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.121.0/go.mod h1:swPiDfFHEiy9x2TwNO3uexCkwppLWfPRVoJdpJvKIQE= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -70,6 +78,8 @@ go.opentelemetry.io/collector/pdata v1.27.0 h1:66yI7FYkUDia74h48Fd2/KG2Vk8DxZnGw go.opentelemetry.io/collector/pdata v1.27.0/go.mod h1:18e8/xDZsqyj00h/5HM5GLdJgBzzG9Ei8g9SpNoiMtI= go.opentelemetry.io/collector/pdata/pprofile v0.121.0 h1:DFBelDRsZYxEaSoxSRtseAazsHJfqfC/Yl64uPicl2g= go.opentelemetry.io/collector/pdata/pprofile v0.121.0/go.mod h1:j/fjrd7ybJp/PXkba92QLzx7hykUVmU8x/WJvI2JWSg= +go.opentelemetry.io/collector/semconv v0.121.0 h1:dtdgh5TsKWGZXIBMsyCMVrY1VgmyWlXHgWx/VH9tL1U= +go.opentelemetry.io/collector/semconv v0.121.0/go.mod h1:te6VQ4zZJO5Lp8dM2XIhDxDiL45mwX0YAQQWRQ0Qr9U= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= @@ -95,20 +105,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler.go new file mode 100644 index 000000000000..132ba12a3ec4 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler.go @@ -0,0 +1,274 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awscloudwatchmetricstreamsencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension" + +import ( + "bufio" + "bytes" + "errors" + "strings" + "time" + + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension/internal/metadata" +) + +const ( + attributeAWSCloudWatchMetricStreamName = "aws.cloudwatch.metric_stream_name" + dimensionInstanceID = "InstanceId" + namespaceDelimiter = "/" +) + +var ( + errNoMetricName = errors.New("cloudwatch metric is missing metric name field") + errNoMetricNamespace = errors.New("cloudwatch metric is missing namespace field") + errNoMetricUnit = errors.New("cloudwatch metric is missing unit field") + errNoMetricValue = errors.New("cloudwatch metric is missing value") + errEmptyRecord = errors.New("0 metrics were extracted from the record") +) + +type formatJSONUnmarshaler struct { + buildInfo component.BuildInfo + logger *zap.Logger +} + +var _ pmetric.Unmarshaler = (*formatJSONUnmarshaler)(nil) + +// The cloudwatchMetric is the format for the CloudWatch metric stream records. +// +// More details can be found at: +// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-json.html +type cloudwatchMetric struct { + // MetricStreamName is the name of the CloudWatch metric stream. + MetricStreamName string `json:"metric_stream_name"` + // AccountID is the AWS account ID associated with the metric. + AccountID string `json:"account_id"` + // Region is the AWS region for the metric. + Region string `json:"region"` + // Namespace is the CloudWatch namespace the metric is in. + Namespace string `json:"namespace"` + // MetricName is the name of the metric. + MetricName string `json:"metric_name"` + // Dimensions is a map of name/value pairs that help to + // differentiate a metric. + Dimensions map[string]string `json:"dimensions"` + // Timestamp is the milliseconds since epoch for + // the metric. + Timestamp int64 `json:"timestamp"` + // Value is the cloudwatchMetricValue, which has the min, max, + // sum, and count. + Value cloudwatchMetricValue `json:"value"` + // Unit is the unit for the metric. + // + // More details can be found at: + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + Unit string `json:"unit"` +} + +// The cloudwatchMetricValue is the actual values of the CloudWatch metric. +type cloudwatchMetricValue struct { + isSet bool + + // Max is the highest value observed. + Max float64 `json:"max"` + // Min is the lowest value observed. + Min float64 `json:"min"` + // Sum is the sum of data points collected. + Sum float64 `json:"sum"` + // Count is the number of data points. + Count float64 `json:"count"` +} + +// validateMetric validates that the cloudwatch metric has been unmarshalled correctly +func validateMetric(metric cloudwatchMetric) error { + if metric.MetricName == "" { + return errNoMetricName + } + if metric.Namespace == "" { + return errNoMetricNamespace + } + if metric.Unit == "" { + return errNoMetricUnit + } + if !metric.Value.isSet { + return errNoMetricValue + } + return nil +} + +// UnmarshalJSON unmarshalls the data to a cloudwatchMetricValue, +// and sets isSet to true upon a successful execution +func (v *cloudwatchMetricValue) UnmarshalJSON(data []byte) error { + type valueType cloudwatchMetricValue + if err := jsoniter.ConfigFastest.Unmarshal(data, (*valueType)(v)); err != nil { + return err + } + v.isSet = true + return nil +} + +// resourceKey stores the metric attributes +// that make a cloudwatchMetric unique to +// a resource +type resourceKey struct { + metricStreamName string + namespace string + accountID string + region string +} + +// metricKey stores the metric attributes +// that make a metric unique within +// a resource +type metricKey struct { + name string + unit string +} + +func (c *formatJSONUnmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { + byResource := make(map[resourceKey]map[metricKey]pmetric.Metric) + + // Multiple metrics in each record separated by newline character + scanner := bufio.NewScanner(bytes.NewReader(record)) + for datumIndex := 0; scanner.Scan(); datumIndex++ { + var cwMetric cloudwatchMetric + if err := jsoniter.ConfigFastest.Unmarshal(scanner.Bytes(), &cwMetric); err != nil { + c.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + ) + continue + } + if err := validateMetric(cwMetric); err != nil { + c.logger.Error( + "Invalid metric", + zap.Int("datum_index", datumIndex), + zap.Error(err), + ) + continue + } + + c.addMetricToResource(byResource, cwMetric) + } + + if err := scanner.Err(); err != nil { + // Treat this as a non-fatal error, and handle the data below. + c.logger.Error("Error scanning for newline-delimited JSON", zap.Error(err)) + } + + if len(byResource) == 0 { + return pmetric.Metrics{}, errEmptyRecord + } + + return c.createMetrics(byResource), nil +} + +// addMetricToResource adds a new cloudwatchMetric to the +// resource it belongs to according to resourceKey. It then +// sets the data point for the cloudwatchMetric. +func (c *formatJSONUnmarshaler) addMetricToResource( + byResource map[resourceKey]map[metricKey]pmetric.Metric, + cwMetric cloudwatchMetric, +) { + rKey := resourceKey{ + metricStreamName: cwMetric.MetricStreamName, + namespace: cwMetric.Namespace, + accountID: cwMetric.AccountID, + region: cwMetric.Region, + } + metrics, ok := byResource[rKey] + if !ok { + metrics = make(map[metricKey]pmetric.Metric) + byResource[rKey] = metrics + } + + mKey := metricKey{ + name: cwMetric.MetricName, + unit: cwMetric.Unit, + } + metric, ok := metrics[mKey] + if !ok { + metric = pmetric.NewMetric() + metric.SetName(mKey.name) + metric.SetUnit(mKey.unit) + metric.SetEmptySummary() + metrics[mKey] = metric + } + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(cwMetric.Timestamp))) + setDataPointAttributes(cwMetric, dp) + dp.SetCount(uint64(cwMetric.Value.Count)) + dp.SetSum(cwMetric.Value.Sum) + minQ := dp.QuantileValues().AppendEmpty() + minQ.SetQuantile(0) + minQ.SetValue(cwMetric.Value.Min) + maxQ := dp.QuantileValues().AppendEmpty() + maxQ.SetQuantile(1) + maxQ.SetValue(cwMetric.Value.Max) +} + +// createMetrics creates pmetric.Metrics based on +// on the extracted metrics of each resource +func (c *formatJSONUnmarshaler) createMetrics( + byResource map[resourceKey]map[metricKey]pmetric.Metric, +) pmetric.Metrics { + metrics := pmetric.NewMetrics() + for rKey, metricsMap := range byResource { + rm := metrics.ResourceMetrics().AppendEmpty() + setResourceAttributes(rKey, rm.Resource()) + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + scopeMetrics.Scope().SetName(metadata.ScopeName) + scopeMetrics.Scope().SetVersion(c.buildInfo.Version) + for _, metric := range metricsMap { + metric.MoveTo(scopeMetrics.Metrics().AppendEmpty()) + } + } + return metrics +} + +// setResourceAttributes sets attributes on a pcommon.Resource from a cloudwatchMetric. +func setResourceAttributes(rKey resourceKey, resource pcommon.Resource) { + attributes := resource.Attributes() + attributes.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) + attributes.PutStr(conventions.AttributeCloudAccountID, rKey.accountID) + attributes.PutStr(conventions.AttributeCloudRegion, rKey.region) + serviceNamespace, serviceName := toServiceAttributes(rKey.namespace) + if serviceNamespace != "" { + attributes.PutStr(conventions.AttributeServiceNamespace, serviceNamespace) + } + attributes.PutStr(conventions.AttributeServiceName, serviceName) + attributes.PutStr(attributeAWSCloudWatchMetricStreamName, rKey.metricStreamName) +} + +// toServiceAttributes splits the CloudWatch namespace into service namespace/name +// if prepended by AWS/. Otherwise, it returns the CloudWatch namespace as the +// service name with an empty service namespace +func toServiceAttributes(namespace string) (serviceNamespace, serviceName string) { + index := strings.Index(namespace, namespaceDelimiter) + if index != -1 && strings.EqualFold(namespace[:index], conventions.AttributeCloudProviderAWS) { + return namespace[:index], namespace[index+1:] + } + return "", namespace +} + +// setDataPointAttributes sets attributes on a metric data point from a cloudwatchMetric. +func setDataPointAttributes(metric cloudwatchMetric, dp pmetric.SummaryDataPoint) { + attrs := dp.Attributes() + for k, v := range metric.Dimensions { + switch k { + case dimensionInstanceID: + attrs.PutStr(conventions.AttributeServiceInstanceID, v) + default: + attrs.PutStr(k, v) + } + } +} diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler_test.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler_test.go new file mode 100644 index 000000000000..b9c99253def0 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler_test.go @@ -0,0 +1,167 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awscloudwatchmetricstreamsencodingextension + +import ( + "bytes" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" +) + +func TestValidateMetric(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + metric cloudwatchMetric + expectedErr error + }{ + "valid_metric": { + metric: cloudwatchMetric{ + Namespace: "test/namespace", + Unit: "Seconds", + Value: cloudwatchMetricValue{ + isSet: true, + }, + MetricName: "test", + }, + }, + "no_metric_name": { + metric: cloudwatchMetric{ + Namespace: "test/namespace", + Unit: "Seconds", + Value: cloudwatchMetricValue{ + isSet: true, + }, + }, + expectedErr: errNoMetricName, + }, + "no_metric_namespace": { + metric: cloudwatchMetric{ + Unit: "Seconds", + Value: cloudwatchMetricValue{ + isSet: true, + }, + MetricName: "test", + }, + expectedErr: errNoMetricNamespace, + }, + "no_metric_unit": { + metric: cloudwatchMetric{ + Namespace: "test/namespace", + Value: cloudwatchMetricValue{ + isSet: true, + }, + MetricName: "test", + }, + expectedErr: errNoMetricUnit, + }, + "no_metric_value": { + metric: cloudwatchMetric{ + Namespace: "test/namespace", + Unit: "Seconds", + Value: cloudwatchMetricValue{ + isSet: false, + }, + MetricName: "test", + }, + expectedErr: errNoMetricValue, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + err := validateMetric(test.metric) + require.Equal(t, test.expectedErr, err) + }) + } +} + +// joinMetricsFromFile reads the metrics inside the files, +// and joins them in the format a record expects it to be: +// each metric is expected to be in 1 line, and every new +// line marks a new metric +func joinMetricsFromFile(t *testing.T, dir string, files []string) []byte { + if len(files) == 0 { + t.Fatalf("joinMetricsFromFile requires at least one file") + } + var buffer bytes.Buffer + for _, file := range files { + // get the metric from the files + data, err := os.ReadFile(filepath.Join(dir, file)) + require.NoError(t, err) + + // remove all insignificant spaces, + // including new lines + var compacted bytes.Buffer + err = json.Compact(&compacted, data) + require.NoError(t, err) + + // append the metric and add new line + // to mark the end of this metric + buffer.Write(compacted.Bytes()) + buffer.WriteByte('\n') + } + return buffer.Bytes() +} + +func TestUnmarshalJSONMetrics(t *testing.T) { + t.Parallel() + + filesDirectory := "testdata/json" + tests := map[string]struct { + files []string + metricExpectedFilename string + expectedErr error + }{ + "valid_record_single_metric": { + // test a record with a single metric + files: []string{"valid_metric.json"}, + metricExpectedFilename: "valid_record_single_metric_expected.yaml", + }, + "invalid_record": { + // test a record with one invalid metric + files: []string{"invalid_metric.json"}, + expectedErr: errEmptyRecord, + }, + "valid_record_multiple_metrics": { + // test a record with multiple + // metrics: some invalid, some + // valid + files: []string{ + "valid_metric.json", + "invalid_metric.json", + "valid_metric.json", + "invalid_metric.json", + }, + metricExpectedFilename: "valid_record_multiple_metrics_expected.yaml", + }, + } + + unmarshalerCW := &formatJSONUnmarshaler{component.BuildInfo{}, zap.NewNop()} + for name, test := range tests { + t.Run(name, func(t *testing.T) { + record := joinMetricsFromFile(t, filesDirectory, test.files) + + metrics, err := unmarshalerCW.UnmarshalMetrics(record) + if test.expectedErr != nil { + require.Equal(t, test.expectedErr, err) + return + } + + expectedMetrics, err := golden.ReadMetrics(filepath.Join(filesDirectory, test.metricExpectedFilename)) + require.NoError(t, err) + + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, metrics)) + }) + } +} diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler.go new file mode 100644 index 000000000000..dc8f37943ba8 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awscloudwatchmetricstreamsencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type formatOpenTelemetry10Unmarshaler struct{} + +var _ pmetric.Unmarshaler = (*formatOpenTelemetry10Unmarshaler)(nil) + +func (f formatOpenTelemetry10Unmarshaler) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { + return pmetric.Metrics{}, fmt.Errorf("UnmarshalMetrics unimplemented for format %q", formatOpenTelemetry10) +} diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler_test.go b/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler_test.go new file mode 100644 index 000000000000..fcff7eea10e1 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/opentelemetry_unmarshaler_test.go @@ -0,0 +1,10 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awscloudwatchmetricstreamsencodingextension + +import "testing" + +func TestUnmarshalOpenTelemetryMetrics(t *testing.T) { + t.Skip("still to be implemented") +} diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/invalid_metric.json b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/invalid_metric.json new file mode 100644 index 000000000000..4ea870b5d273 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/invalid_metric.json @@ -0,0 +1,12 @@ +{ + "metric_stream_name":"MyMetricStream", + "account_id":"1234567890", + "region":"us-east-1", + "namespace":"AWS/EC2", + "metric_name":"MetricWithNoValue", + "dimensions":{ + "InstanceId":"i-123456789012" + }, + "timestamp":1611929698000, + "unit":"Seconds" +} \ No newline at end of file diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_metric.json b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_metric.json new file mode 100644 index 000000000000..c6df0d38cc34 --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_metric.json @@ -0,0 +1,18 @@ +{ + "metric_stream_name":"MyMetricStream", + "account_id":"1234567890", + "region":"us-east-1", + "namespace":"AWS/EC2", + "metric_name":"DiskWriteOps", + "dimensions":{ + "InstanceId":"i-123456789012" + }, + "timestamp":1611929698000, + "value":{ + "count":3.0, + "sum":20.0, + "max":18.0, + "min":0.0 + }, + "unit":"Seconds" +} \ No newline at end of file diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_multiple_metrics_expected.yaml b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_multiple_metrics_expected.yaml new file mode 100644 index 000000000000..ec9aa5914abe --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_multiple_metrics_expected.yaml @@ -0,0 +1,51 @@ +resourceMetrics: + - resource: + attributes: + - key: aws.cloudwatch.metric_stream_name + value: + stringValue: MyMetricStream + - key: cloud.account.id + value: + stringValue: "1234567890" + - key: cloud.provider + value: + stringValue: aws + - key: cloud.region + value: + stringValue: us-east-1 + - key: service.name + value: + stringValue: EC2 + - key: service.namespace + value: + stringValue: AWS + scopeMetrics: + - metrics: + - name: DiskWriteOps + summary: + dataPoints: + - attributes: + - key: service.instance.id + value: + stringValue: i-123456789012 + count: "3" + quantileValues: + - {} + - quantile: 1 + value: 18 + sum: 20 + timeUnixNano: 1611929698000000000 + - attributes: + - key: service.instance.id + value: + stringValue: i-123456789012 + count: "3" + quantileValues: + - {} + - quantile: 1 + value: 18 + sum: 20 + timeUnixNano: 1611929698000000000 + unit: Seconds + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension diff --git a/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_single_metric_expected.yaml b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_single_metric_expected.yaml new file mode 100644 index 000000000000..1863d1580f1d --- /dev/null +++ b/extension/encoding/awscloudwatchmetricstreamsencodingextension/testdata/json/valid_record_single_metric_expected.yaml @@ -0,0 +1,40 @@ +resourceMetrics: + - resource: + attributes: + - key: aws.cloudwatch.metric_stream_name + value: + stringValue: MyMetricStream + - key: cloud.account.id + value: + stringValue: "1234567890" + - key: cloud.provider + value: + stringValue: aws + - key: cloud.region + value: + stringValue: us-east-1 + - key: service.name + value: + stringValue: EC2 + - key: service.namespace + value: + stringValue: AWS + scopeMetrics: + - metrics: + - name: DiskWriteOps + summary: + dataPoints: + - attributes: + - key: service.instance.id + value: + stringValue: i-123456789012 + count: "3" + quantileValues: + - {} + - quantile: 1 + value: 18 + sum: 20 + timeUnixNano: 1611929698000000000 + unit: Seconds + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension