Skip to content

Commit

Permalink
#feat: prom support
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 13, 2023
1 parent 7f337ed commit 37fc338
Show file tree
Hide file tree
Showing 15 changed files with 3,367 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ node_modules
/test/e2e/
/lib/influx/.idea/
/lib/influx/influx.iml
/wasm_parts/_vendor.zip
/wasm_parts/.idea/
/wasm_parts/vendor/
/wasm_parts/main.wasm
/wasm_parts/wasm_parts.iml
5 changes: 3 additions & 2 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -1344,15 +1344,16 @@ const samplesReadTable = {
* @param query {string}
* @param data {string | Buffer | Uint8Array}
* @param database {string}
* @param config {Object?}
* @returns {Promise<AxiosResponse<any>>}
*/
const rawRequest = (query, data, database) => {
const rawRequest = (query, data, database, config) => {
const getParams = [
(database ? `database=${encodeURIComponent(database)}` : null),
(data ? `query=${encodeURIComponent(query)}` : null)
].filter(p => p)
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
return axios.post(url, data || query)
return axios.post(url, data || query, config)
}

/**
Expand Down
2 changes: 0 additions & 2 deletions lib/handlers/prom_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ async function handler (req, res) {
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
finger = fingerPrint(strJson)
req.log.debug({ labels: stream.labels, finger }, 'LABELS FINGERPRINT')
labels.add(finger.toString(), stream.labels)

const dates = {}
if (stream.samples) {
stream.samples.forEach(function (entry) {
req.log.debug({ entry, finger }, 'BULK ROW')
if (
!entry &&
!entry.timestamp &&
Expand Down
31 changes: 9 additions & 22 deletions lib/handlers/prom_query.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/* Emulated PromQL Query Handler */

const { p2l } = require('@qxip/promql2logql');
const { asyncLogError, CORS } = require('../../common')
const { instantQueryScan } = require('../db/clickhouse')
const { instantQuery } = require('../../promql')
const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed
const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed
const exec = (val) => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, val]}}`; // to be removed


async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query')
const resp = {
Expand All @@ -25,31 +23,20 @@ async function handler (req, res) {
}
if (req.query.query === '1+1') {
return res.status(200).send(test())
}
else if (!isNaN(parseInt(req.query.query))) {
} else if (!isNaN(parseInt(req.query.query))) {
return res.status(200).send(exec(parseInt(req.query.query)))
}
/* remove newlines */
req.query.query = req.query.query.replace(/\n/g, ' ')
req.query.time = req.query.time ? parseInt(req.query.time) * 1000 : Date.now()
/* transpile to logql */
try {
req.query.query = p2l(req.query.query)
} catch(e) {
asyncLogError({ e }, req.log)
return res.send(empty)
}
/* scan fingerprints */
/* TODO: handle time tag + direction + limit to govern the query output */
try {
const response = await instantQueryScan(
req.query
)
res.code(200)
res.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
const response = await instantQuery(req.query.query, req.query.time)
return res.code(200)
.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
}).send(response)
} catch (err) {
asyncLogError(err, req.log)
return res.send(empty)
Expand Down
49 changes: 8 additions & 41 deletions lib/handlers/prom_query_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,17 @@
regexp: a regex to filter the returned results, will eventually be rolled into the query language
*/

const { p2l } = require('@qxip/promql2logql')
const { asyncLogError, CORS } = require('../../common')
const { scanFingerprints } = require('../db/clickhouse')
const { rangeQuery } = require('../../promql/index')

async function handler (req, res) {
req.log.debug('GET /api/v1/query_range')
const resp = {
status: "success",
data: {
resultType: "vector",
result: []
}
}
if (req.method === 'POST') {
req.query = req.body
}
if (!req.query.query) {
return res.send(resp)
}
/* remove newlines */
req.query.query = req.query.query.replace(/\n/g, ' ')
if (!req.query.query) {
return res.code(400).send('invalid query')
}
// Convert PromQL to LogQL and execute
try {
req.query.query = p2l(req.query.query)
const response = await scanFingerprints(
{
...req.query,
start: parseInt(req.query.start) * 1e9,
end: parseInt(req.query.end) * 1e9
}
)
res.code(200)
res.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
} catch (err) {
asyncLogError(err, req.log)
return res.send(resp)
}
const startMs = parseInt(req.query.start) * 1000 || Date.now() - 60000
const endMs = parseInt(req.query.end) * 1000 || Date.now()
const stepMs = parseInt(req.query.step) * 1000 || 15000
const query = req.query.query
const result = await rangeQuery(query, startMs, endMs, stepMs)
console.log(JSON.stringify(result))
return res.code(200).send(result)
}

module.exports = handler
100 changes: 100 additions & 0 deletions promql/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
const Sql = require('@cloki/clickhouse-sql')
const prometheus = require('../wasm_parts/main')
const { rawRequest } = require('../lib/db/clickhouse')
const { DATABASE_NAME } = require('../lib/utils')
const { clusterName } = require('../common')
const _dist = clusterName ? '_dist' : ''
/**
*
* @param query {string}
* @param startMs {number}
* @param endMs {number}
* @param stepMs {number}
*/
module.exports.rangeQuery = async (query, startMs, endMs, stepMs) => {
const resp = await prometheus.pqlRangeQuery(query, startMs, endMs, stepMs, module.exports.getData)
return JSON.parse(resp)
}

module.exports.instantQuery = async (query, timeMs) => {
const resp = await prometheus.pqlInstantQuery(query, timeMs, module.exports.getData)
return JSON.parse(resp)
}

module.exports.getData = async (matchers, fromMs, toMs) => {
const matches = []
for (const matcher of matchers) {
const _matcher = [
Sql.Eq('key', matcher[0])
]
switch (matcher[1]) {
case '=':
_matcher.push(Sql.Eq('val', matcher[2]))
break
case '!=':
_matcher.push(Sql.Ne('val', matcher[2]))
break
case '=~':
_matcher.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
break
case '!~':
_matcher.push(Sql.Ne(Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
matches.push(Sql.And(..._matcher))
}

const idx = (new Sql.Select())
.select('fingerprint')
.from('time_series_gin')
.where(Sql.Or(...matches))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + matches.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << matches.length) - 1)
).groupBy('fingerprint')
const withIdx = new Sql.With('idx', idx, false)
const raw = (new Sql.Select())
.with(withIdx)
.select(
[new Sql.Raw('argMaxMerge(last)'), 'value'],
'fingerprint',
[new Sql.Raw('intDiv(timestamp_ns, 15000000000) * 15000'), 'timestamp_ms'])
.from('metrics_15s')
.where(
new Sql.And(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)),
Sql.Gte('timestamp_ns', new Sql.Raw(`${fromMs}000000`)),
Sql.Lte('timestamp_ns', new Sql.Raw(`${toMs}000000`))
)
).groupBy('fingerprint', 'timestamp_ms')
.orderBy('fingerprint', 'timestamp_ms')
const timeSeries = (new Sql.Select())
.select(
'fingerprint',
[new Sql.Raw('arraySort(JSONExtractKeysAndValues(labels, \'String\'))'), 'labels']
).from('time_series')
.where(new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)))
const withRaw = new Sql.With('raw', raw, false)
const withTimeSeries = new Sql.With('timeSeries', timeSeries, false)
const res = (new Sql.Select())
.with(withRaw, withTimeSeries)
.select(
[new Sql.Raw('any(labels)'), 'stream'],
[new Sql.Raw('arraySort(groupArray((raw.timestamp_ms, raw.value)))'), 'values']
).from([new Sql.WithReference(withRaw), 'raw'])
.join(
[new Sql.WithReference(withTimeSeries), 'time_series'],
'any left',
Sql.Eq('time_series.fingerprint', new Sql.Raw('raw.fingerprint'))
).groupBy('raw.fingerprint')
.orderBy('raw.fingerprint')

const db = DATABASE_NAME()
console.log('!!!!!!!!!!! ' + res.toString())
const data = await rawRequest(res.toString() + ' FORMAT RowBinary',
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
}

prometheus.getData = module.exports.getData
7 changes: 7 additions & 0 deletions promql/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const { request } = require('./index');

(async () => {
await new Promise((resolve => setTimeout(resolve, 1000)));
const res = await request('test{}', Date.now() - 300000, Date.now(), 15000)
console.log(res)
})()
65 changes: 65 additions & 0 deletions wasm_parts/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module wasm_parts

replace (
cloud.google.com/go v0.65.0 => cloud.google.com/go v0.102.1
github.com/InfluxCommunity/influxdb3-go v0.2.0 => github.com/akvlad/influxdb3-go v0.0.1
github.com/docker/distribution v2.7.1+incompatible => github.com/docker/distribution v2.8.0+incompatible
k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.22.1
github.com/json-iterator/go v1.1.12 => ./json.iterator
)

require github.com/prometheus/prometheus v1.8.2-0.20220714142409-b41e0750abf5

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aws/aws-sdk-go v1.44.45 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/stretchr/testify v1.8.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/oauth2 v0.0.0-20220628200809-02e64fa58f26 // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.20
Loading

0 comments on commit 37fc338

Please sign in to comment.