Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logical planer and select caching #79

Merged
merged 7 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package engine_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-community/promql-engine/engine"
Expand Down Expand Up @@ -134,7 +136,7 @@ func BenchmarkRangeQuery(b *testing.B) {
},
{
name: "binary operation with one to one",
query: `http_requests_total{container="1"} / ignoring(container) http_responses_total`,
query: `http_requests_total{container="c1"} / ignoring(container) http_responses_total`,
},
{
name: "binary operation with many to one",
Expand Down Expand Up @@ -293,8 +295,48 @@ func BenchmarkOldEngineInstant(b *testing.B) {
}
}

func BenchmarkMergeSelectorsOptimizer(b *testing.B) {
db := createRequestsMetricBlock(b, 10000, 9900)

start := time.Unix(0, 0)
end := start.Add(6 * time.Hour)
step := time.Second * 30

query := `sum(http_requests_total{code="200"}) / sum(http_requests_total)`
b.Run("withoutOptimizers", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
opts := engine.Opts{DisableOptimizers: true}
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
testutil.Ok(b, res.Err)
}
})
b.Run("withOptimizers", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ng := engine.New(engine.Opts{})
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
testutil.Ok(b, res.Err)
}
})

}

func executeRangeQuery(b *testing.B, q string, test *promql.Test, start time.Time, end time.Time, step time.Duration) *promql.Result {
ng := engine.New(engine.Opts{})
return executeRangeQueryWithOpts(b, q, test, start, end, step, engine.Opts{})
}

func executeRangeQueryWithOpts(b *testing.B, q string, test *promql.Test, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(test.Queryable(), nil, q, start, end, step)
testutil.Ok(b, err)

Expand All @@ -318,6 +360,32 @@ func setupStorage(b *testing.B, numLabelsA int, numLabelsB int) *promql.Test {
return test
}

func createRequestsMetricBlock(b *testing.B, numRequests int, numSuccess int) *tsdb.DB {
dir := b.TempDir()

db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions(), nil)
testutil.Ok(b, err)
appender := db.Appender(context.Background())

sixHours := int64(6 * 60 * 2)

for i := 0; i < numRequests; i++ {
for t := int64(0); t < sixHours; t += 30 {
code := "200"
if numSuccess < i {
code = "500"
}
lbls := labels.FromStrings(labels.MetricName, "http_requests_total", "code", code, "pod", strconv.Itoa(i))
_, err = appender.Append(0, lbls, t, 1)
testutil.Ok(b, err)
}
}

testutil.Ok(b, appender.Commit())

return db
}

func synthesizeLoad(numPods, numContainers int) string {
load := `
load 30s`
Expand Down
31 changes: 23 additions & 8 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/thanos-community/promql-engine/logicalplan"
"github.com/thanos-community/promql-engine/physicalplan"
"github.com/thanos-community/promql-engine/physicalplan/model"
"github.com/thanos-community/promql-engine/physicalplan/parse"
)

type engine struct {
logger log.Logger
debugWriter io.Writer
lookbackDelta time.Duration
logger log.Logger
debugWriter io.Writer
lookbackDelta time.Duration
enableOptimizers bool
}

