Skip to content

Commit

Permalink
allow using any base interval size for dynamicIntervalFn
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hassan <[email protected]>
  • Loading branch information
afhassan committed Jan 22, 2025
1 parent b15dde6 commit a1e7a3a
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 134 deletions.
21 changes: 15 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,12 +4193,21 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]
# Maximum number of splits for a range query, 0 disables it. Uses a multiple of
# `split-queries-by-interval` to maintain the number of splits below the limit.
# If vertical sharding is enabled for a query, the combined total number of
# vertical and interval shards is kept below this limit
# CLI flag: -querier.split-queries-by-interval-max-splits
[split_queries_by_interval_max_splits: <int> | default = 0]
dynamic_query_splits:
# [EXPERIMENTAL] Maximum number of shards for a query, 0 disables it.
# Dynamically uses a multiple of `split-queries-by-interval` to maintain the
# number of splits below the limit. If vertical sharding is enabled for a
# query, the combined total number of vertical and interval shards is kept
# below this limit.
# CLI flag: -querier.max-shards-per-query
[max_shards_per_query: <int> | default = 0]

# [EXPERIMENTAL] Max total duration of data fetched by all query shards from
# storage, 0 disables it. Dynamically uses a multiple of
# `split-queries-by-interval` to ensure the total fetched duration of data is
# lower than the value set.
# CLI flag: -querier.max-duration-of-data-fetched-from-storage-per-query
[max_duration_of_data_fetched_from_storage_per_query: <duration> | default = 0s]

# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
Expand Down
1 change: 0 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.QueryStoreAfter,
t.Cfg.Querier.MaxDaysOfDataFetched,
)
if err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type Config struct {
// Limit of number of steps allowed for every subquery expression in a query.
MaxSubQuerySteps int64 `yaml:"max_subquery_steps"`

// Max number of days of data fetched for a query, used to calculate appropriate interval and vertical shard size.
MaxDaysOfDataFetched int `yaml:"max_days_of_data_fetched"`

// Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!).
// ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup.
// However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL
Expand Down Expand Up @@ -134,7 +131,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.IntVar(&cfg.MaxDaysOfDataFetched, "querier.max-days-of-data-fetched", 0, "Max number of days of data fetched for a query. This can be used to calculate appropriate interval and vertical shard size dynamically.")
}

// Validate the config
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/tripperware/queryrange/dynamic_query_splits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package queryrange

import (
"flag"
"time"
)

type DynamicQuerySplitsConfig struct {
MaxShardsPerQuery int `yaml:"max_shards_per_query"`
MaxDurationOfDataFetchedFromStoragePerQuery time.Duration `yaml:"max_duration_of_data_fetched_from_storage_per_query"`
}

// RegisterFlags registers flags foy dynamic query splits
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxShardsPerQuery, "querier.max-shards-per-query", 0, "[EXPERIMENTAL] Maximum number of shards for a query, 0 disables it. Dynamically uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit.")
f.DurationVar(&cfg.MaxDurationOfDataFetchedFromStoragePerQuery, "querier.max-duration-of-data-fetched-from-storage-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched by all query shards from storage, 0 disables it. Dynamically uses a multiple of `split-queries-by-interval` to ensure the total fetched duration of data is lower than the value set.")
}
27 changes: 15 additions & 12 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// Query splits config
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
DynamicQuerySplitsConfig DynamicQuerySplitsConfig `yaml:"dynamic_query_splits"`

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -51,11 +53,11 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits for a range query, 0 disables it. Uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
cfg.DynamicQuerySplitsConfig.RegisterFlags(f)
}

// Validate validates the config.
Expand All @@ -68,8 +70,10 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 {
return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.")
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if cfg.SplitQueriesByInterval <= 0 {
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
}
}
return nil
}
Expand All @@ -86,7 +90,6 @@ func Middlewares(
shardedPrometheusCodec tripperware.Codec,
lookbackDelta time.Duration,
queryStoreAfter time.Duration,
maxDaysOfDataFetched int,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand All @@ -97,8 +100,8 @@ func Middlewares(
}
if cfg.SplitQueriesByInterval != 0 {
intervalFn := staticIntervalFn(cfg)
if cfg.SplitQueriesByIntervalMaxSplits > 0 || maxDaysOfDataFetched > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, queryStoreAfter, lookbackDelta, maxDaysOfDataFetched)
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, queryStoreAfter, lookbackDelta)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, queryStoreAfter, lookbackDelta))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestRoundTrip(t *testing.T) {
ShardedPrometheusCodec,
5*time.Minute,
24*time.Hour,
0,
)
require.NoError(t, err)

