Skip to content

Commit

Permalink
Add logical planer and select caching
Browse files Browse the repository at this point in the history
This commit adds the first version of a logical planner with the
capability to unify matchers between different selectors.

The MergeSelectsOptimizer traverses the AST and identifies the most
selective matcher for each individual metric. It then replaces
less selective matchers with the most selective one, and adds an additional
filters to ensure correctness.

The physical plan can then cache results for identical selectors which leads
to fewer network calls and faster series retrieval operations.
  • Loading branch information
fpetkovski committed Oct 12, 2022
1 parent 700c5ba commit fac1a24
Show file tree
Hide file tree
Showing 18 changed files with 665 additions and 133 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ benchmarks:
.PHONY: bench-old
bench-old: benchmarks
@echo "Benchmarking old engine"
@go test ./... -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 10 | sed -u 's/\/old_engine//' > benchmarks/old.out
@go test ./... -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 3 | sed -u 's/\/old_engine//' > benchmarks/old.out

.PHONY: bench-new
bench-new: benchmarks
@echo "Benchmarking new engine"
@go test ./... -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 10 | sed -u 's/\/new_engine//' > benchmarks/new.out
@go test ./... -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 3 | sed -u 's/\/new_engine//' > benchmarks/new.out

.PHONY: benchmark
benchmark: bench-old bench-new
Expand Down
2 changes: 1 addition & 1 deletion engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func BenchmarkRangeQuery(b *testing.B) {
},
{
name: "binary operation with one to one",
query: `http_requests_total{container="1"} / ignoring(container) http_responses_total`,
query: `http_requests_total{container="c1"} / ignoring(container) http_responses_total`,
},
{
name: "binary operation with many to one",
Expand Down
10 changes: 8 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/thanos-community/promql-engine/logicalplan"
"github.com/thanos-community/promql-engine/physicalplan"
"github.com/thanos-community/promql-engine/physicalplan/model"
"github.com/thanos-community/promql-engine/physicalplan/parse"
Expand Down Expand Up @@ -109,7 +111,9 @@ func (e *engine) NewInstantQuery(q storage.Queryable, _ *promql.QueryOpts, qs st
return nil, err
}

plan, err := physicalplan.New(expr, q, ts, ts, 0, e.lookbackDelta)
logicalPlan := logicalplan.New(expr)
optimizedPlan := logicalPlan.RunOptimizers(logicalplan.DefaultOptimizers)
plan, err := physicalplan.New(optimizedPlan, q, ts, ts, 0, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +136,9 @@ func (e *engine) NewRangeQuery(q storage.Queryable, _ *promql.QueryOpts, qs stri
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}

plan, err := physicalplan.New(expr, q, start, end, interval, e.lookbackDelta)
logicalPlan := logicalplan.New(expr)
optimizedPlan := logicalPlan.RunOptimizers(logicalplan.DefaultOptimizers)
plan, err := physicalplan.New(optimizedPlan, q, start, end, interval, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand Down
25 changes: 22 additions & 3 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-1"} 1+1x30
http_requests_total{pod="nginx-2"} 1+2x600`,
query: `count_over_time(http_requests_total[10m])`,
start: time.Unix(600, 0),
end: time.Unix(6000, 0),
start: time.Unix(60, 0),
end: time.Unix(240, 0),
},
{
name: "rate",
Expand Down Expand Up @@ -688,13 +688,32 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
start: time.Unix(600, 0),
end: time.Unix(6000, 0),
},
{
name: "selector merge",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 1+1x15
http_requests_total{pod="nginx-2", ns="nginx"} 1+2x18
http_requests_total{pod="nginx-3", ns="nginx"} 1+2x21`,
query: `http_requests_total{pod=~"nginx-1", ns="nginx"} / on() group_left() sum(http_requests_total{ns="nginx"})`,
},
{
name: "selector merge with different ranges",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 2+2x16
http_requests_total{pod="nginx-2", ns="nginx"} 2+4x18
http_requests_total{pod="nginx-3", ns="nginx"} 2+6x20`,
query: `
rate(http_requests_total{pod=~"nginx-1", ns="nginx"}[2m])
+ on() group_left()
sum(http_requests_total{ns="nginx"})`,
},
}

lookbackDeltas := []time.Duration{30 * time.Second, time.Minute, 5 * time.Minute, 10 * time.Minute}
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s/lookback-%d", tc.name, lookbackDelta), func(t *testing.T) {
test, err := promql.NewTest(t, tc.load)
testutil.Ok(t, err)
defer test.Close()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/thanos-community/promql-engine
go 1.18

require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/efficientgo/core v1.0.0-rc.0
github.com/go-kit/log v0.2.1
github.com/prometheus/prometheus v0.38.1-0.20221003141934-f7a7b18cdcca
Expand All @@ -14,7 +15,6 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aws/aws-sdk-go v1.44.109 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
Expand Down
25 changes: 25 additions & 0 deletions logicalplan/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package logicalplan

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

type FilteredSelector struct {
*parser.VectorSelector
Filters []*labels.Matcher
}

func (f FilteredSelector) String() string {
return fmt.Sprintf("filter(%s, %s)", f.Filters, f.VectorSelector.String())
}

func (f FilteredSelector) Pretty(level int) string { return f.String() }

func (f FilteredSelector) PositionRange() parser.PositionRange { return parser.PositionRange{} }

func (f FilteredSelector) Type() parser.ValueType { return parser.ValueTypeVector }

func (f FilteredSelector) PromQLExpr() {}
127 changes: 127 additions & 0 deletions logicalplan/merge_selects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package logicalplan

import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

// MergeSelectsOptimizer optimizes a binary expression where
// one select is a superset of the other select.
// For example, the expression:
// metric{a="b", c="d"} / scalar(metric{a="b"}) becomes:
// Filter(c="d", metric{a="b"}) / scalar(metric{a="b"}).
// The engine can then cache the result of `metric{a="b"}`
// and apply an additional filter for {c="d"}.
type MergeSelectsOptimizer struct{}

func (m MergeSelectsOptimizer) Optimize(expr parser.Expr) parser.Expr {
heap := make(matcherHeap)
extractSelectors(heap, expr)
replaceMatchers(heap, &expr)

return expr
}

func extractSelectors(selectors matcherHeap, expr parser.Expr) {
parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error {
e, ok := node.(*parser.VectorSelector)
if !ok {
return nil
}
for _, l := range e.LabelMatchers {
if l.Name == labels.MetricName {
selectors.add(l.Name, e.LabelMatchers)
}
}
return nil
})
}

func replaceMatchers(selectors matcherHeap, expr *parser.Expr) {
traverse(expr, func(node *parser.Expr) {
e, ok := (*node).(*parser.VectorSelector)
if !ok {
return
}

for _, l := range e.LabelMatchers {
if l.Name == labels.MetricName {
replacement, found := selectors.findReplacement(l.Name, e.LabelMatchers)
if found {
// All replacements are done on metrics only,
// so we can drop the explicit metric name selector.
filters := dropMetricName(e.LabelMatchers)
e.LabelMatchers = replacement
*node = &FilteredSelector{
Filters: filters,
VectorSelector: e,
}
return
}
}
}
})
}

func dropMetricName(originalMatchers []*labels.Matcher) []*labels.Matcher {
for i, l := range originalMatchers {
if l.Name == labels.MetricName {
originalMatchers = append(originalMatchers[:i], originalMatchers[i+1:]...)
}
}
return originalMatchers
}

func matcherToMap(matchers []*labels.Matcher) map[string]*labels.Matcher {
r := make(map[string]*labels.Matcher, len(matchers))
for i := 0; i < len(matchers); i++ {
r[matchers[i].Name] = matchers[i]
}
return r
}

// matcherHeap is a set of the most selective label matchers
// for each metrics discovered in a PromQL expression.
type matcherHeap map[string][]*labels.Matcher

func (m matcherHeap) add(metricName string, lessSelective []*labels.Matcher) {
moreSelective, ok := m[metricName]
if !ok {
m[metricName] = lessSelective
return
}

if len(lessSelective) < len(moreSelective) {
lessSelective, moreSelective = moreSelective, lessSelective
}

m[metricName] = moreSelective
}

func (m matcherHeap) findReplacement(metricName string, matcher []*labels.Matcher) ([]*labels.Matcher, bool) {
top, ok := m[metricName]
if !ok {
return nil, false
}

matcherSet := matcherToMap(matcher)
topSet := matcherToMap(top)
for k, v := range topSet {
m, ok := matcherSet[k]
if !ok {
return nil, false
}

equals := v.Name == m.Name && v.Type == m.Type && v.Value == m.Value
if !equals {
return nil, false
}
}

// The top matcher and input matcher are equal. No replacement needed.
if len(top) == len(matcherSet) {
return nil, false
}

return top, true
}
57 changes: 57 additions & 0 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package logicalplan

import "github.com/prometheus/prometheus/promql/parser"

var DefaultOptimizers = []Optimizer{
SortMatchers{},
MergeSelectsOptimizer{},
}

type Plan interface {
RunOptimizers([]Optimizer) parser.Expr
}

type Optimizer interface {
Optimize(parser.Expr) parser.Expr
}

type plan struct {
expr parser.Expr
}

func New(expr parser.Expr) Plan {
return &plan{
expr: expr,
}
}

func (p *plan) RunOptimizers(optimizers []Optimizer) parser.Expr {
for _, o := range optimizers {
p.expr = o.Optimize(p.expr)
}
return p.expr
}

func traverse(expr *parser.Expr, transform func(*parser.Expr)) {
switch node := (*expr).(type) {
case *parser.VectorSelector:
transform(expr)
case *parser.MatrixSelector:
transform(&node.VectorSelector)
case *parser.AggregateExpr:
traverse(&node.Expr, transform)
case *parser.Call:
for _, n := range node.Args {
traverse(&n, transform)
}
case *parser.BinaryExpr:
traverse(&node.LHS, transform)
traverse(&node.RHS, transform)
case *parser.UnaryExpr:
traverse(&node.Expr, transform)
case *parser.ParenExpr:
traverse(&node.Expr, transform)
case *parser.SubqueryExpr:
traverse(&node.Expr, transform)
}
}
38 changes: 38 additions & 0 deletions logicalplan/plan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package logicalplan

import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/promql/parser"
)

func TestDefaultOptimizers(t *testing.T) {
cases := []struct {
name string
expr string
expected string
}{
{
name: "common selectors",
expr: `sum(metric{a="b", c="d"}) / sum(metric{a="b"})`,
expected: `sum(filter([a="b" c="d"], metric{a="b"})) / sum(metric{a="b"})`,
},
{
name: "different selectors",
expr: `sum(metric{a="b"}) / sum(metric{c="d"})`,
expected: `sum(metric{a="b"}) / sum(metric{c="d"})`,
},
}

for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
expr, err := parser.ParseExpr(tcase.expr)
testutil.Ok(t, err)

plan := New(expr)
optimizedPlan := plan.RunOptimizers(DefaultOptimizers)
testutil.Equals(t, tcase.expected, optimizedPlan.String())
})
}
}
28 changes: 28 additions & 0 deletions logicalplan/sort_matchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package logicalplan

import (
"sort"

"github.com/prometheus/prometheus/promql/parser"
)

// SortMatchers sorts all selectors in a selector so that
// all subsequent optimizers, both in the logical and physical plan
// can rely on this property.
type SortMatchers struct{}

func (m SortMatchers) Optimize(expr parser.Expr) parser.Expr {
traverse(&expr, func(node *parser.Expr) {
e, ok := (*node).(*parser.VectorSelector)
if !ok {
return
}

sort.Slice(e.LabelMatchers, func(i, j int) bool {
return e.LabelMatchers[i].Name == e.LabelMatchers[j].Name
})

return
})
return expr
}
Loading

0 comments on commit fac1a24

Please sign in to comment.