Skip to content

Commit

Permalink
Avoiding base64 translation when serializing Traces with postcard (#4029
Browse files Browse the repository at this point in the history
)

This PR also adds unit tests/proptest checking the serializing of
Vec<Span> with postcard.
  • Loading branch information
fulmicoton authored Oct 26, 2023
1 parent 63a2dca commit 7e61087
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 48 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-search = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true }
quickwit-storage = { workspace = true, features = ["testsuite"] }

[features]
testsuite = []
54 changes: 31 additions & 23 deletions quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl TraceId {
Self(bytes)
}

pub fn as_bytes(&self) -> &[u8] {
&self.0
pub fn into_bytes(self) -> [u8; 16] {
self.0
}

pub fn to_vec(&self) -> Vec<u8> {
Expand All @@ -48,34 +48,42 @@ impl TraceId {

impl Serialize for TraceId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let b64trace_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64trace_id)
if serializer.is_human_readable() {
let b64trace_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64trace_id)
} else {
self.0.serialize(serializer)
}
}
}

impl<'de> Deserialize<'de> for TraceId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
let b64trace_id = String::deserialize(deserializer)?;

if b64trace_id.len() != TraceId::BASE64_LENGTH {
let message = format!(
"base64 trace ID must be {} bytes long, got {}",
TraceId::BASE64_LENGTH,
b64trace_id.len()
);
return Err(de::Error::custom(message));
if deserializer.is_human_readable() {
let b64trace_id = String::deserialize(deserializer)?;
if b64trace_id.len() != TraceId::BASE64_LENGTH {
let message = format!(
"base64 trace ID must be {} bytes long, got {}",
TraceId::BASE64_LENGTH,
b64trace_id.len()
);
return Err(de::Error::custom(message));
}
let mut trace_id_bytes = [0u8; 16];
BASE64_STANDARD
// Using the unchecked version here because otherwise the engine gets the wrong size
// estimate and fails.
.decode_slice_unchecked(b64trace_id.as_bytes(), &mut trace_id_bytes)
.map_err(|error| {
let message = format!("failed to decode base64 trace ID: {:?}", error);
de::Error::custom(message)
})?;
Ok(TraceId(trace_id_bytes))
} else {
let trace_id_bytes: [u8; 16] = <[u8; 16]>::deserialize(deserializer)?;
Ok(TraceId(trace_id_bytes))
}
let mut trace_id = [0u8; 16];
BASE64_STANDARD
// Using the unchecked version here because otherwise the engine gets the wrong size
// estimate and fails.
.decode_slice_unchecked(b64trace_id.as_bytes(), &mut trace_id)
.map_err(|error| {
let message = format!("failed to decode base64 trace ID: {:?}", error);
de::Error::custom(message)
})?;
Ok(TraceId(trace_id))
}
}

Expand Down
19 changes: 10 additions & 9 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tantivy::fastfield::Column;
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};

use crate::filters::{create_timestamp_filter_builder, TimestampFilter, TimestampFilterBuilder};
use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector};
use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span};
use crate::GlobalDocAddress;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -414,7 +414,7 @@ impl SegmentCollector for QuickwitSegmentCollector {

let intermediate_aggregation_result = match self.aggregation {
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => {
let fruit = collector.harvest();
let fruit: Vec<Span> = collector.harvest();
let serialized =
postcard::to_allocvec(&fruit).expect("Collector fruit should be serializable.");
Some(serialized)
Expand Down Expand Up @@ -627,7 +627,7 @@ fn merge_intermediate_aggregation_result<'a>(
postcard::from_bytes(intermediate_aggregation_result).map_err(map_error)
})
.collect::<Result<_, _>>()?;
let merged_fruit = collector.merge_fruits(fruits)?;
let merged_fruit: Vec<Span> = collector.merge_fruits(fruits)?;
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
Some(serialized)
}
Expand Down Expand Up @@ -670,12 +670,13 @@ fn merge_leaf_responses(
return Ok(leaf_responses.pop().unwrap());
}

