Skip to content

Commit

Permalink
feat: added translation rw2 translation for gauges
Browse files Browse the repository at this point in the history
Signed-off-by: Juraj Michalek <[email protected]>
  • Loading branch information
jmichalek132 committed Oct 16, 2024
1 parent 12a8df8 commit 8165501
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 17 deletions.
108 changes: 108 additions & 0 deletions pkg/translator/prometheusremotewrite/helper_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"fmt"
"log"
"slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.25.0"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributesV2(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) labels.Labels {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)

// Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2

if haveServiceName {
maxLabelCount++
}

if haveInstanceID {
maxLabelCount++
}

// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)

// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
tempSeriesLabels := labels.Labels{}
// XXX: Should we always drop service namespace/service name/service instance ID from the labels
// (as they get mapped to other Prometheus labels)?
attributes.Range(func(key string, value pcommon.Value) bool {
if !slices.Contains(ignoreAttrs, key) {
tempSeriesLabels = append(tempSeriesLabels, labels.Label{Name: key, Value: value.AsString()})
}
return true
})
// TODO New returns a sorted Labels from the given labels. The caller has to guarantee that all label names are unique.
seriesLabels := labels.New(tempSeriesLabels...) // This sorts by name

for _, label := range seriesLabels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
l[finalKey] = existingValue + ";" + label.Value
} else {
l[finalKey] = label.Value
}
}

// Map service.name + service.namespace to job
if haveServiceName {
val := serviceName.AsString()
if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok {
val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val)
}
l[model.JobLabel] = val
}
// Map service.instance.id to instance
if haveInstanceID {
l[model.InstanceLabel] = instance.AsString()
}
for key, value := range externalLabels {
// External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes
continue
}
l[key] = value
}

for i := 0; i < len(extras); i += 2 {
if i+1 >= len(extras) {
break
}
_, found := l[extras[i]]
if found && logOnOverwrite {
log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.")
}
// internal labels should be maintained
name := extras[i]
if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") {
name = prometheustranslator.NormalizeLabel(name)
}
l[name] = extras[i+1]
}

seriesLabels = seriesLabels[:0]
for k, v := range l {
seriesLabels = append(seriesLabels, labels.Label{Name: k, Value: v})
}

return seriesLabels
}
139 changes: 139 additions & 0 deletions pkg/translator/prometheusremotewrite/helper_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
)

