Skip to content

Commit

Permalink
Add logical planer and select caching (#79)
Browse files Browse the repository at this point in the history
* Add logical planer and select caching

This commit adds the first version of a logical planner with the
capability to unify matchers between different selectors.

The MergeSelectsOptimizer traverses the AST and identifies the most
selective matcher for each individual metric. It then replaces
less selective matchers with the most selective one, and adds an additional
filters to ensure correctness.

The physical plan can then cache results for identical selectors which leads
to fewer network calls and faster series retrieval operations.

Co-authored-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
fpetkovski and bwplotka authored Oct 17, 2022
1 parent 8ba4fe1 commit 67a4593
Show file tree
Hide file tree
Showing 17 changed files with 885 additions and 209 deletions.
72 changes: 70 additions & 2 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package engine_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-community/promql-engine/engine"
Expand Down Expand Up @@ -134,7 +136,7 @@ func BenchmarkRangeQuery(b *testing.B) {
},
{
name: "binary operation with one to one",
query: `http_requests_total{container="1"} / ignoring(container) http_responses_total`,
query: `http_requests_total{container="c1"} / ignoring(container) http_responses_total`,
},
{
name: "binary operation with many to one",
Expand Down Expand Up @@ -293,8 +295,48 @@ func BenchmarkOldEngineInstant(b *testing.B) {
}
}

func BenchmarkMergeSelectorsOptimizer(b *testing.B) {
db := createRequestsMetricBlock(b, 10000, 9900)

start := time.Unix(0, 0)
end := start.Add(6 * time.Hour)
step := time.Second * 30

query := `sum(http_requests_total{code="200"}) / sum(http_requests_total)`
b.Run("withoutOptimizers", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
opts := engine.Opts{DisableOptimizers: true}
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
testutil.Ok(b, res.Err)
}
})
b.Run("withOptimizers", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ng := engine.New(engine.Opts{})
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
testutil.Ok(b, res.Err)
}
})

}

func executeRangeQuery(b *testing.B, q string, test *promql.Test, start time.Time, end time.Time, step time.Duration) *promql.Result {
ng := engine.New(engine.Opts{})
return executeRangeQueryWithOpts(b, q, test, start, end, step, engine.Opts{})
}

func executeRangeQueryWithOpts(b *testing.B, q string, test *promql.Test, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(test.Queryable(), nil, q, start, end, step)
testutil.Ok(b, err)

Expand All @@ -318,6 +360,32 @@ func setupStorage(b *testing.B, numLabelsA int, numLabelsB int) *promql.Test {
return test
}

func createRequestsMetricBlock(b *testing.B, numRequests int, numSuccess int) *tsdb.DB {
dir := b.TempDir()

db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions(), nil)
testutil.Ok(b, err)
appender := db.Appender(context.Background())

sixHours := int64(6 * 60 * 2)

for i := 0; i < numRequests; i++ {
for t := int64(0); t < sixHours; t += 30 {
code := "200"
if numSuccess < i {
code = "500"
}
lbls := labels.FromStrings(labels.MetricName, "http_requests_total", "code", code, "pod", strconv.Itoa(i))
_, err = appender.Append(0, lbls, t, 1)
testutil.Ok(b, err)
}
}

testutil.Ok(b, appender.Commit())

return db
}

func synthesizeLoad(numPods, numContainers int) string {
load := `
load 30s`
Expand Down
31 changes: 23 additions & 8 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/thanos-community/promql-engine/logicalplan"
"github.com/thanos-community/promql-engine/physicalplan"
"github.com/thanos-community/promql-engine/physicalplan/model"
"github.com/thanos-community/promql-engine/physicalplan/parse"
)

type engine struct {
logger log.Logger
debugWriter io.Writer
lookbackDelta time.Duration
logger log.Logger
debugWriter io.Writer
lookbackDelta time.Duration
enableOptimizers bool
}

