Skip to content

Commit

Permalink
use pako for gzip; ADVANCED_SERIES_REQUEST_LIMIT to limit /series res…
Browse files Browse the repository at this point in the history
…ponse
  • Loading branch information
akvlad committed Aug 30, 2024
1 parent c4d58bc commit 55a9240
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 30 deletions.
69 changes: 42 additions & 27 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -1110,43 +1110,58 @@ const scanClickhouse = function (settings, client, params) {
*/
const getSeries = async (matches) => {
const query = transpiler.transpileSeries(matches)
const stream = await axios.post(`${getClickhouseUrl()}`, query + ' FORMAT JSONEachRow', {
const stream = await rawRequest(query + ' FORMAT JSONEachRow', null, DATABASE_NAME(), {
responseType: 'stream'
})
const dStream = StringStream.from(stream.data).lines().map(l => {
if (!l) {
return null
}
try {
return JSON.parse(l)
} catch (err) {
logger.error({ line: l, err }, 'Error parsing line')
return null
}
}, DataStream).filter(e => e)
const res = new Transform({
transform (chunk, encoding, callback) {
callback(null, chunk)
}
})
setTimeout(async () => {
const gen = dStream.toGenerator()
res.write('{"status":"success", "data":[', 'utf-8')
let i = 0
try {
for await (const item of gen()) {
if (!item || !item.labels) {
continue
res.write('{"status":"success", "data":[', 'utf-8')
let lastString = ''
let i = 0
let lastData = 0
let open = true
stream.data.on('data', (chunk) => {
lastData = Date.now()
const strChunk = Buffer.from(chunk).toString('utf-8')
const lines = (lastString + strChunk).split('\n')
lastString = lines.pop()
lines.forEach(line => {
if (!line) {
return
}
try {
const obj = JSON.parse(line)
if (obj.labels) {
res.write((i === 0 ? '' : ',') + obj.labels)
++i
}
res.write((i === 0 ? '' : ',') + item.labels)
++i
} catch (err) {
logger.error({ line: line, err }, 'Error parsing line')
}
} catch (e) {
logger.error(e)
} finally {
res.end(']}', 'utf-8')
})
})
const close = () => {
if (lastString) {
res.write((i === 0 ? '' : ',') + lastString)
}
}, 0)
res.end(']}')
open = false
}
const maybeClose = () => {
if (open && Date.now() - lastData >= 10000) {
close()
}
if (open && Date.now() - lastData < 10000) {
setTimeout(maybeClose, 10000)
}
}
setTimeout(maybeClose, 10000)
stream.data.on('end', close)
stream.data.on('error', close)
stream.data.on('finish', close)
return res
}

Expand Down
6 changes: 5 additions & 1 deletion lib/handlers/label_values.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ async function handler (req, res) {
`type IN (${types.map(t => `${t}`).join(',')})`
].filter(w => w)
where = `WHERE ${where.join(' AND ')}`
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON`
let limit = ''
if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) {
limit = `LIMIT ${process.env.ADVANCED_SERIES_REQUEST_LIMIT}`
}
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} ${limit} FORMAT JSON`
const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME())
const resp = { status: 'success', data: allValues.data.data.map(r => r.val) }
return res.send(resp)
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"basic-auth": "^2.0.1",
"google-protobuf": "^3.21.2",
"@grpc/grpc-js": "^1.10.6",
"@grpc/proto-loader": "^0.7.12"
"@grpc/proto-loader": "^0.7.12",
"pako": "^2.1.0"
},
"devDependencies": {
"@elastic/elasticsearch": "^8.5.0",
Expand Down
3 changes: 3 additions & 0 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ module.exports.transpileSeries = (request) => {
const _query = getQuery(req)
query.withs.idx_sel.query.sqls.push(_query.withs.idx_sel.query)
}
if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) {
query.limit(process.env.ADVANCED_SERIES_REQUEST_LIMIT)
}
setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series${dist}`)
setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}${dist}`)
// logger.debug(query.toString())
Expand Down
34 changes: 33 additions & 1 deletion qryn_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* (C) 2018-2024 QXIP BV
*/
const { boolEnv, readerMode, writerMode } = require('./common')
const { Duplex } = require('stream')

this.readonly = boolEnv('READONLY')
this.http_user = process.env.QRYN_LOGIN || process.env.CLOKI_LOGIN || undefined
Expand Down Expand Up @@ -54,6 +55,7 @@ this.pushOTLP = DATABASE.pushOTLP
this.queryTempoTags = DATABASE.queryTempoTags
this.queryTempoValues = DATABASE.queryTempoValues
let profiler = null
const pako = require('pako')

const {
shaper,
Expand Down Expand Up @@ -121,7 +123,37 @@ let fastify = require('fastify')({
})
done()
}))
await fastify.register(require('@fastify/compress'))
await fastify.register(require('@fastify/compress'), {
encodings: ['gzip'],
zlib: {
createGzip: () => {
const deflator = new pako.Deflate({ gzip: true })
let lastChunk = null
const res = new Duplex({
write: (chunk, encoding, next) => {
lastChunk && deflator.push(lastChunk)
lastChunk = chunk
next()
},
read: function (size) {
},
final (callback) {
deflator.onEnd = async () => {
res.push(null)
callback(null)
}
!lastChunk && callback()
lastChunk && deflator.push(lastChunk, true)
},
emitClose: true
})
deflator.onData = (chunk) => {
res.push(chunk)
}
return res
}
}
})
await fastify.register(require('@fastify/url-data'))
await fastify.register(require('@fastify/websocket'))

Expand Down

0 comments on commit 55a9240

Please sign in to comment.