// Test_createLabelSet checks resultant label names are sanitized and label in extra overrides label in labels if
// collision happens. It does not check whether labels are not sorted
func Test_createLabelSetV2(t *testing.T) {
tests := []struct {
name string
resource pcommon.Resource
orig pcommon.Map
externalLabels map[string]string
extras []string
want labels.Labels
}{
{
"labels_clean",
pcommon.NewResource(),
lbs1,
map[string]string{},
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32),
},
{
"labels_with_resource",
func() pcommon.Resource {
res := pcommon.NewResource()
res.Attributes().PutStr("service.name", "prometheus")
res.Attributes().PutStr("service.instance.id", "127.0.0.1:8080")
return res
}(),
lbs1,
map[string]string{},
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "prometheus", "instance", "127.0.0.1:8080"),
},
{
"labels_with_nonstring_resource",
func() pcommon.Resource {
res := pcommon.NewResource()
res.Attributes().PutInt("service.name", 12345)
res.Attributes().PutBool("service.instance.id", true)
return res
}(),
lbs1,
map[string]string{},
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32, "job", "12345", "instance", "true"),
},
{
"labels_duplicate_in_extras",
pcommon.NewResource(),
lbs1,
map[string]string{},
[]string{label11, value31},
getPromLabelsV2(label11, value31, label12, value12),
},
{
"labels_dirty",
pcommon.NewResource(),
lbs1Dirty,
map[string]string{},
[]string{label31 + dirty1, value31, label32, value32},
getPromLabelsV2(label11+"_", value11, "key_"+label12, value12, label31+"_", value31, label32, value32),
},
{
"no_original_case",
pcommon.NewResource(),
pcommon.NewMap(),
nil,
[]string{label31, value31, label32, value32},
getPromLabelsV2(label31, value31, label32, value32),
},
{
"empty_extra_case",
pcommon.NewResource(),
lbs1,
map[string]string{},
[]string{"", ""},
getPromLabelsV2(label11, value11, label12, value12, "", ""),
},
{
"single_left_over_case",
pcommon.NewResource(),
lbs1,
map[string]string{},
[]string{label31, value31, label32},
getPromLabelsV2(label11, value11, label12, value12, label31, value31),
},
{
"valid_external_labels",
pcommon.NewResource(),
lbs1,
exlbs1,
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, label41, value41, label31, value31, label32, value32),
},
{
"overwritten_external_labels",
pcommon.NewResource(),
lbs1,
exlbs2,
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, label31, value31, label32, value32),
},
{
"colliding attributes",
pcommon.NewResource(),
lbsColliding,
nil,
[]string{label31, value31, label32, value32},
getPromLabelsV2(collidingSanitized, value11+";"+value12, label31, value31, label32, value32),
},
{
"sanitize_labels_starts_with_underscore",
pcommon.NewResource(),
lbs3,
exlbs1,
[]string{label31, value31, label32, value32},
getPromLabelsV2(label11, value11, label12, value12, "key"+label51, value51, label41, value41, label31, value31, label32, value32),
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := createAttributesV2(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)
assert.ElementsMatch(t, tt.want, res)
})
}
}
3 changes: 2 additions & 1 deletion pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
Expand Down Expand Up @@ -77,7 +78,7 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addGaugeNumberDataPoints(dataPoints, promName)
c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName)
case pmetric.MetricTypeSum:
// TODO implement
case pmetric.MetricTypeHistogram:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestFromMetricsV2(t *testing.T) {
want := func() map[string]*writev2.TimeSeries {
return map[string]*writev2.TimeSeries{
"0": {
LabelsRefs: []uint32{1, 2},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
Samples: []writev2.Sample{
{Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), Value: 1.23},
},
Expand Down
21 changes: 12 additions & 9 deletions pkg/translator/prometheusremotewrite/number_data_points_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@ import (
"math"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, name string) {
func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, settings Settings, name string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
// TODO implement support for labels

labels := labels.Labels{
labels.Label{
Name: model.MetricNameLabel,
Value: name,
},
}
labels := createAttributesV2(
resource,
pt.Attributes(),
settings.ExternalLabels,
nil,
true,
model.MetricNameLabel,
name,
)

sample := &writev2.Sample{
// convert ns to ms
Expand Down
23 changes: 20 additions & 3 deletions pkg/translator/prometheusremotewrite/number_data_points_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,16 @@ func TestPrometheusConverterV2_addGaugeNumberDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()
settings := Settings{
Namespace: "",
ExternalLabels: nil,
DisableTargetInfo: false,
ExportCreatedMetric: false,
AddMetricSuffixes: false,
SendMetadata: false,
}
converter := newPrometheusConverterV2()
converter.addGaugeNumberDataPoints(metric.Gauge().DataPoints(), metric.Name())
converter.addGaugeNumberDataPoints(metric.Gauge().DataPoints(), pcommon.NewResource(), settings, metric.Name())
w := tt.want()

diff := cmp.Diff(w, converter.unique, cmpopts.EquateNaNs())
Expand Down Expand Up @@ -150,9 +158,18 @@ func TestPrometheusConverterV2_addGaugeNumberDataPointsDuplicate(t *testing.T) {
}
}

settings := Settings{
Namespace: "",
ExternalLabels: nil,
DisableTargetInfo: false,
ExportCreatedMetric: false,
AddMetricSuffixes: false,
SendMetadata: false,
}

converter := newPrometheusConverterV2()
converter.addGaugeNumberDataPoints(metric1.Gauge().DataPoints(), metric1.Name())
converter.addGaugeNumberDataPoints(metric2.Gauge().DataPoints(), metric2.Name())
converter.addGaugeNumberDataPoints(metric1.Gauge().DataPoints(), pcommon.NewResource(), settings, metric1.Name())
converter.addGaugeNumberDataPoints(metric2.Gauge().DataPoints(), pcommon.NewResource(), settings, metric2.Name())

assert.Equal(t, want(), converter.unique)

Expand Down
13 changes: 13 additions & 0 deletions pkg/translator/prometheusremotewrite/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -136,6 +137,18 @@ func getAttributes(labels ...string) pcommon.Map {
return attributeMap
}

// Prometheus TimeSeries
func getPromLabelsV2(lbs ...string) labels.Labels {
pbLbs := labels.Labels{}
for i := 0; i < len(lbs); i += 2 {
pbLbs = append(pbLbs, labels.Label{
Name: lbs[i],
Value: lbs[i+1],
})
}
return pbLbs
}

// Prometheus TimeSeries
func getPromLabels(lbs ...string) []prompb.Label {
pbLbs := prompb.Labels{
Expand Down
6 changes: 3 additions & 3 deletions receiver/cloudflarereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ require (
go.opentelemetry.io/collector/pdata/pprofile v0.111.1-0.20241008154146-ea48c09c31ae // indirect
go.opentelemetry.io/collector/pipeline v0.111.1-0.20241008154146-ea48c09c31ae // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.1-0.20241008154146-ea48c09c31ae // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
Loading

0 comments on commit 8165501

Please sign in to comment.