diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 5a4bea1c9cc5..283c16601732 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "io" "math" "net/http" @@ -173,38 +174,36 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er case <-prwe.closeChan: return errors.New("shutdown has been called") default: + var tsMap map[string]*prompb.TimeSeries + var tsMapv2 map[string]*writev2.TimeSeries + var symbolsTable writev2.SymbolsTable + var m []*prompb.MetricMetadata + var err error if !prwe.exporterSettings.SendRW2 { // RW1 case - tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) - if err != nil { - prwe.telemetry.recordTranslationFailure(ctx) - prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) - } + tsMap, err = prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) - var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) } - // Call export even if a conversion error, since there may be points that were successfully converted. - return prwe.handleExport(ctx, tsMap, m) } else { // RW2 case - tsMap, symbolsTable, err := prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings) - if err != nil { - prwe.telemetry.recordTranslationFailure(ctx) - prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) - } - - prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) + tsMapv2, symbolsTable, err = prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings) - // TODO handle metadata + prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMapv2)) + } - return prwe.handleExportV2(ctx, tsMap, symbolsTable) + if err != nil { + prwe.telemetry.recordTranslationFailure(ctx) + prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) } + // Call export even if a conversion error, since there may be points that were successfully converted. + return prwe.handleExport(ctx, tsMap, m, symbolsTable, tsMapv2) + } } @@ -220,28 +219,41 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { return sanitizedLabels, nil } -func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error { +func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata, symbolsTable writev2.SymbolsTable, tsMapv2 map[string]*writev2.TimeSeries) error { // There are no metrics to export, so return. - if len(tsMap) == 0 { + if len(tsMap) == 0 || len(tsMapv2) == 0 { return nil } + if len(tsMapv2) == 0 { + // v1 case + // Calls the helper function to convert and batch the TsMap to the desired format + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + if err != nil { + return err + } + if !prwe.walEnabled() { + // Perform a direct export otherwise. + return prwe.export(ctx, requests) + } + // Otherwise the WAL is enabled, and just persist the requests to the WAL + // and they'll be exported in another goroutine to the RemoteWrite endpoint. + if err = prwe.wal.persistToWAL(requests); err != nil { + return consumererror.NewPermanent(err) + } + return nil + } + // v2 case // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + // TODO remove batching for now + requests, err := batchTimeSeriesV2(tsMapv2, symbolsTable, prwe.maxBatchSizeBytes, &prwe.batchTimeSeriesState) if err != nil { return err } - if !prwe.walEnabled() { - // Perform a direct export otherwise. - return prwe.export(ctx, requests) - } - // Otherwise the WAL is enabled, and just persist the requests to the WAL - // and they'll be exported in another goroutine to the RemoteWrite endpoint. - if err = prwe.wal.persistToWAL(requests); err != nil { - return consumererror.NewPermanent(err) - } - return nil + // TODO implement WAl support, can be done after #15277 is fixed + + return prwe.exportV2(ctx, requests) } // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order @@ -273,7 +285,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq if !ok { return } - if errExecute := prwe.execute(ctx, request); errExecute != nil { + if errExecute := prwe.execute(ctx, request, nil, false); errExecute != nil { mu.Lock() errs = multierr.Append(errs, consumererror.NewPermanent(errExecute)) mu.Unlock() @@ -287,9 +299,15 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq return errs } -func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { +func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest, writeReqv2 *writev2.Request, v2 bool) error { // Uses proto.Marshal to convert the WriteRequest into bytes array - data, errMarshal := proto.Marshal(writeReq) + var data []byte + var errMarshal error + if !v2 { + data, errMarshal = proto.Marshal(writeReq) + } else { + data, errMarshal = proto.Marshal(writeReqv2) + } if errMarshal != nil { return consumererror.NewPermanent(errMarshal) } @@ -317,10 +335,16 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ // Add necessary headers specified by: // https://cortexmetrics.io/docs/apis/#remote-api req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("User-Agent", prwe.userAgentHeader) + if !v2 { + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + } else { + req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") + req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") + } + resp, err := prwe.client.Do(req) if err != nil { return err diff --git a/exporter/prometheusremotewriteexporter/exporter_v2.go b/exporter/prometheusremotewriteexporter/exporter_v2.go index b0368bf73869..e0db050d1500 100644 --- a/exporter/prometheusremotewriteexporter/exporter_v2.go +++ b/exporter/prometheusremotewriteexporter/exporter_v2.go @@ -4,18 +4,11 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" import ( - "bytes" "context" - "fmt" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" - "io" "math" - "net/http" "sync" - "github.com/cenkalti/backoff/v4" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "go.opentelemetry.io/collector/consumer/consumererror" "go.uber.org/multierr" ) @@ -67,7 +60,7 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque if !ok { return } - if errExecute := prwe.executeV2(ctx, request); errExecute != nil { + if errExecute := prwe.execute(ctx, nil, request, true); errExecute != nil { mu.Lock() errs = multierr.Append(errs, consumererror.NewPermanent(errExecute)) mu.Unlock() @@ -80,89 +73,3 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque return errs } - -func (prwe *prwExporter) executeV2(ctx context.Context, writeReq *writev2.Request) error { - // Uses proto.Marshal to convert the WriteRequest into bytes array - data, errMarshal := proto.Marshal(writeReq) - if errMarshal != nil { - return consumererror.NewPermanent(errMarshal) - } - // If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer. - // Therefore we always let Snappy decide the size of the buffer. - compressedData := snappy.Encode(nil, data) - - // executeFunc can be used for backoff and non backoff scenarios. - executeFunc := func() error { - // check there was no timeout in the component level to avoid retries - // to continue to run after a timeout - select { - case <-ctx.Done(): - return backoff.Permanent(ctx.Err()) - default: - // continue - } - - // Create the HTTP POST request to send to the endpoint - req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) - if err != nil { - return backoff.Permanent(consumererror.NewPermanent(err)) - } - - // Add necessary headers specified by: - // https://cortexmetrics.io/docs/apis/#remote-api - req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") - req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") - req.Header.Set("User-Agent", prwe.userAgentHeader) - - resp, err := prwe.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - // 2xx status code is considered a success - // 5xx errors are recoverable and the exporter should retry - // Reference for different behavior according to status code: - // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - return nil - } - - body, err := io.ReadAll(io.LimitReader(resp.Body, 256)) - rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body) - if resp.StatusCode >= 500 && resp.StatusCode < 600 { - return rerr - } - - // 429 errors are recoverable and the exporter should retry if RetryOnHTTP429 enabled - // Reference: https://github.com/prometheus/prometheus/pull/12677 - if prwe.retryOnHTTP429 && resp.StatusCode == 429 { - return rerr - } - - return backoff.Permanent(consumererror.NewPermanent(rerr)) - } - - var err error - if prwe.retrySettings.Enabled { - // Use the BackOff instance to retry the func with exponential backoff. - err = backoff.Retry(executeFunc, &backoff.ExponentialBackOff{ - InitialInterval: prwe.retrySettings.InitialInterval, - RandomizationFactor: prwe.retrySettings.RandomizationFactor, - Multiplier: prwe.retrySettings.Multiplier, - MaxInterval: prwe.retrySettings.MaxInterval, - MaxElapsedTime: prwe.retrySettings.MaxElapsedTime, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - }) - } else { - err = executeFunc() - } - - if err != nil { - return consumererror.NewPermanent(err) - } - - return err -}