From 468d4daed7d08c96187a397b752a5f86533588c6 Mon Sep 17 00:00:00 2001 From: Kaushik Surya <108111936+sky333999@users.noreply.github.com> Date: Mon, 7 Oct 2024 16:27:52 -0500 Subject: [PATCH] Add AMP as a metrics destination (#1192) Co-authored-by: Jeffrey Chien --- cmd/config-translator/translator_test.go | 7 + go.mod | 24 +- go.sum | 32 +- internal/metric/metric.go | 61 +++ internal/util/collections/collections.go | 28 ++ internal/util/collections/collections_test.go | 21 + processor/rollupprocessor/README.md | 26 ++ processor/rollupprocessor/cache.go | 74 ++++ processor/rollupprocessor/cache_test.go | 61 +++ processor/rollupprocessor/config.go | 19 + processor/rollupprocessor/config_test.go | 53 +++ processor/rollupprocessor/factory.go | 60 +++ processor/rollupprocessor/factory_test.go | 42 ++ processor/rollupprocessor/processor.go | 196 +++++++++ processor/rollupprocessor/processor_test.go | 380 ++++++++++++++++++ .../rollupprocessor/testdata/config.yaml | 20 + service/defaultcomponents/components.go | 4 + service/defaultcomponents/components_test.go | 2 + tool/testutil/testutil.go | 7 + .../invalidMetricsDestinations.json | 18 + .../sampleSchema/validLinuxMetrics.json | 3 + .../validMetricsDestinations.json | 19 + translator/config/schema.json | 26 ++ .../sampleConfig/amp_config_linux.conf | 33 ++ .../sampleConfig/amp_config_linux.json | 64 +++ .../sampleConfig/amp_config_linux.yaml | 160 ++++++++ .../sampleConfig/complete_linux_config.json | 4 + .../sampleConfig/jmx_config_linux.json | 6 + .../sampleConfig/jmx_config_linux.yaml | 73 +++- translator/tocwconfig/tocwconfig_test.go | 2 + translator/tocwconfig/tocwconfig_unix_test.go | 13 + .../metrics/config/registered_metrics.go | 12 +- translator/translate/otel/common/common.go | 12 +- translator/translate/otel/common/metrics.go | 177 ++++++++ .../translate/otel/common/metrics_test.go | 33 ++ .../testdata/config.json | 0 .../otel/exporter/awscloudwatch/translator.go | 173 +------- .../exporter/awscloudwatch/translator_test.go | 15 +- .../testdata/config.json | 13 + .../testdata/config.yaml | 27 ++ .../prometheusremotewrite/translator.go | 55 +++ .../prometheusremotewrite/translator_test.go | 58 +++ .../otel/extension/sigv4auth/translator.go | 43 ++ .../extension/sigv4auth/translator_test.go | 26 ++ .../otel/pipeline/host/translator.go | 37 +- .../otel/pipeline/host/translator_test.go | 90 ++++- .../otel/pipeline/host/translators.go | 29 +- .../otel/pipeline/host/translators_test.go | 88 +++- .../translate/otel/pipeline/jmx/translator.go | 25 +- .../otel/pipeline/jmx/translator_test.go | 29 ++ .../rollupprocessor/testdata/config.json | 31 ++ .../processor/rollupprocessor/translator.go | 50 +++ .../rollupprocessor/translator_test.go | 69 ++++ .../otel/receiver/adapter/translators.go | 8 +- .../translate/otel/translate_otel_test.go | 16 + 55 files changed, 2391 insertions(+), 263 deletions(-) create mode 100644 processor/rollupprocessor/README.md create mode 100644 processor/rollupprocessor/cache.go create mode 100644 processor/rollupprocessor/cache_test.go create mode 100644 processor/rollupprocessor/config.go create mode 100644 processor/rollupprocessor/config_test.go create mode 100644 processor/rollupprocessor/factory.go create mode 100644 processor/rollupprocessor/factory_test.go create mode 100644 processor/rollupprocessor/processor.go create mode 100644 processor/rollupprocessor/processor_test.go create mode 100644 processor/rollupprocessor/testdata/config.yaml create mode 100644 translator/config/sampleSchema/invalidMetricsDestinations.json create mode 100644 translator/config/sampleSchema/validMetricsDestinations.json create mode 100644 translator/tocwconfig/sampleConfig/amp_config_linux.conf create mode 100755 translator/tocwconfig/sampleConfig/amp_config_linux.json create mode 100644 translator/tocwconfig/sampleConfig/amp_config_linux.yaml create mode 100644 translator/translate/otel/common/metrics.go create mode 100644 translator/translate/otel/common/metrics_test.go rename translator/translate/otel/{exporter/awscloudwatch => common}/testdata/config.json (100%) create mode 100644 translator/translate/otel/exporter/prometheusremotewrite/testdata/config.json create mode 100644 translator/translate/otel/exporter/prometheusremotewrite/testdata/config.yaml create mode 100644 translator/translate/otel/exporter/prometheusremotewrite/translator.go create mode 100644 translator/translate/otel/exporter/prometheusremotewrite/translator_test.go create mode 100644 translator/translate/otel/extension/sigv4auth/translator.go create mode 100644 translator/translate/otel/extension/sigv4auth/translator_test.go create mode 100644 translator/translate/otel/processor/rollupprocessor/testdata/config.json create mode 100644 translator/translate/otel/processor/rollupprocessor/translator.go create mode 100644 translator/translate/otel/processor/rollupprocessor/translator_test.go diff --git a/cmd/config-translator/translator_test.go b/cmd/config-translator/translator_test.go index 24523755c5..147f6fd2b9 100644 --- a/cmd/config-translator/translator_test.go +++ b/cmd/config-translator/translator_test.go @@ -179,6 +179,13 @@ func TestInvalidLogFilterConfig(t *testing.T) { checkIfSchemaValidateAsExpected(t, "../../translator/config/sampleSchema/invalidLogFilesWithFilters.json", false, expectedErrorMap) } +func TestMetricsDestinationsConfig(t *testing.T) { + checkIfSchemaValidateAsExpected(t, "../../translator/config/sampleSchema/validMetricsDestinations.json", true, map[string]int{}) + expectedErrorMap := map[string]int{} + expectedErrorMap["required"] = 1 + checkIfSchemaValidateAsExpected(t, "../../translator/config/sampleSchema/invalidMetricsDestinations.json", false, expectedErrorMap) +} + // Validate all sampleConfig files schema func TestSampleConfigSchema(t *testing.T) { if files, err := os.ReadDir("../../translator/tocwconfig/sampleConfig/"); err == nil { diff --git a/go.mod b/go.mod index 872f5d7021..1909fddee5 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,8 @@ replace ( ) replace ( + // For clear resource attributes after copy functionality https://github.com/amazon-contributing/opentelemetry-collector-contrib/pull/148 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry => github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.0.0-20240903195955-5944792b593a github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza => github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/stanza v0.0.0-20240903195955-5944792b593a // Replace with contrib to revert upstream change https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/20519 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus => github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/prometheus v0.0.0-20240903195955-5944792b593a @@ -99,7 +101,6 @@ require ( github.com/gobwas/glob v0.2.3 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect github.com/hashicorp/golang-lru v1.0.2 github.com/influxdata/telegraf v0.0.0-00010101000000-000000000000 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 @@ -113,6 +114,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter v0.103.0 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver v0.103.0 @@ -149,13 +151,14 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.103.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.1 - github.com/prometheus/common v0.54.0 + github.com/prometheus/common v0.55.0 github.com/prometheus/prometheus v0.51.2-0.20240405174432-b4a973753c6e github.com/shirou/gopsutil v3.21.11+incompatible github.com/shirou/gopsutil/v3 v3.24.5 github.com/stretchr/testify v1.9.0 github.com/xeipuuv/gojsonschema v1.2.0 go.opentelemetry.io/collector/component v0.103.0 + go.opentelemetry.io/collector/config/configauth v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 go.opentelemetry.io/collector/config/configtelemetry v0.103.0 go.opentelemetry.io/collector/config/configtls v0.103.0 @@ -249,7 +252,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/checkpoint-restore/go-criu/v5 v5.3.0 // indirect github.com/cilium/ebpf v0.11.0 // indirect - github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 // indirect + github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect github.com/containerd/console v1.0.3 // indirect github.com/containerd/errdefs v0.1.0 // indirect @@ -306,6 +309,7 @@ require ( github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/gosnmp/gosnmp v1.34.0 // indirect + github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/consul/api v1.29.1 // indirect github.com/hashicorp/cronexpr v1.1.2 // indirect @@ -392,6 +396,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.103.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.103.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.103.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.103.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect @@ -409,7 +414,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect - github.com/prometheus/procfs v0.15.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/relvacode/iso8601 v1.4.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect @@ -428,6 +433,8 @@ require ( github.com/tidwall/gjson v1.10.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/tinylru v1.1.0 // indirect + github.com/tidwall/wal v1.1.7 // indirect github.com/tinylib/msgp v1.1.6 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect @@ -446,7 +453,6 @@ require ( go.etcd.io/bbolt v1.3.10 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.103.0 // indirect - go.opentelemetry.io/collector/config/configauth v0.103.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.10.0 // indirect go.opentelemetry.io/collector/config/configgrpc v0.103.0 // indirect go.opentelemetry.io/collector/config/confighttp v0.103.0 // indirect @@ -482,16 +488,16 @@ require ( go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/oauth2 v0.20.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/api v0.169.0 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 // indirect - google.golang.org/grpc v1.64.1 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index c2253ef76b..0bb405adb8 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,8 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/internal/kubelet github.com/amazon-contributing/opentelemetry-collector-contrib/internal/kubelet v0.0.0-20240903195955-5944792b593a/go.mod h1:4qvmHiXPOkOXJdpmmxMqprb2BXxOGPgOG45BwLdipUM= github.com/amazon-contributing/opentelemetry-collector-contrib/override/aws v0.0.0-20240903195955-5944792b593a h1:EQQLrlrDkjs3rm1i+vj0zTrdJkBdGS5n1w0tmcLNEz8= github.com/amazon-contributing/opentelemetry-collector-contrib/override/aws v0.0.0-20240903195955-5944792b593a/go.mod h1:t/hYoRTnlPuRjh8y0BwVGgNvNIXpU2QJME5YVppUUHQ= +github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.0.0-20240903195955-5944792b593a h1:6rB8vnscjDsrl/vU7wZh9e4r6ZQBnqzdVuo8kUOUejQ= +github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.0.0-20240903195955-5944792b593a/go.mod h1:Rr5b3hr6Jy9w/zTjsOl3vcyDDusc90P+iGdOd0UCYOo= github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/stanza v0.0.0-20240903195955-5944792b593a h1:ansEEnZyu1rMVJJP+Pb55RSyFZU8U4QwgYDk6tlMTOQ= github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/stanza v0.0.0-20240903195955-5944792b593a/go.mod h1:2NSghK+mafMGxM8c4Gff8qcprdMD3YQebZtD9UAdB3E= github.com/amazon-contributing/opentelemetry-collector-contrib/pkg/translator/prometheus v0.0.0-20240903195955-5944792b593a h1:U9tYL8FEcwCBNVTTcFGbC+MLmIYaqQQSTUfZLi2rbjg= @@ -378,8 +380,8 @@ github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc= -github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= @@ -1175,8 +1177,6 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.103.0/go.mod h1:hmeKPJaZjzOjcndDxpWnjt0781EMqvj3or01baNVoRI= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0 h1:1cZyMMLSpSWFdfITyVc9Bb+8rn175/GGwtWZQ3nClpc= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0/go.mod h1:o8BPP4DM2SkdkPJxJOdmgxKz5DftGcuyUXgqf5MoWAw= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.103.0 h1:imXTMt9ravkIqcvvow0ya3aQh64OOiQpMOyWeG/SLDQ= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.103.0/go.mod h1:DR/fbOxaIyobmsd8nbDzuwqwwSNX9+yONDWx8dF2qS4= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.103.0 h1:L1DCWusakqlxHC/5yfAfq4c5og1kFdJKV0jcw7FDdoE= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.103.0/go.mod h1:Ok6bUxGfoGMUBvO0Lwgp3xsHHKqe0TMq4juIl7X3Oxo= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.103.0 h1:GiP0syg12+MrI5IpL8Qt+rQktWDMsP0/8Nu9qmMtscw= @@ -1313,8 +1313,8 @@ github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB8 github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= -github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -1323,8 +1323,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= -github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/prometheus/prometheus v0.51.2-0.20240405174432-b4a973753c6e h1:UmqAuY2OyDoog8+l5FybViJE5B2r+UxVGCUwFTsY5AA= github.com/prometheus/prometheus v0.51.2-0.20240405174432-b4a973753c6e/go.mod h1:+0ld+ozir7zWFcHA2vVpWAKxXakIioEjPPNOqH+J3ZA= github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= @@ -1790,8 +1790,8 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= -golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2059,10 +2059,10 @@ google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxH google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= -google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 h1:P8OJ/WCl/Xo4E4zoe4/bifHpSmmKwARqyqE4nW6J2GQ= -google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 h1:Q2RxlXqh1cgzzUgV261vBO2jI5R/3DD1J2pM0nI4NhU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -2082,8 +2082,8 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/metric/metric.go b/internal/metric/metric.go index 7613890fb8..379dbc9b37 100644 --- a/internal/metric/metric.go +++ b/internal/metric/metric.go @@ -17,6 +17,67 @@ var serviceInputMeasurements = collections.NewSet[string]( "prometheus", ) +// DataPoint is used to provide a common interface for OTEL metric data points. +type DataPoint[T any] interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | pmetric.SummaryDataPoint + MoveTo(dest T) + Attributes() pcommon.Map + StartTimestamp() pcommon.Timestamp + SetStartTimestamp(pcommon.Timestamp) + Timestamp() pcommon.Timestamp + SetTimestamp(pcommon.Timestamp) + CopyTo(dest T) +} + +// DataPoints is used to provide a common interface for OTEL slice types. +type DataPoints[T DataPoint[T]] interface { + Len() int + At(i int) T + EnsureCapacity(newCap int) + AppendEmpty() T + RemoveIf(f func(T) bool) +} + +func RangeMetrics(md pmetric.Metrics, fn func(m pmetric.Metric)) { + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + ilms := rms.At(i).ScopeMetrics() + for j := 0; j < ilms.Len(); j++ { + ms := ilms.At(j).Metrics() + for k := 0; k < ms.Len(); k++ { + fn(ms.At(k)) + } + } + } +} + +func RangeDataPointAttributes(m pmetric.Metric, fn func(attrs pcommon.Map)) { + switch m.Type() { + case pmetric.MetricTypeGauge: + rangeDataPointAttributes[pmetric.NumberDataPoint](m.Gauge().DataPoints(), fn) + case pmetric.MetricTypeSum: + rangeDataPointAttributes[pmetric.NumberDataPoint](m.Sum().DataPoints(), fn) + case pmetric.MetricTypeHistogram: + rangeDataPointAttributes[pmetric.HistogramDataPoint](m.Histogram().DataPoints(), fn) + case pmetric.MetricTypeExponentialHistogram: + rangeDataPointAttributes[pmetric.ExponentialHistogramDataPoint](m.ExponentialHistogram().DataPoints(), fn) + case pmetric.MetricTypeSummary: + rangeDataPointAttributes[pmetric.SummaryDataPoint](m.Summary().DataPoints(), fn) + } +} + +func RangeDataPoints[T DataPoint[T]](dps DataPoints[T], fn func(dp T)) { + for i := 0; i < dps.Len(); i++ { + fn(dps.At(i)) + } +} + +func rangeDataPointAttributes[T DataPoint[T]](dps DataPoints[T], fn func(attrs pcommon.Map)) { + RangeDataPoints[T](dps, func(dp T) { + fn(dp.Attributes()) + }) +} + type Metrics struct { metrics pmetric.MetricSlice } diff --git a/internal/util/collections/collections.go b/internal/util/collections/collections.go index 2609ed95c4..54e82100cb 100644 --- a/internal/util/collections/collections.go +++ b/internal/util/collections/collections.go @@ -81,9 +81,37 @@ func (s Set[K]) Contains(key K) bool { return ok } +// ContainsAll whether the other set is a subset. +func (s Set[K]) ContainsAll(other Set[K]) bool { + for key := range other { + if !s.Contains(key) { + return false + } + } + return true +} + +// Equal whether the two sets are the same. +func (s Set[K]) Equal(other Set[K]) bool { + if len(s) != len(other) { + return false + } + return s.ContainsAll(other) +} + // NewSet creates a new Set with the keys provided. func NewSet[K comparable](keys ...K) Set[K] { s := make(Set[K], len(keys)) s.Add(keys...) return s } + +// Range evaluates a function against each element in the slice. +func Range[T any](values []T, fn func(T) bool) bool { + for _, value := range values { + if !fn(value) { + return false + } + } + return true +} diff --git a/internal/util/collections/collections_test.go b/internal/util/collections/collections_test.go index b42a3493f6..56baaa5c78 100644 --- a/internal/util/collections/collections_test.go +++ b/internal/util/collections/collections_test.go @@ -138,6 +138,27 @@ func TestSet(t *testing.T) { set.Remove(1) require.False(t, set.Contains(1)) require.Equal(t, []int{2}, maps.Keys(set)) + + other := NewSet(1, 2, 3) + // different sizes + assert.False(t, set.Equal(other)) + set.Add(1, 4) + // same size, different keys + assert.False(t, set.Equal(other)) + set.Remove(4) + // set {1,2}, other {1,2,3} + assert.True(t, other.ContainsAll(set)) + assert.False(t, set.ContainsAll(other)) + set.Add(3) + assert.True(t, set.Equal(other)) +} + +func TestRange(t *testing.T) { + fn := func(key int) bool { + return key > 0 + } + assert.True(t, Range([]int{1, 2, 3}, fn)) + assert.False(t, Range([]int{1, 0, -1}, fn)) } func assertMapsEqual(t *testing.T, m1, m2 map[string]interface{}) { diff --git a/processor/rollupprocessor/README.md b/processor/rollupprocessor/README.md new file mode 100644 index 0000000000..cebac28476 --- /dev/null +++ b/processor/rollupprocessor/README.md @@ -0,0 +1,26 @@ +# Rollup Processor + +The Rollup Processor creates new data points with attribute sets that are aggregated (rolled up) from the original data points. +For example, specifying an attribute set of `["Attr1","Attr2"]` would roll up on those two attributes, creating a new data point +with only `"Attr1"` and `"Attr2"` and dropping all other attributes. + +| Status | | +| ------------------------ |---------------------------| +| Stability | [beta] | +| Supported pipeline types | metrics | +| Distributions | [amazon-cloudwatch-agent] | + +The attribute groups obtain their values from the original data point. If the data point does not have +the configured attribute, then that group will not be created. This data point roll up can provide +an exporter with the capability of aggregating the metrics based on these groups. The processor also +supports dropping the original data point to reduce the amount of data being sent along the pipeline. + +### Processor Configuration: + +The following processor configuration parameters are supported. + +| Name | Description | Supported Value | Default | +|--------------------|----------------------------------------------------------------------------------------|----------------------------------------------------|---------| +| `attribute_groups` | The groups of attribute names that will be used to create the rollup data points with. | [["Attribute1", "Attribute2"], ["Attribute1"], []] | [] | +| `drop_original` | The names of metrics where the original data points should be dropped. | ["MetricName1", "MetricName2"] | [] | +| `cache_size` | The size of the rollup cache used for optimization. Can be disabled by setting to <= 0 | 100 | 1000 | diff --git a/processor/rollupprocessor/cache.go b/processor/rollupprocessor/cache.go new file mode 100644 index 0000000000..8445db88a7 --- /dev/null +++ b/processor/rollupprocessor/cache.go @@ -0,0 +1,74 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "sort" + "strings" + "time" + + "github.com/jellydator/ttlcache/v3" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +type rollupCache interface { + Get(key string, opts ...ttlcache.Option[string, []pcommon.Map]) *ttlcache.Item[string, []pcommon.Map] + Set(key string, value []pcommon.Map, ttl time.Duration) *ttlcache.Item[string, []pcommon.Map] + Key(attrs pcommon.Map) string + Start() + Stop() +} + +// ttlRollupCache is a wrapper for the ttlcache.Cache that implements the +// rollupCache interface. +type ttlRollupCache struct { + *ttlcache.Cache[string, []pcommon.Map] +} + +var _ rollupCache = (*ttlRollupCache)(nil) + +func (c *ttlRollupCache) Key(attrs pcommon.Map) string { + pairs := make([]string, 0, attrs.Len()) + attrs.Range(func(k string, v pcommon.Value) bool { + pairs = append(pairs, k+":"+v.AsString()) + return true + }) + sort.Strings(pairs) + return strings.Join(pairs, "|") +} + +// nopRollupCache used when the rollup cache is disabled. +type nopRollupCache struct { +} + +var _ rollupCache = (*nopRollupCache)(nil) + +func (c *nopRollupCache) Get(string, ...ttlcache.Option[string, []pcommon.Map]) *ttlcache.Item[string, []pcommon.Map] { + return nil +} + +func (c *nopRollupCache) Set(string, []pcommon.Map, time.Duration) *ttlcache.Item[string, []pcommon.Map] { + return nil +} + +func (c *nopRollupCache) Key(pcommon.Map) string { + return "" +} + +func (c *nopRollupCache) Start() { +} + +func (c *nopRollupCache) Stop() { +} + +func buildRollupCache(cacheSize int) rollupCache { + if cacheSize <= 0 { + return &nopRollupCache{} + } + return &ttlRollupCache{ + Cache: ttlcache.New[string, []pcommon.Map]( + ttlcache.WithCapacity[string, []pcommon.Map](uint64(cacheSize)), + ), + } +} diff --git a/processor/rollupprocessor/cache_test.go b/processor/rollupprocessor/cache_test.go new file mode 100644 index 0000000000..a41df067e8 --- /dev/null +++ b/processor/rollupprocessor/cache_test.go @@ -0,0 +1,61 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestNopCache(t *testing.T) { + cache := &nopRollupCache{} + key := cache.Key(pcommon.NewMap()) + assert.Equal(t, "", key) + assert.Nil(t, cache.Get(key)) + assert.Nil(t, cache.Set(key, nil, time.Millisecond)) +} + +func TestCacheKey(t *testing.T) { + testCases := []struct { + attrs map[string]any + want string + }{ + { + attrs: map[string]any{}, + want: "", + }, + { + attrs: map[string]any{ + "c": "v1", + "d": "v2", + "a": "v3", + "b": "v4", + }, + want: "a:v3|b:v4|c:v1|d:v2", + }, + { + attrs: map[string]any{ + "a": []any{"1", "3", "2"}, + "b": 1, + "c": 2.5, + "d": false, + }, + want: `a:["1","3","2"]|b:1|c:2.5|d:false`, + }, + } + for _, testCase := range testCases { + cache := buildRollupCache(1) + attrs := pcommon.NewMap() + require.NoError(t, attrs.FromRaw(testCase.attrs)) + assert.Equal(t, testCase.want, cache.Key(attrs)) + + // validate no-op cache + cache = buildRollupCache(0) + assert.Equal(t, "", cache.Key(attrs)) + } +} diff --git a/processor/rollupprocessor/config.go b/processor/rollupprocessor/config.go new file mode 100644 index 0000000000..089bb52a08 --- /dev/null +++ b/processor/rollupprocessor/config.go @@ -0,0 +1,19 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +type Config struct { + // AttributeGroups are the groups of attribute names that will be used + // to create rollup data points with. The number of distinct groups will + // match the number of duplicate data points that are created with those + // attributes. + AttributeGroups [][]string `mapstructure:"attribute_groups,omitempty"` + // DropOriginal is the names of metrics where the original data points should + // be dropped. This is used with the AttributeGroups to reduce the number of + // data points sent to the exporter. + DropOriginal []string `mapstructure:"drop_original,omitempty"` + // CacheSize is used to store built rollup attribute groups using the base + // attributes as keys. Can disable by setting <= 0. + CacheSize int `mapstructure:"cache_size"` +} diff --git a/processor/rollupprocessor/config_test.go b/processor/rollupprocessor/config_test.go new file mode 100644 index 0000000000..f1965a425c --- /dev/null +++ b/processor/rollupprocessor/config_test.go @@ -0,0 +1,53 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestLoadConfig(t *testing.T) { + testCases := []struct { + id component.ID + want component.Config + }{ + { + id: component.NewID(component.MustNewType(typeStr)), + want: NewFactory().CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(component.MustNewType(typeStr), "1"), + want: &Config{DropOriginal: []string{"MetricName"}, CacheSize: defaultCacheSize}, + }, + { + id: component.NewIDWithName(component.MustNewType(typeStr), "2"), + want: &Config{AttributeGroups: [][]string{{"Attr1"}, {"Attr1", "Attr2"}, {"Attr3"}, {}}, CacheSize: defaultCacheSize}, + }, + { + id: component.NewIDWithName(component.MustNewType(typeStr), "3"), + want: &Config{DropOriginal: []string{"MetricName"}, AttributeGroups: [][]string{{"Attr1", "Attr2"}}, CacheSize: 10}, + }, + { + id: component.NewIDWithName(component.MustNewType(typeStr), "4"), + want: &Config{CacheSize: -1}, + }, + } + for _, testCase := range testCases { + conf, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + cfg := NewFactory().CreateDefaultConfig() + sub, err := conf.Sub(testCase.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + assert.NoError(t, component.ValidateConfig(cfg)) + assert.Equal(t, testCase.want, cfg) + } +} diff --git a/processor/rollupprocessor/factory.go b/processor/rollupprocessor/factory.go new file mode 100644 index 0000000000..a2d8841773 --- /dev/null +++ b/processor/rollupprocessor/factory.go @@ -0,0 +1,60 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +const ( + typeStr = "rollup" + stability = component.StabilityLevelAlpha + + defaultCacheSize = 1000 +) + +var processorCapabilities = consumer.Capabilities{MutatesData: true} + +func NewFactory() processor.Factory { + return processor.NewFactory( + component.MustNewType(typeStr), + createDefaultConfig, + processor.WithMetrics(createMetricsProcessor, stability), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + CacheSize: defaultCacheSize, + } +} + +func createMetricsProcessor( + ctx context.Context, + set processor.CreateSettings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (processor.Metrics, error) { + pCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid configuration type: %T", cfg) + } + metricsProcessor := newProcessor(pCfg) + return processorhelper.NewMetricsProcessor( + ctx, + set, + cfg, + nextConsumer, + metricsProcessor.processMetrics, + processorhelper.WithStart(metricsProcessor.start), + processorhelper.WithShutdown(metricsProcessor.stop), + processorhelper.WithCapabilities(processorCapabilities), + ) +} diff --git a/processor/rollupprocessor/factory_test.go b/processor/rollupprocessor/factory_test.go new file mode 100644 index 0000000000..652efb00a3 --- /dev/null +++ b/processor/rollupprocessor/factory_test.go @@ -0,0 +1,42 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" +) + +func TestType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, component.MustNewType(typeStr), factory.Type()) +} + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.Equal(t, &Config{CacheSize: defaultCacheSize}, cfg) +} + +func TestCreateProcessor(t *testing.T) { + factory := NewFactory() + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), nil, consumertest.NewNop()) + assert.Error(t, err) + assert.Nil(t, mp) + + cfg := factory.CreateDefaultConfig().(*Config) + mp, err = factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NoError(t, err) + assert.NotNil(t, mp) + + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, mp.Shutdown(context.Background())) +} diff --git a/processor/rollupprocessor/processor.go b/processor/rollupprocessor/processor.go new file mode 100644 index 0000000000..2b05b4df0d --- /dev/null +++ b/processor/rollupprocessor/processor.go @@ -0,0 +1,196 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "context" + "sort" + + "github.com/jellydator/ttlcache/v3" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "golang.org/x/exp/maps" + + "github.com/aws/amazon-cloudwatch-agent/internal/metric" + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" +) + +type rollupProcessor struct { + attributeGroups [][]string + dropOriginal collections.Set[string] + cache rollupCache +} + +func newProcessor(cfg *Config) *rollupProcessor { + cacheSize := cfg.CacheSize + // use no-op cache if no attribute groups + if len(cfg.AttributeGroups) == 0 { + cacheSize = 0 + } + return &rollupProcessor{ + attributeGroups: uniqueGroups(cfg.AttributeGroups), + dropOriginal: collections.NewSet(cfg.DropOriginal...), + cache: buildRollupCache(cacheSize), + } +} + +func (p *rollupProcessor) start(context.Context, component.Host) error { + go p.cache.Start() + return nil +} + +func (p *rollupProcessor) stop(context.Context) error { + p.cache.Stop() + return nil +} + +func (p *rollupProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + if len(p.attributeGroups) > 0 || len(p.dropOriginal) > 0 { + metric.RangeMetrics(md, p.processMetric) + } + return md, nil +} + +func (p *rollupProcessor) processMetric(m pmetric.Metric) { + switch m.Type() { + case pmetric.MetricTypeGauge: + newDataPoints := pmetric.NewNumberDataPointSlice() + rollupDataPoints[pmetric.NumberDataPoint]( + p.cache, + p.attributeGroups, + p.dropOriginal, + m.Name(), + m.Gauge().DataPoints(), + newDataPoints, + ) + newDataPoints.CopyTo(m.Gauge().DataPoints()) + case pmetric.MetricTypeSum: + newDataPoints := pmetric.NewNumberDataPointSlice() + rollupDataPoints[pmetric.NumberDataPoint]( + p.cache, + p.attributeGroups, + p.dropOriginal, + m.Name(), + m.Sum().DataPoints(), + newDataPoints, + ) + newDataPoints.CopyTo(m.Sum().DataPoints()) + case pmetric.MetricTypeHistogram: + newDataPoints := pmetric.NewHistogramDataPointSlice() + rollupDataPoints[pmetric.HistogramDataPoint]( + p.cache, + p.attributeGroups, + p.dropOriginal, + m.Name(), + m.Histogram().DataPoints(), + newDataPoints, + ) + newDataPoints.CopyTo(m.Histogram().DataPoints()) + case pmetric.MetricTypeExponentialHistogram: + newDataPoints := pmetric.NewExponentialHistogramDataPointSlice() + rollupDataPoints[pmetric.ExponentialHistogramDataPoint]( + p.cache, + p.attributeGroups, + p.dropOriginal, + m.Name(), + m.ExponentialHistogram().DataPoints(), + newDataPoints, + ) + newDataPoints.CopyTo(m.ExponentialHistogram().DataPoints()) + case pmetric.MetricTypeSummary: + newDataPoints := pmetric.NewSummaryDataPointSlice() + rollupDataPoints[pmetric.SummaryDataPoint]( + p.cache, + p.attributeGroups, + p.dropOriginal, + m.Name(), + m.Summary().DataPoints(), + newDataPoints, + ) + newDataPoints.CopyTo(m.Summary().DataPoints()) + } +} + +// rollupDataPoints makes copies of the original data points for each rollup +// attribute group. If the metric name is in the drop original set, the original +// data points are dropped. +func rollupDataPoints[T metric.DataPoint[T]]( + cache rollupCache, + attributeGroups [][]string, + dropOriginal collections.Set[string], + metricName string, + orig metric.DataPoints[T], + dest metric.DataPoints[T], +) { + metric.RangeDataPoints(orig, func(origDataPoint T) { + if !dropOriginal.Contains(metricName) { + origDataPoint.CopyTo(dest.AppendEmpty()) + } + if len(attributeGroups) == 0 { + return + } + key := cache.Key(origDataPoint.Attributes()) + item := cache.Get(key) + var rollup []pcommon.Map + if item == nil { + rollup = buildRollup(attributeGroups, origDataPoint.Attributes()) + cache.Set(key, rollup, ttlcache.DefaultTTL) + } else { + rollup = item.Value() + } + for _, attrs := range rollup { + destDataPoint := dest.AppendEmpty() + origDataPoint.CopyTo(destDataPoint) + attrs.CopyTo(destDataPoint.Attributes()) + } + }) +} + +func buildRollup(attributeGroups [][]string, baseAttributes pcommon.Map) []pcommon.Map { + var results []pcommon.Map + for _, rollupGroup := range attributeGroups { + // skip if target dimensions count is same or more than the original metric. + // cannot have dimensions that do not exist in the original metric. + if len(rollupGroup) >= baseAttributes.Len() { + continue + } + attributes := pcommon.NewMap() + attributes.EnsureCapacity(len(rollupGroup)) + for _, key := range rollupGroup { + value, ok := baseAttributes.Get(key) + if !ok { + break + } + value.CopyTo(attributes.PutEmpty(key)) + } + if attributes.Len() == len(rollupGroup) { + results = append(results, attributes) + } + } + return results +} + +// uniqueGroups filters out duplicate attributes within the sets and filters +// duplicate sets. +func uniqueGroups(groups [][]string) [][]string { + if len(groups) == 0 { + return nil + } + var results [][]string + var uniqueSets []collections.Set[string] + for _, rollupGroup := range groups { + rollupSet := collections.NewSet(rollupGroup...) + isUnique := collections.Range(uniqueSets, func(u collections.Set[string]) bool { + return !rollupSet.Equal(u) + }) + if isUnique { + keys := maps.Keys(rollupSet) + sort.Strings(keys) + results = append(results, keys) + uniqueSets = append(uniqueSets, rollupSet) + } + } + return results +} diff --git a/processor/rollupprocessor/processor_test.go b/processor/rollupprocessor/processor_test.go new file mode 100644 index 0000000000..ec10cfef59 --- /dev/null +++ b/processor/rollupprocessor/processor_test.go @@ -0,0 +1,380 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/aws/amazon-cloudwatch-agent/internal/metric" +) + +func TestProcessor(t *testing.T) { + cfg := &Config{ + AttributeGroups: [][]string{ + {"d1", "d2"}, + {"d1", "d2", "d3"}, + // filtered out + {"d1", "d2", "d2"}, + {}, + }, + DropOriginal: []string{"drop-original"}, + CacheSize: 5, + } + testCases := map[string]struct { + cfg *Config + metricName string + metricType pmetric.MetricType + rawAttributes []map[string]any + wantAttributes []map[string]any + }{ + "Rollup/WithMoreAttributes": { + cfg: cfg, + metricName: "rollup", + metricType: pmetric.MetricTypeGauge, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + "drop": true, + }, + }, + wantAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + "drop": true, + }, + { + "d1": "v1", + "d2": "v2", + }, + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + }, + {}, + }, + }, + "DropOriginal/WithMoreAttributes": { + cfg: cfg, + metricName: "drop-original", + metricType: pmetric.MetricTypeGauge, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + "drop": true, + }, + }, + wantAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + }, + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + }, + {}, + }, + }, + "Rollup/WithSameAttributes": { + cfg: cfg, + metricName: "rollup", + metricType: pmetric.MetricTypeSum, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + }, + }, + wantAttributes: []map[string]any{ + // original attributes are always first + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + }, + { + "d1": "v1", + "d2": "v2", + }, + {}, + }, + }, + "DropOriginal/WithSameAttributes": { + cfg: cfg, + metricName: "drop-original", + metricType: pmetric.MetricTypeSum, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + "d3": "v3", + }, + }, + wantAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + }, + {}, + }, + }, + "Rollup/WithMissingAttributes": { + cfg: cfg, + metricName: "rollup", + metricType: pmetric.MetricTypeHistogram, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d3": "v3", + "d4": "v4", + }, + }, + wantAttributes: []map[string]any{ + { + "d1": "v1", + "d3": "v3", + "d4": "v4", + }, + {}, + }, + }, + "DropOriginal/WithMissingAttributes": { + cfg: cfg, + metricName: "drop-original", + metricType: pmetric.MetricTypeHistogram, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d3": "v3", + "d4": "v4", + }, + }, + wantAttributes: []map[string]any{ + {}, + }, + }, + "Rollup/WithLessAttributes": { + cfg: cfg, + metricName: "rollup", + metricType: pmetric.MetricTypeExponentialHistogram, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + }, + }, + wantAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + }, + {}, + }, + }, + "DropOriginal/WithLessAttributes": { + cfg: cfg, + metricName: "drop-original", + metricType: pmetric.MetricTypeExponentialHistogram, + rawAttributes: []map[string]any{ + { + "d1": "v1", + "d2": "v2", + }, + }, + wantAttributes: []map[string]any{ + {}, + }, + }, + "Rollup/WithMultipleDataPoints": { + cfg: cfg, + metricName: "rollup", + metricType: pmetric.MetricTypeSummary, + rawAttributes: []map[string]any{ + { + "d1": "1v1", + "d2": "1v2", + "d4": "1v4", + }, + { + "d1": "3v1", + "d2": "3v2", + "d3": "3v3", + "d4": "3v4", + }, + { + "d1": "1v1", + "d2": "1v2", + "d4": "1v4", + }, + }, + wantAttributes: []map[string]any{ + // datapoint 1 + { + "d1": "1v1", + "d2": "1v2", + "d4": "1v4", + }, + { + "d1": "1v1", + "d2": "1v2", + }, + {}, + // datapoint 2 + { + "d1": "3v1", + "d2": "3v2", + "d3": "3v3", + "d4": "3v4", + }, + { + "d1": "3v1", + "d2": "3v2", + }, + { + "d1": "3v1", + "d2": "3v2", + "d3": "3v3", + }, + {}, + // datapoint 3 + { + "d1": "1v1", + "d2": "1v2", + "d4": "1v4", + }, + { + "d1": "1v1", + "d2": "1v2", + }, + {}, + }, + }, + "DropOriginal/NoRollup": { + cfg: &Config{ + DropOriginal: []string{"drop-original"}, + }, + metricName: "drop-original", + metricType: pmetric.MetricTypeSummary, + rawAttributes: []map[string]any{ + { + "d1": "1v1", + "d2": "1v2", + }, + { + "d1": "2v1", + "d2": "2v2", + }, + { + "d1": "3v1", + "d2": "3v2", + "d3": "3v3", + }, + }, + wantAttributes: []map[string]any{}, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + p := newProcessor(testCase.cfg) + assert.NoError(t, p.start(context.Background(), componenttest.NewNopHost())) + defer assert.NoError(t, p.stop(context.Background())) + orig := pmetric.NewMetrics() + ms := orig.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + buildTestMetric(t, ms.AppendEmpty(), testCase.metricName, testCase.metricType, testCase.rawAttributes) + assert.Equal(t, len(testCase.rawAttributes), orig.DataPointCount()) + got, err := p.processMetrics(context.Background(), orig) + assert.NoError(t, err) + var gotAttributes []pcommon.Map + metric.RangeMetrics(got, func(m pmetric.Metric) { + validateMetric(t, m) + metric.RangeDataPointAttributes(m, func(attrs pcommon.Map) { + gotAttributes = append(gotAttributes, attrs) + }) + }) + require.Equal(t, len(testCase.wantAttributes), len(gotAttributes)) + for index, gotAttribute := range gotAttributes { + wantAttribute := testCase.wantAttributes[index] + assert.Equal(t, len(wantAttribute), gotAttribute.Len()) + assert.Truef(t, reflect.DeepEqual(gotAttribute.AsRaw(), wantAttribute), "want: %v, got: %v", wantAttribute, gotAttribute.AsRaw()) + } + }) + } +} + +func validateMetric(t *testing.T, m pmetric.Metric) { + t.Helper() + + switch m.Type() { + case pmetric.MetricTypeSum: + assert.True(t, m.Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, m.Sum().AggregationTemporality()) + case pmetric.MetricTypeHistogram: + assert.Equal(t, pmetric.AggregationTemporalityDelta, m.Histogram().AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + assert.Equal(t, pmetric.AggregationTemporalityCumulative, m.ExponentialHistogram().AggregationTemporality()) + } +} + +func buildTestMetric( + t *testing.T, + m pmetric.Metric, + name string, + metricType pmetric.MetricType, + rawAttributes []map[string]any, +) { + t.Helper() + + m.SetName(name) + switch metricType { + case pmetric.MetricTypeGauge: + m.SetEmptyGauge() + buildTestDataPoints[pmetric.NumberDataPoint](t, m.Gauge().DataPoints(), rawAttributes) + case pmetric.MetricTypeSum: + m.SetEmptySum() + m.Sum().SetIsMonotonic(true) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + buildTestDataPoints[pmetric.NumberDataPoint](t, m.Sum().DataPoints(), rawAttributes) + case pmetric.MetricTypeHistogram: + m.SetEmptyHistogram() + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + buildTestDataPoints[pmetric.HistogramDataPoint](t, m.Histogram().DataPoints(), rawAttributes) + case pmetric.MetricTypeExponentialHistogram: + m.SetEmptyExponentialHistogram() + m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + buildTestDataPoints[pmetric.ExponentialHistogramDataPoint](t, m.ExponentialHistogram().DataPoints(), rawAttributes) + case pmetric.MetricTypeSummary: + m.SetEmptySummary() + buildTestDataPoints[pmetric.SummaryDataPoint](t, m.Summary().DataPoints(), rawAttributes) + } +} + +func buildTestDataPoints[T metric.DataPoint[T]]( + t *testing.T, + dps metric.DataPoints[T], + rawAttributes []map[string]any, +) { + t.Helper() + + for _, rawAttribute := range rawAttributes { + dp := dps.AppendEmpty() + assert.NoError(t, dp.Attributes().FromRaw(rawAttribute)) + } +} diff --git a/processor/rollupprocessor/testdata/config.yaml b/processor/rollupprocessor/testdata/config.yaml new file mode 100644 index 0000000000..a241cd5b1b --- /dev/null +++ b/processor/rollupprocessor/testdata/config.yaml @@ -0,0 +1,20 @@ +rollup: +rollup/1: + drop_original: + - MetricName +rollup/2: + attribute_groups: + - - Attr1 + - - Attr1 + - Attr2 + - - Attr3 + - [] +rollup/3: + attribute_groups: + - - Attr1 + - Attr2 + drop_original: + - MetricName + cache_size: 10 +rollup/4: + cache_size: -1 diff --git a/service/defaultcomponents/components.go b/service/defaultcomponents/components.go index 3020161b22..0645b40bc6 100644 --- a/service/defaultcomponents/components.go +++ b/service/defaultcomponents/components.go @@ -7,6 +7,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver" @@ -56,6 +57,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/gpuattributes" + "github.com/aws/amazon-cloudwatch-agent/processor/rollupprocessor" ) func Factories() (otelcol.Factories, error) { @@ -97,6 +99,7 @@ func Factories() (otelcol.Factories, error) { probabilisticsamplerprocessor.NewFactory(), resourceprocessor.NewFactory(), resourcedetectionprocessor.NewFactory(), + rollupprocessor.NewFactory(), spanprocessor.NewFactory(), tailsamplingprocessor.NewFactory(), transformprocessor.NewFactory(), @@ -110,6 +113,7 @@ func Factories() (otelcol.Factories, error) { awsxrayexporter.NewFactory(), cloudwatch.NewFactory(), debugexporter.NewFactory(), + prometheusremotewriteexporter.NewFactory(), ); err != nil { return otelcol.Factories{}, err } diff --git a/service/defaultcomponents/components_test.go b/service/defaultcomponents/components_test.go index f337ddc20d..68fd233249 100644 --- a/service/defaultcomponents/components_test.go +++ b/service/defaultcomponents/components_test.go @@ -53,6 +53,7 @@ func TestComponents(t *testing.T) { "metricstransform", "resourcedetection", "resource", + "rollup", "probabilistic_sampler", "span", "tail_sampling", @@ -70,6 +71,7 @@ func TestComponents(t *testing.T) { "awscloudwatch", "awsxray", "debug", + "prometheusremotewrite", } gotExporters := collections.MapSlice(maps.Keys(factories.Exporters), component.Type.String) assert.Equal(t, len(wantExporters), len(gotExporters)) diff --git a/tool/testutil/testutil.go b/tool/testutil/testutil.go index 0286cd6271..f26e838048 100644 --- a/tool/testutil/testutil.go +++ b/tool/testutil/testutil.go @@ -5,6 +5,7 @@ package testutil import ( "fmt" + "testing" "github.com/aws/amazon-cloudwatch-agent/tool/stdin" ) @@ -20,6 +21,12 @@ func SetUpTestInputStream() chan<- string { return inputChan } +func SetPrometheusRemoteWriteTestingEnv(t *testing.T) { + t.Setenv("AWS_ACCESS_KEY_ID", "amazing_access_key") + t.Setenv("AWS_SECRET_ACCESS_KEY", "super_secret_key") + t.Setenv("AWS_REGION", "us-east-1") +} + func Type(inputChan chan<- string, inputString ...string) { go func() { for _, s := range inputString { diff --git a/translator/config/sampleSchema/invalidMetricsDestinations.json b/translator/config/sampleSchema/invalidMetricsDestinations.json new file mode 100644 index 0000000000..3f63d6211f --- /dev/null +++ b/translator/config/sampleSchema/invalidMetricsDestinations.json @@ -0,0 +1,18 @@ +{ + "metrics": { + "metrics_collected": { + "cpu": { + "resources": [ + "*" + ], + "measurement": [ + "cpu_usage_guest" + ] + } + }, + "metrics_destinations": { + "amp": { + } + } + } +} \ No newline at end of file diff --git a/translator/config/sampleSchema/validLinuxMetrics.json b/translator/config/sampleSchema/validLinuxMetrics.json index fcb03d82ea..ec9275889a 100644 --- a/translator/config/sampleSchema/validLinuxMetrics.json +++ b/translator/config/sampleSchema/validLinuxMetrics.json @@ -103,6 +103,9 @@ "allowed_pending_messages": 10000 } }, + "metrics_destinations": { + "cloudwatch": {} + }, "append_dimensions": { "ImageId": "${aws:ImageId}", "InstanceId": "${aws:InstanceId}", diff --git a/translator/config/sampleSchema/validMetricsDestinations.json b/translator/config/sampleSchema/validMetricsDestinations.json new file mode 100644 index 0000000000..ba9e215068 --- /dev/null +++ b/translator/config/sampleSchema/validMetricsDestinations.json @@ -0,0 +1,19 @@ +{ + "metrics": { + "metrics_collected": { + "cpu": { + "resources": [ + "*" + ], + "measurement": [ + "cpu_usage_guest" + ] + } + }, + "metrics_destinations": { + "amp": { + "workspace_id": "ws-12345" + } + } + } +} \ No newline at end of file diff --git a/translator/config/schema.json b/translator/config/schema.json index 46961a5a48..082d5777fc 100644 --- a/translator/config/schema.json +++ b/translator/config/schema.json @@ -93,6 +93,18 @@ "maxLength": 1024 } }, + "metrics_destinations": { + "type": "object", + "properties": { + "cloudwatch": { + }, + "amp": { + "$ref": "#/definitions/metricsDefinition/definitions/ampDefinition" + } + }, + "minProperties": 1, + "additionalProperties": false + }, "metrics_collected": { "type": "object", "properties": { @@ -356,6 +368,20 @@ }, "additionalProperties": false }, + "ampDefinition": { + "type": "object", + "properties": { + "workspace_id": { + "type": "string", + "minLength": 1, + "maxLength": 255 + } + }, + "required": [ + "workspace_id" + ], + "additionalProperties": false + }, "swapDefinitions": { "$ref": "#/definitions/metricsDefinition/definitions/basicMetricDefinition" }, diff --git a/translator/tocwconfig/sampleConfig/amp_config_linux.conf b/translator/tocwconfig/sampleConfig/amp_config_linux.conf new file mode 100644 index 0000000000..2791c82f7a --- /dev/null +++ b/translator/tocwconfig/sampleConfig/amp_config_linux.conf @@ -0,0 +1,33 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "" + interval = "60s" + logfile = "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = false + precision = "" + quiet = false + round_interval = false + +[inputs] + + [[inputs.cpu]] + collect_cpu_time = true + fieldpass = ["usage_idle", "usage_nice", "usage_guest", "time_active", "usage_active"] + interval = "10s" + percpu = true + report_active = true + totalcpu = false + [inputs.cpu.tags] + "aws:StorageResolution" = "true" + d1 = "foo" + d2 = "bar" + +[outputs] + + [[outputs.cloudwatch]] \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/amp_config_linux.json b/translator/tocwconfig/sampleConfig/amp_config_linux.json new file mode 100755 index 0000000000..9fd3cd6ef6 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/amp_config_linux.json @@ -0,0 +1,64 @@ +{ + "agent": { + "region": "us-west-2" + }, + "metrics": { + "metrics_destinations": { + "amp": { + "workspace_id": "ws-12345" + }, + "cloudwatch": { + } + }, + "metrics_collected": { + "cpu": { + "resources": [ + "*" + ], + "drop_original_metrics": [ + "cpu_usage_idle", + "time_active" + ], + "measurement": [ + { + "name": "cpu_usage_idle", + "rename": "CPU_USAGE_IDLE", + "unit": "unit" + }, + { + "name": "cpu_usage_nice", + "unit": "unit" + }, + "cpu_usage_guest", + "time_active", + "usage_active" + ], + "totalcpu": false, + "metrics_collection_interval": 10, + "append_dimensions": { + "d1": "foo", + "d2": "bar" + } + } + }, + "append_dimensions": { + "ImageId": "${aws:ImageId}", + "InstanceId": "${aws:InstanceId}", + "InstanceType": "${aws:InstanceType}", + "AutoScalingGroupName": "${aws:AutoScalingGroupName}" + }, + "aggregation_dimensions": [ + [ + "ImageId" + ], + [ + "InstanceId", + "InstanceType" + ], + [ + "d1" + ], + [] + ] + } +} \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/amp_config_linux.yaml b/translator/tocwconfig/sampleConfig/amp_config_linux.yaml new file mode 100644 index 0000000000..8c273046a9 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/amp_config_linux.yaml @@ -0,0 +1,160 @@ +exporters: + awscloudwatch: + drop_original_metrics: + CPU_USAGE_IDLE: true + cpu_time_active: true + force_flush_interval: 1m0s + max_datums_per_call: 1000 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent + region: us-west-2 + resource_to_telemetry_conversion: + enabled: true + rollup_dimensions: + - - ImageId + - - InstanceId + - InstanceType + - - d1 + - [ ] + prometheusremotewrite/amp: + add_metric_suffixes: true + auth: + authenticator: sigv4auth + compression: "" + disable_keep_alives: false + endpoint: https://aps-workspaces.us-west-2.amazonaws.com/workspaces/ws-12345/api/v1/remote_write + export_created_metric: + enabled: false + http2_ping_timeout: 0s + http2_read_idle_timeout: 0s + max_batch_size_bytes: 3000000 + namespace: "" + proxy_url: "" + read_buffer_size: 0 + remote_write_queue: + enabled: true + num_consumers: 5 + queue_size: 10000 + resource_to_telemetry_conversion: + clear_after_copy: true + enabled: true + retry_on_failure: + enabled: true + initial_interval: 50ms + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 30s + max_elapsed_time: 5m0s + send_metadata: false + target_info: + enabled: true + timeout: 5s + tls: + ca_file: "" + cert_file: "" + include_system_ca_certs_pool: false + insecure: false + insecure_skip_verify: false + key_file: "" + max_version: "" + min_version: "" + reload_interval: 0s + server_name_override: "" + write_buffer_size: 524288 +extensions: + agenthealth/metrics: + is_usage_data_enabled: true + stats: + operations: + - PutMetricData + usage_flags: + mode: EC2 + region_type: ACJ + sigv4auth: + assume_role: + sts_region: us-west-2 + region: us-west-2 +processors: + batch/host/amp: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 1m0s + ec2tagger: + ec2_instance_tag_keys: + - AutoScalingGroupName + ec2_metadata_tags: + - InstanceType + - ImageId + - InstanceId + imds_retries: 1 + refresh_interval_seconds: 0s + rollup: + attribute_groups: + - - ImageId + - - InstanceId + - InstanceType + - - d1 + - [ ] + cache_size: 1000 + drop_original: + - CPU_USAGE_IDLE + - cpu_time_active + transform: + error_mode: propagate + flatten_data: false + log_statements: [ ] + metric_statements: + - context: metric + statements: + - set(unit, "unit") where name == "cpu_usage_idle" + - set(name, "CPU_USAGE_IDLE") where name == "cpu_usage_idle" + - set(unit, "unit") where name == "cpu_usage_nice" + trace_statements: [ ] +receivers: + telegraf_cpu: + collection_interval: 10s + initial_delay: 1s + timeout: 0s +service: + extensions: + - agenthealth/metrics + - sigv4auth + pipelines: + metrics/host: + exporters: + - awscloudwatch + processors: + - ec2tagger + - transform + receivers: + - telegraf_cpu + metrics/host/amp: + exporters: + - prometheusremotewrite/amp + processors: + - ec2tagger + - transform + - rollup + - batch/host/amp + receivers: + - telegraf_cpu + telemetry: + logs: + development: false + disable_caller: false + disable_stacktrace: false + encoding: console + level: info + output_paths: + - /opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + address: "" + level: None + traces: { } diff --git a/translator/tocwconfig/sampleConfig/complete_linux_config.json b/translator/tocwconfig/sampleConfig/complete_linux_config.json index 1b7260fbc7..c2fb70e151 100755 --- a/translator/tocwconfig/sampleConfig/complete_linux_config.json +++ b/translator/tocwconfig/sampleConfig/complete_linux_config.json @@ -14,6 +14,10 @@ "omit_hostname": true }, "metrics": { + "metrics_destinations": { + "cloudwatch": { + } + }, "metrics_collected": { "jmx": [ { diff --git a/translator/tocwconfig/sampleConfig/jmx_config_linux.json b/translator/tocwconfig/sampleConfig/jmx_config_linux.json index 4f6138476c..60bf43c632 100644 --- a/translator/tocwconfig/sampleConfig/jmx_config_linux.json +++ b/translator/tocwconfig/sampleConfig/jmx_config_linux.json @@ -3,6 +3,12 @@ "omit_hostname": true }, "metrics": { + "metrics_destinations": { + "amp": { + "workspace_id": "ws-12345" + }, + "cloudwatch": {} + }, "metrics_collected": { "jmx": { "endpoint": "localhost:8080", diff --git a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml index 31e88dc697..b91a54d180 100644 --- a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml @@ -11,6 +11,51 @@ exporters: region: us-west-2 resource_to_telemetry_conversion: enabled: true + prometheusremotewrite/amp: + add_metric_suffixes: true + auth: + authenticator: sigv4auth + compression: "" + disable_keep_alives: false + endpoint: https://aps-workspaces.us-west-2.amazonaws.com/workspaces/ws-12345/api/v1/remote_write + export_created_metric: + enabled: false + http2_ping_timeout: 0s + http2_read_idle_timeout: 0s + max_batch_size_bytes: 3000000 + namespace: "" + proxy_url: "" + read_buffer_size: 0 + remote_write_queue: + enabled: true + num_consumers: 5 + queue_size: 10000 + resource_to_telemetry_conversion: + clear_after_copy: true + enabled: true + retry_on_failure: + enabled: true + initial_interval: 50ms + max_elapsed_time: 5m0s + max_interval: 30s + multiplier: 1.5 + randomization_factor: 0.5 + send_metadata: false + target_info: + enabled: true + timeout: 5s + tls: + ca_file: "" + cert_file: "" + include_system_ca_certs_pool: false + insecure: false + insecure_skip_verify: false + key_file: "" + max_version: "" + min_version: "" + reload_interval: 0s + server_name_override: "" + write_buffer_size: 524288 extensions: agenthealth/metrics: is_usage_data_enabled: true @@ -20,7 +65,21 @@ extensions: usage_flags: mode: EC2 region_type: ACJ + sigv4auth: + assume_role: + sts_region: us-west-2 + region: us-west-2 processors: + batch/host/amp: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 1m0s + batch/jmx: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 1m0s cumulativetodelta/jmx: exclude: match_type: "" @@ -76,9 +135,9 @@ processors: metric_statements: - context: metric statements: + - set(name, "kafka.fetch-rate") where name == "kafka.consumer.fetch-rate" - set(unit, "unit") where name == "jvm.memory.heap.used" - set(name, "JVM_MEM_HEAP_USED") where name == "jvm.memory.heap.used" - - set(name, "kafka.fetch-rate") where name == "kafka.consumer.fetch-rate" trace_statements: [] receivers: jmx: @@ -100,6 +159,7 @@ receivers: service: extensions: - agenthealth/metrics + - sigv4auth pipelines: metrics/host: exporters: @@ -109,13 +169,24 @@ service: receivers: - telegraf_cpu - telegraf_disk + metrics/host/amp: + exporters: + - prometheusremotewrite/amp + processors: + - transform + - batch/host/amp + receivers: + - telegraf_cpu + - telegraf_disk metrics/jmx: exporters: - awscloudwatch + - prometheusremotewrite/amp processors: - filter/jmx - resource/jmx - cumulativetodelta/jmx + - batch/jmx - transform/jmx receivers: - jmx diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index e5b539f032..95ad5a79d2 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -706,6 +706,8 @@ func verifyToYamlTranslation(t *testing.T, input interface{}, expectedYamlFilePa yamlStr := toyamlconfig.ToYamlConfig(yamlConfig) require.NoError(t, yaml.Unmarshal([]byte(yamlStr), &actual)) + //assert.NoError(t, os.WriteFile(expectedYamlFilePath, []byte(yamlStr), 0644)) // useful for regenerating YAML + opt := cmpopts.SortSlices(func(x, y interface{}) bool { return pretty.Sprint(x) < pretty.Sprint(y) }) diff --git a/translator/tocwconfig/tocwconfig_unix_test.go b/translator/tocwconfig/tocwconfig_unix_test.go index 4aa074534c..a4ab547371 100644 --- a/translator/tocwconfig/tocwconfig_unix_test.go +++ b/translator/tocwconfig/tocwconfig_unix_test.go @@ -9,6 +9,7 @@ package tocwconfig import ( "testing" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" ) @@ -16,6 +17,7 @@ import ( func TestCompleteConfigUnix(t *testing.T) { resetContext(t) t.Setenv("JMX_JAR_PATH", "../../packaging/opentelemetry-jmx-metrics.jar") + testutil.SetPrometheusRemoteWriteTestingEnv(t) context.CurrentContext().SetMode(config.ModeEC2) expectedEnvVars := map[string]string{ "CWAGENT_USER_AGENT": "CUSTOM USER AGENT VALUE", @@ -29,8 +31,19 @@ func TestCompleteConfigUnix(t *testing.T) { checkTranslation(t, "complete_darwin_config", "darwin", nil, "") } +func TestAMPConfig(t *testing.T) { + resetContext(t) + context.CurrentContext().SetMode(config.ModeEC2) + testutil.SetPrometheusRemoteWriteTestingEnv(t) + expectedEnvVars := map[string]string{} + checkTranslation(t, "amp_config_linux", "linux", expectedEnvVars, "") + checkTranslation(t, "amp_config_linux", "darwin", nil, "") +} + func TestJMXConfigLinux(t *testing.T) { resetContext(t) + context.CurrentContext().SetMode(config.ModeEC2) + testutil.SetPrometheusRemoteWriteTestingEnv(t) t.Setenv("JMX_JAR_PATH", "../../packaging/opentelemetry-jmx-metrics.jar") context.CurrentContext().SetMode(config.ModeEC2) expectedEnvVars := map[string]string{} diff --git a/translator/translate/metrics/config/registered_metrics.go b/translator/translate/metrics/config/registered_metrics.go index 171d78b030..e7bbafcf57 100644 --- a/translator/translate/metrics/config/registered_metrics.go +++ b/translator/translate/metrics/config/registered_metrics.go @@ -3,8 +3,6 @@ package config -import "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" - // This served as the allowlisted metric name, which is registered under the plugin name // Note: the registered metric name don't have plugin name as prefix var Registered_Metrics_Linux = map[string][]string{ @@ -54,9 +52,9 @@ var Registered_Metrics_Windows = map[string][]string{ } var DisableWinPerfCounters = map[string]bool{ - "statsd": true, - "procstat": true, - "nvidia_smi": true, - common.JmxKey: true, - "otlp": true, + "statsd": true, + "procstat": true, + "nvidia_smi": true, + "jmx": true, + "otlp": true, } diff --git a/translator/translate/otel/common/common.go b/translator/translate/otel/common/common.go index 14350b2022..51b046c3f1 100644 --- a/translator/translate/otel/common/common.go +++ b/translator/translate/otel/common/common.go @@ -25,9 +25,13 @@ const ( MetricsCollectedKey = "metrics_collected" LogsCollectedKey = "logs_collected" TracesCollectedKey = "traces_collected" + MetricsDestinationsKey = "metrics_destinations" ECSKey = "ecs" KubernetesKey = "kubernetes" + CloudWatchKey = "cloudwatch" PrometheusKey = "prometheus" + AMPKey = "amp" + WorkspaceIDKey = "workspace_id" EMFProcessorKey = "emf_processor" DisableMetricExtraction = "disable_metric_extraction" XrayKey = "xray" @@ -42,7 +46,9 @@ const ( LocalModeKey = "local_mode" CredentialsKey = "credentials" RoleARNKey = "role_arn" + SigV4Auth = "sigv4auth" MetricsCollectionIntervalKey = "metrics_collection_interval" + AggregationDimensionsKey = "aggregation_dimensions" MeasurementKey = "measurement" DropOriginalMetricsKey = "drop_original_metrics" ForceFlushIntervalKey = "force_flush_interval" @@ -91,7 +97,8 @@ var ( JmxConfigKey = ConfigKey(MetricsKey, MetricsCollectedKey, JmxKey) JmxTargets = []string{"activemq", "cassandra", "hbase", "hadoop", "jetty", "jvm", "kafka", "kafka-consumer", "kafka-producer", "solr", "tomcat", "wildfly"} - AgentDebugConfigKey = ConfigKey(AgentKey, DebugKey) + AgentDebugConfigKey = ConfigKey(AgentKey, DebugKey) + MetricsAggregationDimensionsKey = ConfigKey(MetricsKey, AggregationDimensionsKey) ) // Translator is used to translate the JSON config into an @@ -136,6 +143,9 @@ func (t translatorMap[C]) Set(translator Translator[C]) { func (t translatorMap[C]) Get(id component.ID) (Translator[C], bool) { element, ok := t.lookup[id] + if !ok { + return nil, ok + } return element.Value.(Translator[C]), ok } diff --git a/translator/translate/otel/common/metrics.go b/translator/translate/otel/common/metrics.go new file mode 100644 index 0000000000..ecad28c77d --- /dev/null +++ b/translator/translate/otel/common/metrics.go @@ -0,0 +1,177 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package common + +import ( + "strings" + + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/internal/metric" + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/config" +) + +const ( + dropOriginalWildcard = "*" +) + +// Map to support dropping metrics without measurement. +var toDropMap = collections.NewSet("collectd", "statsd", "ethtool") + +func GetRollupDimensions(conf *confmap.Conf) [][]string { + key := ConfigKey(MetricsKey, AggregationDimensionsKey) + value := conf.Get(key) + if value == nil { + return nil + } + aggregates, ok := value.([]interface{}) + if !ok || !isValidRollupList(aggregates) { + return nil + } + rollup := make([][]string, len(aggregates)) + for i, aggregate := range aggregates { + dimensions := aggregate.([]interface{}) + rollup[i] = make([]string, len(dimensions)) + for j, dimension := range dimensions { + rollup[i][j] = dimension.(string) + } + } + return rollup +} + +// isValidRollupList confirms whether the supplied aggregate_dimension is a valid type ([][]string) +func isValidRollupList(aggregates []interface{}) bool { + if len(aggregates) == 0 { + return false + } + for _, aggregate := range aggregates { + if dimensions, ok := aggregate.([]interface{}); ok { + if len(dimensions) != 0 { + for _, dimension := range dimensions { + if _, ok := dimension.(string); !ok { + return false + } + } + } + } else { + return false + } + } + + return true +} + +func GetDropOriginalMetrics(conf *confmap.Conf) map[string]bool { + key := ConfigKey(MetricsKey, MetricsCollectedKey) + value := conf.Get(key) + if value == nil { + return nil + } + categories := value.(map[string]interface{}) + dropOriginalMetrics := make(map[string]bool) + for category := range categories { + realCategoryName := config.GetRealPluginName(category) + measurementCfgKey := ConfigKey(key, category, MeasurementKey) + dropOriginalCfgKey := ConfigKey(key, category, DropOriginalMetricsKey) + /* Drop original metrics does not support procstat since procstat can monitor multiple process + "procstat": [ + { + "exe": "W3SVC", + "measurement": [ + "pid_count" + ] + }, + { + "exe": "IISADMIN", + "measurement": [ + "pid_count" + ] + }] + Therefore, dropping the original metrics can conflict between these two processes (e.g customers can drop pid_count with the first + process but not the second process) + */ + if dropMetrics := GetArray[any](conf, dropOriginalCfgKey); dropMetrics != nil { + for _, dropMetric := range dropMetrics { + if _, in := toDropMap[category]; in { + dropMetricStr, ok := dropMetric.(string) + if ok { + dropOriginalMetrics[dropMetricStr] = true + } + continue + } + + measurements := GetArray[any](conf, measurementCfgKey) + if measurements == nil { + continue + } + + dropMetricStr, ok := dropMetric.(string) + if !ok { + continue + } + + if !strings.Contains(dropMetricStr, category) && dropMetricStr != dropOriginalWildcard { + dropMetricStr = metric.DecorateMetricName(realCategoryName, dropMetricStr) + } + isMetricDecoration := false + for _, measurement := range measurements { + switch val := measurement.(type) { + /* + "disk": { + "measurement": [ + { + "name": "free", + "rename": "DISK_FREE", + "unit": "unit" + } + ] + } + */ + case map[string]interface{}: + metricName, ok := val["name"].(string) + if !ok { + continue + } + if !strings.Contains(metricName, category) { + metricName = metric.DecorateMetricName(realCategoryName, metricName) + } + // If customers provides drop_original_metrics with a wildcard (*), adding the renamed metric or add the original metric + // if customers only re-unit the metric + if strings.Contains(dropMetricStr, metricName) || dropMetricStr == dropOriginalWildcard { + isMetricDecoration = true + if newMetricName, ok := val["rename"].(string); ok { + dropOriginalMetrics[newMetricName] = true + } else { + dropOriginalMetrics[metricName] = true + } + } + + /* + "measurement": ["free"] + */ + case string: + if dropMetricStr != dropOriginalWildcard { + continue + } + metricName := val + if !strings.Contains(metricName, category) { + metricName = metric.DecorateMetricName(realCategoryName, metricName) + } + + dropOriginalMetrics[metricName] = true + default: + continue + } + } + + if !isMetricDecoration && dropMetricStr != dropOriginalWildcard { + dropOriginalMetrics[dropMetricStr] = true + } + + } + } + } + return dropOriginalMetrics +} diff --git a/translator/translate/otel/common/metrics_test.go b/translator/translate/otel/common/metrics_test.go new file mode 100644 index 0000000000..228d328043 --- /dev/null +++ b/translator/translate/otel/common/metrics_test.go @@ -0,0 +1,33 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package common + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/internal/metric" + "github.com/aws/amazon-cloudwatch-agent/internal/util/testutil" +) + +func TestGetRollupDimensions(t *testing.T) { + jsonCfg := testutil.GetJson(t, filepath.Join("testdata", "config.json")) + conf := confmap.NewFromStringMap(jsonCfg) + assert.Equal(t, [][]string{{"ImageId"}, {"InstanceId", "InstanceType"}, {"d1"}, {}}, GetRollupDimensions(conf)) +} + +func TestGetDropOriginalMetrics(t *testing.T) { + jsonCfg := testutil.GetJson(t, filepath.Join("testdata", "config.json")) + conf := confmap.NewFromStringMap(jsonCfg) + assert.Equal(t, map[string]bool{ + "CPU_USAGE_IDLE": true, + "collectd_drop": true, + metric.DecorateMetricName("cpu", "time_active"): true, + "statsd_drop": true, + "tx_packets": true, + }, GetDropOriginalMetrics(conf)) +} diff --git a/translator/translate/otel/exporter/awscloudwatch/testdata/config.json b/translator/translate/otel/common/testdata/config.json similarity index 100% rename from translator/translate/otel/exporter/awscloudwatch/testdata/config.json rename to translator/translate/otel/common/testdata/config.json diff --git a/translator/translate/otel/exporter/awscloudwatch/translator.go b/translator/translate/otel/exporter/awscloudwatch/translator.go index 736b2aeffd..6ac2cd4e58 100644 --- a/translator/translate/otel/exporter/awscloudwatch/translator.go +++ b/translator/translate/otel/exporter/awscloudwatch/translator.go @@ -4,17 +4,12 @@ package awscloudwatch import ( - "strings" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter" - "github.com/aws/amazon-cloudwatch-agent/internal/metric" "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatch" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" - "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/config" - "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/rollup_dimensions" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" ) @@ -34,13 +29,6 @@ type translator struct { var _ common.Translator[component.Config] = (*translator)(nil) -// Map to support dropping metrics without measurement. -var toDropMap = map[string]struct{}{ - "collectd": {}, - "statsd": {}, - "ethtool": {}, -} - func NewTranslator() common.Translator[component.Config] { return NewTranslatorWithName("") } @@ -77,10 +65,10 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { if agent.Global_Config.Internal { cfg.MaxValuesPerDatum = internalMaxValuesPerDatum } - if rollupDimensions := getRollupDimensions(conf); rollupDimensions != nil { + if rollupDimensions := common.GetRollupDimensions(conf); rollupDimensions != nil { cfg.RollupDimensions = rollupDimensions } - if dropOriginalMetrics := getDropOriginalMetrics(conf); len(dropOriginalMetrics) != 0 { + if dropOriginalMetrics := common.GetDropOriginalMetrics(conf); len(dropOriginalMetrics) != 0 { cfg.DropOriginalConfigs = dropOriginalMetrics } cfg.MiddlewareID = &agenthealth.MetricsID @@ -95,160 +83,3 @@ func getRoleARN(conf *confmap.Conf) string { } return roleARN } - -// TODO: remove dependency on rule. -func getRollupDimensions(conf *confmap.Conf) [][]string { - key := common.ConfigKey(common.MetricsKey, rollup_dimensions.SectionKey) - value := conf.Get(key) - if value == nil { - return nil - } - aggregates, ok := value.([]interface{}) - if !ok || !isValidRollupList(aggregates) { - return nil - } - rollup := make([][]string, len(aggregates)) - for i, aggregate := range aggregates { - dimensions := aggregate.([]interface{}) - rollup[i] = make([]string, len(dimensions)) - for j, dimension := range dimensions { - rollup[i][j] = dimension.(string) - } - } - return rollup -} - -// isValidRollupList confirms whether the supplied aggregate_dimension is a valid type ([][]string) -func isValidRollupList(aggregates []interface{}) bool { - if len(aggregates) == 0 { - return false - } - for _, aggregate := range aggregates { - if dimensions, ok := aggregate.([]interface{}); ok { - if len(dimensions) != 0 { - for _, dimension := range dimensions { - if _, ok := dimension.(string); !ok { - return false - } - } - } - } else { - return false - } - } - - return true -} - -func getDropOriginalMetrics(conf *confmap.Conf) map[string]bool { - key := common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey) - value := conf.Get(key) - if value == nil { - return nil - } - categories := value.(map[string]interface{}) - dropOriginalMetrics := make(map[string]bool) - for category := range categories { - realCategoryName := config.GetRealPluginName(category) - measurementCfgKey := common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey, category, common.MeasurementKey) - dropOriginalCfgKey := common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey, category, common.DropOriginalMetricsKey) - /* Drop original metrics does not support procstat since procstat can monitor multiple process - "procstat": [ - { - "exe": "W3SVC", - "measurement": [ - "pid_count" - ] - }, - { - "exe": "IISADMIN", - "measurement": [ - "pid_count" - ] - }] - Therefore, dropping the original metrics can conflict between these two processes (e.g customers can drop pid_count with the first - process but not the second process) - */ - if dropMetrics := common.GetArray[any](conf, dropOriginalCfgKey); dropMetrics != nil { - for _, dropMetric := range dropMetrics { - if _, in := toDropMap[category]; in { - dropMetric, ok := dropMetric.(string) - if ok { - dropOriginalMetrics[dropMetric] = true - } - continue - } - - measurements := common.GetArray[any](conf, measurementCfgKey) - if measurements == nil { - continue - } - - dropMetric, ok := dropMetric.(string) - if !ok { - continue - } - - if !strings.Contains(dropMetric, category) && dropMetric != dropOriginalWildcard { - dropMetric = metric.DecorateMetricName(realCategoryName, dropMetric) - } - isMetricDecoration := false - for _, measurement := range measurements { - switch val := measurement.(type) { - /* - "disk": { - "measurement": [ - { - "name": "free", - "rename": "DISK_FREE", - "unit": "unit" - } - ] - } - */ - case map[string]interface{}: - metricName, ok := val["name"].(string) - if !ok { - continue - } - if !strings.Contains(metricName, category) { - metricName = metric.DecorateMetricName(realCategoryName, metricName) - } - // If customers provides drop_original_metrics with a wildcard (*), adding the renamed metric or add the original metric - // if customers only re-unit the metric - if strings.Contains(dropMetric, metricName) || dropMetric == dropOriginalWildcard { - isMetricDecoration = true - if newMetricName, ok := val["rename"].(string); ok { - dropOriginalMetrics[newMetricName] = true - } else { - dropOriginalMetrics[metricName] = true - } - } - - /* - "measurement": ["free"] - */ - case string: - if dropMetric != dropOriginalWildcard { - continue - } - metricName := val - if !strings.Contains(metricName, category) { - metricName = metric.DecorateMetricName(realCategoryName, metricName) - } - - dropOriginalMetrics[metricName] = true - default: - continue - } - } - - if !isMetricDecoration && dropMetric != dropOriginalWildcard { - dropOriginalMetrics[dropMetric] = true - } - - } - } - } - return dropOriginalMetrics -} diff --git a/translator/translate/otel/exporter/awscloudwatch/translator_test.go b/translator/translate/otel/exporter/awscloudwatch/translator_test.go index a4df903f47..aa38ad231a 100644 --- a/translator/translate/otel/exporter/awscloudwatch/translator_test.go +++ b/translator/translate/otel/exporter/awscloudwatch/translator_test.go @@ -4,8 +4,6 @@ package awscloudwatch import ( - "encoding/json" - "os" "path/filepath" "runtime" "testing" @@ -15,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/confmap" + "github.com/aws/amazon-cloudwatch-agent/internal/util/testutil" "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatch" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" @@ -106,7 +105,7 @@ func TestTranslator(t *testing.T) { }, }, "WithInternal": { - input: getJson(t, filepath.Join("testdata", "config.json")), + input: testutil.GetJson(t, filepath.Join("..", "..", "common", "testdata", "config.json")), internal: true, want: &cloudwatch.Config{ Namespace: "namespace", @@ -176,13 +175,3 @@ func TestTranslator(t *testing.T) { }) } } - -func getJson(t *testing.T, path string) map[string]interface{} { - t.Helper() - - content, err := os.ReadFile(path) - require.NoError(t, err) - var result map[string]interface{} - require.NoError(t, json.Unmarshal(content, &result)) - return result -} diff --git a/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.json b/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.json new file mode 100644 index 0000000000..54e706bbee --- /dev/null +++ b/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.json @@ -0,0 +1,13 @@ +{ + "agent": { + "debug": true, + "region": "us-east-1" + }, + "metrics": { + "metrics_destinations": { + "amp": { + "workspace_id": "ws-12345" + } + } + } +} \ No newline at end of file diff --git a/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.yaml b/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.yaml new file mode 100644 index 0000000000..f914de8958 --- /dev/null +++ b/translator/translate/otel/exporter/prometheusremotewrite/testdata/config.yaml @@ -0,0 +1,27 @@ +auth: + authenticator: sigv4auth +resource_to_telemetry_conversion: + clear_after_copy: true + enabled: true +timeout: 5000000000 +retry_on_failure: + enabled: true + initial_interval: 50000000 + randomization_factor: 0.5 + multiplier: 1.5 + max_interval: 30000000000 + max_elapsed_time: 300000000000 +remote_write_queue: + enabled: true + queue_size: 10000 + num_consumers: 5 +external_labels: [] +write_buffer_size: 524288 +endpoint: "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-12345/api/v1/remote_write" +headers: [] +target_info: + enabled: true +export_created_metric: + enabled: false +add_metric_suffixes: true +max_batch_size_bytes: 3000000 \ No newline at end of file diff --git a/translator/translate/otel/exporter/prometheusremotewrite/translator.go b/translator/translate/otel/exporter/prometheusremotewrite/translator.go new file mode 100644 index 0000000000..1cca098cbd --- /dev/null +++ b/translator/translate/otel/exporter/prometheusremotewrite/translator.go @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package prometheusremotewrite + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter" + + "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +var ( + AMPSectionKey = common.ConfigKey(common.MetricsKey, common.MetricsDestinationsKey, common.AMPKey) +) + +type translator struct { + name string + factory exporter.Factory +} + +var _ common.Translator[component.Config] = (*translator)(nil) + +func NewTranslator() common.Translator[component.Config] { + return NewTranslatorWithName("") +} + +func NewTranslatorWithName(name string) common.Translator[component.Config] { + return &translator{name, prometheusremotewriteexporter.NewFactory()} +} + +func (t *translator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), t.name) +} + +// Translate creates an exporter config based on the fields in the +// amp or prometheus section of the JSON config. +func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { + if conf == nil || !conf.IsSet(AMPSectionKey) { + return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: AMPSectionKey} + } + cfg := t.factory.CreateDefaultConfig().(*prometheusremotewriteexporter.Config) + cfg.ClientConfig.Auth = &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType(common.SigV4Auth))} + cfg.ResourceToTelemetrySettings = resourcetotelemetry.Settings{Enabled: true, ClearAfterCopy: true} + if value, ok := common.GetString(conf, common.ConfigKey(AMPSectionKey, common.WorkspaceIDKey)); ok { + ampEndpoint := "https://aps-workspaces." + agent.Global_Config.Region + ".amazonaws.com/workspaces/" + value + "/api/v1/remote_write" + cfg.ClientConfig.Endpoint = ampEndpoint + } + return cfg, nil +} diff --git a/translator/translate/otel/exporter/prometheusremotewrite/translator_test.go b/translator/translate/otel/exporter/prometheusremotewrite/translator_test.go new file mode 100644 index 0000000000..a5b9a146d1 --- /dev/null +++ b/translator/translate/otel/exporter/prometheusremotewrite/translator_test.go @@ -0,0 +1,58 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package prometheusremotewrite + +import ( + "path/filepath" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/internal/util/testutil" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +func TestTranslator(t *testing.T) { + agent.Global_Config.Region = "us-east-1" + tt := NewTranslatorWithName("test") + require.EqualValues(t, "prometheusremotewrite/test", tt.ID().String()) + + testCases := map[string]struct { + input map[string]interface{} + want *confmap.Conf + wantErr error + }{ + "WithMissingDestination": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{ + "metrics_destinations": map[string]interface{}{}, + }, + }, + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: AMPSectionKey}, + }, + "WithAMPDestination": { + input: testutil.GetJson(t, filepath.Join("testdata", "config.json")), + want: testutil.GetConf(t, filepath.Join("testdata", "config.yaml")), + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + conf := confmap.NewFromStringMap(testCase.input) + got, err := tt.Translate(conf) + assert.Equal(t, testCase.wantErr, err) + if err == nil { + require.NotNil(t, got) + gotCfg, ok := got.(*prometheusremotewriteexporter.Config) + require.True(t, ok) + wantCfg := &prometheusremotewriteexporter.Config{} + require.NoError(t, testCase.want.Unmarshal(wantCfg)) + assert.Equal(t, wantCfg, gotCfg) + } + }) + } +} diff --git a/translator/translate/otel/extension/sigv4auth/translator.go b/translator/translate/otel/extension/sigv4auth/translator.go new file mode 100644 index 0000000000..31adc23054 --- /dev/null +++ b/translator/translate/otel/extension/sigv4auth/translator.go @@ -0,0 +1,43 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package sigv4auth + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/extension" + + "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +type translator struct { + name string + factory extension.Factory +} + +var _ common.Translator[component.Config] = (*translator)(nil) + +func NewTranslator() common.Translator[component.Config] { + return NewTranslatorWithName("") +} + +func NewTranslatorWithName(name string) common.Translator[component.Config] { + return &translator{name, sigv4authextension.NewFactory()} +} + +func (t *translator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), t.name) +} + +func (t *translator) Translate(_ *confmap.Conf) (component.Config, error) { + cfg := t.factory.CreateDefaultConfig().(*sigv4authextension.Config) + cfg.Region = agent.Global_Config.Region + if agent.Global_Config.Role_arn != "" { + cfg.AssumeRole = sigv4authextension.AssumeRole{ARN: agent.Global_Config.Role_arn, STSRegion: agent.Global_Config.Region} + } + + return cfg, nil +} diff --git a/translator/translate/otel/extension/sigv4auth/translator_test.go b/translator/translate/otel/extension/sigv4auth/translator_test.go new file mode 100644 index 0000000000..ff6d9c155d --- /dev/null +++ b/translator/translate/otel/extension/sigv4auth/translator_test.go @@ -0,0 +1,26 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package sigv4auth + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestTranslate(t *testing.T) { + tt := NewTranslator() + conf := confmap.NewFromStringMap(map[string]interface{}{}) + got, err := tt.Translate(conf) + if err == nil { + require.NotNil(t, got) + gotCfg, ok := got.(*sigv4authextension.Config) + require.True(t, ok) + wantCfg := sigv4authextension.NewFactory().CreateDefaultConfig() + assert.Equal(t, wantCfg, gotCfg) + } +} diff --git a/translator/translate/otel/pipeline/host/translator.go b/translator/translate/otel/pipeline/host/translator.go index 2e3824a72f..24c3c5512d 100644 --- a/translator/translate/otel/pipeline/host/translator.go +++ b/translator/translate/otel/pipeline/host/translator.go @@ -5,21 +5,25 @@ package host import ( "log" + "strings" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" - "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor" ) type translator struct { name string receivers common.TranslatorMap[component.Config] + exporters common.TranslatorMap[component.Config] } var _ common.Translator[*common.ComponentTranslators] = (*translator)(nil) @@ -30,8 +34,9 @@ var _ common.Translator[*common.ComponentTranslators] = (*translator)(nil) func NewTranslator( name string, receivers common.TranslatorMap[component.Config], + exporters common.TranslatorMap[component.Config], ) common.Translator[*common.ComponentTranslators] { - return &translator{name, receivers} + return &translator{name, receivers, exporters} } func (t translator) ID() component.ID { @@ -42,20 +47,20 @@ func (t translator) ID() component.ID { func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) { if conf == nil || !conf.IsSet(common.MetricsKey) { return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.MetricsKey} - } else if t.receivers.Len() == 0 { - log.Printf("D! pipeline %s has no receivers", t.name) + } else if t.receivers.Len() == 0 || t.exporters.Len() == 0 { + log.Printf("D! pipeline %s has no receivers/exporters", t.name) return nil, nil } translators := common.ComponentTranslators{ Receivers: t.receivers, Processors: common.NewTranslatorMap[component.Config](), - Exporters: common.NewTranslatorMap(awscloudwatch.NewTranslator()), - Extensions: common.NewTranslatorMap(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})), + Exporters: t.exporters, + Extensions: common.NewTranslatorMap[component.Config](), } // we need to add delta processor because (only) diskio and net input plugins report delta metric - if common.PipelineNameHostDeltaMetrics == t.name { + if strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) { log.Printf("D! delta processor required because metrics with diskio or net are set") translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDefaultKeys())) } @@ -70,5 +75,23 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, log.Printf("D! metric decorator required because measurement fields are set") translators.Processors.Set(mdt) } + + _, ok1 := t.exporters.Get(component.NewID(component.MustNewType("prometheusremotewrite"))) + _, ok2 := t.exporters.Get(component.MustNewIDWithName("prometheusremotewrite", "amp")) + + if ok1 || ok2 { + translators.Extensions.Set(sigv4auth.NewTranslator()) + } + + if (ok1 || ok2) && conf.IsSet(common.MetricsAggregationDimensionsKey) { + translators.Processors.Set(rollupprocessor.NewTranslator()) + } + + if _, ok := t.exporters.Get(component.NewID(component.MustNewType("awscloudwatch"))); !ok { + translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey)) + } else { + // only add agenthealth for the cloudwatch exporter + translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})) + } return &translators, nil } diff --git a/translator/translate/otel/pipeline/host/translator_test.go b/translator/translate/otel/pipeline/host/translator_test.go index f41a3e1d83..3167a404f6 100644 --- a/translator/translate/otel/pipeline/host/translator_test.go +++ b/translator/translate/otel/pipeline/host/translator_test.go @@ -4,6 +4,7 @@ package host import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -13,6 +14,8 @@ import ( "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite" ) type testTranslator struct { @@ -37,13 +40,15 @@ func TestTranslator(t *testing.T) { exporters []string extensions []string } + testExporters := common.NewTranslatorMap(awscloudwatch.NewTranslator()) testCases := map[string]struct { input map[string]interface{} + exporters common.TranslatorMap[component.Config] pipelineName string want *want wantErr error }{ - "WithoutMetricsKey": { + "WithoutMetricsSection": { input: map[string]interface{}{}, pipelineName: common.PipelineNameHost, wantErr: &common.MissingKeyError{ @@ -51,7 +56,13 @@ func TestTranslator(t *testing.T) { JsonKey: common.MetricsKey, }, }, - "WithMetricsKey": { + "WithoutExporters": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{}, + }, + exporters: common.NewTranslatorMap[component.Config](), + }, + "WithMetricsSection": { input: map[string]interface{}{ "metrics": map[string]interface{}{}, }, @@ -64,7 +75,7 @@ func TestTranslator(t *testing.T) { extensions: []string{"agenthealth/metrics"}, }, }, - "WithMetricsKeyNet": { + "WithDeltaMetrics": { input: map[string]interface{}{ "metrics": map[string]interface{}{ "metrics_collected": map[string]interface{}{ @@ -72,11 +83,11 @@ func TestTranslator(t *testing.T) { }, }, }, - pipelineName: common.PipelineNameHostDeltaMetrics, + pipelineName: fmt.Sprintf("%s_test", common.PipelineNameHostDeltaMetrics), want: &want{ - pipelineID: "metrics/hostDeltaMetrics", + pipelineID: "metrics/hostDeltaMetrics_test", receivers: []string{"nop", "other"}, - processors: []string{"cumulativetodelta/hostDeltaMetrics"}, + processors: []string{"cumulativetodelta/hostDeltaMetrics_test"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, @@ -126,15 +137,70 @@ func TestTranslator(t *testing.T) { extensions: []string{"agenthealth/metrics"}, }, }, + "WithAppendDimensions": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{ + "append_dimensions": map[string]interface{}{}, + }, + }, + pipelineName: common.PipelineNameHost, + want: &want{ + pipelineID: "metrics/host", + receivers: []string{"nop", "other"}, + processors: []string{"ec2tagger"}, + exporters: []string{"awscloudwatch"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, + "WithPRWExporter/Aggregation": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{ + "aggregation_dimensions": []interface{}{[]interface{}{"d1", "d2"}}, + }, + }, + exporters: common.NewTranslatorMap[component.Config]( + prometheusremotewrite.NewTranslator(), + ), + pipelineName: common.PipelineNameHost, + want: &want{ + pipelineID: "metrics/host", + receivers: []string{"nop", "other"}, + processors: []string{"rollup", "batch/host"}, + exporters: []string{"prometheusremotewrite"}, + extensions: []string{"sigv4auth"}, + }, + }, + "WithPRWExporter/NoAggregation": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{}, + }, + exporters: common.NewTranslatorMap[component.Config]( + prometheusremotewrite.NewTranslator(), + ), + pipelineName: common.PipelineNameHost, + want: &want{ + pipelineID: "metrics/host", + receivers: []string{"nop", "other"}, + processors: []string{"batch/host"}, + exporters: []string{"prometheusremotewrite"}, + extensions: []string{"sigv4auth"}, + }, + }, } for name, testCase := range testCases { - nopType, _ := component.NewType("nop") - otherType, _ := component.NewType("other") t.Run(name, func(t *testing.T) { - ht := NewTranslator(testCase.pipelineName, common.NewTranslatorMap[component.Config]( - &testTranslator{id: component.NewID(nopType)}, - &testTranslator{id: component.NewID(otherType)}, - )) + exporters := testCase.exporters + if exporters == nil { + exporters = testExporters + } + ht := NewTranslator( + testCase.pipelineName, + common.NewTranslatorMap[component.Config]( + &testTranslator{id: component.NewID(component.MustNewType("nop"))}, + &testTranslator{id: component.NewID(component.MustNewType("other"))}, + ), + exporters, + ) conf := confmap.NewFromStringMap(testCase.input) got, err := ht.Translate(conf) require.Equal(t, testCase.wantErr, err) diff --git a/translator/translate/otel/pipeline/host/translators.go b/translator/translate/otel/pipeline/host/translators.go index e4bc204e5a..88effe5e9f 100644 --- a/translator/translate/otel/pipeline/host/translators.go +++ b/translator/translate/otel/pipeline/host/translators.go @@ -11,11 +11,17 @@ import ( "github.com/aws/amazon-cloudwatch-agent/receiver/adapter" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline" adaptertranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/adapter" otlpreceiver "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/otlp" ) +var ( + metricsDestinationsKey = common.ConfigKey(common.MetricsKey, common.MetricsDestinationsKey) +) + func NewTranslators(conf *confmap.Conf, os string) (pipeline.TranslatorMap, error) { adapterReceivers, err := adaptertranslator.FindReceiversInConfig(conf, os) if err != nil { @@ -35,25 +41,36 @@ func NewTranslators(conf *confmap.Conf, os string) (pipeline.TranslatorMap, erro }) switch v := conf.Get(common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey, common.OtlpKey)).(type) { - case []interface{}: + case []any: for index := range v { deltaReceivers.Set(otlpreceiver.NewTranslator( otlpreceiver.WithDataType(component.DataTypeMetrics), otlpreceiver.WithIndex(index), )) } - case map[string]interface{}: + case map[string]any: deltaReceivers.Set(otlpreceiver.NewTranslator(otlpreceiver.WithDataType(component.DataTypeMetrics))) } hasHostPipeline := hostReceivers.Len() != 0 hasDeltaPipeline := deltaReceivers.Len() != 0 - if hasHostPipeline { - translators.Set(NewTranslator(common.PipelineNameHost, hostReceivers)) + if !conf.IsSet(metricsDestinationsKey) || conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.CloudWatchKey)) { + exporters := common.NewTranslatorMap(awscloudwatch.NewTranslator()) + if hasHostPipeline { + translators.Set(NewTranslator(common.PipelineNameHost, hostReceivers, exporters)) + } + if hasDeltaPipeline { + translators.Set(NewTranslator(common.PipelineNameHostDeltaMetrics, deltaReceivers, exporters)) + } } - if hasDeltaPipeline { - translators.Set(NewTranslator(common.PipelineNameHostDeltaMetrics, deltaReceivers)) + if conf.IsSet(metricsDestinationsKey) && conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.AMPKey)) { + exporters := common.NewTranslatorMap[component.Config](prometheusremotewrite.NewTranslatorWithName(common.AMPKey)) + // PRW exporter does not need the delta conversion. + receivers := common.NewTranslatorMap[component.Config]() + receivers.Merge(hostReceivers) + receivers.Merge(deltaReceivers) + translators.Set(NewTranslator(fmt.Sprintf("%s/amp", common.PipelineNameHost), receivers, exporters)) } return translators, nil diff --git a/translator/translate/otel/pipeline/host/translators_test.go b/translator/translate/otel/pipeline/host/translators_test.go index 20c75c1e6c..afda2a1dc8 100644 --- a/translator/translate/otel/pipeline/host/translators_test.go +++ b/translator/translate/otel/pipeline/host/translators_test.go @@ -20,54 +20,113 @@ import ( func TestTranslators(t *testing.T) { type want struct { receivers []string + exporters []string } testCases := map[string]struct { - input map[string]interface{} + input map[string]any want map[string]want }{ "WithEmpty": { - input: map[string]interface{}{}, + input: map[string]any{}, want: map[string]want{}, }, "WithMinimal": { - input: map[string]interface{}{ - "metrics": map[string]interface{}{ - "metrics_collected": map[string]interface{}{ - "cpu": map[string]interface{}{}, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "cpu": map[string]any{}, }, }, }, want: map[string]want{ "metrics/host": { receivers: []string{"telegraf_cpu"}, + exporters: []string{"awscloudwatch"}, + }, + }, + }, + "WithAMPDestination": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "cpu": map[string]any{}, + }, + }, + }, + want: map[string]want{ + "metrics/host/amp": { + receivers: []string{"telegraf_cpu"}, + exporters: []string{"prometheusremotewrite/amp"}, + }, + }, + }, + "WithAMPAndCloudWatchDestinations": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + "cloudwatch": map[string]any{}, + }, + "metrics_collected": map[string]any{ + "cpu": map[string]any{}, + }, + }, + }, + want: map[string]want{ + "metrics/host": { + receivers: []string{"telegraf_cpu"}, + exporters: []string{"awscloudwatch"}, + }, + "metrics/host/amp": { + receivers: []string{"telegraf_cpu"}, + exporters: []string{"prometheusremotewrite/amp"}, }, }, }, "WithDeltaMetrics": { - input: map[string]interface{}{ - "metrics": map[string]interface{}{ - "metrics_collected": map[string]interface{}{ - "net": map[string]interface{}{}, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + "cloudwatch": map[string]any{}, + }, + "metrics_collected": map[string]any{ + "net": map[string]any{}, }, }, }, want: map[string]want{ "metrics/hostDeltaMetrics": { receivers: []string{"telegraf_net"}, + exporters: []string{"awscloudwatch"}, + }, + "metrics/host/amp": { + receivers: []string{"telegraf_net"}, + exporters: []string{"prometheusremotewrite/amp"}, }, }, }, "WithOtlpMetrics": { - input: map[string]interface{}{ - "metrics": map[string]interface{}{ - "metrics_collected": map[string]interface{}{ - "otlp": map[string]interface{}{}, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "otlp": map[string]any{}, }, }, }, want: map[string]want{ "metrics/hostDeltaMetrics": { receivers: []string{"otlp/metrics"}, + exporters: []string{"awscloudwatch"}, }, }, }, @@ -87,6 +146,7 @@ func TestTranslators(t *testing.T) { w, ok := testCase.want[tr.ID().String()] require.True(t, ok) assert.Equal(t, w.receivers, collections.MapSlice(tr.(*translator).receivers.Keys(), component.ID.String)) + assert.Equal(t, w.exporters, collections.MapSlice(tr.(*translator).exporters.Keys(), component.ID.String)) }) } }) diff --git a/translator/translate/otel/pipeline/jmx/translator.go b/translator/translate/otel/pipeline/jmx/translator.go index bd6cf43e08..4fb455e078 100644 --- a/translator/translate/otel/pipeline/jmx/translator.go +++ b/translator/translate/otel/pipeline/jmx/translator.go @@ -12,12 +12,16 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/filterprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/resourceprocessor" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/jmx" ) @@ -25,6 +29,10 @@ const ( placeholderTarget = "" ) +var ( + metricsDestinationsKey = common.ConfigKey(common.MetricsKey, common.MetricsDestinationsKey) +) + type translator struct { name string index int @@ -79,8 +87,21 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators resourceprocessor.NewTranslator(resourceprocessor.WithName(common.PipelineNameJmx)), cumulativetodeltaprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), cumulativetodeltaprocessor.WithConfigKeys(common.JmxConfigKey)), ), - Exporters: common.NewTranslatorMap(awscloudwatch.NewTranslator()), - Extensions: common.NewTranslatorMap(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})), + Exporters: common.NewTranslatorMap[component.Config](), + Extensions: common.NewTranslatorMap[component.Config](), + } + + if !conf.IsSet(metricsDestinationsKey) || conf.IsSet(common.ConfigKey(metricsDestinationsKey, "cloudwatch")) { + translators.Exporters.Set(awscloudwatch.NewTranslator()) + translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})) + } + if conf.IsSet(metricsDestinationsKey) && conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.AMPKey)) { + translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey)) + translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey)) + if conf.IsSet(common.MetricsAggregationDimensionsKey) { + translators.Processors.Set(rollupprocessor.NewTranslator()) + } + translators.Extensions.Set(sigv4auth.NewTranslator()) } mdt := metricsdecorator.NewTranslator( diff --git a/translator/translate/otel/pipeline/jmx/translator_test.go b/translator/translate/otel/pipeline/jmx/translator_test.go index 1370111946..c8078a41b9 100644 --- a/translator/translate/otel/pipeline/jmx/translator_test.go +++ b/translator/translate/otel/pipeline/jmx/translator_test.go @@ -128,6 +128,35 @@ func TestTranslator(t *testing.T) { extensions: []string{"agenthealth/metrics"}, }, }, + "WithValidJMX/Object/AMP": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "jmx": map[string]any{ + "endpoint": "localhost:8080", + "jvm": map[string]any{ + "measurement": []any{ + "jvm.memory.heap.init", + }, + }, + }, + }, + }, + }, + index: -1, + want: &want{ + pipelineID: "metrics/jmx", + receivers: []string{"jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx", "batch/jmx"}, + exporters: []string{"prometheusremotewrite/amp"}, + extensions: []string{"sigv4auth"}, + }, + }, "WithValidJMX/Object/Decoration": { input: map[string]any{ "metrics": map[string]any{ diff --git a/translator/translate/otel/processor/rollupprocessor/testdata/config.json b/translator/translate/otel/processor/rollupprocessor/testdata/config.json new file mode 100644 index 0000000000..9bbc61e77c --- /dev/null +++ b/translator/translate/otel/processor/rollupprocessor/testdata/config.json @@ -0,0 +1,31 @@ +{ + "metrics": { + "metrics_collected": { + "cpu": { + "drop_original_metrics": ["cpu_usage_idle", "time_active"], + "measurement": [ + { + "name": "cpu_usage_idle", + "rename": "CPU_USAGE_IDLE", + "unit": "unit" + }, + { + "name": "cpu_usage_nice", + "unit": "unit" + }, + "cpu_usage_guest", + "time_active", + "usage_active" + ] + }, + "disk": {} + }, + "aggregation_dimensions" : [["ImageId"], ["InstanceId", "InstanceType"], ["d1"], []], + "namespace": "namespace", + "force_flush_interval": 30, + "credentials": { + "role_arn": "metrics_role_arn_value_test" + }, + "endpoint_override": "https://monitoring-fips.us-west-2.amazonaws.com" + } +} \ No newline at end of file diff --git a/translator/translate/otel/processor/rollupprocessor/translator.go b/translator/translate/otel/processor/rollupprocessor/translator.go new file mode 100644 index 0000000000..855693e3a5 --- /dev/null +++ b/translator/translate/otel/processor/rollupprocessor/translator.go @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "sort" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/processor" + "golang.org/x/exp/maps" + + "github.com/aws/amazon-cloudwatch-agent/processor/rollupprocessor" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +type translator struct { + name string + factory processor.Factory +} + +var _ common.Translator[component.Config] = (*translator)(nil) + +func NewTranslator() common.Translator[component.Config] { + return NewTranslatorWithName("") +} + +func NewTranslatorWithName(name string) common.Translator[component.Config] { + return &translator{name: name, factory: rollupprocessor.NewFactory()} +} + +func (t *translator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), t.name) +} + +func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { + if conf == nil || !conf.IsSet(common.MetricsAggregationDimensionsKey) { + return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.MetricsAggregationDimensionsKey} + } + cfg := t.factory.CreateDefaultConfig().(*rollupprocessor.Config) + if rollupDimensions := common.GetRollupDimensions(conf); len(rollupDimensions) != 0 { + cfg.AttributeGroups = rollupDimensions + } + if dropOriginalMetrics := common.GetDropOriginalMetrics(conf); len(dropOriginalMetrics) != 0 { + cfg.DropOriginal = maps.Keys(dropOriginalMetrics) + sort.Strings(cfg.DropOriginal) + } + return cfg, nil +} diff --git a/translator/translate/otel/processor/rollupprocessor/translator_test.go b/translator/translate/otel/processor/rollupprocessor/translator_test.go new file mode 100644 index 0000000000..ba36ecf601 --- /dev/null +++ b/translator/translate/otel/processor/rollupprocessor/translator_test.go @@ -0,0 +1,69 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package rollupprocessor + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/internal/metric" + "github.com/aws/amazon-cloudwatch-agent/internal/util/testutil" + "github.com/aws/amazon-cloudwatch-agent/processor/rollupprocessor" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +func TestTranslator(t *testing.T) { + rpt := NewTranslator() + require.EqualValues(t, "rollup", rpt.ID().String()) + testCases := map[string]struct { + input map[string]interface{} + want *rollupprocessor.Config + wantErr error + }{ + "WithMissingKey": { + input: map[string]interface{}{"metrics": map[string]interface{}{}}, + wantErr: &common.MissingKeyError{ + ID: rpt.ID(), + JsonKey: common.MetricsAggregationDimensionsKey, + }, + }, + "WithOnlyAggregationDimensions": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{ + "aggregation_dimensions": []interface{}{[]interface{}{"d1", "d2"}}, + }, + }, + want: &rollupprocessor.Config{ + AttributeGroups: [][]string{{"d1", "d2"}}, + CacheSize: 1000, + }, + }, + "WithFull": { + input: testutil.GetJson(t, filepath.Join("testdata", "config.json")), + want: &rollupprocessor.Config{ + AttributeGroups: [][]string{{"ImageId"}, {"InstanceId", "InstanceType"}, {"d1"}, {}}, + DropOriginal: []string{"CPU_USAGE_IDLE", metric.DecorateMetricName("cpu", "time_active")}, + CacheSize: 1000, + }, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + conf := confmap.NewFromStringMap(testCase.input) + got, err := rpt.Translate(conf) + require.Equal(t, testCase.wantErr, err) + if testCase.want != nil { + require.NoError(t, err) + gotCfg, ok := got.(*rollupprocessor.Config) + require.True(t, ok) + assert.Equal(t, testCase.want.AttributeGroups, gotCfg.AttributeGroups) + assert.Equal(t, testCase.want.DropOriginal, gotCfg.DropOriginal) + } + }) + } +} diff --git a/translator/translate/otel/receiver/adapter/translators.go b/translator/translate/otel/receiver/adapter/translators.go index 45c174e77f..79a5b62408 100644 --- a/translator/translate/otel/receiver/adapter/translators.go +++ b/translator/translate/otel/receiver/adapter/translators.go @@ -69,9 +69,9 @@ var ( statsd.SectionKey: 10 * time.Second, } - // OtelReceivers is used for receivers that need to be in the same pipeline that + // otelReceivers is used for receivers that need to be in the same pipeline that // exports to Cloudwatch while not having to follow the adapter rules - OtelReceivers = map[string]common.Translator[component.Config]{ + otelReceivers = map[string]common.Translator[component.Config]{ common.OtlpKey: otlp.NewTranslator(otlp.WithDataType(component.DataTypeMetrics)), common.JmxKey: jmx.NewTranslator(), } @@ -124,7 +124,7 @@ func fromWindowsMetrics(conf *confmap.Conf) common.TranslatorMap[component.Confi translators := common.NewTranslatorMap[component.Config]() if inputs, ok := conf.Get(metricKey).(map[string]interface{}); ok { for inputName := range inputs { - if _, ok := OtelReceivers[inputName]; ok { + if _, ok := otelReceivers[inputName]; ok { continue } if windowsInputSet.Contains(inputName) { @@ -159,7 +159,7 @@ func fromInputs(conf *confmap.Conf, validInputs map[string]bool, baseKey string) continue } if validInputs != nil { - if _, ok := OtelReceivers[inputName]; ok { + if _, ok := otelReceivers[inputName]; ok { continue } else if _, ok := validInputs[inputName]; !ok { log.Printf("W! Ignoring unrecognized input %s", inputName) diff --git a/translator/translate/otel/translate_otel_test.go b/translator/translate/otel/translate_otel_test.go index 21a54bcfab..da34cf5c60 100644 --- a/translator/translate/otel/translate_otel_test.go +++ b/translator/translate/otel/translate_otel_test.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" "github.com/aws/amazon-cloudwatch-agent/translator" _ "github.com/aws/amazon-cloudwatch-agent/translator/registerrules" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" @@ -20,6 +21,7 @@ import ( func TestTranslator(t *testing.T) { agent.Global_Config.Region = "us-east-1" + testutil.SetPrometheusRemoteWriteTestingEnv(t) testCases := map[string]struct { input interface{} wantErrContains string @@ -163,6 +165,20 @@ func TestTranslator(t *testing.T) { detector: eksdetector.TestEKSDetector, isEKSDataStore: eksdetector.TestIsEKSCacheEKS, }, + "WithAMPDestinationConfig": { + input: map[string]interface{}{ + "metrics": map[string]interface{}{ + "metrics_destinations": map[string]interface{}{ + "amp": map[string]interface{}{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]interface{}{ + "cpu": map[string]interface{}{}, + }, + }, + }, + }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) {