Skip to content

Commit

Permalink
#feat: prom series support
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 16, 2023
1 parent 37fc338 commit 31de334
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 53 deletions.
45 changes: 13 additions & 32 deletions lib/handlers/prom_series.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
const { scanSeries } = require('../db/clickhouse')
const { CORS } = require('../../common')
const { Compiler } = require('bnf')
const { isArray } = require('handlebars-helpers/lib/array')
const { QrynError } = require('./errors')

const promqlSeriesBnf = `
<SYNTAX> ::= <metric_name><OWSP> | "{" <OWSP> <label_selectors> <OWSP> "}" | <metric_name><OWSP> "{" <OWSP> [<label_selectors>] <OWSP> "}"
label ::= (<ALPHA> | "_") *(<ALPHA> | "." | "_" | <DIGITS>)
operator ::= "=~" | "!~" | "!=" | "="
quoted_str ::= (<QUOTE><QUOTE>) | (<AQUOTE><AQUOTE>) | <QLITERAL> | <AQLITERAL>
metric_name ::= label
label_selector ::= <label> <OWSP> <operator> <OWSP> <quoted_str>
label_selectors ::= <label_selector><OWSP>*(","<OWSP><label_selector><OWSP>)
`

const compiler = new Compiler()
compiler.AddLanguage(promqlSeriesBnf, 'promql_series')
const {series} = require('../../promql/index')

