diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index f934708f..5d9bb99c 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -1110,43 +1110,58 @@ const scanClickhouse = function (settings, client, params) { */ const getSeries = async (matches) => { const query = transpiler.transpileSeries(matches) - const stream = await axios.post(`${getClickhouseUrl()}`, query + ' FORMAT JSONEachRow', { + const stream = await rawRequest(query + ' FORMAT JSONEachRow', null, DATABASE_NAME(), { responseType: 'stream' }) - const dStream = StringStream.from(stream.data).lines().map(l => { - if (!l) { - return null - } - try { - return JSON.parse(l) - } catch (err) { - logger.error({ line: l, err }, 'Error parsing line') - return null - } - }, DataStream).filter(e => e) const res = new Transform({ transform (chunk, encoding, callback) { callback(null, chunk) } }) - setTimeout(async () => { - const gen = dStream.toGenerator() - res.write('{"status":"success", "data":[', 'utf-8') - let i = 0 - try { - for await (const item of gen()) { - if (!item || !item.labels) { - continue + res.write('{"status":"success", "data":[', 'utf-8') + let lastString = '' + let i = 0 + let lastData = 0 + let open = true + stream.data.on('data', (chunk) => { + lastData = Date.now() + const strChunk = Buffer.from(chunk).toString('utf-8') + const lines = (lastString + strChunk).split('\n') + lastString = lines.pop() + lines.forEach(line => { + if (!line) { + return + } + try { + const obj = JSON.parse(line) + if (obj.labels) { + res.write((i === 0 ? '' : ',') + obj.labels) + ++i } - res.write((i === 0 ? '' : ',') + item.labels) - ++i + } catch (err) { + logger.error({ line: line, err }, 'Error parsing line') } - } catch (e) { - logger.error(e) - } finally { - res.end(']}', 'utf-8') + }) + }) + const close = () => { + if (lastString) { + res.write((i === 0 ? '' : ',') + lastString) } - }, 0) + res.end(']}') + open = false + } + const maybeClose = () => { + if (open && Date.now() - lastData >= 10000) { + close() + } + if (open && Date.now() - lastData < 10000) { + setTimeout(maybeClose, 10000) + } + } + setTimeout(maybeClose, 10000) + stream.data.on('end', close) + stream.data.on('error', close) + stream.data.on('finish', close) return res } diff --git a/lib/handlers/label_values.js b/lib/handlers/label_values.js index 280c8512..885b66a1 100644 --- a/lib/handlers/label_values.js +++ b/lib/handlers/label_values.js @@ -27,7 +27,11 @@ async function handler (req, res) { `type IN (${types.map(t => `${t}`).join(',')})` ].filter(w => w) where = `WHERE ${where.join(' AND ')}` - const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON` + let limit = '' + if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) { + limit = `LIMIT ${process.env.ADVANCED_SERIES_REQUEST_LIMIT}` + } + const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} ${limit} FORMAT JSON` const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME()) const resp = { status: 'success', data: allValues.data.data.map(r => r.val) } return res.send(resp) diff --git a/package-lock.json b/package-lock.json index faf55487..75bd57f4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43,6 +43,7 @@ "jsonic": "^1.0.1", "logfmt": "^1.3.2", "node-gzip": "^1.1.2", + "pako": "^2.1.0", "patch-package": "^6.4.7", "pino": "^7.6.5", "plugnplay": "npm:@qxip/plugnplay@^3.3.1", @@ -10049,6 +10050,11 @@ "node": ">=6" } }, + "node_modules/pako": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/pako/-/pako-2.1.0.tgz", + "integrity": "sha512-w+eufiZ1WuJYgPXbV/PO3NCMEc3xqylkKHzp8bxp1uW4qaSNQUkwmLLEc3kKsfz8lpV1F8Ht3U1Cm+9Srog2ug==" + }, "node_modules/papaparse": { "version": "5.3.1", "resolved": "https://registry.npmjs.org/papaparse/-/papaparse-5.3.1.tgz", diff --git a/package.json b/package.json index b41449fd..1a7a51c8 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,8 @@ "basic-auth": "^2.0.1", "google-protobuf": "^3.21.2", "@grpc/grpc-js": "^1.10.6", - "@grpc/proto-loader": "^0.7.12" + "@grpc/proto-loader": "^0.7.12", + "pako": "^2.1.0" }, "devDependencies": { "@elastic/elasticsearch": "^8.5.0", diff --git a/parser/transpiler.js b/parser/transpiler.js index 1fea3806..afda33ac 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -442,6 +442,9 @@ module.exports.transpileSeries = (request) => { const _query = getQuery(req) query.withs.idx_sel.query.sqls.push(_query.withs.idx_sel.query) } + if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) { + query.limit(process.env.ADVANCED_SERIES_REQUEST_LIMIT) + } setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series${dist}`) setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}${dist}`) // logger.debug(query.toString()) diff --git a/pyroscope/json_parsers.js b/pyroscope/json_parsers.js index 48d8e27a..a1b8ed23 100644 --- a/pyroscope/json_parsers.js +++ b/pyroscope/json_parsers.js @@ -37,6 +37,18 @@ const labelNames = async (req, payload) => { } } +const labelValues = async (req, payload) => { + req.type = 'json' + let body = await bufferize(payload) + body = JSON.parse(body.toString()) + return { + getName: () => body.name, + getMatchers: () => body.matchers, + getStart: () => body.start, + getEnd: () => body.end + } +} + const analyzeQuery = async (req, payload) => { req.type = 'json' let body = await bufferize(payload) @@ -52,6 +64,7 @@ module.exports = { series, getProfileStats, labelNames, + labelValues, settingsGet, analyzeQuery } diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts index ccbddd41..39e35601 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts @@ -26,10 +26,15 @@ export function diff_tree(id1: number, id2: number, sample_type: string): Uint8A */ export function export_tree(id: number, sample_type: string): Uint8Array; /** +* @param {number} id * @param {Uint8Array} payload +*/ +export function merge_trees_pprof(id: number, payload: Uint8Array): void; +/** +* @param {number} id * @returns {Uint8Array} */ -export function export_trees_pprof(payload: Uint8Array): Uint8Array; +export function export_trees_pprof(id: number): Uint8Array; /** * @param {number} id */ diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.js b/pyroscope/pprof-bin/pkg/pprof_bin.js index 25da605f..e9a9781b 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.js +++ b/pyroscope/pprof-bin/pkg/pprof_bin.js @@ -177,20 +177,28 @@ module.exports.export_tree = function(id, sample_type) { }; /** +* @param {number} id * @param {Uint8Array} payload +*/ +module.exports.merge_trees_pprof = function(id, payload) { + const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc); + const len0 = WASM_VECTOR_LEN; + wasm.merge_trees_pprof(id, ptr0, len0); +}; + +/** +* @param {number} id * @returns {Uint8Array} */ -module.exports.export_trees_pprof = function(payload) { +module.exports.export_trees_pprof = function(id) { try { const retptr = wasm.__wbindgen_add_to_stack_pointer(-16); - const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc); - const len0 = WASM_VECTOR_LEN; - wasm.export_trees_pprof(retptr, ptr0, len0); + wasm.export_trees_pprof(retptr, id); var r0 = getInt32Memory0()[retptr / 4 + 0]; var r1 = getInt32Memory0()[retptr / 4 + 1]; - var v2 = getArrayU8FromWasm0(r0, r1).slice(); + var v1 = getArrayU8FromWasm0(r0, r1).slice(); wasm.__wbindgen_free(r0, r1 * 1, 1); - return v2; + return v1; } finally { wasm.__wbindgen_add_to_stack_pointer(16); } diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm index 6380006b..930457c9 100644 Binary files a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm and b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm differ diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts index 6dc10bc2..8947ed29 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts @@ -5,7 +5,8 @@ export function merge_prof(a: number, b: number, c: number, d: number, e: number export function merge_tree(a: number, b: number, c: number, d: number, e: number): void; export function diff_tree(a: number, b: number, c: number, d: number, e: number): void; export function export_tree(a: number, b: number, c: number, d: number): void; -export function export_trees_pprof(a: number, b: number, c: number): void; +export function merge_trees_pprof(a: number, b: number, c: number): void; +export function export_trees_pprof(a: number, b: number): void; export function drop_tree(a: number): void; export function init_panic_hook(): void; export function __wbindgen_malloc(a: number, b: number): number; diff --git a/pyroscope/pprof-bin/src/lib.rs b/pyroscope/pprof-bin/src/lib.rs index c07523a7..a22a99ab 100644 --- a/pyroscope/pprof-bin/src/lib.rs +++ b/pyroscope/pprof-bin/src/lib.rs @@ -77,6 +77,7 @@ struct Tree { sample_types: Vec, max_self: Vec, nodes_num: i32, + pprof: Profile, } impl Tree { @@ -357,6 +358,7 @@ fn upsert_tree(ctx: &mut HashMap>, id: u32, sample_types: Vec usize { let res = read_uleb128(&self.bytes[self.offs..]); self.offs += res.1; - res.0 + res.0.clone() } fn read_string(&mut self) -> String { @@ -423,6 +425,24 @@ impl TrieReader { } res } + fn read_blob(&mut self) -> &[u8] { + let size = self.read_size(); + let string = &self.bytes[self.offs..self.offs + size]; + self.offs += size; + string + } + fn read_blob_list(&mut self) -> Vec<&[u8]> { + let mut res = Vec::new(); + while self.offs < self.bytes.len() { + let uleb = read_uleb128(&self.bytes[self.offs..]); + self.offs += uleb.1; + let _size = uleb.0; + let string = &self.bytes[self.offs..self.offs + _size]; + self.offs += _size; + res.push(string); + } + res + } /*fn end(&self) -> bool { self.offs >= self.bytes.len() }*/ @@ -917,11 +937,15 @@ pub fn export_tree(id: u32, sample_type: String) -> Vec { } #[wasm_bindgen] -pub fn export_trees_pprof(payload: &[u8]) -> Vec { +pub fn merge_trees_pprof(id: u32, payload: &[u8]) { let p = panic::catch_unwind(|| { + let mut ctx = CTX.lock().unwrap(); + upsert_tree(&mut ctx, id, vec![]); + let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap(); let mut reader = TrieReader::new(payload); - let bin_profs = reader.read_blob_vec(); + let bin_profs = reader.read_blob_list(); let mut merger = merge::ProfileMerge::new(); + merger.merge(&mut tree.pprof); for bin_prof in bin_profs { if bin_prof.len() >= 2 && bin_prof[0] == 0x1f && bin_prof[1] == 0x8b { let mut decompressed = Vec::new(); @@ -936,14 +960,22 @@ pub fn export_trees_pprof(payload: &[u8]) -> Vec { } let res = merger.profile(); - res.encode_to_vec() + tree.pprof = res; }); match p { - Ok(res) => return res, + Ok(_) => {} Err(err) => panic!("{:?}", err), } } +#[wasm_bindgen] +pub fn export_trees_pprof(id: u32) -> Vec { + let mut ctx = CTX.lock().unwrap(); + upsert_tree(&mut ctx, id, vec![]); + let tree = ctx.get_mut(&id).unwrap().lock().unwrap(); + tree.pprof.encode_to_vec() +} + #[wasm_bindgen] pub fn drop_tree(id: u32) { let mut ctx = CTX.lock().unwrap(); diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index 57609adc..b47e5dde 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -19,19 +19,19 @@ const { HISTORY_TIMESPAN } = require('./shared') const settings = require('./settings') -const { mergeStackTraces } = require('./merge_stack_traces') +const { mergeStackTraces, newCtxIdx } = require('./merge_stack_traces') const { selectSeriesImpl } = require('./select_series') const render = require('./render') const profileTypesHandler = async (req, res) => { const dist = clusterName ? '_dist' : '' const _res = new messages.ProfileTypesResponse() - const fromTimeSec = req.body && req.body.getStart + const fromTimeSec = Math.floor(req.body && req.body.getStart ? parseInt(req.body.getStart()) / 1000 - : (Date.now() - HISTORY_TIMESPAN) / 1000 - const toTimeSec = req.body && req.body.getEnd + : (Date.now() - HISTORY_TIMESPAN) / 1000) + const toTimeSec = Math.floor(req.body && req.body.getEnd ? parseInt(req.body.getEnd()) / 1000 - : Date.now() / 1000 + : Date.now() / 1000) const profileTypes = await clickhouse.rawRequest(`SELECT DISTINCT type_id, sample_type_unit FROM profiles_series${dist} ARRAY JOIN sample_types_units as sample_type_unit WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`, @@ -54,12 +54,12 @@ WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDa const labelNames = async (req, res) => { const dist = clusterName ? '_dist' : '' - const fromTimeSec = req.body && req.body.getStart + const fromTimeSec = Math.floor(req.body && req.body.getStart ? parseInt(req.body.getStart()) / 1000 - : (Date.now() - HISTORY_TIMESPAN) / 1000 - const toTimeSec = req.body && req.body.getEnd + : (Date.now() - HISTORY_TIMESPAN) / 1000) + const toTimeSec = Math.floor(req.body && req.body.getEnd ? parseInt(req.body.getEnd()) / 1000 - : Date.now() / 1000 + : Date.now() / 1000) const labelNames = await clickhouse.rawRequest(`SELECT DISTINCT key FROM profiles_series_keys${dist} WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`, @@ -74,12 +74,12 @@ const labelValues = async (req, res) => { const name = req.body && req.body.getName ? req.body.getName() : '' - const fromTimeSec = req.body && req.body.getStart && req.body.getStart() + const fromTimeSec = Math.floor(req.body && req.body.getStart && req.body.getStart() ? parseInt(req.body.getStart()) / 1000 - : (Date.now() - HISTORY_TIMESPAN) / 1000 - const toTimeSec = req.body && req.body.getEnd && req.body.getEnd() + : (Date.now() - HISTORY_TIMESPAN) / 1000) + const toTimeSec = Math.floor(req.body && req.body.getEnd && req.body.getEnd() ? parseInt(req.body.getEnd()) / 1000 - : Date.now() / 1000 + : Date.now() / 1000) if (!name) { throw new Error('No name provided') } @@ -113,86 +113,142 @@ const selectMergeStacktracesV2 = async (req, res) => { const selectSeries = async (req, res) => { const fromTimeSec = Math.floor(req.getStart && req.getStart() ? parseInt(req.getStart()) / 1000 - : Date.now() / 1000 - HISTORY_TIMESPAN) + : (Date.now() - HISTORY_TIMESPAN) / 1000) const toTimeSec = Math.floor(req.getEnd && req.getEnd() ? parseInt(req.getEnd()) / 1000 : Date.now() / 1000) return selectSeriesImpl(fromTimeSec, toTimeSec, req.body) } +let mergeRequestsCounter = 0 +const mergeRequestsLimit = 10 + const selectMergeProfile = async (req, res) => { - const _req = req.body - const fromTimeSec = Math.floor(req.getStart && req.getStart() - ? parseInt(req.getStart()) / 1000 - : Date.now() / 1000 - HISTORY_TIMESPAN) - const toTimeSec = Math.floor(req.getEnd && req.getEnd() - ? parseInt(req.getEnd()) / 1000 - : Date.now() / 1000) - let typeID = _req.getProfileTypeid && _req.getProfileTypeid() - if (!typeID) { - throw new QrynBadRequest('No type provided') - } - typeID = parseTypeId(typeID) - if (!typeID) { - throw new QrynBadRequest('Invalid type provided') - } - const dist = clusterName ? '_dist' : '' - // const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit - const labelSelector = _req.getLabelSelector && _req.getLabelSelector() + const ctx = newCtxIdx() + try { + const _req = req.body + const fromTimeSec = Math.floor(_req && _req.getStart + ? parseInt(_req.getStart()) / 1000 + : (Date.now() - HISTORY_TIMESPAN) / 1000) + const toTimeSec = Math.floor(_req && _req.getEnd + ? parseInt(_req.getEnd()) / 1000 + : Date.now() / 1000) + let typeID = _req.getProfileTypeid && _req.getProfileTypeid() + if (!typeID) { + throw new QrynBadRequest('No type provided') + } + typeID = parseTypeId(typeID) + if (!typeID) { + throw new QrynBadRequest('Invalid type provided') + } + const dist = clusterName ? '_dist' : '' + // const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit + const labelSelector = _req.getLabelSelector && _req.getLabelSelector() - const typeIdSelector = Sql.Eq( - 'type_id', - Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)) - const serviceNameSelector = serviceNameSelectorQuery(labelSelector) + const typeIdSelector = Sql.Eq( + 'type_id', + Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)) + const serviceNameSelector = serviceNameSelectorQuery(labelSelector) - const idxReq = (new Sql.Select()) - .select(new Sql.Raw('fingerprint')) - .from(`${DATABASE_NAME()}.profiles_series_gin`) - .where( - Sql.And( - typeIdSelector, - serviceNameSelector, - Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), - Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), - Sql.Eq( - new Sql.Raw( - `has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))` - ), - 1 + const idxReq = (new Sql.Select()) + .select(new Sql.Raw('fingerprint')) + .from(`${DATABASE_NAME()}.profiles_series_gin`) + .where( + Sql.And( + typeIdSelector, + serviceNameSelector, + Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), + Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), + Sql.Eq( + new Sql.Raw( + `has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))` + ), + 1 + ) ) ) + labelSelectorQuery(idxReq, labelSelector) + const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName)) + let mainReq = (new Sql.Select()) + .with(withIdxReq) + .select([new Sql.Raw('payload'), 'payload']) + .from([`${DATABASE_NAME()}.profiles${dist}`, 'p']) + .where(Sql.And( + new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), + Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), + Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)))) + .orderBy([new Sql.Raw('timestamp_ns'), 'DESC'], [new Sql.Raw('p.fingerprint'), 'ASC']) + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + mainReq = mainReq.limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) + } + const approxReq = (new Sql.Select()) + .select( + [new Sql.Raw('sum(length(payload))'), 'size'], + [new Sql.Raw('count()'), 'count'] + ) + .from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main']) + const approx = await clickhouse.rawRequest( + approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME() ) - labelSelectorQuery(idxReq, labelSelector) - const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName)) - const mainReq = (new Sql.Select()) - .with(withIdxReq) - .select([new Sql.Raw('groupArray(payload)'), 'payload']) - .from([`${DATABASE_NAME()}.profiles${dist}`, 'p']) - .where(Sql.And( - new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), - Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), - Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)))) + const approxData = approx.data.data[0] + logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`) + const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024 * 1024)), 1) + logger.debug(`Request is processed in: ${chunksCount} chunks`) + const chunkSize = Math.ceil(approxData.count / chunksCount) + const promises = [] + require('./pprof-bin/pkg/pprof_bin').init_panic_hook() + let processNs = BigInt(0) + const start = process.hrtime.bigint() - const profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary', - null, - DATABASE_NAME(), - { - responseType: 'arraybuffer' - }) - const binData = Uint8Array.from(profiles.data) + for (let i = 0; i < chunksCount; i++) { + promises.push((async (i) => { + // eslint-disable-next-line no-unmodified-loop-condition + while (mergeRequestsCounter >= mergeRequestsLimit) { + await (new Promise((resolve) => setTimeout(resolve, 50))) + } + logger.debug(`Processing chunk ${i}`) + mergeRequestsCounter++ + let profiles = null + try { + let end = i * chunkSize + chunkSize + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT && end > process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + end = process.env.ADVANCED_PROFILES_MERGE_LIMIT + } + mainReq.limit(end - i * chunkSize, i * chunkSize) + profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary', + null, + DATABASE_NAME(), + { + responseType: 'arraybuffer' + }) + } finally { + mergeRequestsCounter-- + } + const binData = Uint8Array.from(profiles.data) + logger.debug(`Chunk ${i} - ${binData.length} bytes`) + const start = process.hrtime.bigint() + pprofBin.merge_trees_pprof(ctx, binData) + const end = process.hrtime.bigint() + processNs += end - start + })(i)) + } + await Promise.all(promises) + const response = pprofBin.export_trees_pprof(ctx) + const end = process.hrtime.bigint() - require('./pprof-bin/pkg/pprof_bin').init_panic_hook() - const start = process.hrtime.bigint() - const response = pprofBin.export_trees_pprof(binData) - logger.debug(`Pprof export took ${process.hrtime.bigint() - start} nanoseconds`) - return res.code(200).send(Buffer.from(response)) + logger.debug(`Pprof merge took ${processNs} nanoseconds`) + logger.debug(`Pprof load + merge took ${end - start} nanoseconds`) + return res.code(200).send(Buffer.from(response)) + } finally { + pprofBin.drop_tree(ctx) + } } const series = async (req, res) => { const _req = req.body const fromTimeSec = Math.floor(req.getStart && req.getStart() ? parseInt(req.getStart()) / 1000 - : Date.now() / 1000 - HISTORY_TIMESPAN) + : (Date.now() - HISTORY_TIMESPAN) / 1000) const toTimeSec = Math.floor(req.getEnd && req.getEnd() ? parseInt(req.getEnd()) / 1000 : Date.now() / 1000) @@ -398,7 +454,7 @@ const analyzeQuery = async (req, res) => { const query = req.body.getQuery() const fromTimeSec = Math.floor(req.getStart && req.getStart() ? parseInt(req.getStart()) / 1000 - : Date.now() / 1000 - HISTORY_TIMESPAN) + : (Date.now() - HISTORY_TIMESPAN) / 1000) const toTimeSec = Math.floor(req.getEnd && req.getEnd() ? parseInt(req.getEnd()) / 1000 : Date.now() / 1000) @@ -432,6 +488,7 @@ module.exports.init = (fastify) => { series: jsonParsers.series, getProfileStats: jsonParsers.getProfileStats, labelNames: jsonParsers.labelNames, + labelValues: jsonParsers.labelValues, analyzeQuery: jsonParsers.analyzeQuery } for (const name of Object.keys(fns)) { diff --git a/pyroscope/render.js b/pyroscope/render.js index b6fc29ca..ee3a2e94 100644 --- a/pyroscope/render.js +++ b/pyroscope/render.js @@ -1,8 +1,7 @@ -const { parseTypeId } = require('./shared') +const { parseQuery } = require('./shared') const { mergeStackTraces } = require('./merge_stack_traces') const querierMessages = require('./querier_pb') const { selectSeriesImpl } = require('./select_series') -const types = require('./types/v1/types_pb') const render = async (req, res) => { const query = req.query.query @@ -52,28 +51,50 @@ const render = async (req, res) => { const [bMergeStackTrace, selectSeries] = await Promise.all(promises) const mergeStackTrace = querierMessages.SelectMergeStacktracesResponse.deserializeBinary(bMergeStackTrace) - let series = new types.Series() - if (selectSeries.getSeriesList().length === 1) { - series = selectSeries.getSeriesList()[0] + let pTimeline = null + for (const series of selectSeries.getSeriesList()) { + if (!pTimeline) { + pTimeline = timeline(series, + fromTimeSec * 1000, + toTimeSec * 1000, + timelineStep) + continue + } + const _timeline = timeline(series, + fromTimeSec * 1000, + toTimeSec * 1000, + timelineStep) + pTimeline.samples = pTimeline.samples.map((v, i) => v + _timeline.samples[i]) } const fb = toFlamebearer(mergeStackTrace.getFlamegraph(), parsedQuery.profileType) - fb.flamebearerProfileV1.timeline = timeline(series, - fromTimeSec * 1000, - toTimeSec * 1000, - timelineStep) + fb.flamebearerProfileV1.timeline = pTimeline if (groupBy.length > 0) { + const pGroupedTimelines = {} fb.flamebearerProfileV1.groups = {} - let key = '*' - series.getSeriesList().forEach((_series) => { - _series.getLabelsList().forEach((label) => { - key = label.getName() === groupBy[0] ? label.getValue() : key - }) - }) - fb.flamebearerProfileV1.groups[key] = timeline(series, - fromTimeSec * 1000, - toTimeSec * 1000, - timelineStep) + for (const series of selectSeries.getSeriesList()) { + const _key = {} + for (const label of series.getLabelsList()) { + if (groupBy.includes(label.getName())) { + _key[label.getName()] = label.getValue() + } + } + const key = '{' + Object.entries(_key).map(e => `${e[0]}=${JSON.stringify(e[1])}`) + .sort().join(', ') + '}' + if (!pGroupedTimelines[key]) { + pGroupedTimelines[key] = timeline(series, + fromTimeSec * 1000, + toTimeSec * 1000, + timelineStep) + } else { + const _timeline = timeline(series, + fromTimeSec * 1000, + toTimeSec * 1000, + timelineStep) + pGroupedTimelines[key].samples = pGroupedTimelines[key].samples.map((v, i) => v + _timeline.samples[i]) + } + } + fb.flamebearerProfileV1.groups = pGroupedTimelines } res.code(200) res.headers({ 'Content-Type': 'application/json' }) @@ -208,43 +229,6 @@ function sizeToBackfill (startMs, endMs, stepSec) { return Math.floor((endMs - startMs) / (stepSec * 1000)) } -/** - * - * @param query {string} - */ -const parseQuery = (query) => { - query = query.trim() - const match = query.match(/^([^{\s]+)\s*(\{(.*)})?$/) - if (!match) { - return null - } - const typeId = match[1] - const typeDesc = parseTypeId(typeId) - let strLabels = (match[3] || '').trim() - const labels = [] - while (strLabels && strLabels !== '' && strLabels !== '}') { - const m = strLabels.match(/^(,)?\s*([A-Za-z0-9_]+)\s*(!=|!~|=~|=)\s*("([^"\\]|\\.)*")/) - if (!m) { - throw new Error('Invalid label selector') - } - labels.push([m[2], m[3], m[4]]) - strLabels = strLabels.substring(m[0].length).trim() - } - const profileType = new types.ProfileType() - profileType.setId(typeId) - profileType.setName(typeDesc.type) - profileType.setSampleType(typeDesc.sampleType) - profileType.setSampleUnit(typeDesc.sampleUnit) - profileType.setPeriodType(typeDesc.periodType) - profileType.setPeriodUnit(typeDesc.periodUnit) - return { - typeId, - typeDesc, - labels, - labelSelector: strLabels, - profileType - } -} const init = (fastify) => { fastify.get('/pyroscope/render', render) diff --git a/pyroscope/render_diff.js b/pyroscope/render_diff.js index c3c46c06..e8be19cd 100644 --- a/pyroscope/render_diff.js +++ b/pyroscope/render_diff.js @@ -5,29 +5,36 @@ const querierMessages = require('./querier_pb') const types = require('./types/v1/types_pb') const renderDiff = async (req, res) => { - const [leftQuery, leftFromTimeSec, leftToTimeSec] = - parseParams(req.query.leftQuery, req.query.leftFrom, req.query.leftUntil); - const [rightQuery, rightFromTimeSec, rightToTimeSec] = - parseParams(req.query.rightQuery, req.query.rightFrom, req.query.rightUntil); - if (leftQuery.typeId != rightQuery.typeId) { - res.code(400).send('Different type IDs') - return - } const leftCtxIdx = newCtxIdx() - await importStackTraces(leftQuery.typeDesc, '{' + leftQuery.labelSelector + '}', leftFromTimeSec, leftToTimeSec, req.log, leftCtxIdx, true) const rightCtxIdx = newCtxIdx() - await importStackTraces(rightQuery.typeDesc, '{' + rightQuery.labelSelector + '}', rightFromTimeSec, rightToTimeSec, req.log, rightCtxIdx, true) - const flamegraphDiffBin = pprofBin.diff_tree(leftCtxIdx, rightCtxIdx, - `${leftQuery.typeDesc.sampleType}:${leftQuery.typeDesc.sampleUnit}`) - const profileType = new types.ProfileType() - profileType.setId(leftQuery.typeId) - profileType.setName(leftQuery.typeDesc.type) - profileType.setSampleType(leftQuery.typeDesc.sampleType) - profileType.setSampleUnit(leftQuery.typeDesc.sampleUnit) - profileType.setPeriodType(leftQuery.typeDesc.periodType) - profileType.setPeriodUnit(leftQuery.typeDesc.periodUnit) - const diff = querierMessages.FlameGraphDiff.deserializeBinary(flamegraphDiffBin) - return res.code(200).send(diffToFlamegraph(diff, profileType).flamebearerProfileV1) + try { + const [leftQuery, leftFromTimeSec, leftToTimeSec] = + parseParams(req.query.leftQuery, req.query.leftFrom, req.query.leftUntil); + const [rightQuery, rightFromTimeSec, rightToTimeSec] = + parseParams(req.query.rightQuery, req.query.rightFrom, req.query.rightUntil); + if (leftQuery.typeId !== rightQuery.typeId) { + res.code(400).send('Different type IDs') + return + } + + await importStackTraces(leftQuery.typeDesc, '{' + leftQuery.labelSelector + '}', leftFromTimeSec, leftToTimeSec, req.log, leftCtxIdx, true) + + await importStackTraces(rightQuery.typeDesc, '{' + rightQuery.labelSelector + '}', rightFromTimeSec, rightToTimeSec, req.log, rightCtxIdx, true) + const flamegraphDiffBin = pprofBin.diff_tree(leftCtxIdx, rightCtxIdx, + `${leftQuery.typeDesc.sampleType}:${leftQuery.typeDesc.sampleUnit}`) + const profileType = new types.ProfileType() + profileType.setId(leftQuery.typeId) + profileType.setName(leftQuery.typeDesc.type) + profileType.setSampleType(leftQuery.typeDesc.sampleType) + profileType.setSampleUnit(leftQuery.typeDesc.sampleUnit) + profileType.setPeriodType(leftQuery.typeDesc.periodType) + profileType.setPeriodUnit(leftQuery.typeDesc.periodUnit) + const diff = querierMessages.FlameGraphDiff.deserializeBinary(flamegraphDiffBin) + return res.code(200).send(diffToFlamegraph(diff, profileType).flamebearerProfileV1) + } finally { + pprofBin.drop_tree(leftCtxIdx) + pprofBin.drop_tree(rightCtxIdx) + } } /** diff --git a/pyroscope/shared.js b/pyroscope/shared.js index fcb45966..380f8f59 100644 --- a/pyroscope/shared.js +++ b/pyroscope/shared.js @@ -1,6 +1,6 @@ const { QrynBadRequest } = require('../lib/handlers/errors') const Sql = require('@cloki/clickhouse-sql') -const compiler = require('../parser/bnf') +const types = require('./types/v1/types_pb') /** * * @param payload {ReadableStream} @@ -77,14 +77,14 @@ const serviceNameSelectorQuery = (labelSelector) => { } const labelSelectorScript = parseLabelSelector(labelSelector) let conds = null - for (const rule of labelSelectorScript.Children('log_stream_selector_rule')) { - const label = rule.Child('label').value + for (const rule of labelSelectorScript) { + const label = rule[0] if (label !== 'service_name') { continue } - const val = JSON.parse(rule.Child('quoted_str').value) + const val = JSON.parse(rule[2]) let valRul = null - switch (rule.Child('operator').value) { + switch (rule[1]) { case '=': valRul = Sql.Eq(new Sql.Raw('service_name'), Sql.val(val)) break @@ -102,12 +102,54 @@ const serviceNameSelectorQuery = (labelSelector) => { return conds || empty } +/** + * + * @param query {string} + */ +const parseQuery = (query) => { + query = query.trim() + const match = query.match(/^([^{\s]+)\s*(\{(.*)})?$/) + if (!match) { + return null + } + const typeId = match[1] + const typeDesc = parseTypeId(typeId) + const strLabels = (match[3] || '').trim() + const labels = parseLabelSelector(strLabels) + const profileType = new types.ProfileType() + profileType.setId(typeId) + profileType.setName(typeDesc.type) + profileType.setSampleType(typeDesc.sampleType) + profileType.setSampleUnit(typeDesc.sampleUnit) + profileType.setPeriodType(typeDesc.periodType) + profileType.setPeriodUnit(typeDesc.periodUnit) + return { + typeId, + typeDesc, + labels, + labelSelector: strLabels, + profileType + } +} -const parseLabelSelector = (labelSelector) => { - if (labelSelector.endsWith(',}')) { - labelSelector = labelSelector.slice(0, -2) + '}' +const parseLabelSelector = (strLabels) => { + strLabels = strLabels.trim() + if (strLabels.startsWith('{')) { + strLabels = strLabels.slice(1) + } + if (strLabels.endsWith('}')) { + strLabels = strLabels.slice(0, -1) } - return compiler.ParseScript(labelSelector).rootToken + const labels = [] + while (strLabels && strLabels !== '' && strLabels !== '}' && strLabels !== ',') { + const m = strLabels.match(/^(,)?\s*([A-Za-z0-9_]+)\s*(!=|!~|=~|=)\s*("([^"\\]|\\.)*")/) + if (!m) { + throw new Error('Invalid label selector') + } + labels.push([m[2], m[3], m[4]]) + strLabels = strLabels.substring(m[0].length).trim() + } + return labels } /** @@ -128,7 +170,6 @@ const parseTypeId = (typeId) => { } } - /** * * @param {Sql.Select} query @@ -140,10 +181,10 @@ const labelSelectorQuery = (query, labelSelector) => { } const labelSelectorScript = parseLabelSelector(labelSelector) const labelsConds = [] - for (const rule of labelSelectorScript.Children('log_stream_selector_rule')) { - const val = JSON.parse(rule.Child('quoted_str').value) + for (const rule of labelSelectorScript) { + const val = JSON.parse(rule[2]) let valRul = null - switch (rule.Child('operator').value) { + switch (rule[1]) { case '=': valRul = Sql.Eq(new Sql.Raw('val'), Sql.val(val)) break @@ -157,7 +198,7 @@ const labelSelectorQuery = (query, labelSelector) => { valRul = Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(val)})`), 1) } const labelSubCond = Sql.And( - Sql.Eq('key', Sql.val(rule.Child('label').value)), + Sql.Eq('key', Sql.val(rule[0])), valRul ) labelsConds.push(labelSubCond) @@ -183,5 +224,6 @@ module.exports = { serviceNameSelectorQuery, parseLabelSelector, labelSelectorQuery, - HISTORY_TIMESPAN + HISTORY_TIMESPAN, + parseQuery } diff --git a/qryn_node.js b/qryn_node.js index a42c1560..38e48349 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -5,6 +5,7 @@ * (C) 2018-2024 QXIP BV */ const { boolEnv, readerMode, writerMode } = require('./common') +const { Duplex } = require('stream') this.readonly = boolEnv('READONLY') this.http_user = process.env.QRYN_LOGIN || process.env.CLOKI_LOGIN || undefined @@ -54,6 +55,7 @@ this.pushOTLP = DATABASE.pushOTLP this.queryTempoTags = DATABASE.queryTempoTags this.queryTempoValues = DATABASE.queryTempoValues let profiler = null +const pako = require('pako') const { shaper, @@ -121,7 +123,37 @@ let fastify = require('fastify')({ }) done() })) - await fastify.register(require('@fastify/compress')) + await fastify.register(require('@fastify/compress'), { + encodings: ['gzip'], + zlib: { + createGzip: () => { + const deflator = new pako.Deflate({ gzip: true }) + let lastChunk = null + const res = new Duplex({ + write: (chunk, encoding, next) => { + lastChunk && deflator.push(lastChunk) + lastChunk = chunk + next() + }, + read: function (size) { + }, + final (callback) { + deflator.onEnd = async () => { + res.push(null) + callback(null) + } + !lastChunk && callback() + lastChunk && deflator.push(lastChunk, true) + }, + emitClose: true + }) + deflator.onData = (chunk) => { + res.push(chunk) + } + return res + } + } + }) await fastify.register(require('@fastify/url-data')) await fastify.register(require('@fastify/websocket'))