diff --git a/.chloggen/adjuster-resource-fix.yaml b/.chloggen/adjuster-resource-fix.yaml new file mode 100644 index 000000000000..8fb0802b035b --- /dev/null +++ b/.chloggen/adjuster-resource-fix.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Metric adjuster no longer assumes that all metrics from a scrape come from the same resource + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36477] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index 060450cc0f53..ebb33bb970b5 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -255,25 +255,28 @@ func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration, useCr // AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and // previous points in the timeseriesMap. func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { - // By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics. - - job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName) - if !found { - return errors.New("adjusting metrics without job") - } + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + _, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName) + if !found { + return errors.New("adjusting metrics without job") + } - instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID) - if !found { - return errors.New("adjusting metrics without instance") + _, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID) + if !found { + return errors.New("adjusting metrics without instance") + } } - tsm := a.jobsMap.get(job.Str(), instance.Str()) - // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that - // nothing else can modify the data used for adjustment. - tsm.Lock() - defer tsm.Unlock() for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) + job, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceName) + instance, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID) + tsm := a.jobsMap.get(job.Str(), instance.Str()) + + // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that + // nothing else can modify the data used for adjustment. + tsm.Lock() for j := 0; j < rm.ScopeMetrics().Len(); j++ { ilm := rm.ScopeMetrics().At(j) for k := 0; k < ilm.Metrics().Len(); k++ { @@ -303,6 +306,7 @@ func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { } } } + tsm.Unlock() } return nil } diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go index fd1f4f8818da..1f7779269556 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go @@ -26,6 +26,7 @@ var ( percent0 = []float64{10, 50, 90} sum1 = "sum1" + sum2 = "sum2" gauge1 = "gauge1" histogram1 = "histogram1" summary1 = "summary1" @@ -103,6 +104,37 @@ func TestSum(t *testing.T) { runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) } +func TestSumWithDifferentResources(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Sum: round 1 - initial instance, start time is established", + metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t1, 44))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t1, 44)))), + adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t1, 44))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t1, 44)))), + }, + { + description: "Sum: round 2 - instance adjusted based on round 1 (metrics in different order)", + metrics: metricsFromResourceMetrics(resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t2, t2, 66))), resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t2, t2, 66)))), + adjusted: metricsFromResourceMetrics(resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t2, 66))), resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t2, 66)))), + }, + { + description: "Sum: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t3, 55))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t3, 55)))), + adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t3, 55))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t3, 55)))), + }, + { + description: "Sum: round 4 - instance adjusted based on round 3", + metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t4, t4, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t4, t4, 72)))), + adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t4, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t4, 72)))), + }, + { + description: "Sum: round 5 - instance adjusted based on round 4, sum2 metric resets but sum1 doesn't", + metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t5, t5, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t5, t5, 10)))), + adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t5, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t5, t5, 10)))), + }, + } + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + func TestSummaryNoCount(t *testing.T) { script := []*metricsAdjusterTest{ { @@ -710,14 +742,32 @@ func runScript(t *testing.T, ma MetricsAdjuster, job, instance string, tests []* t.Run(test.description, func(t *testing.T) { adjusted := pmetric.NewMetrics() test.metrics.CopyTo(adjusted) - // Add the instance/job to the input metrics. - adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance) - adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, job) + // Add the instance/job to the input metrics if they aren't already present. + for i := 0; i < adjusted.ResourceMetrics().Len(); i++ { + rm := adjusted.ResourceMetrics().At(i) + _, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName) + if !found { + rm.Resource().Attributes().PutStr(semconv.AttributeServiceName, job) + } + _, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID) + if !found { + rm.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance) + } + } assert.NoError(t, ma.AdjustMetrics(adjusted)) - // Add the instance/job to the expected metrics as well. - test.adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance) - test.adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, job) + // Add the instance/job to the expected metrics as well if they aren't already present. + for i := 0; i < test.adjusted.ResourceMetrics().Len(); i++ { + rm := test.adjusted.ResourceMetrics().At(i) + _, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName) + if !found { + rm.Resource().Attributes().PutStr(semconv.AttributeServiceName, job) + } + _, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID) + if !found { + rm.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance) + } + } assert.EqualValues(t, test.adjusted, adjusted) }) } diff --git a/receiver/prometheusreceiver/internal/metricsutil_test.go b/receiver/prometheusreceiver/internal/metricsutil_test.go index d9b79ebdbdd9..bc1dabbd696d 100644 --- a/receiver/prometheusreceiver/internal/metricsutil_test.go +++ b/receiver/prometheusreceiver/internal/metricsutil_test.go @@ -6,6 +6,7 @@ package internal import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" ) type kv struct { @@ -23,6 +24,28 @@ func metrics(metrics ...pmetric.Metric) pmetric.Metrics { return md } +func metricsFromResourceMetrics(metrics ...pmetric.ResourceMetrics) pmetric.Metrics { + md := pmetric.NewMetrics() + for _, metric := range metrics { + mr := md.ResourceMetrics().AppendEmpty() + metric.CopyTo(mr) + } + return md +} + +func resourceMetrics(job, instance string, metrics ...pmetric.Metric) pmetric.ResourceMetrics { + mr := pmetric.NewResourceMetrics() + mr.Resource().Attributes().PutStr(semconv.AttributeServiceName, job) + mr.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance) + ms := mr.ScopeMetrics().AppendEmpty().Metrics() + + for _, metric := range metrics { + destMetric := ms.AppendEmpty() + metric.CopyTo(destMetric) + } + return mr +} + func histogramPointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.HistogramDataPoint { hdp := pmetric.NewHistogramDataPoint() hdp.SetStartTimestamp(startTimestamp)