diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index e630408fe7f..cb8e95483c4 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -232,6 +232,7 @@ impl ServiceError for quickwit_actors::AskError } impl search::SortOrder { + #[inline(always)] pub fn compare_opt(&self, this: &Option, other: &Option) -> Ordering { match (this, other) { (Some(this), Some(other)) => self.compare(this, other), diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 70a6e0f6e4e..360e08183c2 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -24,7 +24,8 @@ use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; use quickwit_doc_mapper::{DocMapper, WarmupInfo}; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitSearchError, + LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue, + SplitSearchError, }; use serde::Deserialize; use tantivy::aggregation::agg_req::{get_fast_field_names, Aggregations}; @@ -295,7 +296,7 @@ pub struct QuickwitSegmentCollector { split_id: String, score_extractor: SortingFieldExtractorPair, // PartialHits in this heap don't contain a split_id yet. - top_k_hits: TopK, + top_k_hits: TopK, segment_ord: u32, timestamp_filter_opt: Option, aggregation: Option, @@ -307,13 +308,9 @@ impl QuickwitSegmentCollector { let (sort_value, sort_value2) = self.score_extractor.extract_typed_sort_value(doc_id, score); - let hit = PartialHit { + let hit = SegmentPartialHit { sort_value: sort_value.map(Into::into), sort_value2: sort_value2.map(Into::into), - // we actually know the split_id, but the hit is likely to be discarded, so clonning it - // would cause a probably useless allocation. TODO use an Arc instead? - split_id: String::new(), - segment_ord: self.segment_ord, doc_id, }; self.top_k_hits.add_entry(hit); @@ -328,6 +325,29 @@ impl QuickwitSegmentCollector { } } +#[derive(Copy, Clone, Debug)] +struct SegmentPartialHit { + sort_value: Option, + sort_value2: Option, + doc_id: DocId, +} + +impl SegmentPartialHit { + fn into_partial_hit(self, split_id: String, segment_ord: SegmentOrdinal) -> PartialHit { + PartialHit { + sort_value: self.sort_value.map(|sort_value| SortByValue { + sort_value: Some(sort_value), + }), + sort_value2: self.sort_value2.map(|sort_value| SortByValue { + sort_value: Some(sort_value), + }), + doc_id: self.doc_id, + split_id, + segment_ord, + } + } +} + impl SegmentCollector for QuickwitSegmentCollector { type Fruit = tantivy::Result; @@ -352,14 +372,12 @@ impl SegmentCollector for QuickwitSegmentCollector { } fn harvest(self) -> Self::Fruit { - let split_id = self.split_id; let partial_hits: Vec = self .top_k_hits .finalize() .into_iter() - .map(|mut hit| { - hit.split_id = split_id.clone(); - hit + .map(|segment_partial_hit: SegmentPartialHit| { + segment_partial_hit.into_partial_hit(self.split_id.clone(), self.segment_ord) }) .collect(); @@ -744,11 +762,50 @@ pub(crate) fn make_merge_collector( }) } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct SegmentPartialHitSortingKey { + sort_value: Option, + sort_value2: Option, + doc_id: DocId, + // TODO This should not be there. + sort_order: SortOrder, + // TODO This should not be there. + sort_order2: SortOrder, +} + +impl Ord for SegmentPartialHitSortingKey { + fn cmp(&self, other: &SegmentPartialHitSortingKey) -> Ordering { + debug_assert_eq!( + self.sort_order, other.sort_order, + "comparing two PartialHitSortingKey of different ordering" + ); + debug_assert_eq!( + self.sort_order2, other.sort_order2, + "comparing two PartialHitSortingKey of different ordering" + ); + let order = self + .sort_order + .compare_opt(&self.sort_value, &other.sort_value); + let order2 = self + .sort_order2 + .compare_opt(&self.sort_value2, &other.sort_value2); + let order_addr = self.sort_order.compare(&self.doc_id, &other.doc_id); + order.then(order2).then(order_addr) + } +} + +impl PartialOrd for SegmentPartialHitSortingKey { + fn partial_cmp(&self, other: &SegmentPartialHitSortingKey) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Clone, Debug, PartialEq, Eq)] struct PartialHitSortingKey { sort_value: Option, sort_value2: Option, address: GlobalDocAddress, + // TODO remove this sort_order: SortOrder, sort_order2: SortOrder, } @@ -803,6 +860,19 @@ impl SortKeyMapper for HitSortingMapper { } } +impl SortKeyMapper for HitSortingMapper { + type Key = SegmentPartialHitSortingKey; + fn get_sort_key(&self, partial_hit: &SegmentPartialHit) -> SegmentPartialHitSortingKey { + SegmentPartialHitSortingKey { + sort_value: partial_hit.sort_value, + sort_value2: partial_hit.sort_value2, + doc_id: partial_hit.doc_id, + sort_order: self.order1, + sort_order2: self.order2, + } + } +} + /// Incrementally merge segment results. #[derive(Clone)] pub(crate) struct IncrementalCollector {