diff --git a/.chloggen/bc--patchbufferallocations.yaml b/.chloggen/bc--patchbufferallocations.yaml new file mode 100644 index 000000000000..de1482281291 --- /dev/null +++ b/.chloggen/bc--patchbufferallocations.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reduce memory allocations of prometheus remote write exporter "batchtimeseries" when large batch sizes are used + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34269] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 1bb03af36435..8cad87a5329e 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -55,20 +55,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - maxBatchSizeBytes int - clientSettings *confighttp.ClientConfig - settings component.TelemetrySettings - retrySettings configretry.BackOffConfig - retryOnHTTP429 bool - wal *prweWAL - exporterSettings prometheusremotewrite.Settings - telemetry prwTelemetry + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + maxBatchSizeBytes int + clientSettings *confighttp.ClientConfig + settings component.TelemetrySettings + retrySettings configretry.BackOffConfig + retryOnHTTP429 bool + wal *prweWAL + exporterSettings prometheusremotewrite.Settings + telemetry prwTelemetry + batchTimeSeriesState batchTimeSeriesState } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -123,7 +124,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, - telemetry: prwTelemetry, + telemetry: prwTelemetry, + batchTimeSeriesState: newBatchTimeSericesState(), } prwe.wal = newWAL(cfg.WAL, prwe.export) @@ -208,7 +210,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index d5eca3086a7e..5819aefc4fe8 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -5,19 +5,39 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent import ( "errors" + "math" "sort" "github.com/prometheus/prometheus/prompb" ) +type batchTimeSeriesState struct { + // Track batch sizes sent to avoid over allocating huge buffers. + // This helps in the case where large batches are sent to avoid allocating too much unused memory + nextTimeSeriesBufferSize int + nextMetricMetadataBufferSize int + nextRequestBufferSize int +} + +func newBatchTimeSericesState() batchTimeSeriesState { + return batchTimeSeriesState{ + nextTimeSeriesBufferSize: math.MaxInt, + nextMetricMetadataBufferSize: math.MaxInt, + nextRequestBufferSize: 0, + } +} + // batchTimeSeries splits series into multiple batch write requests. -func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) { +func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) { if len(tsMap) == 0 { return nil, errors.New("invalid tsMap: cannot be empty map") } - requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m)) - tsArray := make([]prompb.TimeSeries, 0, len(tsMap)) + // Allocate a buffer size of at least 10, or twice the last # of requests we sent + requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize)) + + // Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller + tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap))) sizeOfCurrentBatch := 0 i := 0 @@ -25,10 +45,11 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, sizeOfSeries := v.Size() if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize { + state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray)) wrapped := convertTimeseriesToRequest(tsArray) requests = append(requests, wrapped) - tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i) + tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i)) sizeOfCurrentBatch = 0 } @@ -42,17 +63,19 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, requests = append(requests, wrapped) } - mArray := make([]prompb.MetricMetadata, 0, len(m)) + // Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller + mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m))) sizeOfCurrentBatch = 0 i = 0 for _, v := range m { sizeOfM := v.Size() if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize { + state.nextMetricMetadataBufferSize = max(10, 2*len(mArray)) wrapped := convertMetadataToRequest(mArray) requests = append(requests, wrapped) - mArray = make([]prompb.MetricMetadata, 0, len(m)-i) + mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i)) sizeOfCurrentBatch = 0 } @@ -66,6 +89,7 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, requests = append(requests, wrapped) } + state.nextRequestBufferSize = 2 * len(requests) return requests, nil } diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index 8404bb6c9e9e..afd2a4958ae9 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -4,6 +4,7 @@ package prometheusremotewriteexporter import ( + "math" "testing" "github.com/prometheus/prometheus/prompb" @@ -57,17 +58,86 @@ func Test_batchTimeSeries(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil) + state := newBatchTimeSericesState() + requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state) if tt.returnErr { assert.Error(t, err) return } + assert.NoError(t, err) assert.Equal(t, tt.numExpectedRequests, len(requests)) + if tt.numExpectedRequests <= 1 { + assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize) + assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) + assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) + } else { + assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize) + assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) + assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) + } }) } } +func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) { + labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22) + sample1 := getSample(floatVal1, msTime1) + sample2 := getSample(floatVal2, msTime2) + sample3 := getSample(floatVal3, msTime3) + + // Benchmark for large data sizes + // First allocate 100k time series + tsArray := make([]*prompb.TimeSeries, 0, 100000) + for i := 0; i < 100000; i++ { + ts := getTimeSeries(labels, sample1, sample2, sample3) + tsArray = append(tsArray, ts) + } + + tsMap1 := getTimeseriesMap(tsArray) + + state := newBatchTimeSericesState() + requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + + assert.NoError(t, err) + assert.Equal(t, 18, len(requests)) + assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize) + assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) + assert.Equal(t, 36, state.nextRequestBufferSize) +} + +// Benchmark_batchTimeSeries checks batchTimeSeries +// To run and gather alloc data: +// go test -bench ^Benchmark_batchTimeSeries$ -benchmem -benchtime=100x -run=^$ -count=10 -memprofile memprofile.out +// go tool pprof -svg memprofile.out +func Benchmark_batchTimeSeries(b *testing.B) { + labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22) + sample1 := getSample(floatVal1, msTime1) + sample2 := getSample(floatVal2, msTime2) + sample3 := getSample(floatVal3, msTime3) + + // Benchmark for large data sizes + // First allocate 100k time series + tsArray := make([]*prompb.TimeSeries, 0, 100000) + for i := 0; i < 100000; i++ { + ts := getTimeSeries(labels, sample1, sample2, sample3) + tsArray = append(tsArray, ts) + } + + tsMap1 := getTimeseriesMap(tsArray) + + b.ReportAllocs() + b.ResetTimer() + + state := newBatchTimeSericesState() + // Run batchTimeSeries 100 times with a 1mb max request size + for i := 0; i < b.N; i++ { + requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + assert.NoError(b, err) + assert.Equal(b, 18, len(requests)) + } +} + // Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries // are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly // sorted values. See issues: