From 81655010598196fa6713e9993e220eb23e24724d Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Wed, 16 Oct 2024 18:24:40 +0200 Subject: [PATCH] feat: added translation rw2 translation for gauges Signed-off-by: Juraj Michalek --- .../prometheusremotewrite/helper_v2.go | 108 ++++++++++++++ .../prometheusremotewrite/helper_v2_test.go | 139 ++++++++++++++++++ .../metrics_to_prw_v2.go | 3 +- .../metrics_to_prw_v2_test.go | 2 +- .../number_data_points_v2.go | 21 +-- .../number_data_points_v2_test.go | 23 ++- .../prometheusremotewrite/testutils_test.go | 13 ++ receiver/cloudflarereceiver/go.mod | 6 +- receiver/cloudflarereceiver/go.sum | 3 + 9 files changed, 301 insertions(+), 17 deletions(-) create mode 100644 pkg/translator/prometheusremotewrite/helper_v2.go create mode 100644 pkg/translator/prometheusremotewrite/helper_v2_test.go diff --git a/pkg/translator/prometheusremotewrite/helper_v2.go b/pkg/translator/prometheusremotewrite/helper_v2.go new file mode 100644 index 000000000000..ed36f0b9724c --- /dev/null +++ b/pkg/translator/prometheusremotewrite/helper_v2.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "fmt" + "log" + "slices" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/collector/pdata/pcommon" + conventions "go.opentelemetry.io/collector/semconv/v1.25.0" + + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +) + +// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and +// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. +func createAttributesV2(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, + ignoreAttrs []string, logOnOverwrite bool, extras ...string) labels.Labels { + resourceAttrs := resource.Attributes() + serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) + instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) + + // Calculate the maximum possible number of labels we could return so we can preallocate l + maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + + if haveInstanceID { + maxLabelCount++ + } + + // map ensures no duplicate label name + l := make(map[string]string, maxLabelCount) + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + tempSeriesLabels := labels.Labels{} + // XXX: Should we always drop service namespace/service name/service instance ID from the labels + // (as they get mapped to other Prometheus labels)? + attributes.Range(func(key string, value pcommon.Value) bool { + if !slices.Contains(ignoreAttrs, key) { + tempSeriesLabels = append(tempSeriesLabels, labels.Label{Name: key, Value: value.AsString()}) + } + return true + }) + // TODO New returns a sorted Labels from the given labels. The caller has to guarantee that all label names are unique. + seriesLabels := labels.New(tempSeriesLabels...) // This sorts by name + + for _, label := range seriesLabels { + var finalKey = prometheustranslator.NormalizeLabel(label.Name) + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value + } else { + l[finalKey] = label.Value + } + } + + // Map service.name + service.namespace to job + if haveServiceName { + val := serviceName.AsString() + if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = val + } + // Map service.instance.id to instance + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() + } + for key, value := range externalLabels { + // External labels have already been sanitized + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes + continue + } + l[key] = value + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found && logOnOverwrite { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained + name := extras[i] + if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { + name = prometheustranslator.NormalizeLabel(name) + } + l[name] = extras[i+1] + } + + seriesLabels = seriesLabels[:0] + for k, v := range l { + seriesLabels = append(seriesLabels, labels.Label{Name: k, Value: v}) + } + + return seriesLabels +} diff --git a/pkg/translator/prometheusremotewrite/helper_v2_test.go b/pkg/translator/prometheusremotewrite/helper_v2_test.go new file mode 100644 index 000000000000..8f674eca5b6d --- /dev/null +++ b/pkg/translator/prometheusremotewrite/helper_v2_test.go @@ -0,0 +1,139 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite + +import ( + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Test_createLabelSet checks resultant label names are sanitized and label in extra overrides label in labels if +// collision happens. It does not check whether labels are not sorted +func Test_createLabelSetV2(t *testing.T) { + tests := []struct { + name string + resource pcommon.Resource + orig pcommon.Map + externalLabels map[string]string + extras []string + want labels.Labels + }{ + { + "labels_clean", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32), + }, + { + "labels_with_resource", + func() pcommon.Resource { + res := pcommon.NewResource() + res.Attributes().PutStr("service.name", "prometheus") + res.Attributes().PutStr("service.instance.id", "127.0.0.1:8080") + return res + }(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "prometheus", "instance", "127.0.0.1:8080"), + }, + { + "labels_with_nonstring_resource", + func() pcommon.Resource { + res := pcommon.NewResource() + res.Attributes().PutInt("service.name", 12345) + res.Attributes().PutBool("service.instance.id", true) + return res + }(), + lbs1, + map[string]string{}, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "12345", "instance", "true"), + }, + { + "labels_duplicate_in_extras", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label11, value31}, + getPromLabelsV2(label11, value31, label12, value12), + }, + { + "labels_dirty", + pcommon.NewResource(), + lbs1Dirty, + map[string]string{}, + []string{label31 + dirty1, value31, label32, value32}, + getPromLabelsV2(label11+"_", value11, "key_"+label12, value12, label31+"_", value31, label32, value32), + }, + { + "no_original_case", + pcommon.NewResource(), + pcommon.NewMap(), + nil, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label31, value31, label32, value32), + }, + { + "empty_extra_case", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{"", ""}, + getPromLabelsV2(label11, value11, label12, value12, "", ""), + }, + { + "single_left_over_case", + pcommon.NewResource(), + lbs1, + map[string]string{}, + []string{label31, value31, label32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31), + }, + { + "valid_external_labels", + pcommon.NewResource(), + lbs1, + exlbs1, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label41, value41, label31, value31, label32, value32), + }, + { + "overwritten_external_labels", + pcommon.NewResource(), + lbs1, + exlbs2, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32), + }, + { + "colliding attributes", + pcommon.NewResource(), + lbsColliding, + nil, + []string{label31, value31, label32, value32}, + getPromLabelsV2(collidingSanitized, value11+";"+value12, label31, value31, label32, value32), + }, + { + "sanitize_labels_starts_with_underscore", + pcommon.NewResource(), + lbs3, + exlbs1, + []string{label31, value31, label32, value32}, + getPromLabelsV2(label11, value11, label12, value12, "key"+label51, value51, label41, value41, label31, value31, label32, value32), + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := createAttributesV2(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...) + assert.ElementsMatch(t, tt.want, res) + }) + } +} diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go index 2beb7385fbe1..ccbc218effcd 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go @@ -49,6 +49,7 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() scopeMetricsSlice := resourceMetrics.ScopeMetrics() // keep track of the most recent timestamp in the ResourceMetrics for // use with the "target" info metric @@ -77,7 +78,7 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - c.addGaugeNumberDataPoints(dataPoints, promName) + c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) case pmetric.MetricTypeSum: // TODO implement case pmetric.MetricTypeHistogram: diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go index 375738b8bbee..30cb6b826b2b 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go @@ -27,7 +27,7 @@ func TestFromMetricsV2(t *testing.T) { want := func() map[string]*writev2.TimeSeries { return map[string]*writev2.TimeSeries{ "0": { - LabelsRefs: []uint32{1, 2}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8}, Samples: []writev2.Sample{ {Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), Value: 1.23}, }, diff --git a/pkg/translator/prometheusremotewrite/number_data_points_v2.go b/pkg/translator/prometheusremotewrite/number_data_points_v2.go index 3a192efb9589..bb35df438f96 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_v2.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_v2.go @@ -7,23 +7,26 @@ import ( "math" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) -func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, name string) { +func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, name string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - // TODO implement support for labels - labels := labels.Labels{ - labels.Label{ - Name: model.MetricNameLabel, - Value: name, - }, - } + labels := createAttributesV2( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) sample := &writev2.Sample{ // convert ns to ms diff --git a/pkg/translator/prometheusremotewrite/number_data_points_v2_test.go b/pkg/translator/prometheusremotewrite/number_data_points_v2_test.go index ce78b9c1f391..5b7c2cf377d9 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_v2_test.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_v2_test.go @@ -108,8 +108,16 @@ func TestPrometheusConverterV2_addGaugeNumberDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() + settings := Settings{ + Namespace: "", + ExternalLabels: nil, + DisableTargetInfo: false, + ExportCreatedMetric: false, + AddMetricSuffixes: false, + SendMetadata: false, + } converter := newPrometheusConverterV2() - converter.addGaugeNumberDataPoints(metric.Gauge().DataPoints(), metric.Name()) + converter.addGaugeNumberDataPoints(metric.Gauge().DataPoints(), pcommon.NewResource(), settings, metric.Name()) w := tt.want() diff := cmp.Diff(w, converter.unique, cmpopts.EquateNaNs()) @@ -150,9 +158,18 @@ func TestPrometheusConverterV2_addGaugeNumberDataPointsDuplicate(t *testing.T) { } } + settings := Settings{ + Namespace: "", + ExternalLabels: nil, + DisableTargetInfo: false, + ExportCreatedMetric: false, + AddMetricSuffixes: false, + SendMetadata: false, + } + converter := newPrometheusConverterV2() - converter.addGaugeNumberDataPoints(metric1.Gauge().DataPoints(), metric1.Name()) - converter.addGaugeNumberDataPoints(metric2.Gauge().DataPoints(), metric2.Name()) + converter.addGaugeNumberDataPoints(metric1.Gauge().DataPoints(), pcommon.NewResource(), settings, metric1.Name()) + converter.addGaugeNumberDataPoints(metric2.Gauge().DataPoints(), pcommon.NewResource(), settings, metric2.Name()) assert.Equal(t, want(), converter.unique) diff --git a/pkg/translator/prometheusremotewrite/testutils_test.go b/pkg/translator/prometheusremotewrite/testutils_test.go index 49ef7a735081..d7c92d111829 100644 --- a/pkg/translator/prometheusremotewrite/testutils_test.go +++ b/pkg/translator/prometheusremotewrite/testutils_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -136,6 +137,18 @@ func getAttributes(labels ...string) pcommon.Map { return attributeMap } +// Prometheus TimeSeries +func getPromLabelsV2(lbs ...string) labels.Labels { + pbLbs := labels.Labels{} + for i := 0; i < len(lbs); i += 2 { + pbLbs = append(pbLbs, labels.Label{ + Name: lbs[i], + Value: lbs[i+1], + }) + } + return pbLbs +} + // Prometheus TimeSeries func getPromLabels(lbs ...string) []prompb.Label { pbLbs := prompb.Labels{ diff --git a/receiver/cloudflarereceiver/go.mod b/receiver/cloudflarereceiver/go.mod index d6cfbc66dec3..506debda924d 100644 --- a/receiver/cloudflarereceiver/go.mod +++ b/receiver/cloudflarereceiver/go.mod @@ -49,11 +49,11 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/pipeline v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.1-0.20241008154146-ea48c09c31ae // indirect - go.opentelemetry.io/otel v1.30.0 // indirect - go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/otel v1.31.0 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect - go.opentelemetry.io/otel/trace v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.31.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/receiver/cloudflarereceiver/go.sum b/receiver/cloudflarereceiver/go.sum index 687febfd37e9..df0c0eab5101 100644 --- a/receiver/cloudflarereceiver/go.sum +++ b/receiver/cloudflarereceiver/go.sum @@ -90,14 +90,17 @@ go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.1-0.2024100815414 go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.1-0.20241008154146-ea48c09c31ae/go.mod h1:cwpkRCGssE2AxydEzkFC3l611d8+csaDH/7BjKC7nHI= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= go.opentelemetry.io/otel/sdk/metric v1.30.0 h1:QJLT8Pe11jyHBHfSAgYH7kEmT24eX792jZO1bo4BXkM= go.opentelemetry.io/otel/sdk/metric v1.30.0/go.mod h1:waS6P3YqFNzeP01kuo/MBBYqaoBJl7efRQHOaydhy1Y= go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=