Skip to content

Commit

Permalink
Simple refactoring of the Quickwit collector. (#3973)
Browse files Browse the repository at this point in the history
This introduces a SegmentPartialHit to make the work of the collector
lighter.

I measured a 25% performance improvement on real life data.

There is still a lot of headroom left over.
  • Loading branch information
fulmicoton authored Oct 18, 2023
1 parent acd59ac commit e467667
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl<E: fmt::Debug + ServiceError> ServiceError for quickwit_actors::AskError<E>
}

impl search::SortOrder {
#[inline(always)]
pub fn compare_opt<T: Ord>(&self, this: &Option<T>, other: &Option<T>) -> Ordering {
match (this, other) {
(Some(this), Some(other)) => self.compare(this, other),
Expand Down
92 changes: 81 additions & 11 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use itertools::Itertools;
use quickwit_common::binary_heap::{SortKeyMapper, TopK};
use quickwit_doc_mapper::{DocMapper, WarmupInfo};
use quickwit_proto::search::{
LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitSearchError,
LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue,
SplitSearchError,
};
use serde::Deserialize;
use tantivy::aggregation::agg_req::{get_fast_field_names, Aggregations};
Expand Down Expand Up @@ -295,7 +296,7 @@ pub struct QuickwitSegmentCollector {
split_id: String,
score_extractor: SortingFieldExtractorPair,
// PartialHits in this heap don't contain a split_id yet.
top_k_hits: TopK<PartialHit, PartialHitSortingKey, HitSortingMapper>,
top_k_hits: TopK<SegmentPartialHit, SegmentPartialHitSortingKey, HitSortingMapper>,
segment_ord: u32,
timestamp_filter_opt: Option<TimestampFilter>,
aggregation: Option<AggregationSegmentCollectors>,
Expand All @@ -307,13 +308,9 @@ impl QuickwitSegmentCollector {
let (sort_value, sort_value2) =
self.score_extractor.extract_typed_sort_value(doc_id, score);

let hit = PartialHit {
let hit = SegmentPartialHit {
sort_value: sort_value.map(Into::into),
sort_value2: sort_value2.map(Into::into),
// we actually know the split_id, but the hit is likely to be discarded, so clonning it
// would cause a probably useless allocation. TODO use an Arc<str> instead?
split_id: String::new(),
segment_ord: self.segment_ord,
doc_id,
};
self.top_k_hits.add_entry(hit);
Expand All @@ -328,6 +325,29 @@ impl QuickwitSegmentCollector {
}
}

#[derive(Copy, Clone, Debug)]
struct SegmentPartialHit {
sort_value: Option<SortValue>,
sort_value2: Option<SortValue>,
doc_id: DocId,
}

impl SegmentPartialHit {
fn into_partial_hit(self, split_id: String, segment_ord: SegmentOrdinal) -> PartialHit {
PartialHit {
sort_value: self.sort_value.map(|sort_value| SortByValue {
sort_value: Some(sort_value),
}),
sort_value2: self.sort_value2.map(|sort_value| SortByValue {
sort_value: Some(sort_value),
}),
doc_id: self.doc_id,
split_id,
segment_ord,
}
}
}

impl SegmentCollector for QuickwitSegmentCollector {
type Fruit = tantivy::Result<LeafSearchResponse>;

Expand All @@ -352,14 +372,12 @@ impl SegmentCollector for QuickwitSegmentCollector {
}

fn harvest(self) -> Self::Fruit {
let split_id = self.split_id;
let partial_hits: Vec<PartialHit> = self
.top_k_hits
.finalize()
.into_iter()
.map(|mut hit| {
hit.split_id = split_id.clone();
hit
.map(|segment_partial_hit: SegmentPartialHit| {
segment_partial_hit.into_partial_hit(self.split_id.clone(), self.segment_ord)
})
.collect();

Expand Down Expand Up @@ -744,11 +762,50 @@ pub(crate) fn make_merge_collector(
})
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct SegmentPartialHitSortingKey {
sort_value: Option<SortValue>,
sort_value2: Option<SortValue>,
doc_id: DocId,
// TODO This should not be there.
sort_order: SortOrder,
// TODO This should not be there.
sort_order2: SortOrder,
}

impl Ord for SegmentPartialHitSortingKey {
fn cmp(&self, other: &SegmentPartialHitSortingKey) -> Ordering {
debug_assert_eq!(
self.sort_order, other.sort_order,
"comparing two PartialHitSortingKey of different ordering"
);
debug_assert_eq!(
self.sort_order2, other.sort_order2,
"comparing two PartialHitSortingKey of different ordering"
);
let order = self
.sort_order
.compare_opt(&self.sort_value, &other.sort_value);
let order2 = self
.sort_order2
.compare_opt(&self.sort_value2, &other.sort_value2);
let order_addr = self.sort_order.compare(&self.doc_id, &other.doc_id);
order.then(order2).then(order_addr)
}
}

impl PartialOrd for SegmentPartialHitSortingKey {
fn partial_cmp(&self, other: &SegmentPartialHitSortingKey) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
struct PartialHitSortingKey {
sort_value: Option<SortValue>,
sort_value2: Option<SortValue>,
address: GlobalDocAddress,
// TODO remove this
sort_order: SortOrder,
sort_order2: SortOrder,
}
Expand Down Expand Up @@ -803,6 +860,19 @@ impl SortKeyMapper<PartialHit> for HitSortingMapper {
}
}

impl SortKeyMapper<SegmentPartialHit> for HitSortingMapper {
type Key = SegmentPartialHitSortingKey;
fn get_sort_key(&self, partial_hit: &SegmentPartialHit) -> SegmentPartialHitSortingKey {
SegmentPartialHitSortingKey {
sort_value: partial_hit.sort_value,
sort_value2: partial_hit.sort_value2,
doc_id: partial_hit.doc_id,
sort_order: self.order1,
sort_order2: self.order2,
}
}
}

/// Incrementally merge segment results.
#[derive(Clone)]
pub(crate) struct IncrementalCollector {
Expand Down

0 comments on commit e467667

Please sign in to comment.