diff --git a/promql/index.js b/promql/index.js index f70caafa..d1fc80f6 100644 --- a/promql/index.js +++ b/promql/index.js @@ -117,8 +117,14 @@ const getIdxSubquery = (conds, fromMs, toMs) => { ).groupBy('fingerprint') } -module.exports.getData = async (matchers, fromMs, toMs) => { +module.exports.getData = async (matchers, fromMs, toMs, subqueries) => { const db = DATABASE_NAME() + const subq = (subqueries || {})[getMetricName(matchers)] + if (subq) { + const data = await rawRequest(subq + ' FORMAT RowBinary', + null, db, { responseType: 'arraybuffer' }) + return new Uint8Array(data.data) + } const matches = getMatchersIdxCond(matchers) const idx = getIdxSubquery(matches, fromMs, toMs) const withIdx = new Sql.With('idx', idx, !!clusterName) @@ -176,4 +182,12 @@ module.exports.getData = async (matchers, fromMs, toMs) => { return new Uint8Array(data.data) } +function getMetricName(matchers) { + for (const matcher of matchers) { + if (matcher[0] === '__name__' && matcher[1] === '=') { + return matcher[2] + } + } +} + prometheus.getData = module.exports.getData diff --git a/wasm_parts/main.go b/wasm_parts/main.go index 0dcfae36..e8c326a5 100644 --- a/wasm_parts/main.go +++ b/wasm_parts/main.go @@ -4,8 +4,6 @@ import ( "context" "fmt" gcContext "github.com/metrico/micro-gc/context" - "sync" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -15,6 +13,8 @@ import ( "strings" "time" "unsafe" + promql2 "wasm_parts/promql" + shared2 "wasm_parts/promql/shared" sql "wasm_parts/sql_select" parser2 "wasm_parts/traceql/parser" traceql_transpiler "wasm_parts/traceql/transpiler" @@ -165,12 +165,12 @@ func stats() { } //export pqlRangeQuery -func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 { +func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64, optimizable uint32) uint32 { ctxId := gcContext.GetContextID() gcContext.SetContext(id) defer gcContext.SetContext(ctxId) - return pql(id, data[id], func() (promql.Query, error) { + return pql(id, data[id], optimizable != 0, int64(fromMS), int64(toMS), int64(stepMS), func() (promql.Query, error) { queriable := &TestQueryable{id: id, stepMs: int64(stepMS)} return getEng().NewRangeQuery( queriable, @@ -184,19 +184,20 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint } //export pqlInstantQuery -func pqlInstantQuery(id uint32, timeMS float64) uint32 { +func pqlInstantQuery(id uint32, timeMS float64, optimizable uint32) uint32 { ctxId := gcContext.GetContextID() gcContext.SetContext(id) defer gcContext.SetContext(ctxId) - return pql(id, data[id], func() (promql.Query, error) { - queriable := &TestQueryable{id: id, stepMs: 15000} - return getEng().NewInstantQuery( - queriable, - nil, - string(data[id].request), - time.Unix(0, int64(timeMS)*1000000)) - }) + return pql(id, data[id], optimizable != 0, int64(timeMS-300000), int64(timeMS), 15000, + func() (promql.Query, error) { + queriable := &TestQueryable{id: id, stepMs: 15000} + return getEng().NewInstantQuery( + queriable, + nil, + string(data[id].request), + time.Unix(0, int64(timeMS)*1000000)) + }) } //export pqlSeries @@ -255,13 +256,16 @@ func wrapErrorStr(err error) string { return err.Error() } -func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { +func pql(id uint32, c *ctx, optimizable bool, + fromMs int64, toMs int64, stepMs int64, + query func() (promql.Query, error)) uint32 { rq, err := query() if err != nil { c.response = wrapError(err) return 1 } + var walk func(node parser.Node, i func(node parser.Node)) walk = func(node parser.Node, i func(node parser.Node)) { i(node) @@ -269,9 +273,33 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { walk(n, i) } } + + subsels := strings.Builder{} + subsels.WriteString("{") + if optimizable { + var ( + subselsMap map[string]string + err error + ) + subselsMap, rq, err = optimizeQuery(rq, fromMs, toMs, stepMs) + if err != nil { + c.response = wrapError(err) + return 1 + } + i := 0 + for k, v := range subselsMap { + if i != 0 { + subsels.WriteString(",") + } + subsels.WriteString(fmt.Sprintf(`"%s":"%s"`, strconv.Quote(k), strconv.Quote(v))) + i++ + } + } + subsels.WriteString("}") + matchersJSON := getmatchersJSON(rq) - c.response = []byte(matchersJSON) + c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s}`, subsels.String(), matchersJSON)) c.onDataLoad = func(c *ctx) { ctxId := gcContext.GetContextID() gcContext.SetContext(id) @@ -284,6 +312,68 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { return 0 } +func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) { + appliableNodes := findAppliableNodes(q.Statement(), nil) + var err error + subsels := make(map[string]string) + for _, m := range appliableNodes { + fmt.Println(m) + opt := m.optimizer + opt = &promql2.FinalizerOptimizer{ + SubOptimizer: opt, + } + opt, err = promql2.PlanOptimize(m.node, opt) + if err != nil { + return nil, nil, err + } + planner, err := opt.Optimize(m.node) + if err != nil { + return nil, nil, err + } + fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano()) + swapChild(m.parent, m.node, &parser.VectorSelector{ + Name: fakeMetric, + OriginalOffset: 0, + Offset: 0, + Timestamp: nil, + StartOrEnd: 0, + LabelMatchers: []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "__name__", + Value: fakeMetric, + }, + }, + UnexpandedSeriesSet: nil, + Series: nil, + PosRange: parser.PositionRange{}, + }) + sel, err := planner.Process(&shared2.PlannerContext{ + IsCluster: false, + From: time.Unix(0, fromMs*1000000), + To: time.Unix(0, toMs*1000000), + Step: time.Millisecond * time.Duration(stepMs), + TimeSeriesTable: "time_series", + TimeSeriesDistTable: "time_series_dist", + TimeSeriesGinTable: "time_series_gin", + MetricsTable: "metrics_15s", + MetricsDistTable: "metrics_15s_dist", + }) + if err != nil { + return nil, nil, err + } + strSel, err := sel.String(&sql.Ctx{ + Params: map[string]sql.SQLObject{}, + Result: map[string]sql.SQLObject{}, + }) + if err != nil { + return nil, nil, err + } + subsels[fakeMetric] = strSel + } + return subsels, q, nil +} + //export onDataLoad func onDataLoad(idx uint32) { data[idx].onDataLoad(data[idx]) @@ -358,9 +448,171 @@ func writeVector(v promql.Vector) string { } func main() { - p := sync.Pool{} - a := p.Get() - _ = a + queriable := &TestQueryable{id: 0, stepMs: 15000} + rq, err := getEng().NewRangeQuery( + queriable, + nil, + "histogram_quantile(0.5, sum by (container) (rate(network_usage{container=~\"awesome\"}[5m])))", + time.Now().Add(time.Hour*-24), + time.Now(), + time.Millisecond*time.Duration(15000)) + if err != nil { + panic(err) + } + matchers := findAppliableNodes(rq.Statement(), nil) + for _, m := range matchers { + fmt.Println(m) + opt := m.optimizer + opt = &promql2.FinalizerOptimizer{ + SubOptimizer: opt, + } + opt, err = promql2.PlanOptimize(m.node, opt) + if err != nil { + panic(err) + } + planner, err := opt.Optimize(m.node) + if err != nil { + panic(err) + } + fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano()) + fmt.Println(rq.Statement()) + swapChild(m.parent, m.node, &parser.VectorSelector{ + Name: fakeMetric, + OriginalOffset: 0, + Offset: 0, + Timestamp: nil, + StartOrEnd: 0, + LabelMatchers: []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "__name__", + Value: fakeMetric, + }, + }, + UnexpandedSeriesSet: nil, + Series: nil, + PosRange: parser.PositionRange{}, + }) + fmt.Println(rq.Statement()) + sel, err := planner.Process(&shared2.PlannerContext{ + IsCluster: false, + From: time.Now().Add(time.Hour * -24), + To: time.Now(), + Step: time.Millisecond * time.Duration(15000), + TimeSeriesTable: "time_series", + TimeSeriesDistTable: "time_series_dist", + TimeSeriesGinTable: "time_series_gin", + MetricsTable: "metrics_15s", + MetricsDistTable: "metrics_15s_dist", + }) + if err != nil { + panic(err) + } + strSel, err := sel.String(&sql.Ctx{ + Params: map[string]sql.SQLObject{}, + Result: map[string]sql.SQLObject{}, + }) + if err != nil { + panic(err) + } + println(strSel) + } + +} + +func getOptimizer(n parser.Node) promql2.IOptimizer { + for _, f := range promql2.Optimizers { + opt := f() + if opt.IsAppliable(n) { + return opt + } + } + return nil +} + +func isRate(node parser.Node) (bool, bool) { + opt := getOptimizer(node) + if opt == nil { + return false, true + } + return true, false +} + +type MatchNode struct { + node parser.Node + parent parser.Node + optimizer promql2.IOptimizer +} + +func findAppliableNodes(root parser.Node, parent parser.Node) []MatchNode { + var res []MatchNode + optimizer := getOptimizer(root) + if optimizer != nil { + res = append(res, MatchNode{ + node: root, + parent: parent, + optimizer: optimizer, + }) + return res + } + for _, n := range parser.Children(root) { + res = append(res, findAppliableNodes(n, root)...) + } + return res +} + +func swapChild(node parser.Node, child parser.Node, newChild parser.Expr) { + // For some reasons these switches have significantly better performance than interfaces + switch n := node.(type) { + case *parser.EvalStmt: + n.Expr = newChild + case parser.Expressions: + for i, e := range n { + if e.String() == child.String() { + n[i] = newChild + } + } + case *parser.AggregateExpr: + if n.Expr == nil && n.Param == nil { + return + } else if n.Expr == nil { + n.Param = newChild + } else if n.Param == nil { + n.Expr = newChild + } else { + if n.Expr.String() == child.String() { + n.Expr = newChild + } else { + n.Param = newChild + } + } + case *parser.BinaryExpr: + if n.LHS.String() == child.String() { + n.LHS = newChild + } else if n.RHS.String() == child.String() { + n.RHS = newChild + } + case *parser.Call: + for i, e := range n.Args { + if e.String() == child.String() { + n.Args[i] = newChild + } + } + case *parser.SubqueryExpr: + n.Expr = newChild + case *parser.ParenExpr: + n.Expr = newChild + case *parser.UnaryExpr: + n.Expr = newChild + case *parser.MatrixSelector: + n.VectorSelector = newChild + case *parser.StepInvariantExpr: + n.Expr = newChild + } +} + +func getChildren(e parser.Node) []parser.Node { + return parser.Children(e) } type TestLogger struct{} @@ -585,3 +837,17 @@ func matchers2Str(labelMatchers []*labels.Matcher) string { matchersJson.WriteString("]") return matchersJson.String() } + +type pqlRequest struct { + optimizable bool + body string +} + +func (p *pqlRequest) Read(body []byte) { + r := BinaryReader{buffer: body} + p.optimizable = r.ReadULeb32() != 0 + p.body = r.ReadString() + if !p.optimizable { + return + } +} diff --git a/wasm_parts/main.js b/wasm_parts/main.js index bd15ac79..03e84b4b 100644 --- a/wasm_parts/main.js +++ b/wasm_parts/main.js @@ -61,8 +61,8 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => const end = endMs || Date.now() const step = stepMs || 15000 return await pql(query, - (ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step), - (matchers) => getData(matchers, start, end)) + (ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), + (matchers, subq) => getData(matchers, start, end, subq)) } /** @@ -76,8 +76,8 @@ module.exports.pqlInstantQuery = async (query, timeMs, getData) => { const time = timeMs || Date.now() const _wasm = getWasm() return await pql(query, - (ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time), - (matchers) => getData(matchers, time - 300000, time)) + (ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), + (matchers, subq) => getData(matchers, time - 300000, time, subq)) } module.exports.pqlMatchers = (query) => { @@ -163,7 +163,7 @@ const pql = async (query, wasmCall, getData) => { const matchersResults = await Promise.all( matchersObj.map(async (matchers, i) => { - const data = await getData(matchers) + const data = await getData(matchers.matchers, matchers.subqueries) return { matchers, data } })) diff --git a/wasm_parts/promql/aggregate.go b/wasm_parts/promql/aggregate.go new file mode 100644 index 00000000..0fd2ed15 --- /dev/null +++ b/wasm_parts/promql/aggregate.go @@ -0,0 +1,71 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/promql/parser" + "wasm_parts/promql/planners" + "wasm_parts/promql/shared" +) + +type AggregateOptimizer struct { + WithLabelsIn string + WithLabelsOut string + + subOptimizer IOptimizer +} + +func (a *AggregateOptimizer) IsAppliable(node parser.Node) bool { + aggExpr, ok := node.(*parser.AggregateExpr) + if !ok { + return false + } + if aggExpr.Op != parser.SUM { + return false + } + return GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) != nil +} + +func (a *AggregateOptimizer) PlanOptimize(node parser.Node) error { + aggExpr := node.(*parser.AggregateExpr) + a.subOptimizer = GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) + return a.subOptimizer.PlanOptimize(node) +} + +func (a *AggregateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) { + aggExpr := node.(*parser.AggregateExpr) + planner, err := a.subOptimizer.Optimize(aggExpr.Expr) + if err != nil { + return nil, err + } + withLabelsIn := a.WithLabelsIn + if withLabelsIn == "" { + planner = &planners.LabelsInitPlanner{ + Main: planner, + FingerprintsAlias: "fp_sel", + } + withLabelsIn = "labels" + } + if a.WithLabelsOut == "" { + return nil, fmt.Errorf("AggregateOptimizer.WithLabelsOut is empty") + } + byWithout := "by" + if aggExpr.Without { + byWithout = "without" + } + planner = &planners.ByWithoutPlanner{ + Main: planner, + FingerprintWithName: withLabelsIn, + FingerprintsOutName: a.WithLabelsOut, + ByWithout: byWithout, + Labels: aggExpr.Grouping, + } + planner = &planners.SumPlanner{ + Main: planner, + LabelsAlias: a.WithLabelsOut, + } + return planner, nil +} + +func (a *AggregateOptimizer) Children() []IOptimizer { + return []IOptimizer{a.subOptimizer} +} diff --git a/wasm_parts/promql/finalize.go b/wasm_parts/promql/finalize.go new file mode 100644 index 00000000..d15bcf29 --- /dev/null +++ b/wasm_parts/promql/finalize.go @@ -0,0 +1,45 @@ +package promql + +import ( + "github.com/prometheus/prometheus/promql/parser" + "wasm_parts/promql/planners" + "wasm_parts/promql/shared" +) + +type FinalizerOptimizer struct { + LabelsIn string + SubOptimizer IOptimizer +} + +func (f *FinalizerOptimizer) IsAppliable(node parser.Node) bool { + return false +} + +func (f *FinalizerOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) { + planner, err := f.SubOptimizer.Optimize(node) + if err != nil { + return nil, err + } + labelsIn := f.LabelsIn + if labelsIn == "" { + planner = &planners.LabelsInitPlanner{ + Main: planner, + FingerprintsAlias: "fp_sel", + } + labelsIn = "labels" + } + + planner = &planners.FinalizePlanner{ + LabelsAlias: labelsIn, + Main: planner, + } + return planner, nil +} + +func (f *FinalizerOptimizer) PlanOptimize(node parser.Node) error { + return f.SubOptimizer.PlanOptimize(node) +} + +func (f *FinalizerOptimizer) Children() []IOptimizer { + return []IOptimizer{f.SubOptimizer} +} diff --git a/wasm_parts/promql/optimize.go b/wasm_parts/promql/optimize.go new file mode 100644 index 00000000..2ea382ea --- /dev/null +++ b/wasm_parts/promql/optimize.go @@ -0,0 +1,37 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/promql/parser" +) + +func PlanOptimize(node parser.Node, optimizer IOptimizer) (IOptimizer, error) { + err := optimizer.PlanOptimize(node) + if err != nil { + return nil, err + } + + var checkLabelAliases func(opt IOptimizer, i int) int + checkLabelAliases = func(opt IOptimizer, i int) int { + var _i int + for _, c := range opt.Children() { + _i = checkLabelAliases(c, i) + } + switch opt.(type) { + case *AggregateOptimizer: + if _i != 0 { + opt.(*AggregateOptimizer).WithLabelsIn = fmt.Sprintf("labels_", _i) + } + opt.(*AggregateOptimizer).WithLabelsOut = fmt.Sprintf("labels_%d", _i+1) + _i++ + case *FinalizerOptimizer: + if _i != 0 { + opt.(*FinalizerOptimizer).LabelsIn = fmt.Sprintf("labels_%d", _i) + } + _i++ + } + return _i + } + checkLabelAliases(optimizer, 0) + return optimizer, nil +} diff --git a/wasm_parts/promql/planners/aggregate.go b/wasm_parts/promql/planners/aggregate.go new file mode 100644 index 00000000..a1f6cf0d --- /dev/null +++ b/wasm_parts/promql/planners/aggregate.go @@ -0,0 +1,48 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type SumPlanner struct { + Main shared.RequestPlanner + LabelsAlias string +} + +func (s *SumPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := s.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withLabels *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == s.LabelsAlias { + withLabels = w + break + } + } + if withLabels == nil { + return nil, fmt.Errorf("labels subrequest not found") + } + withMain := sql.NewWith(main, "pre_sum") + + res := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol(withLabels.GetAlias()+".new_fingerprint", "fingerprint"), + sql.NewSimpleCol("pre_sum.timestamp_ms", "timestamp_ms"), + sql.NewSimpleCol("sum(pre_sum.value)", "value")). + From(sql.NewWithRef(withMain)). + Join(sql.NewJoin( + "ANY LEFT", + sql.NewWithRef(withLabels), + sql.Eq( + sql.NewRawObject("pre_sum.fingerprint"), + sql.NewRawObject(withLabels.GetAlias()+".fingerprint")))). + GroupBy( + sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint"), + sql.NewRawObject("pre_sum.timestamp_ms")) + return res, nil +} diff --git a/wasm_parts/promql/planners/by_without.go b/wasm_parts/promql/planners/by_without.go new file mode 100644 index 00000000..de38b83e --- /dev/null +++ b/wasm_parts/promql/planners/by_without.go @@ -0,0 +1,59 @@ +package planners + +import ( + "fmt" + "strings" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type ByWithoutPlanner struct { + Main shared.RequestPlanner + FingerprintWithName string + FingerprintsOutName string + ByWithout string + Labels []string +} + +func (b *ByWithoutPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := b.Main.Process(ctx) + if err != nil { + return nil, err + } + var fp *sql.With + withs := main.GetWith() + for _, w := range withs { + if w.GetAlias() == b.FingerprintWithName { + fp = w + break + } + } + if fp == nil { + return nil, fmt.Errorf("fingerprints subrequest not found") + } + labelsCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + cond := "IN" + if b.ByWithout == "without" { + cond = "NOT IN" + } + values := make([]string, len(b.Labels)) + var err error + for i, l := range b.Labels { + values[i], err = sql.NewStringVal(l).String(ctx, options...) + if err != nil { + return "", err + } + } + return fmt.Sprintf("mapFilter((k,v) -> k %s (%s), labels)", cond, strings.Join(values, ",")), nil + }) + newFpCol := "cityHash64(arraySort(arrayZip(mapKeys(labels), mapValues(labels))))" + newFp := sql.NewSelect(). + Select( + sql.NewSimpleCol(fp.GetAlias()+".new_fingerprint", "fingerprint"), + sql.NewCol(labelsCol, "labels"), + sql.NewSimpleCol(newFpCol, "new_fingerprint"), + ). + From(sql.NewWithRef(fp)) + withNewFp := sql.NewWith(newFp, b.FingerprintsOutName) + return main.AddWith(withNewFp), nil +} diff --git a/wasm_parts/promql/planners/finalize.go b/wasm_parts/promql/planners/finalize.go new file mode 100644 index 00000000..8c26aec8 --- /dev/null +++ b/wasm_parts/promql/planners/finalize.go @@ -0,0 +1,47 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type FinalizePlanner struct { + LabelsAlias string + Main shared.RequestPlanner +} + +func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := f.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withLabels *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == f.LabelsAlias { + withLabels = w + break + } + } + + if withLabels == nil { + return nil, fmt.Errorf("FinalizePlanner.Process: %s CTE not found", f.LabelsAlias) + } + + withMain := sql.NewWith(main, "pre_final") + res := sql.NewSelect().With(withMain).Select(withMain). + Select( + sql.NewSimpleCol("pre_final.fingerprint", "fingerprint"), + sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"), + sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"), + ).From(sql.NewWithRef(withMain)). + Join(sql.NewJoin( + "ANY LEFT", + sql.NewWithRef(withLabels), + sql.Eq( + sql.NewRawObject("pre_final.fingerprint"), + sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))). + GroupBy(sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".labels")) + return res, nil +} diff --git a/wasm_parts/promql/planners/labels_init.go b/wasm_parts/promql/planners/labels_init.go new file mode 100644 index 00000000..35c060fc --- /dev/null +++ b/wasm_parts/promql/planners/labels_init.go @@ -0,0 +1,48 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type LabelsInitPlanner struct { + Main shared.RequestPlanner + FingerprintsAlias string +} + +func (l *LabelsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := l.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withFp *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == l.FingerprintsAlias { + withFp = w + break + } + } + + if withFp == nil { + return nil, fmt.Errorf("fingerprints subrequest not found") + } + + labelsCol := "mapFromArrays(" + + "arrayMap(x -> x.1, JSONExtractKeysAndValues(time_series.labels, 'String') as ts_kv), " + + "arrayMap(x -> x.2, ts_kv))" + + labelsSubSel := sql.NewSelect().Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol(labelsCol, "labels"), + sql.NewSimpleCol("fingerprint", "new_fingerprint")). + From(sql.NewSimpleCol(ctx.TimeSeriesTable, "time_series")). + AndWhere( + sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))), + sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))), + sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFp))) + withLabelsSubSel := sql.NewWith(labelsSubSel, "labels") + + return main.AddWith(withLabelsSubSel), nil +} diff --git a/wasm_parts/promql/planners/metrics_extend.go b/wasm_parts/promql/planners/metrics_extend.go new file mode 100644 index 00000000..f5986ea8 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_extend.go @@ -0,0 +1,36 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsExtendPlanner struct { + Main shared.RequestPlanner +} + +func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + extendCnt := 300000 / ctx.Step.Milliseconds() + if extendCnt < 1 { + return main, nil + } + withMain := sql.NewWith(main, "pre_extend") + extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMaxIf(value, timestamp_ms, isNaN(value) = 0) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", extendCnt), nil + }) + extend := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(extendedCol, "value")). + From(sql.NewWithRef(withMain)) + return extend, nil +} diff --git a/wasm_parts/promql/planners/metrics_rate.go b/wasm_parts/promql/planners/metrics_rate.go new file mode 100644 index 00000000..7a4f6253 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_rate.go @@ -0,0 +1,56 @@ +package planners + +import ( + "fmt" + "time" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type RatePlanner struct { + Main shared.RequestPlanner + Duration time.Duration +} + +func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + rateCnt := m.Duration.Milliseconds() / ctx.Step.Milliseconds() + if rateCnt < 1 { + rateCnt = 1 + } + withMain := sql.NewWith(main, "pre_rate") + lastCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMax(value, timestamp_ms) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", rateCnt), nil + }) + firstCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMin(value, timestamp_ms) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", rateCnt), nil + }) + valueCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "if(last > first, last - first, last) / %f", m.Duration.Seconds()), nil + }) + extend := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(lastCol, "last"), + sql.NewCol(firstCol, "first"), + sql.NewCol(valueCol, "_value")). + From(sql.NewWithRef(withMain)) + withExtend := sql.NewWith(extend, "rate") + return sql.NewSelect(). + With(withExtend). + Select(sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewSimpleCol("_value", "value")). + From(sql.NewWithRef(withExtend)), nil +} diff --git a/wasm_parts/promql/planners/metrics_raw_init.go b/wasm_parts/promql/planners/metrics_raw_init.go new file mode 100644 index 00000000..a0b69cb4 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_raw_init.go @@ -0,0 +1,36 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsInitPlanner struct { + ValueCol sql.SQLObject + Fingerprint shared.RequestPlanner +} + +func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + fpReq, err := m.Fingerprint.Process(ctx) + if err != nil { + return nil, err + } + withFpReq := sql.NewWith(fpReq, "fp_sel") + if m.ValueCol == nil { + m.ValueCol = sql.NewRawObject("argMaxMerge(last)") + } + tsNsCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("intDiv(timestamp_ns, %d) * %d", ctx.Step.Nanoseconds(), ctx.Step.Milliseconds()), nil + }) + return sql.NewSelect().With(withFpReq).Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewCol(tsNsCol, "timestamp_ms"), + sql.NewCol(m.ValueCol, "value")). + From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")). + AndWhere( + sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())), + sql.Le(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.To.UnixNano())), + sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFpReq))). + GroupBy(sql.NewRawObject("fingerprint"), sql.NewRawObject("timestamp_ms")), nil +} diff --git a/wasm_parts/promql/planners/metrics_zerofill.go b/wasm_parts/promql/planners/metrics_zerofill.go new file mode 100644 index 00000000..bf3d9ad0 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_zerofill.go @@ -0,0 +1,45 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsZeroFillPlanner struct { + Main shared.RequestPlanner +} + +func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + withMain := sql.NewWith(main, "prezerofill") + arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1 + zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))", + arrLen, ctx.From.UnixMilli(), ctx.Step.Milliseconds()), nil + }) + zeroFill := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewCol(zeroFillCol, "values")). + From(sql.NewWithRef(withMain)). + GroupBy(sql.NewRawObject("fingerprint")) + withZeroFill := sql.NewWith(zeroFill, "zerofill") + + joinZeroFillStmt := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("arrayMap((x,y) -> (y * %d + %d, x), values, range(%d))", + ctx.Step.Milliseconds(), ctx.From.UnixMilli(), arrLen), nil + }) + + postZeroFill := sql.NewSelect().With(withZeroFill). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("val.1", "timestamp_ms"), + sql.NewSimpleCol("val.2", "value")). + From(sql.NewWithRef(withZeroFill)). + Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil)) + return postZeroFill, nil +} diff --git a/wasm_parts/promql/planners/stream_select_planner.go b/wasm_parts/promql/planners/stream_select_planner.go new file mode 100644 index 00000000..af095b16 --- /dev/null +++ b/wasm_parts/promql/planners/stream_select_planner.go @@ -0,0 +1,102 @@ +package planners + +import ( + "fmt" + "github.com/prometheus/prometheus/model/labels" + "strings" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type StreamSelectPlanner struct { + Main shared.RequestPlanner + Matchers []*labels.Matcher +} + +func (s *StreamSelectPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := s.Main.Process(ctx) + if err != nil { + return nil, err + } + conds := make([]sql.SQLCondition, len(s.Matchers)) + for i, m := range s.Matchers { + conds[i], err = s.getCond(m) + if err != nil { + return nil, err + } + } + main.AndWhere(sql.Or(conds...)) + + bitSetEntries := make([]*bitSetEntry, len(conds)) + for i, c := range conds { + bitSetEntries[i] = &bitSetEntry{c, i} + } + main.AndHaving(sql.Eq(&bitSet{entries: bitSetEntries}, sql.NewIntVal((int64(1)<