From 7e61087707d69b8f00280d6a187a1af4a35fea15 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 26 Oct 2023 15:13:14 +0900 Subject: [PATCH] Avoiding base64 translation when serializing Traces with postcard (#4029) This PR also adds unit tests/proptest checking the serializing of Vec with postcard. --- quickwit/quickwit-jaeger/Cargo.toml | 2 +- .../src/otlp/trace_id.rs | 54 ++++++++------ quickwit/quickwit-search/src/collector.rs | 19 ++--- .../src/find_trace_ids_collector.rs | 71 +++++++++++++++---- 4 files changed, 98 insertions(+), 48 deletions(-) diff --git a/quickwit/quickwit-jaeger/Cargo.toml b/quickwit/quickwit-jaeger/Cargo.toml index 66f587fca93..8b6c54bbf09 100644 --- a/quickwit/quickwit-jaeger/Cargo.toml +++ b/quickwit/quickwit-jaeger/Cargo.toml @@ -44,7 +44,7 @@ quickwit-indexing = { workspace = true, features = ["testsuite"] } quickwit-ingest = { workspace = true } quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-search = { workspace = true, features = ["testsuite"] } -quickwit-storage = { workspace = true } +quickwit-storage = { workspace = true, features = ["testsuite"] } [features] testsuite = [] diff --git a/quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs b/quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs index 781e717859e..6e30b64b295 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs @@ -33,8 +33,8 @@ impl TraceId { Self(bytes) } - pub fn as_bytes(&self) -> &[u8] { - &self.0 + pub fn into_bytes(self) -> [u8; 16] { + self.0 } pub fn to_vec(&self) -> Vec { @@ -48,34 +48,42 @@ impl TraceId { impl Serialize for TraceId { fn serialize(&self, serializer: S) -> Result { - let b64trace_id = BASE64_STANDARD.encode(self.0); - serializer.serialize_str(&b64trace_id) + if serializer.is_human_readable() { + let b64trace_id = BASE64_STANDARD.encode(self.0); + serializer.serialize_str(&b64trace_id) + } else { + self.0.serialize(serializer) + } } } impl<'de> Deserialize<'de> for TraceId { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { - let b64trace_id = String::deserialize(deserializer)?; - - if b64trace_id.len() != TraceId::BASE64_LENGTH { - let message = format!( - "base64 trace ID must be {} bytes long, got {}", - TraceId::BASE64_LENGTH, - b64trace_id.len() - ); - return Err(de::Error::custom(message)); + if deserializer.is_human_readable() { + let b64trace_id = String::deserialize(deserializer)?; + if b64trace_id.len() != TraceId::BASE64_LENGTH { + let message = format!( + "base64 trace ID must be {} bytes long, got {}", + TraceId::BASE64_LENGTH, + b64trace_id.len() + ); + return Err(de::Error::custom(message)); + } + let mut trace_id_bytes = [0u8; 16]; + BASE64_STANDARD + // Using the unchecked version here because otherwise the engine gets the wrong size + // estimate and fails. + .decode_slice_unchecked(b64trace_id.as_bytes(), &mut trace_id_bytes) + .map_err(|error| { + let message = format!("failed to decode base64 trace ID: {:?}", error); + de::Error::custom(message) + })?; + Ok(TraceId(trace_id_bytes)) + } else { + let trace_id_bytes: [u8; 16] = <[u8; 16]>::deserialize(deserializer)?; + Ok(TraceId(trace_id_bytes)) } - let mut trace_id = [0u8; 16]; - BASE64_STANDARD - // Using the unchecked version here because otherwise the engine gets the wrong size - // estimate and fails. - .decode_slice_unchecked(b64trace_id.as_bytes(), &mut trace_id) - .map_err(|error| { - let message = format!("failed to decode base64 trace ID: {:?}", error); - de::Error::custom(message) - })?; - Ok(TraceId(trace_id)) } } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index eb314867704..d91fbf1b8e0 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -37,7 +37,7 @@ use tantivy::fastfield::Column; use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError}; use crate::filters::{create_timestamp_filter_builder, TimestampFilter, TimestampFilterBuilder}; -use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector}; +use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span}; use crate::GlobalDocAddress; #[derive(Clone, Debug)] @@ -414,7 +414,7 @@ impl SegmentCollector for QuickwitSegmentCollector { let intermediate_aggregation_result = match self.aggregation { Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { - let fruit = collector.harvest(); + let fruit: Vec = collector.harvest(); let serialized = postcard::to_allocvec(&fruit).expect("Collector fruit should be serializable."); Some(serialized) @@ -627,7 +627,7 @@ fn merge_intermediate_aggregation_result<'a>( postcard::from_bytes(intermediate_aggregation_result).map_err(map_error) }) .collect::>()?; - let merged_fruit = collector.merge_fruits(fruits)?; + let merged_fruit: Vec = collector.merge_fruits(fruits)?; let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?; Some(serialized) } @@ -670,12 +670,13 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } - let merged_intermediate_aggregation_result = merge_intermediate_aggregation_result( - aggregations_opt, - leaf_responses - .iter() - .filter_map(|leaf_response| leaf_response.intermediate_aggregation_result.as_deref()), - )?; + let merged_intermediate_aggregation_result: Option> = + merge_intermediate_aggregation_result( + aggregations_opt, + leaf_responses.iter().filter_map(|leaf_response| { + leaf_response.intermediate_aggregation_result.as_deref() + }), + )?; let num_attempted_splits = leaf_responses .iter() .map(|leaf_response| leaf_response.num_attempted_splits) diff --git a/quickwit/quickwit-search/src/find_trace_ids_collector.rs b/quickwit/quickwit-search/src/find_trace_ids_collector.rs index d88da060664..f6736edcb0d 100644 --- a/quickwit/quickwit-search/src/find_trace_ids_collector.rs +++ b/quickwit/quickwit-search/src/find_trace_ids_collector.rs @@ -19,7 +19,6 @@ use std::cmp::{Ord, Ordering}; use std::collections::HashSet; -use std::hash::{Hash, Hasher}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; @@ -48,12 +47,6 @@ impl Span { } } -impl Hash for Span { - fn hash(&self, state: &mut H) { - self.trace_id.hash(state); - } -} - impl Ord for Span { fn cmp(&self, other: &Self) -> Ordering { self.span_timestamp @@ -185,23 +178,24 @@ impl Collector for FindTraceIdsCollector { } fn merge_segment_fruits(mut segment_fruits: Vec>, num_traces: usize) -> Vec { + // Spans are ordered in reverse order of their timestamp. for segment_fruit in &mut segment_fruits { segment_fruit.sort_unstable() } - let mut trace_ids = Vec::with_capacity(num_traces); - let mut seen_trace_ids = FnvHashSet::default(); + let mut spans: Vec = Vec::with_capacity(num_traces); + let mut seen_trace_ids: FnvHashSet = FnvHashSet::default(); - for trace_id in segment_fruits.into_iter().kmerge() { - if !seen_trace_ids.contains(&trace_id.trace_id) { - seen_trace_ids.insert(trace_id.trace_id); - trace_ids.push(trace_id); + for span in segment_fruits.into_iter().kmerge() { + if !seen_trace_ids.contains(&span.trace_id) { + seen_trace_ids.insert(span.trace_id); + spans.push(span); - if trace_ids.len() == num_traces { + if spans.len() == num_traces { break; } } } - trace_ids + spans } pub struct FindTraceIdsSegmentCollector { @@ -258,6 +252,7 @@ struct SelectTraceIds { select_workbench: Vec, running_term_ord: Option, running_span_timestamp: DateTime, + // This is the lowest timestamp required to enter our top K. span_timestamp_sentinel: DateTime, } @@ -367,6 +362,7 @@ mod serde_datetime { #[cfg(test)] mod tests { use tantivy::time::OffsetDateTime; + use tantivy::DateTime; use super::*; use crate::collector::QuickwitAggregations; @@ -569,4 +565,49 @@ mod tests { ); } } + + use proptest::prelude::*; + + fn span_strategy() -> impl Strategy { + let trace_id_strat = proptest::array::uniform16(any::()); + let span_timestamp_strat = any::(); + (trace_id_strat, span_timestamp_strat).prop_map(|(trace_id, span_timestamp)| { + Span::new( + TraceId::new(trace_id), + tantivy::DateTime::from_timestamp_nanos(span_timestamp), + ) + }) + } + + fn test_postcard_aux Deserialize<'a> + Eq>(item: &I) { + let payload = postcard::to_allocvec(item).unwrap(); + let deserialized_item: I = postcard::from_bytes(&payload).unwrap(); + assert_eq!(item, &deserialized_item); + } + + #[test] + fn test_proptest_spans_postcard_empty_vec() { + test_postcard_aux(&Vec::::new()); + } + + #[test] + fn test_proptest_spans_postcard_extreme_values() { + test_postcard_aux(&vec![Span { + trace_id: TraceId::new([255u8; 16]), + span_timestamp: tantivy::DateTime::from_timestamp_nanos(i64::MIN), + }]); + } + + proptest::proptest! { + + #[test] + fn test_proptest_spans_postcard_serdeser(span in span_strategy()) { + test_postcard_aux(&span); + } + + #[test] + fn test_proptest_spans_vec_postcard_serdeser(spans in proptest::collection::vec(span_strategy(), 0..100)) { + test_postcard_aux(&spans); + } + } }