type Opts struct {
promql.EngineOpts

// DisableOptimizers disables query optimizations using logicalPlan.DefaultOptimizers.
DisableOptimizers bool

// DisableFallback enables mode where engine returns error if some expression of feature is not yet implemented
// in the new engine, instead of falling back to prometheus engine.
DisableFallback bool
Expand All @@ -53,9 +59,10 @@ func New(opts Opts) v1.QueryEngine {
}

core := &engine{
debugWriter: opts.DebugWriter,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
debugWriter: opts.DebugWriter,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enableOptimizers: !opts.DisableOptimizers,
}
if opts.DisableFallback {
return core
Expand Down Expand Up @@ -122,7 +129,11 @@ func (e *engine) NewInstantQuery(q storage.Queryable, _ *promql.QueryOpts, qs st
return nil, err
}

plan, err := physicalplan.New(expr, q, ts, ts, 0, e.lookbackDelta)
logicalPlan := logicalplan.New(expr, ts, ts)
if e.enableOptimizers {
logicalPlan = logicalPlan.Optimize(logicalplan.DefaultOptimizers)
}
plan, err := physicalplan.New(logicalPlan.Expr(), q, ts, ts, 0, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand All @@ -145,7 +156,11 @@ func (e *engine) NewRangeQuery(q storage.Queryable, _ *promql.QueryOpts, qs stri
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}

plan, err := physicalplan.New(expr, q, start, end, interval, e.lookbackDelta)
logicalPlan := logicalplan.New(expr, start, end)
if e.enableOptimizers {
logicalPlan = logicalPlan.Optimize(logicalplan.DefaultOptimizers)
}
plan, err := physicalplan.New(logicalPlan.Expr(), q, start, end, interval, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand Down
111 changes: 66 additions & 45 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-1"} 1+1x30
http_requests_total{pod="nginx-2"} 1+2x600`,
query: `count_over_time(http_requests_total[10m])`,
start: time.Unix(600, 0),
end: time.Unix(6000, 0),
start: time.Unix(60, 0),
end: time.Unix(600, 0),
},
{
name: "rate",
Expand Down Expand Up @@ -765,8 +765,28 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-2"} 1+2x18`,
query: "sum_over_time(http_requests_total[5m] @ 180 offset 2m)",
},
{
name: "selector merge",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 1+1x15
http_requests_total{pod="nginx-2", ns="nginx"} 1+2x18
http_requests_total{pod="nginx-3", ns="nginx"} 1+2x21`,
query: `http_requests_total{pod=~"nginx-1", ns="nginx"} / on() group_left() sum(http_requests_total{ns="nginx"})`,
},
{
name: "selector merge with different ranges",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 2+2x16
http_requests_total{pod="nginx-2", ns="nginx"} 2+4x18
http_requests_total{pod="nginx-3", ns="nginx"} 2+6x20`,
query: `
rate(http_requests_total{pod=~"nginx-1", ns="nginx"}[2m])
+ on() group_left()
sum(http_requests_total{ns="nginx"})`,
},
}

disableOptimizerOpts := []bool{true, false}
lookbackDeltas := []time.Duration{30 * time.Second, time.Minute, 5 * time.Minute, 10 * time.Minute}
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
Expand All @@ -787,25 +807,26 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
if tc.step == 0 {
tc.step = step
}
for _, disableOptimizers := range disableOptimizerOpts {
for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback, DisableOptimizers: disableOptimizers})
q1, err := newEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)

oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)
oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)
oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)

testutil.Equals(t, oldResult, newResult)
})
testutil.Equals(t, oldResult, newResult)
})
}
}
})
}
Expand Down Expand Up @@ -990,11 +1011,6 @@ func TestInstantQuery(t *testing.T) {
http_requests_total{pod="nginx-2"} 1+2x20`,
query: "sum(irate(http_requests_total[1m]))",
},
{
name: "string literal",
load: "",
query: `"hello"`,
},
{
name: "number literal",
load: "",
Expand Down Expand Up @@ -1317,37 +1333,42 @@ func TestInstantQuery(t *testing.T) {
},
}

disableOptimizers := []bool{true, false}
lookbackDeltas := []time.Duration{30 * time.Second, time.Minute, 5 * time.Minute, 10 * time.Minute}
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
test, err := promql.NewTest(t, tc.load)
testutil.Ok(t, err)
defer test.Close()
for _, withoutOptimizers := range disableOptimizers {
t.Run(fmt.Sprintf("disableOptimizers=%t", withoutOptimizers), func(t *testing.T) {
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
test, err := promql.NewTest(t, tc.load)
testutil.Ok(t, err)
defer test.Close()

testutil.Ok(t, test.Run())
testutil.Ok(t, test.Run())

for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)
for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)

oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)
oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)

testutil.Equals(t, oldResult, newResult)
testutil.Equals(t, oldResult, newResult)
})
}
})
}
})
}
}
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/thanos-community/promql-engine
go 1.18

require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/efficientgo/core v1.0.0-rc.0
github.com/go-kit/log v0.2.1
github.com/prometheus/client_golang v1.13.0
Expand All @@ -15,7 +16,6 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aws/aws-sdk-go v1.44.109 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
Expand Down
28 changes: 28 additions & 0 deletions logicalplan/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

type FilteredSelector struct {
*parser.VectorSelector
Filters []*labels.Matcher
}

func (f FilteredSelector) String() string {
return fmt.Sprintf("filter(%s, %s)", f.Filters, f.VectorSelector.String())
}

func (f FilteredSelector) Pretty(level int) string { return f.String() }

func (f FilteredSelector) PositionRange() parser.PositionRange { return parser.PositionRange{} }

func (f FilteredSelector) Type() parser.ValueType { return parser.ValueTypeVector }

func (f FilteredSelector) PromQLExpr() {}
Loading

0 comments on commit 67a4593

Please sign in to comment.