Skip to content

Commit

Permalink
feat/re-allow multiple workers (#36134)
Browse files Browse the repository at this point in the history
#### Description

This MR does the following:

* Re adds the ability to allow multiple workers in this exporter due to:

1. Out of Order is no longer an issue now that it is fully supported in
Prometheus. Nonetheless, I am setting the default worker as 1 to avoid
OoO in Vanilla Prometheus Settings.

2. With a single worker, and for a collector with a large load, this
becomes "blocking". Example: Imagine a scenario in which a collector is
collecting lots of targets, and with a slow prometheus/unstable network,
a single worker can easily bottleneck the off-shipping if retries are
enabled.

#### Link to tracking issue
N/A

#### Testing  ####

#### Documentation
docs auto-updated. Readme.md is now correct in its explanation of the
`num_consumers since its no longer hard-coded at 1. Additional docs
added.
  • Loading branch information
bmiguel-teixeira authored Jan 17, 2025
1 parent ce1d15a commit 3febf16
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 12 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prometheusremotewrite-reallow-multiple-workers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Re allows the configuration of multiple workers

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36134]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
29 changes: 28 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The following settings can be optionally configured:
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `enabled`: enable the sending queue (default: `true`)
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5`)
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5` or default: `1` if `EnableMultipleWorkersFeatureGate` is enabled).
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
- `target_info`: customize `target_info` metric
Expand All @@ -66,6 +66,7 @@ The following settings can be optionally configured:
- `max_batch_size_bytes` (default = `3000000` -> `~2.861 mb`): Maximum size of a batch of
samples to be sent to the remote write endpoint. If the batch size is larger
than this value, it will be split into multiple batches.
- `max_batch_request_parallelism` (default = `5`): Maximum parallelism allowed for a single request bigger than `max_batch_size_bytes`.

Example:

Expand Down Expand Up @@ -101,12 +102,22 @@ Several helper files are leveraged to provide additional capabilities automatica
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`.

### Feature gates

#### RetryOn429

This exporter has feature gate: `exporter.prometheusremotewritexporter.RetryOn429`.
When this feature gate is enable the prometheus remote write exporter will retry on 429 http status code with the provided retry configuration.
It currently doesn't support respecting the http header `Retry-After` if provided since the retry library used doesn't support this feature.

To enable it run collector with enabled feature gate `exporter.prometheusremotewritexporter.RetryOn429`. This can be done by executing it with one additional parameter - `--feature-gates=telemetry.useOtelForInternalMetrics`.

#### EnableMultipleWorkersFeatureGate

This exporter has feature gate: `+exporter.prometheusremotewritexporter.EnableMultipleWorkers`.

When this feature gate is enabled, `num_consumers` will be used as the worker counter for handling batches from the queue, and `max_batch_request_parallelism` will be used for parallelism on single batch bigger than `max_batch_size_bytes`.
Enabling this feature gate, with `num_consumers` higher than 1 requires the target destination to supports ingestion of OutOfOrder samples. See [Multiple Consumers and OutOfOrder](#multiple-consumers-and-outoforder) for more info

## Metric names and labels normalization

OpenTelemetry metric names and attributes are normalized to be compliant with Prometheus naming rules. [Details on this normalization process are described in the Prometheus translator module](../../pkg/translator/prometheus/).
Expand Down Expand Up @@ -149,3 +160,19 @@ sum by (namespace) (app_ads_ad_requests_total)
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol

## Multiple Consumers and OutOfOrder

**DISCLAIMER**: This snippet applies only to Prometheus, other remote write destinations using Prometheus Protocol (ex: Thanos/Grafana Mimir/VictoriaMetrics) may have different settings.

By default, Prometheus expects samples to be ingested sequentially, in temporal order.

When multiple consumers are enabled, the temporal ordering of the samples written to the target destination is not deterministic, and temporal ordering can no longer be guaranteed. For example, one worker may push a sample for `t+30s`, and a second worker may push an additional sample but for `t+15s`.

Vanilla Prometheus configurations will reject these unordered samples and you'll receive "out of order" errors.

Out-of-order support in Prometheus must be enabled for multiple consumers.
This can be done by using the `tsdb.out_of_order_time_window: 10m` settings. Please choose an appropriate time window to support pushing the worst-case scenarios of a "queue" build-up on the sender side.

See for more info:
- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb
7 changes: 7 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Config struct {
// maximum size in bytes of time series batch sent to remote storage
MaxBatchSizeBytes int `mapstructure:"max_batch_size_bytes"`

// maximum amount of parallel requests to do when handling large batch request
MaxBatchRequestParallelism *int `mapstructure:"max_batch_request_parallelism"`

// ResourceToTelemetrySettings is the option for converting resource attributes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down Expand Up @@ -87,6 +90,10 @@ var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.MaxBatchRequestParallelism != nil && *cfg.MaxBatchRequestParallelism < 1 {
return fmt.Errorf("max_batch_request_parallelism can't be set to below 1")
}

if cfg.RemoteWriteQueue.QueueSize < 0 {
return fmt.Errorf("remote write queue size can't be negative")
}
Expand Down
13 changes: 11 additions & 2 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "2"),
expected: &Config{
MaxBatchSizeBytes: 3000000,
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
MaxBatchSizeBytes: 3000000,
MaxBatchRequestParallelism: toPtr(10),
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
Expand Down Expand Up @@ -90,6 +91,10 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "negative_num_consumers"),
errorMessage: "remote write consumer number can't be negative",
},
{
id: component.NewIDWithName(metadata.Type, "less_than_1_max_batch_request_parallelism"),
errorMessage: "max_batch_request_parallelism can't be set to below 1",
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -136,3 +141,7 @@ func TestDisabledTargetInfo(t *testing.T) {

assert.False(t, cfg.(*Config).TargetInfo.Enabled)
}

func toPtr[T any](val T) *T {
return &val
}
10 changes: 9 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,21 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

concurrency := 5
if !enableMultipleWorkersFeatureGate.IsEnabled() {
concurrency = cfg.RemoteWriteQueue.NumConsumers
}
if cfg.MaxBatchRequestParallelism != nil {
concurrency = *cfg.MaxBatchRequestParallelism
}

prwe := &prwExporter{
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
concurrency: concurrency,
clientSettings: &cfg.ClientConfig,
settings: set.TelemetrySettings,
retrySettings: cfg.BackOffConfig,
Expand Down
31 changes: 23 additions & 8 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ var retryOn429FeatureGate = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterDescription("When enabled, the Prometheus remote write exporter will retry 429 http status code. Requires exporter.prometheusremotewritexporter.metrics.RetryOn429 to be enabled."),
)

var enableMultipleWorkersFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.prometheusremotewritexporter.EnableMultipleWorkers",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled and settings configured, the Prometheus remote exporter will"+
" spawn multiple workers/goroutines to handle incoming metrics batches concurrently"),
)

