Skip to content

Commit

Permalink
chore: try to reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
jmichalek132 committed Oct 25, 2024
1 parent 4c4ef36 commit d5521bf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 129 deletions.
94 changes: 59 additions & 35 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"io"
"math"
"net/http"
Expand Down Expand Up @@ -173,38 +174,36 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:
var tsMap map[string]*prompb.TimeSeries
var tsMapv2 map[string]*writev2.TimeSeries
var symbolsTable writev2.SymbolsTable
var m []*prompb.MetricMetadata
var err error
if !prwe.exporterSettings.SendRW2 {
// RW1 case
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}
tsMap, err = prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
} else {
// RW2 case
tsMap, symbolsTable, err := prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
tsMapv2, symbolsTable, err = prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings)

// TODO handle metadata
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMapv2))
}

return prwe.handleExportV2(ctx, tsMap, symbolsTable)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m, symbolsTable, tsMapv2)

}
}

Expand All @@ -220,28 +219,41 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata, symbolsTable writev2.SymbolsTable, tsMapv2 map[string]*writev2.TimeSeries) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
if len(tsMap) == 0 || len(tsMapv2) == 0 {
return nil
}
if len(tsMapv2) == 0 {
// v1 case
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
if err != nil {
return err
}
if !prwe.walEnabled() {
// Perform a direct export otherwise.
return prwe.export(ctx, requests)
}

// Otherwise the WAL is enabled, and just persist the requests to the WAL
// and they'll be exported in another goroutine to the RemoteWrite endpoint.
if err = prwe.wal.persistToWAL(requests); err != nil {
return consumererror.NewPermanent(err)
}
return nil
}
// v2 case
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
// TODO remove batching for now
requests, err := batchTimeSeriesV2(tsMapv2, symbolsTable, prwe.maxBatchSizeBytes, &prwe.batchTimeSeriesState)
if err != nil {
return err
}
if !prwe.walEnabled() {
// Perform a direct export otherwise.
return prwe.export(ctx, requests)
}

// Otherwise the WAL is enabled, and just persist the requests to the WAL
// and they'll be exported in another goroutine to the RemoteWrite endpoint.
if err = prwe.wal.persistToWAL(requests); err != nil {
return consumererror.NewPermanent(err)
}
return nil
// TODO implement WAl support, can be done after #15277 is fixed

return prwe.exportV2(ctx, requests)
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
Expand Down Expand Up @@ -273,7 +285,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
if !ok {
return
}
if errExecute := prwe.execute(ctx, request); errExecute != nil {
if errExecute := prwe.execute(ctx, request, nil, false); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
Expand All @@ -287,9 +299,15 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
return errs
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest, writeReqv2 *writev2.Request, v2 bool) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
var data []byte
var errMarshal error
if !v2 {
data, errMarshal = proto.Marshal(writeReq)
} else {
data, errMarshal = proto.Marshal(writeReqv2)
}
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
Expand Down Expand Up @@ -317,10 +335,16 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

if !v2 {
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
} else {
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
}

resp, err := prwe.client.Do(req)
if err != nil {
return err
Expand Down
95 changes: 1 addition & 94 deletions exporter/prometheusremotewriteexporter/exporter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"

import (
"bytes"
"context"
"fmt"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"io"
"math"
"net/http"
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/multierr"
)
Expand Down Expand Up @@ -67,7 +60,7 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque
if !ok {
return
}
if errExecute := prwe.executeV2(ctx, request); errExecute != nil {
if errExecute := prwe.execute(ctx, nil, request, true); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
Expand All @@ -80,89 +73,3 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque

return errs
}

func (prwe *prwExporter) executeV2(ctx context.Context, writeReq *writev2.Request) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Therefore we always let Snappy decide the size of the buffer.
compressedData := snappy.Encode(nil, data)

// executeFunc can be used for backoff and non backoff scenarios.
executeFunc := func() error {
// check there was no timeout in the component level to avoid retries
// to continue to run after a timeout
select {
case <-ctx.Done():
return backoff.Permanent(ctx.Err())
default:
// continue
}

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return backoff.Permanent(consumererror.NewPermanent(err))
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

resp, err := prwe.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}

body, err := io.ReadAll(io.LimitReader(resp.Body, 256))
rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body)
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
return rerr
}

// 429 errors are recoverable and the exporter should retry if RetryOnHTTP429 enabled
// Reference: https://github.com/prometheus/prometheus/pull/12677
if prwe.retryOnHTTP429 && resp.StatusCode == 429 {
return rerr
}

return backoff.Permanent(consumererror.NewPermanent(rerr))
}

var err error
if prwe.retrySettings.Enabled {
// Use the BackOff instance to retry the func with exponential backoff.
err = backoff.Retry(executeFunc, &backoff.ExponentialBackOff{
InitialInterval: prwe.retrySettings.InitialInterval,
RandomizationFactor: prwe.retrySettings.RandomizationFactor,
Multiplier: prwe.retrySettings.Multiplier,
MaxInterval: prwe.retrySettings.MaxInterval,
MaxElapsedTime: prwe.retrySettings.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
})
} else {
err = executeFunc()
}

if err != nil {
return consumererror.NewPermanent(err)
}

return err
}

0 comments on commit d5521bf

Please sign in to comment.