Skip to content

Commit

Permalink
promql optimization: rate & sum
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Jun 20, 2024
1 parent b592684 commit 8fa69d2
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 96 deletions.
1 change: 1 addition & 0 deletions promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
const db = DATABASE_NAME()
const subq = (subqueries || {})[getMetricName(matchers)]
if (subq) {
console.log(subq)
const data = await rawRequest(subq + ' FORMAT RowBinary',
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
Expand Down
109 changes: 32 additions & 77 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ func pql(id uint32, c *ctx, optimizable bool,
}
}

maxDurationMs := getMaxDurationMs(rq.Statement())
fromMs -= maxDurationMs

subsels := strings.Builder{}
subsels.WriteString("{")
if optimizable {
Expand All @@ -291,15 +294,18 @@ func pql(id uint32, c *ctx, optimizable bool,
if i != 0 {
subsels.WriteString(",")
}
subsels.WriteString(fmt.Sprintf(`"%s":"%s"`, strconv.Quote(k), strconv.Quote(v)))
subsels.WriteString(fmt.Sprintf(`%s:%s`, strconv.Quote(k), strconv.Quote(v)))
i++
}
}
subsels.WriteString("}")

matchersJSON := getmatchersJSON(rq)

c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s}`, subsels.String(), matchersJSON))
c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s, "fromMs": %d}`,
subsels.String(),
matchersJSON,
fromMs))
c.onDataLoad = func(c *ctx) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
Expand All @@ -312,6 +318,21 @@ func pql(id uint32, c *ctx, optimizable bool,
return 0
}

func getMaxDurationMs(q parser.Node) int64 {
maxDurationMs := int64(0)
for _, c := range parser.Children(q) {
_m := getMaxDurationMs(c)
if _m > maxDurationMs {
maxDurationMs = _m
}
}
ms, _ := q.(*parser.MatrixSelector)
if ms != nil && maxDurationMs < ms.Range.Milliseconds() {
return ms.Range.Milliseconds()
}
return maxDurationMs
}

func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) {
appliableNodes := findAppliableNodes(q.Statement(), nil)
var err error
Expand Down Expand Up @@ -352,7 +373,7 @@ func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[
IsCluster: false,
From: time.Unix(0, fromMs*1000000),
To: time.Unix(0, toMs*1000000),
Step: time.Millisecond * time.Duration(stepMs),
Step: time.Millisecond * 15000, /*time.Duration(stepMs)*/
TimeSeriesTable: "time_series",
TimeSeriesDistTable: "time_series_dist",
TimeSeriesGinTable: "time_series_gin",
Expand Down Expand Up @@ -448,76 +469,6 @@ func writeVector(v promql.Vector) string {
}

func main() {
queriable := &TestQueryable{id: 0, stepMs: 15000}
rq, err := getEng().NewRangeQuery(
queriable,
nil,
"histogram_quantile(0.5, sum by (container) (rate(network_usage{container=~\"awesome\"}[5m])))",
time.Now().Add(time.Hour*-24),
time.Now(),
time.Millisecond*time.Duration(15000))
if err != nil {
panic(err)
}
matchers := findAppliableNodes(rq.Statement(), nil)
for _, m := range matchers {
fmt.Println(m)
opt := m.optimizer
opt = &promql2.FinalizerOptimizer{
SubOptimizer: opt,
}
opt, err = promql2.PlanOptimize(m.node, opt)
if err != nil {
panic(err)
}
planner, err := opt.Optimize(m.node)
if err != nil {
panic(err)
}
fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano())
fmt.Println(rq.Statement())
swapChild(m.parent, m.node, &parser.VectorSelector{
Name: fakeMetric,
OriginalOffset: 0,
Offset: 0,
Timestamp: nil,
StartOrEnd: 0,
LabelMatchers: []*labels.Matcher{
{
Type: labels.MatchEqual,
Name: "__name__",
Value: fakeMetric,
},
},
UnexpandedSeriesSet: nil,
Series: nil,
PosRange: parser.PositionRange{},
})
fmt.Println(rq.Statement())
sel, err := planner.Process(&shared2.PlannerContext{
IsCluster: false,
From: time.Now().Add(time.Hour * -24),
To: time.Now(),
Step: time.Millisecond * time.Duration(15000),
TimeSeriesTable: "time_series",
TimeSeriesDistTable: "time_series_dist",
TimeSeriesGinTable: "time_series_gin",
MetricsTable: "metrics_15s",
MetricsDistTable: "metrics_15s_dist",
})
if err != nil {
panic(err)
}
strSel, err := sel.String(&sql.Ctx{
Params: map[string]sql.SQLObject{},
Result: map[string]sql.SQLObject{},
})
if err != nil {
panic(err)
}
println(strSel)
}

}

func getOptimizer(n parser.Node) promql2.IOptimizer {
Expand Down Expand Up @@ -696,10 +647,11 @@ type TestSeries struct {
data []byte
stepMs int64

labels labels.Labels
tsMs int64
val float64
i int
labels labels.Labels
tsMs int64
val float64
lastValTs int64
i int

state int
}
Expand All @@ -721,11 +673,14 @@ func (t *TestSeries) Next() bool {
t.tsMs += t.stepMs
if t.tsMs >= ts {
t.state = 0
} else if t.lastValTs+300000 < t.tsMs {
t.state = 0
}
}
if t.state == 0 {
t.tsMs = ts
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
t.lastValTs = t.tsMs
t.i++
t.state = 1
}
Expand Down
9 changes: 5 additions & 4 deletions wasm_parts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
const step = stepMs || 15000
return await pql(query,
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
(matchers, subq) => getData(matchers, start, end, subq))
(matchers, subq, startMs) => getData(matchers, startMs, end, subq))
}

