diff --git a/promql/index.js b/promql/index.js index d1fc80f6..8bda90ab 100644 --- a/promql/index.js +++ b/promql/index.js @@ -121,6 +121,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => { const db = DATABASE_NAME() const subq = (subqueries || {})[getMetricName(matchers)] if (subq) { + console.log(subq) const data = await rawRequest(subq + ' FORMAT RowBinary', null, db, { responseType: 'arraybuffer' }) return new Uint8Array(data.data) diff --git a/wasm_parts/main.go b/wasm_parts/main.go index e8c326a5..7f3ad847 100644 --- a/wasm_parts/main.go +++ b/wasm_parts/main.go @@ -274,6 +274,9 @@ func pql(id uint32, c *ctx, optimizable bool, } } + maxDurationMs := getMaxDurationMs(rq.Statement()) + fromMs -= maxDurationMs + subsels := strings.Builder{} subsels.WriteString("{") if optimizable { @@ -291,7 +294,7 @@ func pql(id uint32, c *ctx, optimizable bool, if i != 0 { subsels.WriteString(",") } - subsels.WriteString(fmt.Sprintf(`"%s":"%s"`, strconv.Quote(k), strconv.Quote(v))) + subsels.WriteString(fmt.Sprintf(`%s:%s`, strconv.Quote(k), strconv.Quote(v))) i++ } } @@ -299,7 +302,10 @@ func pql(id uint32, c *ctx, optimizable bool, matchersJSON := getmatchersJSON(rq) - c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s}`, subsels.String(), matchersJSON)) + c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s, "fromMs": %d}`, + subsels.String(), + matchersJSON, + fromMs)) c.onDataLoad = func(c *ctx) { ctxId := gcContext.GetContextID() gcContext.SetContext(id) @@ -312,6 +318,21 @@ func pql(id uint32, c *ctx, optimizable bool, return 0 } +func getMaxDurationMs(q parser.Node) int64 { + maxDurationMs := int64(0) + for _, c := range parser.Children(q) { + _m := getMaxDurationMs(c) + if _m > maxDurationMs { + maxDurationMs = _m + } + } + ms, _ := q.(*parser.MatrixSelector) + if ms != nil && maxDurationMs < ms.Range.Milliseconds() { + return ms.Range.Milliseconds() + } + return maxDurationMs +} + func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) { appliableNodes := findAppliableNodes(q.Statement(), nil) var err error @@ -352,7 +373,7 @@ func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[ IsCluster: false, From: time.Unix(0, fromMs*1000000), To: time.Unix(0, toMs*1000000), - Step: time.Millisecond * time.Duration(stepMs), + Step: time.Millisecond * 15000, /*time.Duration(stepMs)*/ TimeSeriesTable: "time_series", TimeSeriesDistTable: "time_series_dist", TimeSeriesGinTable: "time_series_gin", @@ -448,76 +469,6 @@ func writeVector(v promql.Vector) string { } func main() { - queriable := &TestQueryable{id: 0, stepMs: 15000} - rq, err := getEng().NewRangeQuery( - queriable, - nil, - "histogram_quantile(0.5, sum by (container) (rate(network_usage{container=~\"awesome\"}[5m])))", - time.Now().Add(time.Hour*-24), - time.Now(), - time.Millisecond*time.Duration(15000)) - if err != nil { - panic(err) - } - matchers := findAppliableNodes(rq.Statement(), nil) - for _, m := range matchers { - fmt.Println(m) - opt := m.optimizer - opt = &promql2.FinalizerOptimizer{ - SubOptimizer: opt, - } - opt, err = promql2.PlanOptimize(m.node, opt) - if err != nil { - panic(err) - } - planner, err := opt.Optimize(m.node) - if err != nil { - panic(err) - } - fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano()) - fmt.Println(rq.Statement()) - swapChild(m.parent, m.node, &parser.VectorSelector{ - Name: fakeMetric, - OriginalOffset: 0, - Offset: 0, - Timestamp: nil, - StartOrEnd: 0, - LabelMatchers: []*labels.Matcher{ - { - Type: labels.MatchEqual, - Name: "__name__", - Value: fakeMetric, - }, - }, - UnexpandedSeriesSet: nil, - Series: nil, - PosRange: parser.PositionRange{}, - }) - fmt.Println(rq.Statement()) - sel, err := planner.Process(&shared2.PlannerContext{ - IsCluster: false, - From: time.Now().Add(time.Hour * -24), - To: time.Now(), - Step: time.Millisecond * time.Duration(15000), - TimeSeriesTable: "time_series", - TimeSeriesDistTable: "time_series_dist", - TimeSeriesGinTable: "time_series_gin", - MetricsTable: "metrics_15s", - MetricsDistTable: "metrics_15s_dist", - }) - if err != nil { - panic(err) - } - strSel, err := sel.String(&sql.Ctx{ - Params: map[string]sql.SQLObject{}, - Result: map[string]sql.SQLObject{}, - }) - if err != nil { - panic(err) - } - println(strSel) - } - } func getOptimizer(n parser.Node) promql2.IOptimizer { @@ -696,10 +647,11 @@ type TestSeries struct { data []byte stepMs int64 - labels labels.Labels - tsMs int64 - val float64 - i int + labels labels.Labels + tsMs int64 + val float64 + lastValTs int64 + i int state int } @@ -721,11 +673,14 @@ func (t *TestSeries) Next() bool { t.tsMs += t.stepMs if t.tsMs >= ts { t.state = 0 + } else if t.lastValTs+300000 < t.tsMs { + t.state = 0 } } if t.state == 0 { t.tsMs = ts t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8])) + t.lastValTs = t.tsMs t.i++ t.state = 1 } diff --git a/wasm_parts/main.js b/wasm_parts/main.js index 03e84b4b..0ef187a6 100644 --- a/wasm_parts/main.js +++ b/wasm_parts/main.js @@ -62,7 +62,7 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => const step = stepMs || 15000 return await pql(query, (ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), - (matchers, subq) => getData(matchers, start, end, subq)) + (matchers, subq, startMs) => getData(matchers, startMs, end, subq)) } /** @@ -75,9 +75,10 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => module.exports.pqlInstantQuery = async (query, timeMs, getData) => { const time = timeMs || Date.now() const _wasm = getWasm() + const start = time - 300000 return await pql(query, (ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), - (matchers, subq) => getData(matchers, time - 300000, time, subq)) + (matchers, subq, start) => getData(matchers, start, time, subq)) } module.exports.pqlMatchers = (query) => { @@ -162,8 +163,8 @@ const pql = async (query, wasmCall, getData) => { const matchersObj = JSON.parse(ctx.read()) const matchersResults = await Promise.all( - matchersObj.map(async (matchers, i) => { - const data = await getData(matchers.matchers, matchers.subqueries) + matchersObj.matchers.map(async (matchers, i) => { + const data = await getData(matchers, matchersObj.subqueries, matchersObj.fromMs) return { matchers, data } })) diff --git a/wasm_parts/main.wasm.gz b/wasm_parts/main.wasm.gz index 469487ea..eba7be72 100644 Binary files a/wasm_parts/main.wasm.gz and b/wasm_parts/main.wasm.gz differ diff --git a/wasm_parts/promql/planners/finalize.go b/wasm_parts/promql/planners/finalize.go index 8c26aec8..f3064f28 100644 --- a/wasm_parts/promql/planners/finalize.go +++ b/wasm_parts/promql/planners/finalize.go @@ -32,16 +32,16 @@ func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, erro withMain := sql.NewWith(main, "pre_final") res := sql.NewSelect().With(withMain).Select(withMain). Select( - sql.NewSimpleCol("pre_final.fingerprint", "fingerprint"), sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"), sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"), ).From(sql.NewWithRef(withMain)). + //AndWhere(sql.Neq(sql.NewRawObject("pre_final.value"), sql.NewIntVal(0))). Join(sql.NewJoin( "ANY LEFT", sql.NewWithRef(withLabels), sql.Eq( sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))). - GroupBy(sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".labels")) + GroupBy(sql.NewRawObject(withLabels.GetAlias() + ".labels")) return res, nil } diff --git a/wasm_parts/promql/planners/metrics_extend.go b/wasm_parts/promql/planners/metrics_extend.go index f5986ea8..52e1916c 100644 --- a/wasm_parts/promql/planners/metrics_extend.go +++ b/wasm_parts/promql/planners/metrics_extend.go @@ -22,7 +22,13 @@ func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, withMain := sql.NewWith(main, "pre_extend") extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { return fmt.Sprintf( - "argMaxIf(value, timestamp_ms, isNaN(value) = 0) OVER ("+ + "argMaxIf(value, timestamp_ms, pre_extend.original = 1) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", extendCnt), nil + }) + origCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "max(original) OVER ("+ "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ ")", extendCnt), nil }) @@ -30,7 +36,11 @@ func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, Select( sql.NewSimpleCol("fingerprint", "fingerprint"), sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), - sql.NewCol(extendedCol, "value")). + sql.NewCol(extendedCol, "value"), + sql.NewCol(origCol, "original")). From(sql.NewWithRef(withMain)) - return extend, nil + withExtend := sql.NewWith(extend, "extend") + return sql.NewSelect().With(withExtend).Select(sql.NewRawObject("*")). + From(sql.NewWithRef(withExtend)). + AndWhere(sql.Eq(sql.NewRawObject("original"), sql.NewIntVal(1))), nil } diff --git a/wasm_parts/promql/planners/metrics_rate.go b/wasm_parts/promql/planners/metrics_rate.go index 7a4f6253..4a472c42 100644 --- a/wasm_parts/promql/planners/metrics_rate.go +++ b/wasm_parts/promql/planners/metrics_rate.go @@ -34,18 +34,39 @@ func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ ")", rateCnt), nil }) - valueCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + resetCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { return fmt.Sprintf( - "if(last > first, last - first, last) / %f", m.Duration.Seconds()), nil + "if(value < (any(value) OVER (" + + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" + + ") as lastValue), lastValue, 0)"), nil }) - extend := sql.NewSelect().With(withMain). + reset := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(resetCol, "reset"), + sql.NewSimpleCol("value", "value")). + From(sql.NewWithRef(withMain)) + withReset := sql.NewWith(reset, "pre_reset") + resetColSum := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + _rateCnt := rateCnt - 1 + if rateCnt <= 1 { + _rateCnt = 1 + } + return fmt.Sprintf( + "sum(reset) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", _rateCnt), nil + }) + extend := sql.NewSelect().With(withReset). Select( sql.NewSimpleCol("fingerprint", "fingerprint"), sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), sql.NewCol(lastCol, "last"), sql.NewCol(firstCol, "first"), - sql.NewCol(valueCol, "_value")). - From(sql.NewWithRef(withMain)) + sql.NewCol(resetColSum, "reset"), + sql.NewSimpleCol(fmt.Sprintf("(last - first + reset) / %f", m.Duration.Seconds()), "_value")). + From(sql.NewWithRef(withReset)) withExtend := sql.NewWith(extend, "rate") return sql.NewSelect(). With(withExtend). diff --git a/wasm_parts/promql/planners/metrics_raw_init.go b/wasm_parts/promql/planners/metrics_raw_init.go index a0b69cb4..4cc233c3 100644 --- a/wasm_parts/promql/planners/metrics_raw_init.go +++ b/wasm_parts/promql/planners/metrics_raw_init.go @@ -26,7 +26,8 @@ func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, e return sql.NewSelect().With(withFpReq).Select( sql.NewSimpleCol("fingerprint", "fingerprint"), sql.NewCol(tsNsCol, "timestamp_ms"), - sql.NewCol(m.ValueCol, "value")). + sql.NewCol(m.ValueCol, "value"), + sql.NewSimpleCol("1::UInt8", "original")). From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")). AndWhere( sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())), diff --git a/wasm_parts/promql/planners/metrics_zerofill.go b/wasm_parts/promql/planners/metrics_zerofill.go index bf3d9ad0..4f8fc703 100644 --- a/wasm_parts/promql/planners/metrics_zerofill.go +++ b/wasm_parts/promql/planners/metrics_zerofill.go @@ -15,7 +15,12 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec if err != nil { return nil, err } - withMain := sql.NewWith(main, "prezerofill") + main.OrderBy(sql.NewRawObject("fingerprint"), sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("timestamp_ms WITH FILL FROM %d TO %d STEP %d", + ctx.From.UnixMilli(), ctx.To.UnixMilli(), ctx.Step.Milliseconds()), nil + })) + return main, nil + /*withMain := sql.NewWith(main, "prezerofill") arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1 zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))", @@ -37,9 +42,9 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec postZeroFill := sql.NewSelect().With(withZeroFill). Select( sql.NewSimpleCol("fingerprint", "fingerprint"), - sql.NewSimpleCol("val.1", "timestamp_ms"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), sql.NewSimpleCol("val.2", "value")). - From(sql.NewWithRef(withZeroFill)). + From(sql.NewWithRef(withMain)). Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil)) - return postZeroFill, nil + return postZeroFill, nil*/ }