diff --git a/test/traceql_parser.test.js b/test/traceql_parser.test.js index 7042cc67..0a6c09a4 100644 --- a/test/traceql_parser.test.js +++ b/test/traceql_parser.test.js @@ -44,3 +44,8 @@ it('traceql: max duration', () => { const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms') expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms') }) + +it('traceql: select', () => { + const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | select(a, b)') + expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | select(a, b)') +}) diff --git a/traceql/clickhouse_transpiler/index.js b/traceql/clickhouse_transpiler/index.js index 44b07ea8..b8338a00 100644 --- a/traceql/clickhouse_transpiler/index.js +++ b/traceql/clickhouse_transpiler/index.js @@ -292,6 +292,9 @@ module.exports.Planner = class Planner { if (!agg) { return } + if (['count', 'sum', 'min', 'max', 'avg'].indexOf(agg.Child('fn').value) < 0) { + return + } this.aggFn = agg.Child('fn').value const labelName = agg.Child('attr').Child('label_name') this.aggregatedAttr = labelName ? labelName.value : '' diff --git a/traceql/clickhouse_transpiler/traces_data.js b/traceql/clickhouse_transpiler/traces_data.js index fc3ab193..2a263ea2 100644 --- a/traceql/clickhouse_transpiler/traces_data.js +++ b/traceql/clickhouse_transpiler/traces_data.js @@ -24,7 +24,9 @@ const processFn = (sel, ctx) => { [new Sql.Raw( 'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000' ), 'duration_ms'], - [new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'] + [new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'], + [new Sql.Raw(`groupArrayIf(base64Encode(traces.payload), (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload'], + [new Sql.Raw(`groupArrayIf(traces.payload_type, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload_type'] ).from([table, 'traces']).where(Sql.And( new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds)) )).groupBy('traces.trace_id') diff --git a/traceql/index.js b/traceql/index.js index bec8b969..71690db0 100644 --- a/traceql/index.js +++ b/traceql/index.js @@ -4,6 +4,7 @@ const logger = require('../lib/logger') const { DATABASE_NAME } = require('../lib/utils') const { clusterName } = require('../common') const { rawRequest } = require('../lib/db/clickhouse') +const { postProcess } = require('./post_processor') /** * @@ -39,12 +40,14 @@ const search = async (query, limit, from, to) => { } else { res = await processSmallResult(ctx, scrpit.rootToken) } + res = postProcess(res, scrpit.rootToken) res.forEach(t => t.spanSets.forEach( ss => ss.spans.sort( (a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano)) ) ) + console.log(JSON.stringify(res, 2)) return res } @@ -70,11 +73,12 @@ const evaluateComplexity = async (ctx, script) => { async function processComplexResult (ctx, script, complexity) { const planner = ctx.planner.plan() const maxFilter = Math.floor(complexity / 10000000) - let traces = [] + //let traces = [] + let response = null for (let i = 0; i < maxFilter; i++) { ctx.randomFilter = [maxFilter, i] const sql = planner(ctx) - const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) if (response.data.data.length === parseInt(ctx.limit)) { const minStart = response.data.data.reduce((acc, row) => acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0 @@ -88,7 +92,7 @@ async function processComplexResult (ctx, script, complexity) { ctx.randomFilter = [maxFilter, i] } ctx.cachedTraceIds = response.data.data.map(row => row.trace_id) - traces = response.data.data.map(row => ({ + /*traces = response.data.data.map(row => ({ traceID: row.trace_id, rootServiceName: row.root_service_name, rootTraceName: row.root_trace_name, @@ -105,9 +109,9 @@ async function processComplexResult (ctx, script, complexity) { matched: row.span_id.length } ] - })) + }))*/ } - return traces + return response.data.data } /** @@ -119,7 +123,7 @@ async function processSmallResult (ctx, script) { const planner = ctx.planner.plan() const sql = planner(ctx) const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) - const traces = response.data.data.map(row => ({ + /*const traces = response.data.data.map(row => ({ traceID: row.trace_id, rootServiceName: row.root_service_name, rootTraceName: row.root_trace_name, @@ -146,8 +150,8 @@ async function processSmallResult (ctx, script) { matched: row.span_id.length } ] - })) - return traces + }))*/ + return response.data.data } module.exports = { diff --git a/traceql/post_processor/index.js b/traceql/post_processor/index.js new file mode 100644 index 00000000..b3515af5 --- /dev/null +++ b/traceql/post_processor/index.js @@ -0,0 +1,74 @@ +const Otlp = require('../../lib/db/otlp') +const Zipkin = require('../../lib/db/zipkin') +const protobufjs = require('protobufjs') +const path = require('path') +const OTLPSpan = protobufjs.loadSync(path.join(__dirname, '..', '..', + 'lib', 'otlp.proto')).lookupType('Span') +/** + * + * @param rows {Row[]} + * @param script {Token} + */ +function postProcess (rows, script) { + const selectAttrs = script.Children('aggregator') + .filter(x => x.Child('fn').value === 'select') + .map(x => x.Children('label_name')) + .reduce((acc, x) => { + let attrs = x.map(y => ({ + name: y.value, + path: y.value.split('.').filter(y => y) + })) + if (attrs[0] === 'span' || attrs[0] === 'resource') { + attrs = attrs.slice(1) + } + return [...acc, ...attrs] + }, []) + rows = rows.map(row => ({ + ...row, + objs: row.payload.map((payload, i) => { + let span = null + switch (row.payload_type[i]) { + case 1: + return new Zipkin(JSON.parse(Buffer.from(payload, 'base64').toString())) + case 2: + span = OTLPSpan.toObject( + OTLPSpan.decode(Buffer.from(payload, 'base64')), { + longs: String, + bytes: String + }) + return new Otlp(span) + } + return null + }) + })) + const spans = (row) => row.span_id.map((spanId, i) => ({ + spanID: spanId, + startTimeUnixNano: row.timestamp_ns[i], + durationNanos: row.duration[i], + attributes: selectAttrs.map(attr => ({ + key: attr.name, + value: { + stringValue: (row.objs[i].tags.find(t => t[0] === attr.path.join('.')) || [null, null])[1] + } + })).filter(x => x.value.stringValue) + })) + const traces = rows.map(row => ({ + traceID: row.trace_id, + rootServiceName: row.root_service_name, + rootTraceName: row.root_trace_name, + startTimeUnixNano: row.start_time_unix_nano, + durationMs: row.duration_ms, + spanSet: { spans: spans(row) }, + spanSets: [ + { + spans: spans(row), + matched: row.span_id.length + } + ] + })) + return traces +} + +module.exports = { + postProcess +} diff --git a/traceql/post_processor/types.d.ts b/traceql/post_processor/types.d.ts new file mode 100644 index 00000000..5a92e85b --- /dev/null +++ b/traceql/post_processor/types.d.ts @@ -0,0 +1,11 @@ +export interface Row { + trace_id: string; + span_id: string[]; + duration: string[]; + timestamp_ns: string[]; + start_time_unix_nano: string; + duration_ms: number; + root_service_name: string; + payload: string[]; + payload_type: number[]; +} \ No newline at end of file diff --git a/traceql/traceql.bnf b/traceql/traceql.bnf index 366698ce..28e447b1 100644 --- a/traceql/traceql.bnf +++ b/traceql/traceql.bnf @@ -6,14 +6,15 @@ complex_head ::= "(" ")" tail ::= and_or ::= "&&" | "||" -aggregator ::= "|" -fn ::= "count"|"sum"|"min"|"max"|"avg" -attr ::= "(" [ ] ")" +aggregator ::= "|" [ ] +fn ::= "count"|"sum"|"min"|"max"|"avg"|"select" +attr ::= "(" [ ] ")" cmp ::= "="|"!="|"<="|">="|"<"|">" cmp_val ::= [] measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d" label_name ::= ("." | | "-" | "_") *("." | | "_" | "-" | ) +label_names ::= *( "," ) number ::= ["-"] ["." ] attr_selector ::=