Skip to content

Commit

Permalink
ruler: cap the number of remote eval retries (grafana#10375)
Browse files Browse the repository at this point in the history
* ruler: cap the number of remote eval retries

The retries happen more aggressively than actual evaluations. With the current setup an error spike results in 3x the query rate - initial query, and two retries fairly quickly 100ms & 200ms after that.

This PR changes that so that the whole process doesn't retry more than a fixed number of queries/sec. I chose 170 because at GL the average evals/sec is 340 per ruler. This would retry about half of the rules on average. _On average_ that should increase query load by 50%.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix a totally arbitrary stupid linter rule

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Use a CB instead of a rate limtier

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "Use a CB instead of a rate limtier"

This reverts commit b07366f.

* Don't abort retries if we're over the rate limit

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Cancel reservation when context expires

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Jan 9, 2025
1 parent 3091c3a commit ffee57d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 23 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

## main / unreleased

* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220

### Grafana Mimir

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -13227,6 +13227,16 @@
"fieldDefaultValue": "protobuf",
"fieldFlag": "ruler.query-frontend.query-result-response-format",
"fieldType": "string"
},
{
"kind": "field",
"name": "max_retries_rate",
"required": false,
"desc": "Maximum number of retries for failed queries per second.",
"fieldValue": null,
"fieldDefaultValue": 170,
"fieldFlag": "ruler.query-frontend.max-retries-rate",
"fieldType": "float"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2987,6 +2987,8 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.query-stats-enabled
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of rules per rule group by namespace. Value is a map, where each key is the namespace and value is the number of rules allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rules specified has the same meaning as -ruler.max-rules-per-rule-group, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rules-per-rule-group. (default {})
-ruler.query-frontend.address string
GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) to enable client side load balancing.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.recording-rules-evaluation-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,10 @@ query_frontend:
# CLI flag: -ruler.query-frontend.query-result-response-format
[query_result_response_format: <string> | default = "protobuf"]
# Maximum number of retries for failed queries per second.
# CLI flag: -ruler.query-frontend.max-retries-rate
[max_retries_rate: <float> | default = 170]
tenant_federation:
# Enable rule groups to query against multiple tenants. The tenant IDs
# involved need to be in the rule group's 'source_tenants' field. If this flag
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, 1, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
27 changes: 22 additions & 5 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -67,6 +68,8 @@ type QueryFrontendConfig struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

QueryResultResponseFormat string `yaml:"query_result_response_format"`

MaxRetriesRate float64 `yaml:"max_retries_rate"`
}

func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -80,6 +83,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)

f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", ")))
f.Float64Var(&c.MaxRetriesRate, "ruler.query-frontend.max-retries-rate", 170, "Maximum number of retries for failed queries per second.")
}

func (c *QueryFrontendConfig) Validate() error {
Expand Down Expand Up @@ -115,6 +119,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryLimiter *rate.Limiter
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -130,6 +135,7 @@ var protobufDecoderInstance = protobufDecoder{}
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
timeout time.Duration,
maxRetryRate float64, // maxRetryRate is the maximum number of retries for failed queries per second.
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
logger log.Logger,
Expand All @@ -138,6 +144,7 @@ func NewRemoteQuerier(
return &RemoteQuerier{
client: client,
timeout: timeout,
retryLimiter: rate.NewLimiter(rate.Limit(maxRetryRate), 1),
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
logger: logger,
Expand Down Expand Up @@ -349,12 +356,22 @@ func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPReque
if !retry.Ongoing() {
return nil, err
}
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err)
retry.Wait()

// Avoid masking last known error if context was cancelled while waiting.
if ctx.Err() != nil {
return nil, fmt.Errorf("%s while retrying request, last err was: %w", ctx.Err(), err)
retryReservation := q.retryLimiter.Reserve()
if !retryReservation.OK() {
// This should only happen if we've misconfigured the limiter.
return nil, fmt.Errorf("couldn't reserve a retry token")
}
// We want to wait at least the time for the backoff, but also don't want to exceed the rate limit.
// All of this is capped to the max backoff, so that we are less likely to overrun into the next evaluation.
retryDelay := max(retry.NextDelay(), min(retryConfig.MaxBackoff, retryReservation.Delay()))
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err, "retry_delay", retryDelay)
select {
case <-time.After(retryDelay):
case <-ctx.Done():
retryReservation.Cancel()
// Avoid masking last known error if context was cancelled while waiting.
return nil, fmt.Errorf("%s while retrying request, last error was: %w", ctx.Err(), err)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +76,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +86,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +101,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, format, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +165,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,7 +175,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
Expand Down Expand Up @@ -276,20 +276,20 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
if testCase.expectedError == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
}
require.Equal(t, int64(1), count.Load())
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
if testCase.expectedRetries {
require.Greater(t, count.Load(), int64(1))
} else {
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatProtobuf, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +701,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +713,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +771,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down

0 comments on commit ffee57d

Please sign in to comment.