Expand Down
148 changes: 72 additions & 76 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queryrange
import (
"context"
"net/http"
"sort"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -19,16 +18,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

// dayMillis is the L4 block range in milliseconds.
var dayMillis = util.DurationMilliseconds(24 * time.Hour)

type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error)

type dayRange struct {
startDay int64
endDay int64
}

// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, queryStoreAfter time.Duration, lookbackDelta time.Duration) tripperware.Middleware {
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
Expand Down Expand Up @@ -169,14 +160,13 @@ func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Reques
}
}

func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, queryStoreAfter time.Duration, lookbackDelta time.Duration, maxDaysOfDataFetched int) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, queryStoreAfter time.Duration, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return cfg.SplitQueriesByInterval, err
}

queryDayRange := int((r.GetEnd() / dayMillis) - (r.GetStart() / dayMillis) + 1)
analysis, err := queryAnalyzer.Analyze(r.GetQuery())
if err != nil {
return cfg.SplitQueriesByInterval, err
Expand All @@ -191,23 +181,35 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer
if err != nil {
return cfg.SplitQueriesByInterval, err
}
daysFetchedWithoutSharding := getDaysFetchedByQuery(queryExpr, []tripperware.Request{r}, queryStoreAfter, lookbackDelta, time.Now())
extraDaysFetchedPerShard := daysFetchedWithoutSharding - queryDayRange

// if lookbackDelta is configured and the query start time is not 00:00 UTC, we need to account for 1 fetched day of data per split except for the first split
lookbackDeltaCompensation := 0
if lookbackDelta > 0 && (r.GetStart()-util.DurationMilliseconds(lookbackDelta))/dayMillis == r.GetStart()/dayMillis {
lookbackDeltaCompensation = 1
}
// Calculates: duration of data fetched if the query was not sharded, the original range covered by the query start and end times,
// and the duration of data fetched by lookbackDelta for the first split
durationFetchedWithoutSharding, originalRangeCount, firstSplitLookbackDeltaCompensation := durationFetchedByQuery(queryExpr, r, queryStoreAfter, lookbackDelta, cfg.SplitQueriesByInterval, time.Now())
extraDaysFetchedPerShard := durationFetchedWithoutSharding - originalRangeCount

// Calculate the extra duration of data fetched by lookbackDelta per each split except the first split
nextIntervalStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), cfg.SplitQueriesByInterval) + r.GetStep()
nextIntervalReq := r.WithStartEnd(nextIntervalStart, r.GetEnd())
_, _, lookbackDeltaCompensation := durationFetchedByQuery(queryExpr, nextIntervalReq, queryStoreAfter, lookbackDelta, cfg.SplitQueriesByInterval, time.Now())

var maxSplitsByFetchedDaysOfData int
if maxDaysOfDataFetched > 0 {
maxSplitsByFetchedDaysOfData = ((maxDaysOfDataFetched / queryVerticalShardSize) - queryDayRange - lookbackDeltaCompensation) / (extraDaysFetchedPerShard + lookbackDeltaCompensation)
if cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if extraDaysFetchedPerShard == 0 {
extraDaysFetchedPerShard = 1 // prevent division by 0
}
maxIntervalsFetchedByQuery := int(cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery / cfg.SplitQueriesByInterval)
maxSplitsByFetchedDaysOfData = ((maxIntervalsFetchedByQuery / queryVerticalShardSize) - originalRangeCount + firstSplitLookbackDeltaCompensation) / (extraDaysFetchedPerShard + lookbackDeltaCompensation)
if maxSplitsByFetchedDaysOfData <= 0 {
maxSplitsByFetchedDaysOfData = 1
}
}

