From 0e7ce0cc12d09c88e10f312b5f164d2c10f5f71c Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 29 Aug 2024 04:59:30 -0300 Subject: [PATCH] [processor/interval] Support Gauges and Summaries (#34805) **Description:** Adds support for Gauges and Summaries **Link to tracking Issue:** #34803 **Testing:** Unit tests were extended to cover the new behavior **Documentation:** --------- Signed-off-by: Arthur Silva Sens --- .../intervalprocessor_gauge_summary.yaml | 27 ++++++++ processor/intervalprocessor/README.md | 8 ++- processor/intervalprocessor/config.go | 8 ++- processor/intervalprocessor/factory.go | 4 +- .../internal/metrics/metrics.go | 3 - processor/intervalprocessor/processor.go | 29 +++++++-- processor/intervalprocessor/processor_test.go | 32 +++++----- .../testdata/gauges_are_aggregated/input.yaml | 40 ++++++++++++ .../testdata/gauges_are_aggregated/next.yaml | 1 + .../gauges_are_aggregated/output.yaml | 27 ++++++++ .../gauges_are_passed_through/input.yaml | 2 +- .../gauges_are_passed_through/next.yaml | 2 +- .../gauges_are_passed_through/output.yaml | 2 +- .../summaries_are_aggregated/input.yaml | 63 +++++++++++++++++++ .../summaries_are_aggregated/next.yaml | 1 + .../summaries_are_aggregated/output.yaml | 34 ++++++++++ .../summaries_are_passed_through/input.yaml | 2 +- .../summaries_are_passed_through/next.yaml | 2 +- .../summaries_are_passed_through/output.yaml | 2 +- 19 files changed, 258 insertions(+), 31 deletions(-) create mode 100644 .chloggen/intervalprocessor_gauge_summary.yaml create mode 100644 processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml create mode 100644 processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml create mode 100644 processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml diff --git a/.chloggen/intervalprocessor_gauge_summary.yaml b/.chloggen/intervalprocessor_gauge_summary.yaml new file mode 100644 index 0000000000000..96894ec683712 --- /dev/null +++ b/.chloggen/intervalprocessor_gauge_summary.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/interval + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support for gauge and summary metrics. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34803] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Only the last value of a gauge or summary metric is reported in the interval processor, instead of all values. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 0c4971e735663..857bcf6c7d584 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -19,19 +19,23 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically * Monotonically increasing, cumulative sums * Monotonically increasing, cumulative histograms * Monotonically increasing, cumulative exponential histograms +* Gauges +* Summaries The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline: * All delta metrics * Non-monotonically increasing sums -* Gauges -* Summaries + +> NOTE: Aggregating data over an interval is an inherently "lossy" process. For monotonically increasing, cumulative sums, histograms, and exponential histograms, you "lose" precision, but you don't lose overall data. But for non-monotonically increasing sums, gauges, and summaries, aggregation represents actual data loss. IE you could "lose" that a value increased and then decreased back to the original value. In most cases, this data "loss" is ok. However, if you would rather these values be passed through, and *not* aggregated, you can set that in the configuration ## Configuration The following settings can be optionally configured: * `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s +* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false +* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false ## Example of metric flows diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 1967afc972bb0..96ad36189f800 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -18,8 +18,14 @@ var _ component.Config = (*Config)(nil) // Config defines the configuration for the processor. type Config struct { - // Interval is the time + // Interval is the time interval at which the processor will aggregate metrics. Interval time.Duration `mapstructure:"interval"` + // GaugePassThrough is a flag that determines whether gauge metrics should be passed through + // as they are or aggregated. + GaugePassThrough bool `mapstructure:"gauge_pass_through"` + // SummaryPassThrough is a flag that determines whether summary metrics should be passed through + // as they are or aggregated. + SummaryPassThrough bool `mapstructure:"summary_pass_through"` } // Validate checks whether the input configuration has all of the required fields for the processor. diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 87a1278cbc0b4..981cc63f29a28 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -25,7 +25,9 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - Interval: 60 * time.Second, + Interval: 60 * time.Second, + GaugePassThrough: false, + SummaryPassThrough: false, } } diff --git a/processor/intervalprocessor/internal/metrics/metrics.go b/processor/intervalprocessor/internal/metrics/metrics.go index c3febf1a173a2..f06a91a8bc061 100644 --- a/processor/intervalprocessor/internal/metrics/metrics.go +++ b/processor/intervalprocessor/internal/metrics/metrics.go @@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" ) type DataPointSlice[DP DataPoint[DP]] interface { @@ -15,8 +14,6 @@ type DataPointSlice[DP DataPoint[DP]] interface { } type DataPoint[Self any] interface { - pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint - Timestamp() pcommon.Timestamp Attributes() pcommon.Map CopyTo(dest Self) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 6960472e5395f..fa49a04211d89 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -36,8 +36,11 @@ type Processor struct { numberLookup map[identity.Stream]pmetric.NumberDataPoint histogramLookup map[identity.Stream]pmetric.HistogramDataPoint expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint + summaryLookup map[identity.Stream]pmetric.SummaryDataPoint - exportInterval time.Duration + exportInterval time.Duration + gaugePassThrough bool + summaryPassThrough bool nextConsumer consumer.Metrics } @@ -59,8 +62,11 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics numberLookup: map[identity.Stream]pmetric.NumberDataPoint{}, histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{}, expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, + summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{}, - exportInterval: config.Interval, + exportInterval: config.Interval, + gaugePassThrough: config.GaugePassThrough, + summaryPassThrough: config.SummaryPassThrough, nextConsumer: nextConsumer, } @@ -102,8 +108,22 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { switch m.Type() { - case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: - return false + case pmetric.MetricTypeSummary: + if p.summaryPassThrough { + return false + } + + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup) + return true + case pmetric.MetricTypeGauge: + if p.gaugePassThrough { + return false + } + + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup) + return true case pmetric.MetricTypeSum: // Check if we care about this value sum := m.Sum() @@ -202,6 +222,7 @@ func (p *Processor) exportMetrics() { clear(p.numberLookup) clear(p.histogramLookup) clear(p.expHistogramLookup) + clear(p.summaryLookup) return out }() diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 39cb953d23104..cda18e561b5d9 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -21,26 +21,29 @@ import ( func TestAggregation(t *testing.T) { t.Parallel() - testCases := []string{ - "basic_aggregation", - "non_monotonic_sums_are_passed_through", - "summaries_are_passed_through", - "histograms_are_aggregated", - "exp_histograms_are_aggregated", - "all_delta_metrics_are_passed_through", + testCases := []struct { + name string + passThrough bool + }{ + {name: "basic_aggregation"}, + {name: "histograms_are_aggregated"}, + {name: "exp_histograms_are_aggregated"}, + {name: "gauges_are_aggregated"}, + {name: "summaries_are_aggregated"}, + {name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled + {name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled + {name: "gauges_are_passed_through", passThrough: true}, + {name: "summaries_are_passed_through", passThrough: true}, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - config := &Config{Interval: time.Second} - + var config *Config for _, tc := range testCases { - testName := tc - - t.Run(testName, func(t *testing.T) { - t.Parallel() + config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough} + t.Run(tc.name, func(t *testing.T) { // next stores the results of the filter metric processor next := &consumertest.MetricsSink{} @@ -53,7 +56,7 @@ func TestAggregation(t *testing.T) { ) require.NoError(t, err) - dir := filepath.Join("testdata", testName) + dir := filepath.Join("testdata", tc.name) md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) require.NoError(t, err) @@ -75,6 +78,7 @@ func TestAggregation(t *testing.T) { require.Empty(t, processor.numberLookup) require.Empty(t, processor.histogramLookup) require.Empty(t, processor.expHistogramLookup) + require.Empty(t, processor.summaryLookup) // Exporting again should return nothing processor.exportMetrics() diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml new file mode 100644 index 0000000000000..019dd6dd8511e --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml @@ -0,0 +1,40 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 258 + attributes: + - key: aaa + value: + stringValue: bbb + # For interval processor point of view, only the last datapoint should be passed through. + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml new file mode 100644 index 0000000000000..d2e76ef0f16b7 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml new file mode 100644 index 0000000000000..fe0b264bd1db5 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml @@ -0,0 +1,27 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml index a3d65c2986e0d..89b1879ee4d82 100644 --- a/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml @@ -36,4 +36,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml index a3d65c2986e0d..c1e8b3add92e4 100644 --- a/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml @@ -36,4 +36,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml index 3949e7c54ded1..d2e76ef0f16b7 100644 --- a/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml @@ -1 +1 @@ -resourceMetrics: [] +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml new file mode 100644 index 0000000000000..c0190dd5c614a --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + quantileValues: + - quantile: 0.25 + value: 40 + - quantile: 0.5 + value: 10 + - quantile: 0.75 + value: 60 + - quantile: 0.95 + value: 5 + attributes: + - key: aaa + value: + stringValue: bbb + # Only last summary should pass through + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml new file mode 100644 index 0000000000000..d2e76ef0f16b7 --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml new file mode 100644 index 0000000000000..75b8475e9ba7d --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml @@ -0,0 +1,34 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml index 15862ceb73e8b..7d9cdfd5b6fd9 100644 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml @@ -59,4 +59,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml index 15862ceb73e8b..7d9cdfd5b6fd9 100644 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml @@ -59,4 +59,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml index 3949e7c54ded1..d2e76ef0f16b7 100644 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml @@ -1 +1 @@ -resourceMetrics: [] +resourceMetrics: [] \ No newline at end of file