// Series Handler
async function handler (req, res) {
Expand All @@ -30,27 +17,21 @@ async function handler (req, res) {
if (!isArray(query)) {
query = [query]
}
query = query.map((m) => {
let res = '{'
let parsed = compiler.ParseScript(m)
if (!parsed) {
throw new QrynError(400, `invalid series query ${m}`)
}
parsed = parsed.rootToken
res += parsed.Child('metric_name') ? `name="${parsed.Child('metric_name').val}` : ''
res += parsed.Child('metric_name') && parsed.Child('label_selector') ? ',' : ''
res += parsed.Children('label_selector').map(c => c.value.toString()).join(',')
res += '}'
return res
const startMs = req.query.start ? parseInt(req.query.start) * 1000 : Date.now() - 7 * 24 * 3600 * 1000
const endMs = req.query.end ? parseInt(req.query.end) * 1000 : Date.now() - 7 * 24 * 3600 * 1000
const result = []
query = query.map(async (m) => {
const _result = await series(m, startMs, endMs)
result.push.apply(result, _result)
})
// convert the input query into a label selector
const response = await scanSeries(query)
res.code(200)
res.headers({
await Promise.all(query)
return res.code(200).headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
}).send(JSON.stringify({
status: 'success',
data: result
}))
}

module.exports = handler
61 changes: 50 additions & 11 deletions promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,34 @@ module.exports.instantQuery = async (query, timeMs) => {
return JSON.parse(resp)
}

module.exports.getData = async (matchers, fromMs, toMs) => {
const matches = []
module.exports.series = async (query, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
const matchers = prometheus.pqlMatchers(query)
const conds = getMatchersIdxCond(matchers[0])
const idx = getIdxSubquery(conds, fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
const req = (new Sql.Select())
.with(withIdx)
.select([new Sql.Raw('any(labels)'), 'labels'])
.from(`time_series${_dist}`)
.where(Sql.And(
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx))))
.groupBy(new Sql.Raw('fingerprint'))
const data = await rawRequest(req.toString() + ' FORMAT JSON',
null,
DATABASE_NAME())
return data.data.data.map(l => JSON.parse(l.labels))
}

/**
*
* @param matchers {[[string]]}
*/
const getMatchersIdxCond = (matchers) => {
const matchesCond = []
for (const matcher of matchers) {
const _matcher = [
Sql.Eq('key', matcher[0])
Expand All @@ -40,27 +66,42 @@ module.exports.getData = async (matchers, fromMs, toMs) => {
case '!~':
_matcher.push(Sql.Ne(Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
matches.push(Sql.And(..._matcher))
matchesCond.push(Sql.And(..._matcher))
}
return matchesCond
}

const idx = (new Sql.Select())
const getIdxSubquery = (conds, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
return (new Sql.Select())
.select('fingerprint')
.from('time_series_gin')
.where(Sql.Or(...matches))
.where(Sql.And(
Sql.Or(...conds),
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`))))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + matches.map(
new Sql.Raw('groupBitOr(' + conds.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << matches.length) - 1)
).join('+') + ')'), (1 << conds.length) - 1)
).groupBy('fingerprint')
const withIdx = new Sql.With('idx', idx, false)
}

module.exports.getData = async (matchers, fromMs, toMs) => {
const db = DATABASE_NAME()
const matches = getMatchersIdxCond(matchers)
const idx = getIdxSubquery(matches, fromMs, toMs)

const withIdx = new Sql.With('idx', idx, !!clusterName)
const raw = (new Sql.Select())
.with(withIdx)
.select(
[new Sql.Raw('argMaxMerge(last)'), 'value'],
'fingerprint',
[new Sql.Raw('intDiv(timestamp_ns, 15000000000) * 15000'), 'timestamp_ms'])
.from('metrics_15s')
.from(`metrics_15s${_dist}`)
.where(
new Sql.And(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)),
Expand Down Expand Up @@ -90,8 +131,6 @@ module.exports.getData = async (matchers, fromMs, toMs) => {
).groupBy('raw.fingerprint')
.orderBy('raw.fingerprint')

const db = DATABASE_NAME()
console.log('!!!!!!!!!!! ' + res.toString())
const data = await rawRequest(res.toString() + ' FORMAT RowBinary',
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
Expand Down
58 changes: 48 additions & 10 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,25 @@ func pqlInstantQuery(id uint32, timeMS float64) uint32 {
})
}

func pql(c *ctx, query func() (promql.Query, error)) uint32 {
rq, err := query()

//export pqlSeries
func pqlSeries(id uint32) uint32 {
queriable := &TestQueryable{id: id}
query, err := eng.NewRangeQuery(
queriable,
nil,
string(data[id].request),
time.Unix(0, 1),
time.Unix(0, 2),
time.Second)
if err != nil {
c.response = []byte(fmt.Sprintf(`{"status":"error", "error":%s}`, strconv.Quote(err.Error())))
data[id].response = wrapError(err)
return 1
}
data[id].response = []byte(getmatchersJSON(query))
return 0
}

func getmatchersJSON(q promql.Query) string {
var matchersJson = strings.Builder{}
var walk func(node parser.Node, i func(node parser.Node))
walk = func(node parser.Node, i func(node parser.Node)) {
Expand All @@ -119,7 +131,7 @@ func pql(c *ctx, query func() (promql.Query, error)) uint32 {
}
i := 0
matchersJson.WriteString("[")
walk(rq.Statement(), func(node parser.Node) {
walk(q.Statement(), func(node parser.Node) {
switch n := node.(type) {
case *parser.VectorSelector:
if i != 0 {
Expand All @@ -130,8 +142,34 @@ func pql(c *ctx, query func() (promql.Query, error)) uint32 {
}
})
matchersJson.WriteString("]")
return matchersJson.String()
}

func wrapError(err error) []byte {
return []byte(wrapErrorStr(err))
}

func wrapErrorStr(err error) string {
return fmt.Sprintf(`{"status":"error", "error":%s}`, strconv.Quote(err.Error()))
}

func pql(c *ctx, query func() (promql.Query, error)) uint32 {
rq, err := query()

if err != nil {
c.response = wrapError(err)
return 1
}
var walk func(node parser.Node, i func(node parser.Node))
walk = func(node parser.Node, i func(node parser.Node)) {
i(node)
for _, n := range parser.Children(node) {
walk(n, i)
}
}
matchersJSON := getmatchersJSON(rq)

c.response = []byte(matchersJson.String())
c.response = []byte(matchersJSON)
c.onDataLoad = func(c *ctx) {
res := rq.Exec(context.Background())
c.response = []byte(writeResponse(res))
Expand All @@ -147,23 +185,23 @@ func onDataLoad(idx uint32) {

func writeResponse(res *promql.Result) string {
if res.Err != nil {
return fmt.Sprintf(`{"status": "error", "error": %s}`, strconv.Quote(res.Err.Error()))
return wrapErrorStr(res.Err)
}
switch res.Value.Type() {
case parser.ValueTypeMatrix:
m, err := res.Matrix()
if err != nil {
return fmt.Sprintf(`{"status": "error", "error": %s}`, strconv.Quote(err.Error()))
return wrapErrorStr(err)
}
return writeMatrix(m)
case parser.ValueTypeVector:
v, err := res.Vector()
if err != nil {
return fmt.Sprintf(`{"status": "error", "error": %s}`, strconv.Quote(err.Error()))
return wrapErrorStr(err)
}
return writeVector(v)
}
return `{"status": "error", "error": "result type not supported"}`
return wrapErrorStr(fmt.Errorf("result type not supported"))
}

func writeMatrix(m promql.Matrix) string {
Expand Down
20 changes: 20 additions & 0 deletions wasm_parts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
(matchers) => getData(matchers, time - 300000, time))
}

module.exports.pqlMatchers = (query) => {
const _wasm = wasm
const id = counter
counter = (counter + 1) & 0xFFFFFFFF
const ctx = new Ctx(id, _wasm)
ctx.create()
try {
ctx.write(query)
const res1 = _wasm.exports.pqlSeries(id)
if (res1 !== 0) {
throw new Error('pql failed: ', ctx.read())
}
/** @type {[[[string]]]} */
const matchersObj = JSON.parse(ctx.read())
return matchersObj
} finally {
ctx.destroy()
}
}

/**
*
* @param query {string}
Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.

0 comments on commit 31de334

Please sign in to comment.