diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index f934708f..5d9bb99c 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -1110,43 +1110,58 @@ const scanClickhouse = function (settings, client, params) { */ const getSeries = async (matches) => { const query = transpiler.transpileSeries(matches) - const stream = await axios.post(`${getClickhouseUrl()}`, query + ' FORMAT JSONEachRow', { + const stream = await rawRequest(query + ' FORMAT JSONEachRow', null, DATABASE_NAME(), { responseType: 'stream' }) - const dStream = StringStream.from(stream.data).lines().map(l => { - if (!l) { - return null - } - try { - return JSON.parse(l) - } catch (err) { - logger.error({ line: l, err }, 'Error parsing line') - return null - } - }, DataStream).filter(e => e) const res = new Transform({ transform (chunk, encoding, callback) { callback(null, chunk) } }) - setTimeout(async () => { - const gen = dStream.toGenerator() - res.write('{"status":"success", "data":[', 'utf-8') - let i = 0 - try { - for await (const item of gen()) { - if (!item || !item.labels) { - continue + res.write('{"status":"success", "data":[', 'utf-8') + let lastString = '' + let i = 0 + let lastData = 0 + let open = true + stream.data.on('data', (chunk) => { + lastData = Date.now() + const strChunk = Buffer.from(chunk).toString('utf-8') + const lines = (lastString + strChunk).split('\n') + lastString = lines.pop() + lines.forEach(line => { + if (!line) { + return + } + try { + const obj = JSON.parse(line) + if (obj.labels) { + res.write((i === 0 ? '' : ',') + obj.labels) + ++i } - res.write((i === 0 ? '' : ',') + item.labels) - ++i + } catch (err) { + logger.error({ line: line, err }, 'Error parsing line') } - } catch (e) { - logger.error(e) - } finally { - res.end(']}', 'utf-8') + }) + }) + const close = () => { + if (lastString) { + res.write((i === 0 ? '' : ',') + lastString) } - }, 0) + res.end(']}') + open = false + } + const maybeClose = () => { + if (open && Date.now() - lastData >= 10000) { + close() + } + if (open && Date.now() - lastData < 10000) { + setTimeout(maybeClose, 10000) + } + } + setTimeout(maybeClose, 10000) + stream.data.on('end', close) + stream.data.on('error', close) + stream.data.on('finish', close) return res } diff --git a/lib/handlers/label_values.js b/lib/handlers/label_values.js index 280c8512..885b66a1 100644 --- a/lib/handlers/label_values.js +++ b/lib/handlers/label_values.js @@ -27,7 +27,11 @@ async function handler (req, res) { `type IN (${types.map(t => `${t}`).join(',')})` ].filter(w => w) where = `WHERE ${where.join(' AND ')}` - const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON` + let limit = '' + if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) { + limit = `LIMIT ${process.env.ADVANCED_SERIES_REQUEST_LIMIT}` + } + const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} ${limit} FORMAT JSON` const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME()) const resp = { status: 'success', data: allValues.data.data.map(r => r.val) } return res.send(resp) diff --git a/package.json b/package.json index b41449fd..1a7a51c8 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,8 @@ "basic-auth": "^2.0.1", "google-protobuf": "^3.21.2", "@grpc/grpc-js": "^1.10.6", - "@grpc/proto-loader": "^0.7.12" + "@grpc/proto-loader": "^0.7.12", + "pako": "^2.1.0" }, "devDependencies": { "@elastic/elasticsearch": "^8.5.0", diff --git a/parser/transpiler.js b/parser/transpiler.js index 1fea3806..afda33ac 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -442,6 +442,9 @@ module.exports.transpileSeries = (request) => { const _query = getQuery(req) query.withs.idx_sel.query.sqls.push(_query.withs.idx_sel.query) } + if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) { + query.limit(process.env.ADVANCED_SERIES_REQUEST_LIMIT) + } setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series${dist}`) setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}${dist}`) // logger.debug(query.toString()) diff --git a/qryn_node.js b/qryn_node.js index a42c1560..38e48349 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -5,6 +5,7 @@ * (C) 2018-2024 QXIP BV */ const { boolEnv, readerMode, writerMode } = require('./common') +const { Duplex } = require('stream') this.readonly = boolEnv('READONLY') this.http_user = process.env.QRYN_LOGIN || process.env.CLOKI_LOGIN || undefined @@ -54,6 +55,7 @@ this.pushOTLP = DATABASE.pushOTLP this.queryTempoTags = DATABASE.queryTempoTags this.queryTempoValues = DATABASE.queryTempoValues let profiler = null +const pako = require('pako') const { shaper, @@ -121,7 +123,37 @@ let fastify = require('fastify')({ }) done() })) - await fastify.register(require('@fastify/compress')) + await fastify.register(require('@fastify/compress'), { + encodings: ['gzip'], + zlib: { + createGzip: () => { + const deflator = new pako.Deflate({ gzip: true }) + let lastChunk = null + const res = new Duplex({ + write: (chunk, encoding, next) => { + lastChunk && deflator.push(lastChunk) + lastChunk = chunk + next() + }, + read: function (size) { + }, + final (callback) { + deflator.onEnd = async () => { + res.push(null) + callback(null) + } + !lastChunk && callback() + lastChunk && deflator.push(lastChunk, true) + }, + emitClose: true + }) + deflator.onData = (chunk) => { + res.push(chunk) + } + return res + } + } + }) await fastify.register(require('@fastify/url-data')) await fastify.register(require('@fastify/websocket'))