From 52c50be13e26cd2be2953d785f2ba0e6109ba328 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Sun, 17 Mar 2024 10:03:14 +0800 Subject: [PATCH 1/4] collect_block in QuickwitCollector collect_block + using `first_vals` to batch fetch sort values --- quickwit/quickwit-search/src/collector.rs | 234 +++++++++++++++++++++- quickwit/rest-api-tests/run_tests.py | 2 +- 2 files changed, 227 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index f8b7242110c..ca9c767a34e 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -150,6 +150,20 @@ enum SortingFieldExtractorComponent { } impl SortingFieldExtractorComponent { + fn is_fast_field(&self) -> bool { + matches!(self, SortingFieldExtractorComponent::FastField { .. }) + } + /// Loads the fast field values for the given doc_ids in its u64 representation. The returned + /// u64 representation maintains the ordering of the original value. + #[inline] + fn extract_typed_sort_values_block(&self, doc_ids: &[DocId], values: &mut [Option]) { + // In the collect block case we don't have scores to extract + if let SortingFieldExtractorComponent::FastField { sort_column, .. } = self { + let values = &mut values[..doc_ids.len()]; + sort_column.first_vals(doc_ids, values); + } + } + /// Returns the sort value for the given element in its u64 representation. The returned u64 /// representation maintains the ordering of the original value. /// @@ -369,6 +383,23 @@ pub(crate) struct SortingFieldExtractorPair { } impl SortingFieldExtractorPair { + /// Returns the list of sort values for the given element + /// + /// See also [`SortingFieldExtractorComponent::extract_typed_sort_values_block`] for more + /// information. + #[inline] + fn extract_typed_sort_values( + &self, + doc_ids: &[DocId], + values1: &mut [Option], + values2: &mut [Option], + ) { + self.first + .extract_typed_sort_values_block(doc_ids, &mut values1[..doc_ids.len()]); + if let Some(second) = self.second.as_ref() { + second.extract_typed_sort_values_block(doc_ids, &mut values2[..doc_ids.len()]); + } + } /// Returns the list of sort values for the given element /// /// See also [`SortingFieldExtractorComponent::extract_typed_sort_value_opt`] for more @@ -480,6 +511,10 @@ pub struct QuickwitSegmentCollector { search_after: Option, // Precomputed order for search_after for split_id and segment_ord precomp_search_after_order: Ordering, + // Caches for block fetching + filtered_docs: Box<[DocId; 64]>, + sort_values1: Box<[Option; 64]>, + sort_values2: Box<[Option; 64]>, } /// Search After, but the sort values are converted to the u64 fast field representation. @@ -543,15 +578,107 @@ impl SearchAfterSegment { } impl QuickwitSegmentCollector { - #[inline] - fn collect_top_k(&mut self, doc_id: DocId, score: Score) { - let (sort_value, sort_value2): (Option, Option) = - self.score_extractor.extract_typed_sort_value(doc_id, score); + fn collect_top_k_block(&mut self, num_docs: usize, docs: &[DocId]) { + let doc_ids = get_final_docs( + &self.timestamp_filter_opt, + num_docs, + docs, + &self.filtered_docs, + ); + self.score_extractor.extract_typed_sort_values( + doc_ids, + &mut self.sort_values1[..], + &mut self.sort_values2[..], + ); + if let Some(_search_after) = &self.search_after { + // Search after not optimized for block collection yet + for ((doc_id, sort_value), sort_value2) in doc_ids + .iter() + .cloned() + .zip(self.sort_values1.iter().cloned()) + .zip(self.sort_values2.iter().cloned()) + { + Self::collect_top_k_vals( + doc_id, + sort_value, + sort_value2, + &self.search_after, + self.precomp_search_after_order, + &mut self.top_k_hits, + ); + } + } else { + // Probaly would make sense to check the fence against e.g. sort_values1 earlier, + // before creating the SegmentPartialHit. + // + // Below are different versions to avoid iterating the caches if they are unused. + // + // No sort values loaded. Sort only by doc_id. + if !self.score_extractor.first.is_fast_field() { + for doc_id in doc_ids.iter().cloned() { + let hit = SegmentPartialHit { + sort_value: None, + sort_value2: None, + doc_id, + }; + self.top_k_hits.add_entry(hit); + } + return; + } + let has_no_second_sort = !self + .score_extractor + .second + .as_ref() + .map(|extr| extr.is_fast_field()) + .unwrap_or(false); + // No second sort values => We can skip iterating the second sort values cache. + if has_no_second_sort { + for (doc_id, sort_value) in doc_ids + .iter() + .cloned() + .zip(self.sort_values1.iter().cloned()) + { + let hit = SegmentPartialHit { + sort_value, + sort_value2: None, + doc_id, + }; + self.top_k_hits.add_entry(hit); + } + return; + } - if let Some(search_after) = &self.search_after { + for ((doc_id, sort_value), sort_value2) in doc_ids + .iter() + .cloned() + .zip(self.sort_values1.iter().cloned()) + .zip(self.sort_values2.iter().cloned()) + { + let hit = SegmentPartialHit { + sort_value, + sort_value2, + doc_id, + }; + self.top_k_hits.add_entry(hit); + } + } + } + #[inline] + /// Generic top k collection, that includes search_after handling + /// + /// Outside of the collector to circumvent lifetime issues. + fn collect_top_k_vals( + doc_id: DocId, + sort_value: Option, + sort_value2: Option, + search_after: &Option, + precomp_search_after_order: Ordering, + top_k_hits: &mut TopK, + ) { + if let Some(search_after) = &search_after { let search_after_value1 = search_after.sort_value; let search_after_value2 = search_after.sort_value2; - let orders = &self.top_k_hits.sort_key_mapper; + let orders = &top_k_hits.sort_key_mapper; let mut cmp_result = orders .order1 .compare_opt(&sort_value, &search_after_value1) @@ -565,7 +692,7 @@ impl QuickwitSegmentCollector { // default let order = orders.order1; cmp_result = cmp_result - .then(self.precomp_search_after_order) + .then(precomp_search_after_order) // We compare doc_id only if sort_value1, sort_value2, split_id and segment_ord // are equal. .then_with(|| order.compare(&doc_id, &search_after.doc_id)) @@ -581,7 +708,21 @@ impl QuickwitSegmentCollector { sort_value2, doc_id, }; - self.top_k_hits.add_entry(hit); + top_k_hits.add_entry(hit); + } + + #[inline] + fn collect_top_k(&mut self, doc_id: DocId, score: Score) { + let (sort_value, sort_value2): (Option, Option) = + self.score_extractor.extract_typed_sort_value(doc_id, score); + Self::collect_top_k_vals( + doc_id, + sort_value, + sort_value2, + &self.search_after, + self.precomp_search_after_order, + &mut self.top_k_hits, + ); } #[inline] @@ -593,6 +734,24 @@ impl QuickwitSegmentCollector { } } +/// This is to circumvent lifetime issues and is pretty terrible. +/// +/// We either use `filtered_docs` buffer (computed in `compute_filtered_block`) or return the passed +/// docs. +#[inline] +fn get_final_docs<'a>( + timestamp_filter_opt: &Option, + num_docs: usize, + docs: &'a [DocId], + filtered_docs: &'a [DocId; BUFFER_LEN], +) -> &'a [DocId] { + if timestamp_filter_opt.is_some() { + &filtered_docs[..num_docs] + } else { + &docs[..num_docs] + } +} + #[derive(Copy, Clone, Debug)] struct SegmentPartialHit { /// Normalized to u64, the typed value can be reconstructed with @@ -635,9 +794,65 @@ impl SegmentPartialHit { } } +const BUFFER_LEN: usize = tantivy::COLLECT_BLOCK_BUFFER_LEN; +/// Store the filtered docs in `filtered_docs_buffer` if `timestamp_filter_opt` is present. +/// +/// Returns the number of docs. +/// +/// Ideally we would return just final docs slice, but we can't do that because of the borrow +/// checker. +fn compute_filtered_block<'a>( + timestamp_filter_opt: &Option, + docs: &'a [DocId], + filtered_docs_buffer: &'a mut [DocId; BUFFER_LEN], +) -> usize { + let Some(timestamp_filter) = ×tamp_filter_opt else { + return docs.len(); + }; + let mut len = 0; + for &doc in docs { + filtered_docs_buffer[len] = doc; + len += if timestamp_filter.is_within_range(doc) { + 1 + } else { + 0 + }; + } + len +} + impl SegmentCollector for QuickwitSegmentCollector { type Fruit = tantivy::Result; + #[inline] + fn collect_block(&mut self, docs: &[DocId]) { + let num_docs = + compute_filtered_block(&self.timestamp_filter_opt, docs, &mut self.filtered_docs); + + // Update results + self.num_hits += num_docs as u64; + + if self.top_k_hits.max_len() != 0 { + self.collect_top_k_block(num_docs, docs); + } + + let docs = get_final_docs( + &self.timestamp_filter_opt, + num_docs, + docs, + &self.filtered_docs, + ); + match self.aggregation.as_mut() { + Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { + collector.collect_block(docs) + } + Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => { + collector.collect_block(docs) + } + None => (), + } + } + #[inline] fn collect(&mut self, doc_id: DocId, score: Score) { if !self.accept_document(doc_id) { @@ -907,6 +1122,9 @@ impl Collector for QuickwitCollector { aggregation, search_after, precomp_search_after_order, + filtered_docs: Box::new([0; 64]), + sort_values1: Box::new([None; 64]), + sort_values2: Box::new([None; 64]), }) } diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 70d0f314e12..5a8b7cbb95f 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -147,7 +147,7 @@ def check_result_list(result, expected, context_path=""): display_filtered_result = filtered_result[:5] + ['...'] if len(filtered_result) > 5 else filtered_result else: display_filtered_result = filtered_result - raise Exception("Wrong length at context %s. Expected: %s Received: %s,\n Expected \n%s \n Received \n%s" % (context_path, len(expected), len(result), display_filtered_result, expected)) + raise Exception("Wrong length at context %s. Expected: %s Received: %s,\n Expected \n%s \n Received \n%s" % (context_path, len(expected), len(result), expected, display_filtered_result)) raise Exception("Wrong length at context %s. Expected: %s Received: %s" % (context_path, len(expected), len(result))) for (i, (left, right)) in enumerate(zip(result, expected)): check_result(left, right, context_path + "[%s]" % i) From 4991cd346ee7f2eb852d7938659b7cc5e6249c82 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 18 Mar 2024 11:20:44 +0900 Subject: [PATCH 2/4] Isolating the 3 components of the collector. --- quickwit/quickwit-search/src/collector.rs | 167 +++++++++++----------- 1 file changed, 81 insertions(+), 86 deletions(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index ca9c767a34e..898922c41a3 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -500,19 +500,34 @@ enum AggregationSegmentCollectors { /// Quickwit collector working at the scale of the segment. pub struct QuickwitSegmentCollector { + timestamp_filter_opt: Option, + segment_top_k_collector: Option, + // Caches for block fetching + filtered_docs: Box<[DocId; 64]>, + aggregation: Option, num_hits: u64, +} + +impl QuickwitSegmentCollector { + #[inline] + fn accept_document(&self, doc_id: DocId) -> bool { + if let Some(ref timestamp_filter) = self.timestamp_filter_opt { + return timestamp_filter.is_within_range(doc_id); + } + true + } +} + +/// Quickwit collector working at the scale of the segment. +struct QuickwitSegmentTopKCollector { split_id: String, score_extractor: SortingFieldExtractorPair, // PartialHits in this heap don't contain a split_id yet. top_k_hits: TopK, segment_ord: u32, - timestamp_filter_opt: Option, - aggregation: Option, search_after: Option, // Precomputed order for search_after for split_id and segment_ord precomp_search_after_order: Ordering, - // Caches for block fetching - filtered_docs: Box<[DocId; 64]>, sort_values1: Box<[Option; 64]>, sort_values2: Box<[Option; 64]>, } @@ -577,22 +592,16 @@ impl SearchAfterSegment { } } -impl QuickwitSegmentCollector { - fn collect_top_k_block(&mut self, num_docs: usize, docs: &[DocId]) { - let doc_ids = get_final_docs( - &self.timestamp_filter_opt, - num_docs, - docs, - &self.filtered_docs, - ); +impl QuickwitSegmentTopKCollector { + fn collect_top_k_block(&mut self, docs: &[DocId]) { self.score_extractor.extract_typed_sort_values( - doc_ids, + docs, &mut self.sort_values1[..], &mut self.sort_values2[..], ); if let Some(_search_after) = &self.search_after { // Search after not optimized for block collection yet - for ((doc_id, sort_value), sort_value2) in doc_ids + for ((doc_id, sort_value), sort_value2) in docs .iter() .cloned() .zip(self.sort_values1.iter().cloned()) @@ -615,7 +624,7 @@ impl QuickwitSegmentCollector { // // No sort values loaded. Sort only by doc_id. if !self.score_extractor.first.is_fast_field() { - for doc_id in doc_ids.iter().cloned() { + for doc_id in docs.iter().cloned() { let hit = SegmentPartialHit { sort_value: None, sort_value2: None, @@ -633,10 +642,8 @@ impl QuickwitSegmentCollector { .unwrap_or(false); // No second sort values => We can skip iterating the second sort values cache. if has_no_second_sort { - for (doc_id, sort_value) in doc_ids - .iter() - .cloned() - .zip(self.sort_values1.iter().cloned()) + for (doc_id, sort_value) in + docs.iter().cloned().zip(self.sort_values1.iter().cloned()) { let hit = SegmentPartialHit { sort_value, @@ -648,7 +655,7 @@ impl QuickwitSegmentCollector { return; } - for ((doc_id, sort_value), sort_value2) in doc_ids + for ((doc_id, sort_value), sort_value2) in docs .iter() .cloned() .zip(self.sort_values1.iter().cloned()) @@ -724,32 +731,6 @@ impl QuickwitSegmentCollector { &mut self.top_k_hits, ); } - - #[inline] - fn accept_document(&self, doc_id: DocId) -> bool { - if let Some(ref timestamp_filter) = self.timestamp_filter_opt { - return timestamp_filter.is_within_range(doc_id); - } - true - } -} - -/// This is to circumvent lifetime issues and is pretty terrible. -/// -/// We either use `filtered_docs` buffer (computed in `compute_filtered_block`) or return the passed -/// docs. -#[inline] -fn get_final_docs<'a>( - timestamp_filter_opt: &Option, - num_docs: usize, - docs: &'a [DocId], - filtered_docs: &'a [DocId; BUFFER_LEN], -) -> &'a [DocId] { - if timestamp_filter_opt.is_some() { - &filtered_docs[..num_docs] - } else { - &docs[..num_docs] - } } #[derive(Copy, Clone, Debug)] @@ -805,9 +786,9 @@ fn compute_filtered_block<'a>( timestamp_filter_opt: &Option, docs: &'a [DocId], filtered_docs_buffer: &'a mut [DocId; BUFFER_LEN], -) -> usize { +) -> &'a [DocId] { let Some(timestamp_filter) = ×tamp_filter_opt else { - return docs.len(); + return docs; }; let mut len = 0; for &doc in docs { @@ -818,36 +799,35 @@ fn compute_filtered_block<'a>( 0 }; } - len + &filtered_docs_buffer[..len] + // len } impl SegmentCollector for QuickwitSegmentCollector { type Fruit = tantivy::Result; #[inline] - fn collect_block(&mut self, docs: &[DocId]) { - let num_docs = - compute_filtered_block(&self.timestamp_filter_opt, docs, &mut self.filtered_docs); + fn collect_block(&mut self, unfiltered_docs: &[DocId]) { + let filtered_docs: &[DocId] = compute_filtered_block( + &self.timestamp_filter_opt, + unfiltered_docs, + &mut self.filtered_docs, + ); // Update results - self.num_hits += num_docs as u64; + self.num_hits += filtered_docs.len() as u64; - if self.top_k_hits.max_len() != 0 { - self.collect_top_k_block(num_docs, docs); + if let Some(segment_top_k_collector) = self.segment_top_k_collector.as_mut() { + //.top_k_hits.max_len() != 0 { + segment_top_k_collector.collect_top_k_block(filtered_docs); } - let docs = get_final_docs( - &self.timestamp_filter_opt, - num_docs, - docs, - &self.filtered_docs, - ); match self.aggregation.as_mut() { Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { - collector.collect_block(docs) + collector.collect_block(filtered_docs) } Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => { - collector.collect_block(docs) + collector.collect_block(filtered_docs) } None => (), } @@ -860,7 +840,9 @@ impl SegmentCollector for QuickwitSegmentCollector { } self.num_hits += 1; - self.collect_top_k(doc_id, score); + if let Some(segment_top_k_collector) = self.segment_top_k_collector.as_mut() { + segment_top_k_collector.collect_top_k(doc_id, score); + } match self.aggregation.as_mut() { Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { @@ -874,19 +856,23 @@ impl SegmentCollector for QuickwitSegmentCollector { } fn harvest(self) -> Self::Fruit { - let partial_hits: Vec = self - .top_k_hits - .finalize() - .into_iter() - .map(|segment_partial_hit: SegmentPartialHit| { - segment_partial_hit.into_partial_hit( - self.split_id.clone(), - self.segment_ord, - &self.score_extractor.first, - &self.score_extractor.second, - ) - }) - .collect(); + let mut partial_hits: Vec = Vec::new(); + if let Some(segment_top_k_collector) = self.segment_top_k_collector { + // TODO put that in a method of segment_top_k_collector + partial_hits = segment_top_k_collector + .top_k_hits + .finalize() + .into_iter() + .map(|segment_partial_hit: SegmentPartialHit| { + segment_partial_hit.into_partial_hit( + segment_top_k_collector.split_id.clone(), + segment_top_k_collector.segment_ord, + &segment_top_k_collector.score_extractor.first, + &segment_top_k_collector.score_extractor.second, + ) + }) + .collect(); + } let intermediate_aggregation_result = match self.aggregation { Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => { @@ -1112,19 +1098,28 @@ impl Collector for QuickwitCollector { // Convert search_after into fast field u64 let search_after = SearchAfterSegment::new(self.search_after.clone(), order1, order2, &score_extractor); + + let segment_top_k_collector = if leaf_max_hits == 0 { + None + } else { + Some(QuickwitSegmentTopKCollector { + split_id: self.split_id.clone(), + score_extractor, + top_k_hits: TopK::new(leaf_max_hits, sort_key_mapper), + segment_ord, + search_after, + precomp_search_after_order, + sort_values1: Box::new([None; 64]), + sort_values2: Box::new([None; 64]), + }) + }; + Ok(QuickwitSegmentCollector { - num_hits: 0u64, - split_id: self.split_id.clone(), - score_extractor, - top_k_hits: TopK::new(leaf_max_hits, sort_key_mapper), - segment_ord, + num_hits: 0, timestamp_filter_opt, + segment_top_k_collector, aggregation, - search_after, - precomp_search_after_order, filtered_docs: Box::new([0; 64]), - sort_values1: Box::new([None; 64]), - sort_values2: Box::new([None; 64]), }) } From e18b1263b356171af9219f4699543402151f217d Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 18 Mar 2024 07:09:09 +0100 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Paul Masurel --- quickwit/quickwit-search/src/collector.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 898922c41a3..33e0365f7d3 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -599,7 +599,7 @@ impl QuickwitSegmentTopKCollector { &mut self.sort_values1[..], &mut self.sort_values2[..], ); - if let Some(_search_after) = &self.search_after { + if self.search_after.is_some() { // Search after not optimized for block collection yet for ((doc_id, sort_value), sort_value2) in docs .iter() @@ -775,7 +775,7 @@ impl SegmentPartialHit { } } -const BUFFER_LEN: usize = tantivy::COLLECT_BLOCK_BUFFER_LEN; +pub use tantivy::COLLECT_BLOCK_BUFFER_LEN; /// Store the filtered docs in `filtered_docs_buffer` if `timestamp_filter_opt` is present. /// /// Returns the number of docs. @@ -800,7 +800,6 @@ fn compute_filtered_block<'a>( }; } &filtered_docs_buffer[..len] - // len } impl SegmentCollector for QuickwitSegmentCollector { @@ -818,7 +817,6 @@ impl SegmentCollector for QuickwitSegmentCollector { self.num_hits += filtered_docs.len() as u64; if let Some(segment_top_k_collector) = self.segment_top_k_collector.as_mut() { - //.top_k_hits.max_len() != 0 { segment_top_k_collector.collect_top_k_block(filtered_docs); } From 026a597e3e82785718a32fb8e3dbc73b03c4b777 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 18 Mar 2024 14:15:36 +0800 Subject: [PATCH 4/4] fix var name --- quickwit/quickwit-search/src/collector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 33e0365f7d3..8ec9faf31ac 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -785,7 +785,7 @@ pub use tantivy::COLLECT_BLOCK_BUFFER_LEN; fn compute_filtered_block<'a>( timestamp_filter_opt: &Option, docs: &'a [DocId], - filtered_docs_buffer: &'a mut [DocId; BUFFER_LEN], + filtered_docs_buffer: &'a mut [DocId; COLLECT_BLOCK_BUFFER_LEN], ) -> &'a [DocId] { let Some(timestamp_filter) = ×tamp_filter_opt else { return docs;