Skip to content

Commit

Permalink
Setting ruler.evaluation-delay-duration to be deprecated. (#6149)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielblando authored Aug 13, 2024
1 parent 22245aa commit 24edba0
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 360 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [CHANGE] Querier: Deprecate and enable by default `querier.ingester-metadata-streaming` flag. #6147
* [CHANGE] QueryFrontend/QueryScheduler: Deprecate `-querier.max-outstanding-requests-per-tenant` and `-query-scheduler.max-outstanding-requests-per-tenant` flags. Use frontend.max-outstanding-requests-per-tenant instead. #6146
* [CHANGE] Ingesters: Enable 'snappy-block' compression on ingester clients by default. #6148
* [CHANGE] Ruler: Scheduling `ruler.evaluation-delay-duration` to be deprecated. Use the highest value between `ruler.evaluation-delay-duration` and `ruler.query-offset` #6149
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3333,6 +3333,7 @@ query_rejection:
# them.
[query_attributes: <list of QueryAttribute> | default = []]

# Deprecated(use ruler.query-offset instead) and will be removed in v1.19.0:
# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
8 changes: 0 additions & 8 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ receivers:
labels: {}
annotations: {}
`

cortexRulerEvalStaleNanConfigYaml = `groups:
- name: rule
interval: 1s
rules:
- record: stale_nan_eval
expr: a_sometimes_stale_nan_series * 2
`
)

var (
Expand Down
115 changes: 0 additions & 115 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"crypto/x509/pkix"
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"os"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -200,119 +198,6 @@ func TestRulerAPISingleBinary(t *testing.T) {
require.NoError(t, cortexRestarted.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total"))
}

func TestRulerEvaluationDelay(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

namespace := "ns"
user := "fake"

evaluationDelay := time.Minute * 5

configOverrides := map[string]string{
"-ruler-storage.local.directory": filepath.Join(e2e.ContainerSharedDir, "ruler_configs"),
"-ruler.poll-interval": "2s",
"-ruler.rule-path": filepath.Join(e2e.ContainerSharedDir, "rule_tmp/"),
"-ruler.evaluation-delay-duration": evaluationDelay.String(),
}

// Start Cortex components.
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))
require.NoError(t, writeFileToSharedDir(s, filepath.Join("ruler_configs", user, namespace), []byte(cortexRulerEvalStaleNanConfigYaml)))
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, configOverrides, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

// Create a client with the ruler address configured
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", cortex.HTTPEndpoint(), "")
require.NoError(t, err)

now := time.Now()

// Generate series that includes stale nans
samplesToSend := 10
series := prompb.TimeSeries{
Labels: []prompb.Label{
{Name: "__name__", Value: "a_sometimes_stale_nan_series"},
{Name: "instance", Value: "sometimes-stale"},
},
}
series.Samples = make([]prompb.Sample, samplesToSend)
posStale := 2

// Create samples, that are delayed by the evaluation delay with increasing values.
for pos := range series.Samples {
series.Samples[pos].Timestamp = e2e.TimeToMilliseconds(now.Add(-evaluationDelay).Add(time.Duration(pos) * time.Second))
series.Samples[pos].Value = float64(pos + 1)

// insert staleness marker at the positions marked by posStale
if pos == posStale {
series.Samples[pos].Value = math.Float64frombits(value.StaleNaN)
}
}

// Insert metrics
res, err := c.Push([]prompb.TimeSeries{series})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Get number of rule evaluations just after push
ruleEvaluationsAfterPush, err := cortex.SumMetrics([]string{"cortex_prometheus_rule_evaluations_total"})
require.NoError(t, err)

// Wait until the rule is evaluated for the first time
require.NoError(t, cortex.WaitSumMetrics(e2e.Greater(ruleEvaluationsAfterPush[0]), "cortex_prometheus_rule_evaluations_total"))

// Query the timestamp of the latest result to ensure the evaluation is delayed
result, err := c.Query("timestamp(stale_nan_eval)", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())

vector := result.(model.Vector)
require.Equal(t, 1, vector.Len(), "expect one sample returned")

// 290 seconds gives 10 seconds of slack between the rule evaluation and the query
// to account for CI latency, but ensures the latest evaluation was in the past.
var maxDiff int64 = 290_000
require.GreaterOrEqual(t, e2e.TimeToMilliseconds(time.Now())-int64(vector[0].Value)*1000, maxDiff)

// Wait until all the pushed samples have been evaluated by the rule. This
// ensures that rule results are successfully written even after a
// staleness period.
require.NoError(t, cortex.WaitSumMetrics(e2e.GreaterOrEqual(ruleEvaluationsAfterPush[0]+float64(samplesToSend)), "cortex_prometheus_rule_evaluations_total"))

// query all results to verify rules have been evaluated correctly
result, err = c.QueryRange("stale_nan_eval", now.Add(-evaluationDelay), now, time.Second)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, result.Type())

matrix := result.(model.Matrix)
require.GreaterOrEqual(t, 1, matrix.Len(), "expect at least a series returned")

// Iterate through the values recorded and ensure they exist as expected.
inputPos := 0
for _, m := range matrix {
for _, v := range m.Values {
// Skip values for stale positions
if inputPos == posStale {
inputPos++
}

expectedValue := model.SampleValue(2 * (inputPos + 1))
require.Equal(t, expectedValue, v.Value)

// Look for next value
inputPos++

// We have found all input values
if inputPos >= len(series.Samples) {
break
}
}
}
require.Equal(t, len(series.Samples), inputPos, "expect to have returned all evaluations")
}

func TestRulerSharding(t *testing.T) {
const numRulesGroups = 100

Expand Down
39 changes: 5 additions & 34 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand Down Expand Up @@ -46,27 +45,15 @@ type PusherAppender struct {
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
evaluationDelay time.Duration
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h == nil && fh == nil {
return 0, errors.New("no histogram")
}

if h != nil {
// A histogram sample is considered stale if its sum is set to NaN.
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
if a.evaluationDelay > 0 && (value.IsStaleNaN(h.Sum)) {
t -= a.evaluationDelay.Milliseconds()
}
a.histograms = append(a.histograms, cortexpb.HistogramToHistogramProto(t, h))
} else {
// A histogram sample is considered stale if its sum is set to NaN.
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
if a.evaluationDelay > 0 && (value.IsStaleNaN(fh.Sum)) {
t -= a.evaluationDelay.Milliseconds()
}
a.histograms = append(a.histograms, cortexpb.FloatHistogramToHistogramProto(t, fh))
}
a.histogramLabels = append(a.histogramLabels, l)
Expand All @@ -75,19 +62,6 @@ func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t

func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.labels = append(a.labels, l)

// Adapt staleness markers for ruler evaluation delay. As the upstream code
// is using the actual time, when there is a no longer available series.
// This then causes 'out of order' append failures once the series is
// becoming available again.
// see https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/rules/manager.go#L647-L660
// Similar to staleness markers, the rule manager also appends actual time to the ALERTS and ALERTS_FOR_STATE series.
// See: https://github.com/prometheus/prometheus/blob/ae086c73cb4d6db9e8b67d5038d3704fea6aec4a/rules/alerting.go#L414-L417
metricName := l.Get(labels.MetricName)
if a.evaluationDelay > 0 && (value.IsStaleNaN(v) || metricName == "ALERTS" || metricName == "ALERTS_FOR_STATE") {
t -= a.evaluationDelay.Milliseconds()
}

a.samples = append(a.samples, cortexpb.Sample{
TimestampMs: t,
Value: v,
Expand Down Expand Up @@ -164,16 +138,14 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
failedWrites: t.failedWrites,
totalWrites: t.totalWrites,

ctx: ctx,
pusher: t.pusher,
userID: t.userID,
evaluationDelay: t.rulesLimits.EvaluationDelay(t.userID),
ctx: ctx,
pusher: t.pusher,
userID: t.userID,
}
}

// RulesLimits defines limits used by Ruler.
type RulesLimits interface {
EvaluationDelay(userID string) time.Duration
MaxQueryLength(userID string) time.Duration
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
Expand All @@ -182,7 +154,7 @@ type RulesLimits interface {
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
}

// EngineQueryFunc returns a new engine query function by passing an altered timestamp.
// EngineQueryFunc returns a new engine query function validating max queryLength.
// Modified from Prometheus rules.EngineQueryFunc
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
Expand All @@ -202,8 +174,7 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
}
}

evaluationDelay := overrides.EvaluationDelay(userID)
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t.Add(-evaluationDelay))
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 24edba0

Please sign in to comment.