diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 4b0243149da..46409fb33ce 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5957,12 +5957,14 @@ dependencies = [ "anyhow", "assert-json-diff 2.0.2", "async-trait", + "base64 0.21.5", "bytes", "bytesize", "elasticsearch-dsl", "futures", "futures-util", "go-parse-duration", + "hex", "http-serde", "humantime", "hyper", diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 6e93ad7a2a9..3584623ff6e 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -12,12 +12,14 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } elasticsearch-dsl = "0.4.15" futures = { workspace = true } futures-util = { workspace = true } go-parse-duration = "0.1.1" +hex = { workspace = true } humantime = { workspace = true } http-serde = { workspace = true } hyper = { workspace = true } diff --git a/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json b/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json new file mode 100644 index 00000000000..487fd7e8a38 --- /dev/null +++ b/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json @@ -0,0 +1,168 @@ +{ + "traceID": "0000000000000001", + "spans": [ + { + "traceID": "0000000000000001", + "spanID": "0000000000000001", + "operationName": "test-general-conversion", + "references": [], + "startTime": 1485467191639875, + "duration": 5, + "flags": 0, + "tags": [], + "logs": [ + { + "timestamp": 1485467191639875, + "fields": [ + { + "key": "event", + "type": "string", + "value": "some-event" + } + ] + }, + { + "timestamp": 1485467191639875, + "fields": [ + { + "key": "x", + "type": "string", + "value": "y" + } + ] + } + ], + "processID": "p1", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000002", + "operationName": "some-operation", + "references": [], + "flags": 0, + "startTime": 1485467191639875, + "duration": 5, + "tags": [ + { + "key": "peer.service", + "type": "string", + "value": "service-y" + }, + { + "key": "peer.ipv4", + "type": "int64", + "value": 23456 + }, + { + "key": "error", + "type": "bool", + "value": true + }, + { + "key": "temperature", + "type": "float64", + "value": 72.5 + }, + { + "key": "javascript_limit", + "type": "int64", + "value": "9223372036854775222" + }, + { + "key": "blob", + "type": "binary", + "value": "AAAwOQ==" + } + ], + "logs": [], + "processID": "p1", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000003", + "operationName": "some-operation", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + } + ], + "startTime": 1485467191639875, + "duration": 5, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000004", + "operationName": "reference-test", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "00000000000000ff", + "spanID": "00000000000000ff" + }, + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + }, + { + "refType": "FOLLOWS_FROM", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + } + ], + "startTime": 1485467191639875, + "duration": 5, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [ + "some span warning" + ] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000005", + "operationName": "preserveParentID-test", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000004" + } + ], + "startTime": 1485467191639875, + "duration": 4, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [ + "some span warning" + ] + } + ], + "processes": { + "p1": { + "serviceName": "service-x", + "key": "p1", + "tags": [] + }, + "p2": { + "serviceName": "service-y", + "key": "p2", + "tags": [] + } + }, + "warnings": [ + ] + } diff --git a/quickwit/quickwit-serve/src/jaeger_api/model.rs b/quickwit/quickwit-serve/src/jaeger_api/model.rs index 651b8e3e111..9c6692524d6 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/model.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/model.rs @@ -17,23 +17,32 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; -use std::hash::{Hash, Hasher}; -use std::ops::Add; +use base64::prelude::{Engine, BASE64_STANDARD}; use hyper::StatusCode; use itertools::Itertools; use quickwit_proto::jaeger::api_v2::{KeyValue, Log, Process, Span, SpanRef, ValueType}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use serde_with::serde_as; use crate::jaeger_api::util::{ bytes_to_hex_string, from_well_known_duration, from_well_known_timestamp, }; -pub const ALL_OPERATIONS: &str = ""; -pub const DEFAULT_NUMBER_OF_TRACES: i32 = 20; +pub(super) const ALL_OPERATIONS: &str = ""; +pub(super) const DEFAULT_NUMBER_OF_TRACES: i32 = 20; + +pub(super) fn build_jaeger_traces(spans: Vec) -> anyhow::Result> { + let jaeger_traces = spans + .into_iter() + .group_by(|span| span.trace_id.clone()) + .into_iter() + .map(|(span_id, group)| JaegerTrace::new(span_id, group.collect_vec())) + .collect_vec(); + Ok(jaeger_traces) +} #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] @@ -61,7 +70,7 @@ pub struct TracesSearchQueryParams { // Jaeger Model for UI // Source: https://github.com/jaegertracing/jaeger/blob/main/model/json/model.go#L82 -#[derive(Clone, Default, Debug, Serialize, Deserialize, utoipa::IntoParams)] +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize, utoipa::IntoParams)] #[serde(rename_all = "camelCase")] pub struct JaegerTrace { #[serde(rename = "traceID")] @@ -73,14 +82,12 @@ pub struct JaegerTrace { impl JaegerTrace { pub fn new(trace_id: String, mut spans: Vec) -> Self { - let mut count = 0; - let mut hashcode_to_processes: HashMap> = HashMap::new(); - Self::handle_span_processes(&mut spans, &mut count, &mut hashcode_to_processes); + let processes = Self::build_process_map(&mut spans); JaegerTrace { trace_id, spans, - processes: Self::create_key_to_process_mapping(&hashcode_to_processes), - warnings: vec![], + processes, + warnings: Vec::new(), } } @@ -89,53 +96,33 @@ impl JaegerTrace { /// processed `JaegerProcess` objects and assigns a new key to each unique `service_name` value. /// The logic has been replicated from /// https://github.com/jaegertracing/jaeger/blob/995231c42cadd70bce2bbbf02579e33f6e6329c8/model/converter/json/process_hashtable.go#L37 - fn handle_span_processes( - spans: &mut [JaegerSpan], - count: &mut i32, - acc: &mut HashMap>, - ) { + /// TODO: use also tags to identify processes. + fn build_process_map(spans: &mut [JaegerSpan]) -> HashMap { + let mut service_name_to_process_id: HashMap = HashMap::new(); + let mut process_map: HashMap = HashMap::new(); + let mut process_counter: i32 = 0; for span in spans.iter_mut() { let mut current_process = span.process.clone(); - let hash = current_process.hash_code(); - if let Some(entries) = acc.get_mut(&hash) { - if let Some(existing_p) = entries - .iter_mut() - .find(|p| p.service_name == current_process.service_name) - { - current_process.key = existing_p.key.clone(); - span.update_process_id(existing_p.key.clone()); - } else { - let new_key = JaegerProcess::next_key(&count.add(1)); - span.update_process_id(new_key.clone()); - current_process.key = new_key.clone(); - entries.push(current_process.clone()); - } + if let Some(process_id) = service_name_to_process_id.get(¤t_process.service_name) + { + span.update_process_id(process_id.clone()); } else { - let new_key = JaegerProcess::next_key(&count.add(1)); - current_process.key = new_key.clone(); - span.update_process_id(new_key.clone()); - acc.insert(hash, vec![current_process.clone()]); - } - } - } - - /// Get the accumulated mapping of `key` to the corresponding `JaegerProcess` - /// The logic has been replicated from - // https://github.com/jaegertracing/jaeger/blob/995231c42cadd70bce2bbbf02579e33f6e6329c8/model/converter/json/process_hashtable.go#L59 - fn create_key_to_process_mapping( - data: &HashMap>, - ) -> HashMap { - let mut result: HashMap = HashMap::new(); - for processes in data.values() { - for process in processes { - result.insert(process.key.clone(), process.clone()); + process_counter += 1; + current_process.key = format!("p{}", process_counter); + span.update_process_id(current_process.key.clone()); + process_map.insert(current_process.key.clone(), current_process.clone()); + service_name_to_process_id.insert( + current_process.service_name.clone(), + current_process.key.clone(), + ); } } - result + process_map } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde_as] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct JaegerSpan { #[serde(rename = "traceID")] @@ -144,43 +131,40 @@ pub struct JaegerSpan { span_id: String, operation_name: String, references: Vec, + #[serde(default)] flags: u32, start_time: i64, // start_time since Unix epoch duration: i64, // microseconds tags: Vec, logs: Vec, + #[serde(default)] #[serde(skip_serializing)] process: JaegerProcess, #[serde(rename = "processID")] - process_id: String, + process_id: Option, pub warnings: Vec, } -impl JaegerSpan { - pub fn find_better_name_for_pb_convert(span: &Span) -> Self { +impl TryFrom for JaegerSpan { + type Error = anyhow::Error; + fn try_from(span: Span) -> Result { let references = span .references .iter() - .map(JaegerSpanRef::convert_from_proto) + .map(JaegerSpanRef::from) .collect_vec(); - let tags = span - .tags - .iter() - .map(JaegerKeyValue::convert_from_proto) - .collect_vec(); + let tags = span.tags.iter().map(JaegerKeyValue::from).collect_vec(); - let logs = span - .logs - .iter() - .map(JaegerLog::convert_from_proto) - .collect_vec(); + let logs = span.logs.iter().map(JaegerLog::from).collect_vec(); // TODO what's the best way to handle unwrap here? - let process: JaegerProcess = - JaegerProcess::convert_from_proto(span.process.clone().unwrap()); + let process: JaegerProcess = span + .process + .map(JaegerProcess::from) + .unwrap_or_else(JaegerProcess::default); - Self { + Ok(Self { trace_id: bytes_to_hex_string(&span.trace_id), span_id: bytes_to_hex_string(&span.span_id), operation_name: span.operation_name.clone(), @@ -191,27 +175,30 @@ impl JaegerSpan { tags, logs, process, - process_id: "no_value".to_string(), /* TODO we need to initialize it somehow to - * mutate it further */ + process_id: None, warnings: span.warnings.iter().map(|s| s.to_string()).collect_vec(), - } + }) } +} +impl JaegerSpan { pub fn update_process_id(&mut self, new_process_id: String) { - self.process_id = new_process_id + self.process_id = Some(new_process_id) } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct JaegerSpanRef { + #[serde(rename = "traceID")] trace_id: String, + #[serde(rename = "spanID")] span_id: String, ref_type: String, } -impl JaegerSpanRef { - fn convert_from_proto(sr: &SpanRef) -> Self { +impl From<&SpanRef> for JaegerSpanRef { + fn from(sr: &SpanRef) -> Self { Self { trace_id: bytes_to_hex_string(sr.trace_id.as_slice()), span_id: bytes_to_hex_string(sr.span_id.as_slice()), @@ -224,105 +211,106 @@ impl JaegerSpanRef { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct JaegerKeyValue { key: String, #[serde(rename = "type")] - type_: String, + value_type: String, value: Value, } -impl JaegerKeyValue { - fn convert_from_proto(kv: &KeyValue) -> Self { +impl From<&KeyValue> for JaegerKeyValue { + fn from(kv: &KeyValue) -> Self { match kv.v_type { // String = 0, 0 => Self { key: kv.key.to_string(), - type_: ValueType::String.as_str_name().to_lowercase(), + value_type: ValueType::String.as_str_name().to_lowercase(), value: json!(kv.v_str.to_string()), }, // Bool = 1, 1 => Self { key: kv.key.to_string(), - type_: ValueType::Bool.as_str_name().to_lowercase(), + value_type: ValueType::Bool.as_str_name().to_lowercase(), value: json!(kv.v_bool), }, // Int64 = 2, - 2 => Self { - key: kv.key.to_string(), - type_: ValueType::Int64.as_str_name().to_lowercase(), - value: json!(kv.v_int64), - }, + 2 => { + if kv.v_int64 > 9007199254740991 { + Self { + key: kv.key.to_string(), + value_type: ValueType::Int64.as_str_name().to_lowercase(), + value: json!(kv.v_int64.to_string()), + } + } else { + Self { + key: kv.key.to_string(), + value_type: ValueType::Int64.as_str_name().to_lowercase(), + value: json!(kv.v_int64), + } + } + } // Float64 = 3, 3 => Self { key: kv.key.to_string(), - type_: ValueType::Float64.as_str_name().to_lowercase(), + value_type: ValueType::Float64.as_str_name().to_lowercase(), value: json!(kv.v_float64), }, // Binary = 4, 4 => Self { key: kv.key.to_string(), - type_: ValueType::Binary.as_str_name().to_lowercase(), - value: json!(kv.v_binary), + value_type: ValueType::Binary.as_str_name().to_lowercase(), + value: serde_json::Value::String(BASE64_STANDARD.encode(kv.v_binary.as_slice())), }, _ => Self { key: "no_value".to_string(), - type_: "unsupported_type".to_string(), + value_type: "unsupported_type".to_string(), value: Default::default(), }, } } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct JaegerLog { timestamp: i64, // microseconds since Unix epoch fields: Vec, } -impl JaegerLog { - fn convert_from_proto(log: &Log) -> Self { +impl From<&Log> for JaegerLog { + fn from(log: &Log) -> Self { Self { timestamp: from_well_known_timestamp(&log.timestamp), - fields: log - .fields - .iter() - .map(JaegerKeyValue::convert_from_proto) - .collect_vec(), + fields: log.fields.iter().map(JaegerKeyValue::from).collect_vec(), } } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct JaegerProcess { service_name: String, - #[serde(skip_serializing)] key: String, tags: Vec, } -impl JaegerProcess { - fn convert_from_proto(process: Process) -> Self { +impl Default for JaegerProcess { + fn default() -> Self { Self { - service_name: process.service_name.to_string(), + service_name: "none".to_string(), key: "".to_string(), - tags: process - .tags - .iter() - .map(JaegerKeyValue::convert_from_proto) - .collect_vec(), + tags: vec![], } } +} - pub fn next_key(count: &i32) -> String { - format!("p{}", count) - } - - pub fn hash_code(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - self.service_name.hash(&mut hasher); - hasher.finish() +impl From for JaegerProcess { + fn from(process: Process) -> Self { + Self { + service_name: process.service_name.to_string(), + key: "".to_string(), + tags: process.tags.iter().map(JaegerKeyValue::from).collect_vec(), + } } } @@ -333,11 +321,230 @@ pub struct JaegerError { pub message: String, } -// impl JaegerError { // TODO remove? -// pub fn internal_jaeger_error() -> Self { -// JaegerError { -// status: StatusCode::INTERNAL_SERVER_ERROR, -// message: "Jaeger is not available".to_string(), -// } -// } -// } +impl From for JaegerError { + fn from(error: anyhow::Error) -> Self { + Self { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: error.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::jaeger::api_v2::Log; + + use crate::jaeger_api::model::{build_jaeger_traces, JaegerSpan}; + + #[test] + fn test_convert_grpc_jaeger_spans_into_jaeger_ui_model() { + let file_content = std::fs::read_to_string(get_jaeger_ui_trace_filepath()).unwrap(); + let expected_jaeger_trace: serde_json::Value = serde_json::from_str(&file_content).unwrap(); + let grpc_spans = create_grpc_spans(); + let jaeger_spans: Vec = grpc_spans + .iter() + .map(|span| super::JaegerSpan::try_from(span.clone()).unwrap()) + .collect(); + let traces = build_jaeger_traces(jaeger_spans).unwrap(); + let trace_json: serde_json::Value = serde_json::to_value(traces[0].clone()).unwrap(); + assert_json_diff::assert_json_eq!(expected_jaeger_trace, trace_json); + } + + fn get_jaeger_ui_trace_filepath() -> String { + format!( + "{}/resources/tests/jaeger_ui_trace.json", + env!("CARGO_MANIFEST_DIR"), + ) + } + + fn create_grpc_spans() -> Vec { + let span_0 = quickwit_proto::jaeger::api_v2::Span { + trace_id: vec![1], + span_id: vec![1], + operation_name: "test-general-conversion".to_string(), + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-x".to_string(), + tags: vec![], + }), + logs: vec![ + Log { + timestamp: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + fields: vec![quickwit_proto::jaeger::api_v2::KeyValue { + key: "event".to_string(), + v_type: 0, + v_str: "some-event".to_string(), + ..Default::default() + }], + }, + Log { + timestamp: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + fields: vec![quickwit_proto::jaeger::api_v2::KeyValue { + key: "x".to_string(), + v_type: 0, + v_str: "y".to_string(), + ..Default::default() + }], + }, + ], + ..Default::default() + }; + let span_1 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "some-operation".to_string(), + trace_id: vec![1], + span_id: vec![2], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-x".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + tags: vec![ + quickwit_proto::jaeger::api_v2::KeyValue { + key: "peer.service".to_string(), + v_type: 0, + v_str: "service-y".to_string(), + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "peer.ipv4".to_string(), + v_type: 2, + v_int64: 23456, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "error".to_string(), + v_type: 1, + v_bool: true, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "temperature".to_string(), + v_type: 3, + v_float64: 72.5, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "javascript_limit".to_string(), + v_type: 2, + v_int64: 9223372036854775222, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "blob".to_string(), + v_type: 4, + v_binary: vec![0b0, 0b0, 0b00110000, 0b00111001], + ..Default::default() + }, + ], + ..Default::default() + }; + let span_2 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "some-operation".to_string(), + trace_id: vec![1], + span_id: vec![3], + references: vec![quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 0, + }], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + ..Default::default() + }; + let span_3 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "reference-test".to_string(), + trace_id: vec![1], + span_id: vec![4], + references: vec![ + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![255], + span_id: vec![255], + ref_type: 0, + }, + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 0, + }, + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 1, + }, + ], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + warnings: vec!["some span warning".to_string()], + ..Default::default() + }; + let span_4 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "preserveParentID-test".to_string(), + trace_id: vec![1], + span_id: vec![5], + references: vec![quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![4], + ref_type: 0, + }], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 4000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + warnings: vec!["some span warning".to_string()], + ..Default::default() + }; + vec![span_0, span_1, span_2, span_3, span_4] + } +} diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 23ba0124c81..e8316e8dbb8 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -20,7 +20,6 @@ use hyper::StatusCode; use itertools::Itertools; use quickwit_jaeger::JaegerService; -use quickwit_proto::jaeger::api_v2::Span; use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; use quickwit_proto::jaeger::storage::v1::{ FindTracesRequest, GetOperationsRequest, GetServicesRequest, GetTraceRequest, @@ -28,17 +27,17 @@ use quickwit_proto::jaeger::storage::v1::{ }; use quickwit_proto::tonic; use quickwit_proto::tonic::Request; +use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::info; use warp::{Filter, Rejection}; +use super::model::build_jaeger_traces; use crate::jaeger_api::model::{ JaegerError, JaegerResponseBody, JaegerSpan, JaegerTrace, TracesSearchQueryParams, ALL_OPERATIONS, DEFAULT_NUMBER_OF_TRACES, }; -use crate::jaeger_api::util::{ - hex_string_to_bytes, parse_duration_with_units, to_well_known_timestamp, -}; +use crate::jaeger_api::util::{parse_duration_with_units, to_well_known_timestamp}; use crate::json_api_response::JsonApiResponse; use crate::{require, BodyFormat}; @@ -192,47 +191,66 @@ async fn jaeger_traces_search( num_traces: search_params.limit.unwrap_or(DEFAULT_NUMBER_OF_TRACES), }; let find_traces_request = FindTracesRequest { query: Some(query) }; - - let mut span_stream = jaeger_service + let spans_chunk_stream = jaeger_service .find_traces(with_tonic(find_traces_request)) .await - .unwrap() + .map_err(|error| { + info!(error = ?error, "failed to fetch traces"); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to fetch traces".to_string(), + } + })? .into_inner(); - let SpansResponseChunk { spans } = span_stream.next().await.unwrap().unwrap(); + let jaeger_spans = collect_and_build_jaeger_spans(spans_chunk_stream).await?; + let jaeger_traces: Vec = build_jaeger_traces(jaeger_spans)?; Ok(JaegerResponseBody { - data: create_jaeger_trace(spans), + data: jaeger_traces, }) } +async fn collect_and_build_jaeger_spans( + mut spans_chunk_stream: ReceiverStream>, +) -> anyhow::Result> { + let mut all_spans = Vec::::new(); + while let Some(Ok(SpansResponseChunk { spans })) = spans_chunk_stream.next().await { + let jaeger_spans: Vec = + spans.into_iter().map(JaegerSpan::try_from).try_collect()?; + all_spans.extend(jaeger_spans); + } + Ok(all_spans) +} + async fn jaeger_get_trace_by_id( - trace_id_json: String, + trace_id_string: String, jaeger_service: JaegerService, ) -> Result>, JaegerError> { - let trace_id = hex_string_to_bytes(trace_id_json.as_str()); + let trace_id = hex::decode(trace_id_string).map_err(|error| { + info!(error = ?error, "failed to decode trace id"); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to decode trace id".to_string(), + } + })?; let get_trace_request = GetTraceRequest { trace_id }; - let mut span_stream = jaeger_service + let spans_chunk_stream = jaeger_service .get_trace(with_tonic(get_trace_request)) .await - .unwrap() + .map_err(|error| { + info!(error = ?error, "failed to fetch trace"); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to fetch trace".to_string(), + } + })? .into_inner(); - let SpansResponseChunk { spans } = span_stream.next().await.unwrap().unwrap(); + let jaeger_spans = collect_and_build_jaeger_spans(spans_chunk_stream).await?; + let jaeger_traces: Vec = build_jaeger_traces(jaeger_spans)?; Ok(JaegerResponseBody { - data: create_jaeger_trace(spans), + data: jaeger_traces, }) } -fn create_jaeger_trace(spans: Vec) -> Vec { - let result: Vec = spans - .iter() - .map(JaegerSpan::find_better_name_for_pb_convert) - .group_by(|span| span.trace_id.clone()) - .into_iter() - .map(|(span_id, group)| JaegerTrace::new(span_id, group.collect_vec())) - .collect_vec(); - info!("traces {:?}", result); - result -} - fn make_jaeger_api_response( jaeger_result: Result, format: BodyFormat, diff --git a/quickwit/quickwit-serve/src/jaeger_api/util.rs b/quickwit/quickwit-serve/src/jaeger_api/util.rs index a3bda9f1f60..ba20f4907d9 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/util.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/util.rs @@ -20,19 +20,8 @@ use go_parse_duration::parse_duration; use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp}; -// TODO move to `TraceId` and simplify if possible -pub fn hex_string_to_bytes(hex_string: &str) -> Vec { - if hex_string.len() % 2 != 0 { - panic!("Hex string must have an even number of characters"); - } - (0..hex_string.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&hex_string[i..i + 2], 16).expect("Failed to parse hex")) - .collect() -} - pub fn bytes_to_hex_string(bytes: &[u8]) -> String { - bytes.iter().map(|b| format!("{:02x}", b)).collect() + format!("{:0>16}", hex::encode(bytes)) } pub fn to_well_known_timestamp(timestamp_nanos: i64) -> WellKnownTimestamp { @@ -43,7 +32,10 @@ pub fn to_well_known_timestamp(timestamp_nanos: i64) -> WellKnownTimestamp { pub fn from_well_known_timestamp(timestamp_opt: &Option) -> i64 { match timestamp_opt { - Some(timestamp) => timestamp.seconds * 1_000_000 + i64::from(timestamp.nanos / 1000), + Some(timestamp) => timestamp + .seconds + .saturating_mul(1_000_000) + .saturating_add(i64::from(timestamp.nanos / 1000)), None => 0i64, } }