// NewFactory creates a new Prometheus Remote Write exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand All @@ -42,17 +49,19 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
return nil, errors.New("invalid configuration")
}

if !enableMultipleWorkersFeatureGate.IsEnabled() && prwCfg.RemoteWriteQueue.NumConsumers != 5 {
set.Logger.Warn("`remote_write_queue.num_consumers` will be used to configure processing parallelism, rather than request parallelism in a future release. This may cause out-of-order issues unless you take action. Please migrate to using `max_batch_request_parallelism` to keep the your existing behavior.")
}

prwe, err := newPRWExporter(prwCfg, set)
if err != nil {
return nil, err
}

// Don't allow users to configure the queue.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
// Prometheus remote write samples needs to be in chronological
// order for each timeseries. If we shard the incoming metrics
// without considering this limitation, we experience
// "out of order samples" errors.
numConsumers := 1
if enableMultipleWorkersFeatureGate.IsEnabled() {
numConsumers = prwCfg.RemoteWriteQueue.NumConsumers
}
exporter, err := exporterhelper.NewMetrics(
ctx,
set,
Expand All @@ -61,7 +70,7 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueConfig{
Enabled: prwCfg.RemoteWriteQueue.Enabled,
NumConsumers: 1,
NumConsumers: numConsumers,
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
}),
exporterhelper.WithStart(prwe.Start),
Expand All @@ -83,10 +92,16 @@ func createDefaultConfig() component.Config {
clientConfig.WriteBufferSize = 512 * 1024
clientConfig.Timeout = exporterhelper.NewDefaultTimeoutConfig().Timeout

numConsumers := 5
if enableMultipleWorkersFeatureGate.IsEnabled() {
numConsumers = 1
}
return &Config{
Namespace: "",
ExternalLabels: map[string]string{},
MaxBatchSizeBytes: 3000000,
// To set this as default once `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is removed
// MaxBatchRequestParallelism: 5,
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: retrySettings,
AddMetricSuffixes: true,
Expand All @@ -96,7 +111,7 @@ func createDefaultConfig() component.Config {
RemoteWriteQueue: RemoteWriteQueue{
Enabled: true,
QueueSize: 10000,
NumConsumers: 5,
NumConsumers: numConsumers,
},
TargetInfo: &TargetInfo{
Enabled: true,
Expand Down
5 changes: 5 additions & 0 deletions exporter/prometheusremotewriteexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ prometheusremotewrite:

prometheusremotewrite/2:
namespace: "test-space"
max_batch_request_parallelism: 10
retry_on_failure:
enabled: true
initial_interval: 10s
Expand Down Expand Up @@ -38,6 +39,10 @@ prometheusremotewrite/negative_num_consumers:
queue_size: 5
num_consumers: -1

prometheusremotewrite/less_than_1_max_batch_request_parallelism:
endpoint: "localhost:8888"
max_batch_request_parallelism: 0

prometheusremotewrite/disabled_target_info:
endpoint: "localhost:8888"
target_info:
Expand Down

0 comments on commit 3febf16

Please sign in to comment.