Skip to content

Commit

Permalink
feat: added prometheus rw2 translation for gauges (#35734)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Follow up from #35703.

Draft starting the work on adding support for remote write 2.0 in the
translation package.
Adding support for translating gauges.

This is first iteration and to keep the PR small
* we don't handle duplicate metrics
* only support gauges
* don't handle other labels than metric name
* don't handle exemplars
*  don't handle metadata

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue #33661
Fixes

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Signed-off-by: Juraj Michalek <[email protected]>
Co-authored-by: Arthur Silva Sens <[email protected]>
Co-authored-by: David Ashpole <[email protected]>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 686721c commit 230ed5c
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/jm-prom-translation-rw2-gauges-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/translator/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add FromMetricsV2

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The public function is partially implemented and not ready for use

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions pkg/translator/prometheusremotewrite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.111.0
github.com/prometheus/common v0.60.0
Expand Down
19 changes: 9 additions & 10 deletions pkg/translator/prometheusremotewrite/metrics_to_prw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func BenchmarkFromMetrics(b *testing.B) {
b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) {
for _, exemplarsPerSeries := range []int{0, 5, 10} {
b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) {
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries)
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano())))

for i := 0; i < b.N; i++ {
tsMap, err := FromMetrics(payload.Metrics(), Settings{})
Expand Down Expand Up @@ -71,7 +71,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) {
for _, exemplarsPerSeries := range []int{0, 5, 10} {
b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) {
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries)
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano())))

for i := 0; i < b.N; i++ {
converter := newPrometheusConverter()
Expand All @@ -90,22 +90,21 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
}
}

func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest {
func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int, timestamp pcommon.Timestamp) pmetricotlp.ExportRequest {
request := pmetricotlp.NewExportRequest()

rm := request.Metrics().ResourceMetrics().AppendEmpty()
generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount)

metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
ts := pcommon.NewTimestampFromTime(time.Now())

for i := 1; i <= histogramCount; i++ {
m := metrics.AppendEmpty()
m.SetEmptyHistogram()
m.SetName(fmt.Sprintf("histogram-%v", i))
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
h := m.Histogram().DataPoints().AppendEmpty()
h.SetTimestamp(ts)
h.SetTimestamp(timestamp)

// Set 50 samples, 10 each with values 0.5, 1, 2, 4, and 8
h.SetCount(50)
Expand All @@ -114,7 +113,7 @@ func createExportRequest(resourceAttributeCount int, histogramCount int, nonHist
h.ExplicitBounds().FromRaw([]float64{.5, 1, 2, 4, 8, 16}) // Bucket boundaries include the upper limit (ie. each sample is on the upper limit of its bucket)

generateAttributes(h.Attributes(), "series", labelsPerMetric)
generateExemplars(h.Exemplars(), exemplarsPerSeries, ts)
generateExemplars(h.Exemplars(), exemplarsPerSeries, timestamp)
}

for i := 1; i <= nonHistogramCount; i++ {
Expand All @@ -123,21 +122,21 @@ func createExportRequest(resourceAttributeCount int, histogramCount int, nonHist
m.SetName(fmt.Sprintf("sum-%v", i))
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
point := m.Sum().DataPoints().AppendEmpty()
point.SetTimestamp(ts)
point.SetTimestamp(timestamp)
point.SetDoubleValue(1.23)
generateAttributes(point.Attributes(), "series", labelsPerMetric)
generateExemplars(point.Exemplars(), exemplarsPerSeries, ts)
generateExemplars(point.Exemplars(), exemplarsPerSeries, timestamp)
}

for i := 1; i <= nonHistogramCount; i++ {
m := metrics.AppendEmpty()
m.SetEmptyGauge()
m.SetName(fmt.Sprintf("gauge-%v", i))
point := m.Gauge().DataPoints().AppendEmpty()
point.SetTimestamp(ts)
point.SetTimestamp(timestamp)
point.SetDoubleValue(1.23)
generateAttributes(point.Attributes(), "series", labelsPerMetric)
generateExemplars(point.Exemplars(), exemplarsPerSeries, ts)
generateExemplars(point.Exemplars(), exemplarsPerSeries, timestamp)
}

return request
Expand Down
122 changes: 122 additions & 0 deletions pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"errors"
"fmt"
"strconv"

"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

// FromMetricsV2 converts pmetric.Metrics to Prometheus remote write format 2.0.
func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.TimeSeries, writev2.SymbolsTable, error) {
c := newPrometheusConverterV2()
errs := c.fromMetrics(md, settings)
tss := c.timeSeries()
out := make(map[string]*writev2.TimeSeries, len(tss))
for i := range tss {
out[strconv.Itoa(i)] = &tss[i]
}

return out, c.symbolTable, errs
}

// prometheusConverterV2 converts from OTLP to Prometheus write 2.0 format.
type prometheusConverterV2 struct {
// TODO handle conflicts
unique map[uint64]*writev2.TimeSeries
symbolTable writev2.SymbolsTable
}

func newPrometheusConverterV2() *prometheusConverterV2 {
return &prometheusConverterV2{
unique: map[uint64]*writev2.TimeSeries{},
symbolTable: writev2.NewSymbolTable(),
}
}

// fromMetrics converts pmetric.Metrics to Prometheus remote write format.
func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metricSlice := scopeMetricsSlice.At(j).Metrics()

// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))

if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}

promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)

// handle individual metrics based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addGaugeNumberDataPoints(dataPoints, promName)
case pmetric.MetricTypeSum:
// TODO implement
case pmetric.MetricTypeHistogram:
// TODO implement
case pmetric.MetricTypeExponentialHistogram:
// TODO implement
case pmetric.MetricTypeSummary:
// TODO implement
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
// TODO implement
// addResourceTargetInfov2(resource, settings, mostRecentTimestamp, c)
}

return
}

// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format.
func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries {
allTS := make([]writev2.TimeSeries, 0, len(c.unique))
for _, ts := range c.unique {
allTS = append(allTS, *ts)
}
return allTS
}

func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls labels.Labels) *writev2.TimeSeries {
if sample == nil || len(lbls) == 0 {
// This shouldn't happen
return nil
}
ts := &writev2.TimeSeries{}
ts.LabelsRefs = c.symbolTable.SymbolizeLabels(lbls, ts.LabelsRefs)
ts.Samples = append(ts.Samples, *sample)

c.unique[lbls.Hash()] = ts

return ts
}
44 changes: 44 additions & 0 deletions pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite

import (
"testing"
"time"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestFromMetricsV2(t *testing.T) {
settings := Settings{
Namespace: "",
ExternalLabels: nil,
DisableTargetInfo: false,
ExportCreatedMetric: false,
AddMetricSuffixes: false,
SendMetadata: false,
}

ts := uint64(time.Now().UnixNano())
payload := createExportRequest(5, 0, 1, 3, 0, pcommon.Timestamp(ts))
want := func() map[string]*writev2.TimeSeries {
return map[string]*writev2.TimeSeries{
"0": {
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{
{Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), Value: 1.23},
},
},
}
}
tsMap, symbolsTable, err := FromMetricsV2(payload.Metrics(), settings)
wanted := want()
require.NoError(t, err)
require.NotNil(t, tsMap)
require.Equal(t, wanted, tsMap)
require.NotNil(t, symbolsTable)

}
43 changes: 43 additions & 0 deletions pkg/translator/prometheusremotewrite/number_data_points_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"math"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, name string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
// TODO implement support for labels

labels := labels.Labels{
labels.Label{
Name: model.MetricNameLabel,
Value: name,
},
}

sample := &writev2.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
c.addSample(sample, labels)
}
}
Loading

0 comments on commit 230ed5c

Please sign in to comment.