diff --git a/patterns/patterns_bin/src/pattern_reg.rs b/patterns/patterns_bin/src/pattern_reg.rs new file mode 100644 index 00000000..f8657961 --- /dev/null +++ b/patterns/patterns_bin/src/pattern_reg.rs @@ -0,0 +1,45 @@ +use crate::pattern::Pattern; +use uuid::Uuid; + +pub struct PatternRegistry { + patterns: Vec, +} + +impl PatternRegistry { + pub const fn new() -> PatternRegistry { + PatternRegistry { patterns: Vec::new() } + } + + pub fn find_pattern(&mut self, str_text: &Vec, i_text: &Vec, sample: String) -> &Pattern { + let mut idx: i32 = -1; + let mut mtc = 0; + for i in 0..self.patterns.len() { + mtc = self.patterns[i].match_text(&i_text); + if mtc == -1 || mtc > self.patterns[i].fluct { + continue; + } + idx = i as i32; + break; + } + + if idx == -1 { + let pattern = Pattern::new(Uuid::new_v4().to_string(), &i_text, &str_text, sample); + self.patterns.push(pattern); + idx = (self.patterns.len() - 1) as i32; + } else if mtc != 0 { + self.patterns[idx as usize].adjust_pattern(&i_text); + } + return &self.patterns[idx as usize]; + } + + pub fn to_string(&self) -> String { + let mut s = String::new(); + for i in 0..self.patterns.len() { + s += self.patterns[i].to_string().as_str(); + s += "\n"; + } + return s + } +} + +pub static mut REGISTRY: PatternRegistry = PatternRegistry::new(); \ No newline at end of file diff --git a/patterns/patterns_bin/src/tokens.rs b/patterns/patterns_bin/src/tokens.rs new file mode 100644 index 00000000..5d3f4449 --- /dev/null +++ b/patterns/patterns_bin/src/tokens.rs @@ -0,0 +1,45 @@ +use regex::{Regex, CaptureMatches, Match}; + +/*pub fn tokenize(re: &Regex, text: &str) -> CaptureMatches { + return re.captures_iter(text); +}*/ + +pub struct Tokenizer<'a> { + text: String, + pos: usize, + re: Regex, + iter: Option> +} + +impl Tokenizer<'_> { + pub fn new<'a>(text: &'a str) -> Tokenizer<'a> { + let mut res = Tokenizer { + text: text.to_string(), + pos: 0, + re: Regex::new(r"([\p{L}_]+|[\d.]+|[^\p{L}_\d.]+)\s*").unwrap(), + iter: None + }; + res + } +} + +impl Iterator for Tokenizer<'_> { + type Item = String; + + fn next(&mut self) -> Option { + None + /*let cap: Option = None; + if let Some(c) = cap { + self.pos += c.get(0).unwrap().end(); + Some(c.get(0).unwrap().as_str().to_string()) + } else { + None + }*/ + } +} + +#[test] +fn test_tokenizer() { + let text = "Hello, world! 123"; + let mut tokenizer = Tokenizer::new(text); +} \ No newline at end of file diff --git a/pyroscope/flamebearer.d.ts b/pyroscope/flamebearer.d.ts index 27a701dd..7ec2f880 100644 --- a/pyroscope/flamebearer.d.ts +++ b/pyroscope/flamebearer.d.ts @@ -52,3 +52,16 @@ export interface heatmap { minDepth: uint64, maxDepth: uint64 } + +export interface level { + values: number[] +} + +export interface flamegraphDiff { + name: string[], + levels: level[], + total: int64, + maxSelf: int64, + leftTicks: int64, + rightTicks: int64 +} diff --git a/pyroscope/merge_stack_traces.js b/pyroscope/merge_stack_traces.js index c9033f4e..f8664712 100644 --- a/pyroscope/merge_stack_traces.js +++ b/pyroscope/merge_stack_traces.js @@ -22,7 +22,9 @@ const sqlWithReference = (ref) => { let ctxIdx = 0 -const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => { +const newCtxIdx = () => ++ctxIdx + +const importStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx, save) => { const dist = clusterName ? '_dist' : '' const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000) const serviceNameSelector = serviceNameSelectorQuery(sel) @@ -127,30 +129,36 @@ const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => const binData = Uint8Array.from(profiles.data) log.debug(`selectMergeStacktraces: profiles downloaded: ${binData.length / 1025}kB in ${Date.now() - start}ms`) require('./pprof-bin/pkg/pprof_bin').init_panic_hook() - const _ctxIdx = ++ctxIdx const [legacyLen, shift] = readULeb32(binData, 0) let ofs = shift - try { - let mergePprofLat = BigInt(0) - for (let i = 0; i < legacyLen; i++) { - const [profLen, shift] = readULeb32(binData, ofs) - ofs += shift - start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) - pprofBin.merge_prof(_ctxIdx, - Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)), - `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) - mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start - ofs += profLen - } - start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) - pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs)), - typeRegex.sampleType + ':' + typeRegex.sampleUnit) - const mergeTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start + let mergePprofLat = BigInt(0) + for (let i = 0; i < legacyLen; i++) { + const [profLen, shift] = readULeb32(binData, ofs) + ofs += shift start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) + pprofBin.merge_prof(_ctxIdx, + Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)), + `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) + mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start + ofs += profLen + } + start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) + save && require('fs').writeFileSync(`/home/hromozeka/QXIP/qryn/data.${Date.now()}.bin`, + Buffer.from(Uint8Array.from(profiles.data.slice(ofs)))) + pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs)), + typeRegex.sampleType + ':' + typeRegex.sampleUnit) + const mergeTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start + log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`) + log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`) +} + +const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => { + const _ctxIdx = newCtxIdx() + try { + await importStackTraces(typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx) + const start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) const resp = pprofBin.export_tree(_ctxIdx, typeRegex.sampleType + ':' + typeRegex.sampleUnit) const exportTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start - log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`) - log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`) log.debug(`export_tree: ${exportTreeLat / BigInt(1000000)}ms`) return Buffer.from(resp) } finally { @@ -159,5 +167,7 @@ const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => } module.exports = { - mergeStackTraces -} \ No newline at end of file + mergeStackTraces, + importStackTraces, + newCtxIdx +} diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts index f4204d23..ccbddd41 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts @@ -13,6 +13,13 @@ export function merge_prof(id: number, bytes: Uint8Array, sample_type: string): */ export function merge_tree(id: number, bytes: Uint8Array, sample_type: string): void; /** +* @param {number} id1 +* @param {number} id2 +* @param {string} sample_type +* @returns {Uint8Array} +*/ +export function diff_tree(id1: number, id2: number, sample_type: string): Uint8Array; +/** * @param {number} id * @param {string} sample_type * @returns {Uint8Array} diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.js b/pyroscope/pprof-bin/pkg/pprof_bin.js index 913b1d40..25da605f 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.js +++ b/pyroscope/pprof-bin/pkg/pprof_bin.js @@ -133,6 +133,28 @@ function getArrayU8FromWasm0(ptr, len) { ptr = ptr >>> 0; return getUint8Memory0().subarray(ptr / 1, ptr / 1 + len); } +/** +* @param {number} id1 +* @param {number} id2 +* @param {string} sample_type +* @returns {Uint8Array} +*/ +module.exports.diff_tree = function(id1, id2, sample_type) { + try { + const retptr = wasm.__wbindgen_add_to_stack_pointer(-16); + const ptr0 = passStringToWasm0(sample_type, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); + const len0 = WASM_VECTOR_LEN; + wasm.diff_tree(retptr, id1, id2, ptr0, len0); + var r0 = getInt32Memory0()[retptr / 4 + 0]; + var r1 = getInt32Memory0()[retptr / 4 + 1]; + var v2 = getArrayU8FromWasm0(r0, r1).slice(); + wasm.__wbindgen_free(r0, r1 * 1, 1); + return v2; + } finally { + wasm.__wbindgen_add_to_stack_pointer(16); + } +}; + /** * @param {number} id * @param {string} sample_type diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm index fcb06ee5..a110fa20 100644 Binary files a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm and b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm differ diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts index c1259bcb..6dc10bc2 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts @@ -3,6 +3,7 @@ export const memory: WebAssembly.Memory; export function merge_prof(a: number, b: number, c: number, d: number, e: number): void; export function merge_tree(a: number, b: number, c: number, d: number, e: number): void; +export function diff_tree(a: number, b: number, c: number, d: number, e: number): void; export function export_tree(a: number, b: number, c: number, d: number): void; export function export_trees_pprof(a: number, b: number, c: number): void; export function drop_tree(a: number): void; diff --git a/pyroscope/pprof-bin/src/lib.rs b/pyroscope/pprof-bin/src/lib.rs index 493d3992..e3985831 100644 --- a/pyroscope/pprof-bin/src/lib.rs +++ b/pyroscope/pprof-bin/src/lib.rs @@ -2,6 +2,7 @@ mod ch64; mod merge; +use std::cmp::Ordering; use ch64::city_hash_64; use ch64::read_uint64_le; use lazy_static::lazy_static; @@ -12,6 +13,7 @@ use pprof_pb::google::v1::Sample; use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::Level; use pprof_pb::querier::v1::SelectMergeStacktracesResponse; +use pprof_pb::querier::v1::FlameGraphDiff; use prost::Message; use std::collections::{HashMap, HashSet}; use std::io::Read; @@ -19,6 +21,8 @@ use std::panic; use std::sync::Mutex; use std::vec::Vec; use wasm_bindgen::prelude::*; +use std::sync::Arc; + pub mod pprof_pb { @@ -47,16 +51,51 @@ struct TreeNodeV2 { total: Vec, } +impl TreeNodeV2 { + pub fn clone(&self) -> TreeNodeV2 { + TreeNodeV2 { + fn_id: self.fn_id, + node_id: self.node_id, + slf: self.slf.clone(), + total: self.total.clone(), + } + } + pub fn set_total_and_self(&self, slf: Vec, total: Vec) -> TreeNodeV2 { + let mut res = self.clone(); + res.slf = slf; + res.total = total; + return res; + } +} + struct Tree { names: Vec, names_map: HashMap, - nodes: HashMap>, + nodes: HashMap>>, sample_types: Vec, max_self: Vec, nodes_num: i32, } -fn find_node(id: u64, nodes: &Vec) -> i32 { +impl Tree { + pub fn total(&self) -> i64 { + let mut total: i64 = 0; + for c in 0..self.nodes.get(&0).unwrap().len() { + let _c = &self.nodes.get(&0).unwrap()[c]; + total += _c.total[0]; + } + total + } + pub fn add_name(&mut self, name: String, name_hash: u64) { + if self.names_map.contains_key(&name_hash) { + return; + } + self.names.push(name); + self.names_map.insert(name_hash, self.names.len() - 1); + } +} + +fn find_node(id: u64, nodes: &Vec>) -> i32 { let mut n: i32 = -1; for c in 0..nodes.len() { let _c = &nodes[c]; @@ -109,19 +148,25 @@ impl MergeTotalsProcessor { fn merge_totals( &self, - node: &mut TreeNodeV2, + node: Arc, _max_self: &Vec, sample: &Sample, merge_self: bool, - ) -> Vec { + ) -> (TreeNodeV2, Vec) { let mut max_self = _max_self.clone(); + let mut res: TreeNodeV2 = TreeNodeV2 { + fn_id: node.fn_id, + node_id: node.node_id, + slf: vec![0; node.slf.len()], + total: vec![0; node.slf.len()], + }; for i in 0..self.from_idx.len() { if self.from_idx[i] == -1 { continue; } - node.total[i] += sample.value[self.from_idx[i] as usize]; + res.total[i] += sample.value[self.from_idx[i] as usize]; if merge_self { - node.slf[i] += sample.value[self.from_idx[i] as usize]; + res.slf[i] += sample.value[self.from_idx[i] as usize]; for i in 0..max_self.len() { if max_self[i] < node.slf[i] { max_self[i] = node.slf[i]; @@ -129,7 +174,7 @@ impl MergeTotalsProcessor { } } } - max_self + (res, max_self) } } @@ -164,29 +209,30 @@ fn merge(tree: &mut Tree, p: &Profile) { if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000 { tree.nodes.insert(parent_id, Vec::new()); } - let mut fake_children: Vec = Vec::new(); + let mut fake_children: Vec> = Vec::new(); let children = tree.nodes.get_mut(&parent_id).unwrap_or(&mut fake_children); let mut n = find_node(node_id, children); if n == -1 { - children.push(TreeNodeV2 { + children.push(Arc::new(TreeNodeV2 { //parent_id, fn_id: name_hash, node_id, slf: vec![0; tree.sample_types.len()], total: vec![0; tree.sample_types.len()], - }); + })); let idx = children.len().clone() - 1; - let max_self = m.merge_totals( - children.get_mut(idx).unwrap(), + let new_node_and_max_self = m.merge_totals( + children.get(idx).unwrap().clone(), tree.max_self.as_ref(), s, i == 0, ); - tree.max_self = max_self; + children[idx] = Arc::new(new_node_and_max_self.0); + tree.max_self = new_node_and_max_self.1; n = idx as i32; } else if tree.nodes_num < 2000000 { m.merge_totals( - children.get_mut(n as usize).unwrap(), + children.get_mut(n as usize).unwrap().clone(), &tree.max_self, s, i == 0, @@ -214,7 +260,7 @@ fn read_uleb128(bytes: &[u8]) -> (usize, usize) { fn bfs(t: &Tree, res: &mut Vec, sample_type: String) { let mut total: i64 = 0; - let mut root_children: &Vec = &Vec::new(); + let mut root_children: &Vec> = &Vec::new(); if t.nodes.contains_key(&(0u64)) { root_children = t.nodes.get(&(0u64)).unwrap(); } @@ -291,22 +337,22 @@ fn bfs(t: &Tree, res: &mut Vec, sample_type: String) { } lazy_static! { - static ref CTX: Mutex> = Mutex::new(HashMap::new()); + static ref CTX: Mutex>> = Mutex::new(HashMap::new()); } -fn upsert_tree(ctx: &mut HashMap, id: u32, sample_types: Vec) { +fn upsert_tree(ctx: &mut HashMap>, id: u32, sample_types: Vec) { if !ctx.contains_key(&id) { let _len = sample_types.len().clone(); ctx.insert( id, - Tree { + Mutex::new(Tree { names: vec!["total".to_string(), "n/a".to_string()], names_map: HashMap::new(), nodes: HashMap::new(), sample_types, max_self: vec![0; _len], nodes_num: 1, - }, + }), ); } } @@ -413,18 +459,11 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) { n = find_node(node_id, tree.nodes.get(&parent_id).unwrap()); } if n != -1 { - tree.nodes - .get_mut(&parent_id) - .unwrap() - .get_mut(n as usize) - .unwrap() - .total[sample_type_index] += total[sample_type_index]; - tree.nodes - .get_mut(&parent_id) - .unwrap() - .get_mut(n as usize) - .unwrap() - .slf[sample_type_index] += slf[sample_type_index]; + let mut __node = tree.nodes.get_mut(&parent_id).unwrap().get_mut(n as usize).unwrap().clone(); + let mut _node = __node.as_ref().clone(); + _node.total[sample_type_index] += total[sample_type_index]; + _node.slf[sample_type_index] += slf[sample_type_index]; + tree.nodes.get_mut(&parent_id).unwrap()[n as usize] = Arc::new(_node); } if tree.nodes_num >= 2000000 { return; @@ -432,13 +471,13 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) { if !tree.nodes.contains_key(&parent_id) { tree.nodes.insert(parent_id, Vec::new()); } - tree.nodes.get_mut(&parent_id).unwrap().push(TreeNodeV2 { + tree.nodes.get_mut(&parent_id).unwrap().push(Arc::new(TreeNodeV2 { fn_id, //parent_id, node_id, slf, total, - }); + })); tree.nodes_num += 1; } } @@ -551,12 +590,25 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) { inject_functions(prof, tree, 0, vec![], type_idx); }*/ +fn assert_positive(t: &Tree) -> bool{ + for n in t.nodes.keys() { + for _n in 0..t.nodes.get(&n).unwrap().len() { + for __n in 0..t.nodes.get(&n).unwrap()[_n].slf.len() { + if t.nodes.get(&n).unwrap()[_n].slf[__n] < 0 { + return false; + } + } + } + } + true +} + #[wasm_bindgen] pub fn merge_prof(id: u32, bytes: &[u8], sample_type: String) { let p = panic::catch_unwind(|| { let mut ctx = CTX.lock().unwrap(); upsert_tree(&mut ctx, id, vec![sample_type]); - let mut tree = ctx.get_mut(&id).unwrap(); + let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap(); let prof = Profile::decode(bytes).unwrap(); merge(&mut tree, &prof); }); @@ -571,7 +623,7 @@ pub fn merge_tree(id: u32, bytes: &[u8], sample_type: String) { let result = panic::catch_unwind(|| { let mut ctx = CTX.lock().unwrap(); upsert_tree(&mut ctx, id, vec![sample_type.clone()]); - let mut tree = ctx.get_mut(&id).unwrap(); + let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap(); merge_trie(&mut tree, bytes, &sample_type); 0 }); @@ -581,25 +633,275 @@ pub fn merge_tree(id: u32, bytes: &[u8], sample_type: String) { } } +#[wasm_bindgen] +pub fn diff_tree(id1: u32, id2: u32, sample_type: String) -> Vec { + let mut ctx = CTX.lock().unwrap(); + let _ctx = &mut ctx; + upsert_tree(_ctx, id1, vec![sample_type.clone()]); + upsert_tree(_ctx, id2, vec![sample_type.clone()]); + let mut t1 = _ctx.get(&id1).unwrap().lock().unwrap(); + let mut t2 = _ctx.get(&id2).unwrap().lock().unwrap(); + let mut is_positive = assert_positive(&t1); + if !is_positive { + panic!("Tree 1 is not positive"); + } + is_positive = assert_positive(&t2); + if!is_positive { + panic!("Tree 2 is not positive"); + } + + + for n in t1.names_map.keys() { + if !t2.names_map.contains_key(&n) { + t2.names.push(t1.names[*t1.names_map.get(&n).unwrap()].clone()); + let idx = t2.names.len() - 1; + t2.names_map.insert(*n, idx); + } + } + for n in t2.names_map.keys() { + if !t1.names_map.contains_key(&n) { + let idx = t2.names_map.get(&n).unwrap().clone(); + t1.names.push(t2.names[idx].clone()); + let idx2 = t1.names.len() - 1; + t1.names_map.insert(*n, idx2); + } + } + + let keys = t1.nodes.keys().map(|x| (*x).clone()).collect::>(); + for n in keys { + if !t2.nodes.contains_key(&n) { + t2.nodes.insert(n, vec![]); + } + let lnodes = t1.nodes.get_mut(&n).unwrap(); + let rnodes = t2.nodes.get_mut(&n).unwrap(); + lnodes.sort_by(|x, y| + if x.node_id < y.node_id { Ordering::Less } else { Ordering::Greater }); + rnodes.sort_by(|x, y| + if x.node_id < y.node_id { Ordering::Less } else { Ordering::Greater }); + let mut i = 0; + let mut j = 0; + let mut new_t1_nodes: Vec> = vec![]; + let mut new_t2_nodes: Vec> = vec![]; + let t1_nodes = t1.nodes.get(&n).unwrap(); + let t2_nodes = t2.nodes.get(&n).unwrap(); + while i < t1_nodes.len() && j < t2_nodes.len() { + if n == 0 { + println!("{:?}:{:?} - {:?}:{:?}", + t1_nodes[i].node_id, + t1.names[*t1.names_map.get(&t1_nodes[i].fn_id).unwrap() as usize], + t2_nodes[j].node_id, + t2.names[*t2.names_map.get(&t2_nodes[j].fn_id).unwrap() as usize] + ) + } + + if t1_nodes[i].node_id == t2_nodes[j].node_id { + new_t1_nodes.push(t1_nodes[i].clone()); + new_t2_nodes.push(t2_nodes[j].clone()); + i += 1; + j += 1; + continue; + } + if t1_nodes[i].node_id < t2_nodes[j].node_id { + new_t1_nodes.push(t1_nodes[i].clone()); + new_t2_nodes.push(Arc::new(TreeNodeV2{ + node_id: t1_nodes[i].node_id, + fn_id: t1_nodes[i].fn_id, + slf: vec![0], + total: vec![0], + })); + i += 1; + } else { + new_t2_nodes.push(t2_nodes[j].clone()); + new_t1_nodes.push(Arc::new(TreeNodeV2{ + node_id: t2_nodes[j].node_id, + fn_id: t2_nodes[j].fn_id, + slf: vec![0], + total: vec![0], + })); + j += 1; + } + } + while i < t1_nodes.len() { + new_t1_nodes.push(t1_nodes[i].clone()); + new_t2_nodes.push(Arc::new(TreeNodeV2{ + node_id: t1_nodes[i].node_id, + fn_id: t1_nodes[i].fn_id, + slf: vec![0], + total: vec![0], + })); + i += 1; + } + while j < t2_nodes.len() { + new_t2_nodes.push(t2_nodes[j].clone()); + new_t1_nodes.push(Arc::new(TreeNodeV2{ + node_id: t2_nodes[j].node_id, + fn_id: t2_nodes[j].fn_id, + slf: vec![0], + total: vec![0], + })); + j+=1; + } + t1.nodes.insert(n, new_t1_nodes); + t2.nodes.insert(n, new_t2_nodes); + } + + for n in t2.nodes.keys().clone() { + if!t1.nodes.contains_key(&n) { + let mut new_t1_nodes: Vec> = vec![]; + for _n in t2.nodes.get(&n).unwrap() { + new_t1_nodes.push(Arc::new(TreeNodeV2{ + node_id: _n.node_id, + fn_id: _n.fn_id, + slf: vec![0], + total: vec![0], + })) + } + t1.nodes.insert(*n, new_t1_nodes); + } + } + + let total_left = t1.total(); + let total_right = t2.total(); + let mut min_val = 0 as i64; + let tn = Arc::new(TreeNodeV2{ + fn_id: 0, + node_id: 0, + slf: vec![0], + total: vec![total_left], + }); + let mut left_nodes = vec![tn]; + let tn2 = Arc::new(TreeNodeV2{ + fn_id: 0, + node_id: 0, + slf: vec![0], + total: vec![total_right], + }); + let mut right_nodes = vec![tn2]; + + let mut x_left_offsets = vec![0 as i64]; + let mut x_right_offsets = vec![0 as i64]; + let mut levels = vec![0 as i64]; + let mut name_location_cache: HashMap = HashMap::new(); + let mut res = FlameGraphDiff::default(); + res.left_ticks = total_left; + res.right_ticks = total_right; + res.total = total_left + total_right; + while left_nodes.len() > 0 { + let left = left_nodes.pop().unwrap(); + let right = right_nodes.pop().unwrap(); + let mut x_left_offset = x_left_offsets.pop().unwrap(); + let mut x_right_offset = x_right_offsets.pop().unwrap(); + let level = levels.pop().unwrap(); + let mut name: String = "total".to_string(); + if left.fn_id != 0 { + name = t1.names[t1.names_map.get(&left.fn_id).unwrap().clone() as usize].clone(); + } + if left.total[0] >= min_val || right.total[0] >= min_val || name == "other" { + let mut i = 0 as i64; + if !name_location_cache.contains_key(&name) { + res.names.push(name.clone().to_string()); + name_location_cache.insert(name, (res.names.len() - 1) as i64); + i = res.names.len() as i64 - 1; + } else { + i = *name_location_cache.get(name.as_str()).unwrap(); + } + if level == res.levels.len() as i64 { + res.levels.push(Level::default()) + } + if res.max_self < left.slf[0] { + res.max_self = left.slf[0]; + } + if res.max_self < right.slf[0] { + res.max_self = right.slf[0]; + } + let mut values = vec![x_left_offset, left.total[0], left.slf[0], + x_right_offset, right.total[0], right.slf[0], i]; + res.levels[level as usize].values.extend(values); + let mut other_left_total = 0 as i64; + let mut other_right_total = 0 as i64; + let mut nodes_len = 0; + if t1.nodes.contains_key(&left.node_id) { + nodes_len = t1.nodes.get(&left.node_id).unwrap().len().clone(); + } + for j in 0..nodes_len { + let _left = t1.nodes.get(&left.node_id).unwrap()[j].clone(); + let _right = t2.nodes.get(&left.node_id).unwrap()[j].clone(); + if _left.total[0] >= min_val || _right.total[0] >= min_val { + levels.insert(0, level + 1); + x_left_offsets.insert(0, x_left_offset); + x_right_offsets.insert(0, x_right_offset); + x_left_offset += _left.total[0].clone() as i64; + x_right_offset += _right.total[0].clone() as i64; + left_nodes.insert(0, _left.clone()); + right_nodes.insert(0, _right.clone()); + } else { + other_left_total += _left.total[0] as i64; + other_right_total += _right.total[0] as i64; + } + if other_left_total > 0 || other_right_total > 0 { + levels.insert(0, level + 1); + t1.add_name("other".to_string(), 1); + x_left_offsets.insert(0, x_left_offset); + left_nodes.insert(0, Arc::new(TreeNodeV2{ + fn_id: 1, + node_id: 1, + slf: vec![other_left_total as i64], + total: vec![other_left_total as i64], + })); + t2.add_name("other".to_string(), 1); + x_right_offsets.insert(0, x_right_offset); + right_nodes.insert(0, Arc::new(TreeNodeV2{ + fn_id: 1, + node_id: 1, + slf: vec![other_right_total as i64], + total: vec![other_right_total as i64], + })); + } + } + } + + } + for i in 0..res.levels.len() { + let mut j = 0; + let mut prev = 0 as i64; + while j < res.levels[i].values.len() { + res.levels[i].values[j] -= prev; + prev += res.levels[i].values[j] + res.levels[i].values[j+1]; + j += 7; + } + prev = 0; + j = 3; + while j < res.levels[i].values.len() { + res.levels[i].values[j] -= prev; + prev += res.levels[i].values[j] + res.levels[i].values[j+1]; + j += 7; + } + } + + res.encode_to_vec() +} + + + #[wasm_bindgen] pub fn export_tree(id: u32, sample_type: String) -> Vec { let p = panic::catch_unwind(|| { let mut ctx = CTX.lock().unwrap(); let mut res = SelectMergeStacktracesResponse::default(); upsert_tree(&mut ctx, id, vec![sample_type.clone()]); - let tree = ctx.get_mut(&id).unwrap(); + let tree = ctx.get_mut(&id).unwrap().lock().unwrap(); let mut fg = FlameGraph::default(); fg.names = tree.names.clone(); fg.max_self = tree.max_self[0 /* TODO */]; fg.total = 0; - let mut root_children: &Vec = &vec![]; + let mut root_children: &Vec> = &vec![]; if tree.nodes.contains_key(&(0u64)) { root_children = tree.nodes.get(&(0u64)).unwrap(); } for n in root_children.iter() { fg.total += n.total[0 /*TODO*/] as i64; } - bfs(tree, &mut fg.levels, sample_type.clone()); + bfs(&tree, &mut fg.levels, sample_type.clone()); res.flamegraph = Some(fg); return res.encode_to_vec(); }); diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index 61ea2a48..f979e10a 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -9,6 +9,7 @@ const { QrynBadRequest } = require('../lib/handlers/errors') const { clusterName } = require('../common') const logger = require('../lib/logger') const jsonParsers = require('./json_parsers') +const renderDiff = require('./render_diff') const { parser, wrapResponse, @@ -444,4 +445,5 @@ module.exports.init = (fastify) => { } settings.init(fastify) render.init(fastify) + renderDiff.init(fastify) } diff --git a/pyroscope/render.js b/pyroscope/render.js index ce5362c7..c37de551 100644 --- a/pyroscope/render.js +++ b/pyroscope/render.js @@ -248,5 +248,7 @@ const init = (fastify) => { } module.exports = { - init + init, + parseQuery, + toFlamebearer } diff --git a/pyroscope/render_diff.js b/pyroscope/render_diff.js new file mode 100644 index 00000000..c3c46c06 --- /dev/null +++ b/pyroscope/render_diff.js @@ -0,0 +1,75 @@ +const { parseQuery, toFlamebearer } = require('./render') +const { importStackTraces, newCtxIdx } = require('./merge_stack_traces') +const pprofBin = require('./pprof-bin/pkg') +const querierMessages = require('./querier_pb') +const types = require('./types/v1/types_pb') + +const renderDiff = async (req, res) => { + const [leftQuery, leftFromTimeSec, leftToTimeSec] = + parseParams(req.query.leftQuery, req.query.leftFrom, req.query.leftUntil); + const [rightQuery, rightFromTimeSec, rightToTimeSec] = + parseParams(req.query.rightQuery, req.query.rightFrom, req.query.rightUntil); + if (leftQuery.typeId != rightQuery.typeId) { + res.code(400).send('Different type IDs') + return + } + const leftCtxIdx = newCtxIdx() + await importStackTraces(leftQuery.typeDesc, '{' + leftQuery.labelSelector + '}', leftFromTimeSec, leftToTimeSec, req.log, leftCtxIdx, true) + const rightCtxIdx = newCtxIdx() + await importStackTraces(rightQuery.typeDesc, '{' + rightQuery.labelSelector + '}', rightFromTimeSec, rightToTimeSec, req.log, rightCtxIdx, true) + const flamegraphDiffBin = pprofBin.diff_tree(leftCtxIdx, rightCtxIdx, + `${leftQuery.typeDesc.sampleType}:${leftQuery.typeDesc.sampleUnit}`) + const profileType = new types.ProfileType() + profileType.setId(leftQuery.typeId) + profileType.setName(leftQuery.typeDesc.type) + profileType.setSampleType(leftQuery.typeDesc.sampleType) + profileType.setSampleUnit(leftQuery.typeDesc.sampleUnit) + profileType.setPeriodType(leftQuery.typeDesc.periodType) + profileType.setPeriodUnit(leftQuery.typeDesc.periodUnit) + const diff = querierMessages.FlameGraphDiff.deserializeBinary(flamegraphDiffBin) + return res.code(200).send(diffToFlamegraph(diff, profileType).flamebearerProfileV1) +} + +/** + * + * @param diff + * @param type + */ +const diffToFlamegraph = (diff, type) => { + const fg = new querierMessages.FlameGraph() + fg.setNamesList(diff.getNamesList()) + fg.setLevelsList(diff.getLevelsList()) + fg.setTotal(diff.getTotal()) + fg.setMaxSelf(diff.getMaxSelf()) + const fb = toFlamebearer(fg, type) + fb.flamebearerProfileV1.leftTicks = diff.getLeftticks() + fb.flamebearerProfileV1.rightTicks = diff.getRightticks() + fb.flamebearerProfileV1.metadata = { + ...(fb.flamebearerProfileV1.metadata || {}), + format: 'double' + } + return fb +} + +const parseParams = (query, from, until) => { + const parsedQuery = parseQuery(query) + const fromTimeSec = from + ? Math.floor(parseInt(from) / 1000) + : Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000) + const toTimeSec = until + ? Math.floor(parseInt(until) / 1000) + : Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000) + if (!parsedQuery) { + throw new Error('Invalid query') + } + return [parsedQuery, fromTimeSec, toTimeSec] +} + +const init = (fastify) => { + fastify.get('/pyroscope/render-diff', renderDiff) +} + +module.exports = { + renderDiff, + init +}