Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] allocate smaller buffers for prometh…
Browse files Browse the repository at this point in the history
…eus remote write batch time series (#34271)

**Description:** 
Adjusting buffer allocations in
prometheusremotewriteexporter.batchTimeSeries

Rather than allocating the maximum possible size buffer each time we
memoize the last buffer size that we actually used and allocate a 2x
sized buffer for the next request. This ensures that we tend to allocate
buffers sized appropriately for the requests and still avoid array
resizing in most cases (unless the data point size changes by more than
2x from one request to the next).

This reduces memory allocations and memory usage significantly in our
testing when you have large batches.

In the case that the request sizes never meet the max request size
configured for the exporter the existing behavior will remain and we
will continue to allocate a single buffer with a size equal to the
number of time series / metric metadata objects passed in. There will
still be a benefit of not allocating a huge requests buffer in this case
since instead of allocating a buffer with a size equal to the number of
total objects passed in we default to a buffer of size 10 or 2x the last
number of requests whichever is larger.

**Link to tracking Issue:** 
Fixes
#34269
**Testing:**
We have validated this fix in our test environment and existing unit
tests cover this code. The observable behavior is unchanged as this just
tweaks the pre-allocated size of the buffers. I can add some unit tests
to validate the memoized buffer sizes once we agree on the proper place
to store that memoization.

**Documentation:**
No documentation added as this is a perf optimization that doesn't have
any user facing impact beyond reducing memory usage.
  • Loading branch information
ben-childs-docusign authored Aug 1, 2024
1 parent 42f9faa commit 67a5891
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/bc--patchbufferallocations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Reduce memory allocations of prometheus remote write exporter "batchtimeseries" when large batch sizes are used

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34269]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
34 changes: 18 additions & 16 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -123,7 +124,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
}

prwe.wal = newWAL(cfg.WAL, prwe.export)
Expand Down Expand Up @@ -208,7 +210,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
if err != nil {
return err
}
Expand Down
36 changes: 30 additions & 6 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,51 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent

import (
"errors"
"math"
"sort"

"github.com/prometheus/prometheus/prompb"
)

type batchTimeSeriesState struct {
// Track batch sizes sent to avoid over allocating huge buffers.
// This helps in the case where large batches are sent to avoid allocating too much unused memory
nextTimeSeriesBufferSize int
nextMetricMetadataBufferSize int
nextRequestBufferSize int
}

func newBatchTimeSericesState() batchTimeSeriesState {
return batchTimeSeriesState{
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
}
}

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m))
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
// Allocate a buffer size of at least 10, or twice the last # of requests we sent
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))

// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
sizeOfCurrentBatch = 0
}

Expand All @@ -42,17 +63,19 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

mArray := make([]prompb.MetricMetadata, 0, len(m))
// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
sizeOfCurrentBatch = 0
}

Expand All @@ -66,6 +89,7 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

state.nextRequestBufferSize = 2 * len(requests)
return requests, nil
}

Expand Down
72 changes: 71 additions & 1 deletion exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package prometheusremotewriteexporter

import (
"math"
"testing"

"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -57,17 +58,86 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state)
if tt.returnErr {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tt.numExpectedRequests, len(requests))
if tt.numExpectedRequests <= 1 {
assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
} else {
assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
}
})
}
}

func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) {
labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22)
sample1 := getSample(floatVal1, msTime1)
sample2 := getSample(floatVal2, msTime2)
sample3 := getSample(floatVal3, msTime3)

// Benchmark for large data sizes
// First allocate 100k time series
tsArray := make([]*prompb.TimeSeries, 0, 100000)
for i := 0; i < 100000; i++ {
ts := getTimeSeries(labels, sample1, sample2, sample3)
tsArray = append(tsArray, ts)
}

tsMap1 := getTimeseriesMap(tsArray)

state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)

assert.NoError(t, err)
assert.Equal(t, 18, len(requests))
assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 36, state.nextRequestBufferSize)
}

// Benchmark_batchTimeSeries checks batchTimeSeries
// To run and gather alloc data:
// go test -bench ^Benchmark_batchTimeSeries$ -benchmem -benchtime=100x -run=^$ -count=10 -memprofile memprofile.out
// go tool pprof -svg memprofile.out
func Benchmark_batchTimeSeries(b *testing.B) {
labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22)
sample1 := getSample(floatVal1, msTime1)
sample2 := getSample(floatVal2, msTime2)
sample3 := getSample(floatVal3, msTime3)

// Benchmark for large data sizes
// First allocate 100k time series
tsArray := make([]*prompb.TimeSeries, 0, 100000)
for i := 0; i < 100000; i++ {
ts := getTimeSeries(labels, sample1, sample2, sample3)
tsArray = append(tsArray, ts)
}

tsMap1 := getTimeseriesMap(tsArray)

b.ReportAllocs()
b.ResetTimer()

state := newBatchTimeSericesState()
// Run batchTimeSeries 100 times with a 1mb max request size
for i := 0; i < b.N; i++ {
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
assert.NoError(b, err)
assert.Equal(b, 18, len(requests))
}
}

// Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries
// are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly
// sorted values. See issues:
Expand Down

0 comments on commit 67a5891

Please sign in to comment.