/**
Expand All @@ -75,9 +75,10 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
const time = timeMs || Date.now()
const _wasm = getWasm()
const start = time - 300000
return await pql(query,
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
(matchers, subq) => getData(matchers, time - 300000, time, subq))
(matchers, subq, start) => getData(matchers, start, time, subq))
}

module.exports.pqlMatchers = (query) => {
Expand Down Expand Up @@ -162,8 +163,8 @@ const pql = async (query, wasmCall, getData) => {
const matchersObj = JSON.parse(ctx.read())

const matchersResults = await Promise.all(
matchersObj.map(async (matchers, i) => {
const data = await getData(matchers.matchers, matchers.subqueries)
matchersObj.matchers.map(async (matchers, i) => {
const data = await getData(matchers, matchersObj.subqueries, matchersObj.fromMs)
return { matchers, data }
}))

Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.
4 changes: 2 additions & 2 deletions wasm_parts/promql/planners/finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, erro
withMain := sql.NewWith(main, "pre_final")
res := sql.NewSelect().With(withMain).Select(withMain).
Select(
sql.NewSimpleCol("pre_final.fingerprint", "fingerprint"),
sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"),
sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"),
).From(sql.NewWithRef(withMain)).
//AndWhere(sql.Neq(sql.NewRawObject("pre_final.value"), sql.NewIntVal(0))).
Join(sql.NewJoin(
"ANY LEFT",
sql.NewWithRef(withLabels),
sql.Eq(
sql.NewRawObject("pre_final.fingerprint"),
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))).
GroupBy(sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".labels"))
GroupBy(sql.NewRawObject(withLabels.GetAlias() + ".labels"))
return res, nil
}
16 changes: 13 additions & 3 deletions wasm_parts/promql/planners/metrics_extend.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect,
withMain := sql.NewWith(main, "pre_extend")
extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
return fmt.Sprintf(
"argMaxIf(value, timestamp_ms, isNaN(value) = 0) OVER ("+
"argMaxIf(value, timestamp_ms, pre_extend.original = 1) OVER ("+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
")", extendCnt), nil
})
origCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
return fmt.Sprintf(
"max(original) OVER ("+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
")", extendCnt), nil
})
extend := sql.NewSelect().With(withMain).
Select(
sql.NewSimpleCol("fingerprint", "fingerprint"),
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
sql.NewCol(extendedCol, "value")).
sql.NewCol(extendedCol, "value"),
sql.NewCol(origCol, "original")).
From(sql.NewWithRef(withMain))
return extend, nil
withExtend := sql.NewWith(extend, "extend")
return sql.NewSelect().With(withExtend).Select(sql.NewRawObject("*")).
From(sql.NewWithRef(withExtend)).
AndWhere(sql.Eq(sql.NewRawObject("original"), sql.NewIntVal(1))), nil
}
31 changes: 26 additions & 5 deletions wasm_parts/promql/planners/metrics_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,39 @@ func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
")", rateCnt), nil
})
valueCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
resetCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
return fmt.Sprintf(
"if(last > first, last - first, last) / %f", m.Duration.Seconds()), nil
"if(value < (any(value) OVER (" +
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" +
") as lastValue), lastValue, 0)"), nil
})
extend := sql.NewSelect().With(withMain).
reset := sql.NewSelect().With(withMain).
Select(
sql.NewSimpleCol("fingerprint", "fingerprint"),
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
sql.NewCol(resetCol, "reset"),
sql.NewSimpleCol("value", "value")).
From(sql.NewWithRef(withMain))
withReset := sql.NewWith(reset, "pre_reset")
resetColSum := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
_rateCnt := rateCnt - 1
if rateCnt <= 1 {
_rateCnt = 1
}
return fmt.Sprintf(
"sum(reset) OVER ("+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
")", _rateCnt), nil
})
extend := sql.NewSelect().With(withReset).
Select(
sql.NewSimpleCol("fingerprint", "fingerprint"),
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
sql.NewCol(lastCol, "last"),
sql.NewCol(firstCol, "first"),
sql.NewCol(valueCol, "_value")).
From(sql.NewWithRef(withMain))
sql.NewCol(resetColSum, "reset"),
sql.NewSimpleCol(fmt.Sprintf("(last - first + reset) / %f", m.Duration.Seconds()), "_value")).
From(sql.NewWithRef(withReset))
withExtend := sql.NewWith(extend, "rate")
return sql.NewSelect().
With(withExtend).
Expand Down
3 changes: 2 additions & 1 deletion wasm_parts/promql/planners/metrics_raw_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, e
return sql.NewSelect().With(withFpReq).Select(
sql.NewSimpleCol("fingerprint", "fingerprint"),
sql.NewCol(tsNsCol, "timestamp_ms"),
sql.NewCol(m.ValueCol, "value")).
sql.NewCol(m.ValueCol, "value"),
sql.NewSimpleCol("1::UInt8", "original")).
From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")).
AndWhere(
sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())),
Expand Down
13 changes: 9 additions & 4 deletions wasm_parts/promql/planners/metrics_zerofill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec
if err != nil {
return nil, err
}
withMain := sql.NewWith(main, "prezerofill")
main.OrderBy(sql.NewRawObject("fingerprint"), sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
return fmt.Sprintf("timestamp_ms WITH FILL FROM %d TO %d STEP %d",
ctx.From.UnixMilli(), ctx.To.UnixMilli(), ctx.Step.Milliseconds()), nil
}))
return main, nil
/*withMain := sql.NewWith(main, "prezerofill")
arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1
zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))",
Expand All @@ -37,9 +42,9 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec
postZeroFill := sql.NewSelect().With(withZeroFill).
Select(
sql.NewSimpleCol("fingerprint", "fingerprint"),
sql.NewSimpleCol("val.1", "timestamp_ms"),
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
sql.NewSimpleCol("val.2", "value")).
From(sql.NewWithRef(withZeroFill)).
From(sql.NewWithRef(withMain)).
Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil))
return postZeroFill, nil
return postZeroFill, nil*/
}

0 comments on commit 8fa69d2

Please sign in to comment.