let merged_intermediate_aggregation_result = merge_intermediate_aggregation_result(
aggregations_opt,
leaf_responses
.iter()
.filter_map(|leaf_response| leaf_response.intermediate_aggregation_result.as_deref()),
)?;
let merged_intermediate_aggregation_result: Option<Vec<u8>> =
merge_intermediate_aggregation_result(
aggregations_opt,
leaf_responses.iter().filter_map(|leaf_response| {
leaf_response.intermediate_aggregation_result.as_deref()
}),
)?;
let num_attempted_splits = leaf_responses
.iter()
.map(|leaf_response| leaf_response.num_attempted_splits)
Expand Down
71 changes: 56 additions & 15 deletions quickwit/quickwit-search/src/find_trace_ids_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::cmp::{Ord, Ordering};
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
Expand Down Expand Up @@ -48,12 +47,6 @@ impl Span {
}
}

impl Hash for Span {
fn hash<H: Hasher>(&self, state: &mut H) {
self.trace_id.hash(state);
}
}

impl Ord for Span {
fn cmp(&self, other: &Self) -> Ordering {
self.span_timestamp
Expand Down Expand Up @@ -185,23 +178,24 @@ impl Collector for FindTraceIdsCollector {
}

fn merge_segment_fruits(mut segment_fruits: Vec<Vec<Span>>, num_traces: usize) -> Vec<Span> {
// Spans are ordered in reverse order of their timestamp.
for segment_fruit in &mut segment_fruits {
segment_fruit.sort_unstable()
}
let mut trace_ids = Vec::with_capacity(num_traces);
let mut seen_trace_ids = FnvHashSet::default();
let mut spans: Vec<Span> = Vec::with_capacity(num_traces);
let mut seen_trace_ids: FnvHashSet<TraceId> = FnvHashSet::default();

for trace_id in segment_fruits.into_iter().kmerge() {
if !seen_trace_ids.contains(&trace_id.trace_id) {
seen_trace_ids.insert(trace_id.trace_id);
trace_ids.push(trace_id);
for span in segment_fruits.into_iter().kmerge() {
if !seen_trace_ids.contains(&span.trace_id) {
seen_trace_ids.insert(span.trace_id);
spans.push(span);

if trace_ids.len() == num_traces {
if spans.len() == num_traces {
break;
}
}
}
trace_ids
spans
}

pub struct FindTraceIdsSegmentCollector {
Expand Down Expand Up @@ -258,6 +252,7 @@ struct SelectTraceIds {
select_workbench: Vec<TraceIdTermOrd>,
running_term_ord: Option<TermOrd>,
running_span_timestamp: DateTime,
// This is the lowest timestamp required to enter our top K.
span_timestamp_sentinel: DateTime,
}

Expand Down Expand Up @@ -367,6 +362,7 @@ mod serde_datetime {
#[cfg(test)]
mod tests {
use tantivy::time::OffsetDateTime;
use tantivy::DateTime;

use super::*;
use crate::collector::QuickwitAggregations;
Expand Down Expand Up @@ -569,4 +565,49 @@ mod tests {
);
}
}

use proptest::prelude::*;

fn span_strategy() -> impl Strategy<Value = Span> {
let trace_id_strat = proptest::array::uniform16(any::<u8>());
let span_timestamp_strat = any::<i64>();
(trace_id_strat, span_timestamp_strat).prop_map(|(trace_id, span_timestamp)| {
Span::new(
TraceId::new(trace_id),
tantivy::DateTime::from_timestamp_nanos(span_timestamp),
)
})
}

fn test_postcard_aux<I: Serialize + std::fmt::Debug + for<'a> Deserialize<'a> + Eq>(item: &I) {
let payload = postcard::to_allocvec(item).unwrap();
let deserialized_item: I = postcard::from_bytes(&payload).unwrap();
assert_eq!(item, &deserialized_item);
}

#[test]
fn test_proptest_spans_postcard_empty_vec() {
test_postcard_aux(&Vec::<Span>::new());
}

#[test]
fn test_proptest_spans_postcard_extreme_values() {
test_postcard_aux(&vec![Span {
trace_id: TraceId::new([255u8; 16]),
span_timestamp: tantivy::DateTime::from_timestamp_nanos(i64::MIN),
}]);
}

proptest::proptest! {

#[test]
fn test_proptest_spans_postcard_serdeser(span in span_strategy()) {
test_postcard_aux(&span);
}

#[test]
fn test_proptest_spans_vec_postcard_serdeser(spans in proptest::collection::vec(span_strategy(), 0..100)) {
test_postcard_aux(&spans);
}
}
}

0 comments on commit 7e61087

Please sign in to comment.