var maxSplitsByConfig int
if cfg.SplitQueriesByIntervalMaxSplits > 0 {
maxSplitsByConfig = cfg.SplitQueriesByIntervalMaxSplits / queryVerticalShardSize
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 {
maxSplitsByConfig = cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery / queryVerticalShardSize
if maxSplitsByConfig <= 0 {
maxSplitsByConfig = 1
}
}

var maxSplits time.Duration
Expand All @@ -225,70 +227,64 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer

queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
baseInterval := cfg.SplitQueriesByInterval
n := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)

// Calculate the multiple of interval needed to shard query to <= maxSplits
n1 := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)
if n1 <= 0 {
n1 = 1
}

// The first split can be truncated and not cover the full length of n*interval.
// So we remove it and calculate the multiple of interval needed to shard <= maxSplits-1
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n1*baseInterval) + r.GetStep()
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
var n2 time.Duration
if maxSplits > 1 {
n2 = (queryRangeWithoutFirstSplit + baseInterval*(maxSplits-1) - 1) / (baseInterval * (maxSplits - 1))
} else {
// If maxSplits is <= 1 then we should not shard at all
n1 += (queryRangeWithoutFirstSplit + baseInterval - 1) / baseInterval
}
n := max(n1, n2)
return n * cfg.SplitQueriesByInterval, nil
}
}

// calculates the total number of days the query will have to fetch during execution, considering the query itself,
// queryStoreAfter and lookbackDelta.
func getDaysFetchedByQuery(expr parser.Expr, reqs []tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, now time.Time) int {
count := 0
// calculates the total duration of data the query will have to fetch from storage as a multiple of baseInterval.
// also returns the total time range fetched by the original query start and end times
func durationFetchedByQuery(expr parser.Expr, req tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, baseInterval time.Duration, now time.Time) (durationFetchedCount int, originalRangeCount int, lookbackDeltaCount int) {
durationFetchedCount = 0
originalRangeCount = 0
lookbackDeltaCount = 0
baseIntervalMillis := util.DurationMilliseconds(baseInterval)
queryStoreMaxT := util.TimeToMillis(now.Add(-queryStoreAfter))
var evalRange time.Duration

for _, req := range reqs {
var ranges []dayRange
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), lookbackDelta, n, path, evalRange)
// Query shouldn't touch Store Gateway.
if start > queryStoreMaxT {
return nil
} else {
// If the query split needs to query store, cap the max time to now - queryStoreAfter.
end = min(end, queryStoreMaxT)
}

startDay := start / dayMillis
endDay := end / dayMillis
ranges = append(ranges, dayRange{startDay: startDay, endDay: endDay})
evalRange = 0
case *parser.MatrixSelector:
evalRange = n.Range
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
originalRangeCount += int((req.GetEnd()/baseIntervalMillis)-(req.GetStart()/baseIntervalMillis)) + 1
start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), 0, n, path, evalRange)
// Query shouldn't touch Store Gateway.
if start > queryStoreMaxT {
return nil
} else {
// If the query split needs to query store, cap the max time to now - queryStoreAfter.
end = min(end, queryStoreMaxT)
}
return nil
})
nonOverlappingRanges := mergeDayRanges(ranges)
for _, dayRange := range nonOverlappingRanges {
count += int(dayRange.endDay-dayRange.startDay) + 1
}
}
return count
}

func mergeDayRanges(ranges []dayRange) []dayRange {
if len(ranges) == 0 {
return ranges
}
startIntervalIndex := start / baseIntervalMillis
endIntervalIndex := end / baseIntervalMillis
durationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1

// Sort ranges by their startDay
sort.Slice(ranges, func(i, j int) bool {
return ranges[i].startDay < ranges[j].startDay
})

// Merge overlapping ranges
merged := []dayRange{ranges[0]}
for _, current := range ranges {
last := &merged[len(merged)-1]
if current.startDay <= last.endDay {
if current.endDay > last.endDay {
last.endDay = current.endDay
if evalRange == 0 && (start-util.DurationMilliseconds(lookbackDelta))/baseIntervalMillis == start/baseIntervalMillis {
lookbackDeltaCount += 1
}
} else {
merged = append(merged, current)
evalRange = 0
case *parser.MatrixSelector:
evalRange = n.Range
}
}
return merged
return nil
})
return durationFetchedCount, originalRangeCount, lookbackDeltaCount
}
Loading

0 comments on commit a1e7a3a

Please sign in to comment.