type Opts struct {
promql.EngineOpts

// DisableOptimizers disables query optimizations using logicalPlan.DefaultOptimizers.
DisableOptimizers bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is that this has to be extended to have different optimization modes, possible with https://yourbasic.org/golang/bitmask-flag-set-clear/

I guess we can break compatibility still and change it in future if needed.


// DisableFallback enables mode where engine returns error if some expression of feature is not yet implemented
// in the new engine, instead of falling back to prometheus engine.
DisableFallback bool
Expand All @@ -53,9 +59,10 @@ func New(opts Opts) v1.QueryEngine {
}

core := &engine{
debugWriter: opts.DebugWriter,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
debugWriter: opts.DebugWriter,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enableOptimizers: !opts.DisableOptimizers,
}
if opts.DisableFallback {
return core
Expand Down Expand Up @@ -122,7 +129,11 @@ func (e *engine) NewInstantQuery(q storage.Queryable, _ *promql.QueryOpts, qs st
return nil, err
}

plan, err := physicalplan.New(expr, q, ts, ts, 0, e.lookbackDelta)
logicalPlan := logicalplan.New(expr, ts, ts)
if e.enableOptimizers {
logicalPlan = logicalPlan.Optimize(logicalplan.DefaultOptimizers)
}
plan, err := physicalplan.New(logicalPlan.Expr(), q, ts, ts, 0, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand All @@ -145,7 +156,11 @@ func (e *engine) NewRangeQuery(q storage.Queryable, _ *promql.QueryOpts, qs stri
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}

plan, err := physicalplan.New(expr, q, start, end, interval, e.lookbackDelta)
logicalPlan := logicalplan.New(expr, start, end)
if e.enableOptimizers {
logicalPlan = logicalPlan.Optimize(logicalplan.DefaultOptimizers)
}
plan, err := physicalplan.New(logicalPlan.Expr(), q, start, end, interval, e.lookbackDelta)
if err != nil {
return nil, err
}
Expand Down
111 changes: 66 additions & 45 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-1"} 1+1x30
http_requests_total{pod="nginx-2"} 1+2x600`,
query: `count_over_time(http_requests_total[10m])`,
start: time.Unix(600, 0),
end: time.Unix(6000, 0),
start: time.Unix(60, 0),
end: time.Unix(600, 0),
},
{
name: "rate",
Expand Down Expand Up @@ -765,8 +765,28 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-2"} 1+2x18`,
query: "sum_over_time(http_requests_total[5m] @ 180 offset 2m)",
},
{
name: "selector merge",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 1+1x15
http_requests_total{pod="nginx-2", ns="nginx"} 1+2x18
http_requests_total{pod="nginx-3", ns="nginx"} 1+2x21`,
query: `http_requests_total{pod=~"nginx-1", ns="nginx"} / on() group_left() sum(http_requests_total{ns="nginx"})`,
},
{
name: "selector merge with different ranges",
load: `load 30s
http_requests_total{pod="nginx-1", ns="nginx"} 2+2x16
http_requests_total{pod="nginx-2", ns="nginx"} 2+4x18
http_requests_total{pod="nginx-3", ns="nginx"} 2+6x20`,
query: `
rate(http_requests_total{pod=~"nginx-1", ns="nginx"}[2m])
+ on() group_left()
sum(http_requests_total{ns="nginx"})`,
},
}

disableOptimizerOpts := []bool{true, false}
lookbackDeltas := []time.Duration{30 * time.Second, time.Minute, 5 * time.Minute, 10 * time.Minute}
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
Expand All @@ -787,25 +807,26 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
if tc.step == 0 {
tc.step = step
}
for _, disableOptimizers := range disableOptimizerOpts {
for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback, DisableOptimizers: disableOptimizers})
q1, err := newEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)

oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)
oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewRangeQuery(test.Storage(), nil, tc.query, tc.start, tc.end, step)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)
oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)

testutil.Equals(t, oldResult, newResult)
})
testutil.Equals(t, oldResult, newResult)
})
}
}
})
}
Expand Down Expand Up @@ -990,11 +1011,6 @@ func TestInstantQuery(t *testing.T) {
http_requests_total{pod="nginx-2"} 1+2x20`,
query: "sum(irate(http_requests_total[1m]))",
},
{
name: "string literal",
load: "",
query: `"hello"`,
},
{
name: "number literal",
load: "",
Expand Down Expand Up @@ -1317,37 +1333,42 @@ func TestInstantQuery(t *testing.T) {
},
}

disableOptimizers := []bool{true, false}
lookbackDeltas := []time.Duration{30 * time.Second, time.Minute, 5 * time.Minute, 10 * time.Minute}
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
test, err := promql.NewTest(t, tc.load)
testutil.Ok(t, err)
defer test.Close()
for _, withoutOptimizers := range disableOptimizers {
t.Run(fmt.Sprintf("disableOptimizers=%t", withoutOptimizers), func(t *testing.T) {
for _, lookbackDelta := range lookbackDeltas {
opts.LookbackDelta = lookbackDelta
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
test, err := promql.NewTest(t, tc.load)
testutil.Ok(t, err)
defer test.Close()

testutil.Ok(t, test.Run())
testutil.Ok(t, test.Run())

for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)
for _, disableFallback := range []bool{false, true} {
t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) {
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: disableFallback})
q1, err := newEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
testutil.Ok(t, newResult.Err)

oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)
oldEngine := promql.NewEngine(opts)
q2, err := oldEngine.NewInstantQuery(test.Storage(), nil, tc.query, queryTime)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)
oldResult := q2.Exec(context.Background())
testutil.Ok(t, oldResult.Err)

testutil.Equals(t, oldResult, newResult)
testutil.Equals(t, oldResult, newResult)
})
}
})
}
})
}
}
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/thanos-community/promql-engine
go 1.18

require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/efficientgo/core v1.0.0-rc.0
github.com/go-kit/log v0.2.1
github.com/prometheus/client_golang v1.13.0
Expand All @@ -15,7 +16,6 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aws/aws-sdk-go v1.44.109 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
Expand Down
28 changes: 28 additions & 0 deletions logicalplan/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

type FilteredSelector struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems to change PromQL spec and adds a new filter() function to it. I'm wondering how safe this is for example filter is implemented upstream, in which case it may do something completely different and we end up with diverging specs.

Copy link
Collaborator Author

@fpetkovski fpetkovski Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, we are adding a new AST node type, similar to how StepInvariant was added upstream: https://github.com/prometheus/prometheus/blob/96d5a32659f0e3928c10a771e50123fead9828bd/promql/parser/ast.go#L178-L183

However, this is not changing the actual spec of PromQL. It only changes the parsed AST. In the best case scenario we would have our own structs copied from the AST and work with them. I thought that might be an overkill for now so I decided to start with something simpler.

If something like this was added to upsteram PromQL, I would expect some tests to fail somewhere :) I am not sure if there is a better way to prevent such incompatibilities, and I don't also see how exactly they would manifest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack! I see! This makes sense, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have to extend AST if we want to reuse it and avoid transforming types too many times.

*parser.VectorSelector
Filters []*labels.Matcher
}

func (f FilteredSelector) String() string {
return fmt.Sprintf("filter(%s, %s)", f.Filters, f.VectorSelector.String())
}

func (f FilteredSelector) Pretty(level int) string { return f.String() }

func (f FilteredSelector) PositionRange() parser.PositionRange { return parser.PositionRange{} }

func (f FilteredSelector) Type() parser.ValueType { return parser.ValueTypeVector }

func (f FilteredSelector) PromQLExpr() {}
Loading