From 1b426d67bba198d6cbcd0353291ed3e3f1bfd03d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 30 May 2024 12:21:01 +0800 Subject: [PATCH 1/2] count optimization for multisplits * optimization requests by passing threshold in leaf search * Execute query.count() instead of QuickwitCollector for count searches We have 100 concurrent split searches by default, but num_cpus worker threads. This means most search futures will wait to be scheduled. When they are scheduled they can check the new threshold from the preceding searches and maybe skip the search. Switches to RWLock for the threshold since we read more often now. Future Work: We run num_cpu full searches in some cases before the threshold kicks in. But in some cases we could statically analyze from which split the best results come and generate count only requests for the others. Addresses https://github.com/quickwit-oss/quickwit/issues/5032 --- quickwit/quickwit-search/src/collector.rs | 12 +++ quickwit/quickwit-search/src/leaf.rs | 123 +++++++++++++++++----- quickwit/quickwit-search/src/root.rs | 16 ++- 3 files changed, 120 insertions(+), 31 deletions(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 928020c84bb..b7a4edfe2d4 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector { } impl QuickwitCollector { + pub fn is_count_only(&self) -> bool { + self.max_hits == 0 && self.aggregation.is_none() && self.search_after.is_none() + } + /// Updates search parameters affecting the returned documents. + /// Does not update aggregations. + pub fn update_search_param(&mut self, search_request: &SearchRequest) { + let sort_by = sort_by_from_request(search_request); + self.sort_by = sort_by; + self.max_hits = search_request.max_hits as usize; + self.start_offset = search_request.start_offset as usize; + self.search_after.clone_from(&search_request.search_after); + } pub fn fast_field_names(&self) -> HashSet { let mut fast_field_names = HashSet::default(); self.sort_by.first.add_fast_field(&mut fast_field_names); diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d8ff592338d..bc6c3e51a96 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use anyhow::Context; use futures::future::try_join_all; @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; +use crate::root::is_metadata_count_request_with_ast; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -330,6 +331,16 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh Ok(()) } +fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { + LeafSearchResponse { + num_hits: count, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + intermediate_aggregation_result: None, + } +} + /// Apply a leaf search on a single split. async fn leaf_search_single_split( searcher_context: &SearcherContext, @@ -337,6 +348,7 @@ async fn leaf_search_single_split( storage: Arc, split: SplitIdAndFooterOffsets, doc_mapper: Arc, + split_filter: Arc>, aggregations_limits: AggregationLimits, ) -> crate::Result { rewrite_request( @@ -362,32 +374,67 @@ async fn leaf_search_single_split( .await?; let split_schema = index.schema(); - let quickwit_collector = - make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; - let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = reader.searcher(); - let collector_warmup_info = quickwit_collector.warmup_info(); + let mut collector = + make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request + // This may be the case for AllQuery with a sort by date, where the current split can't have + // better results. + // + // TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save + // opening the index and execute this earlier. Opening splits is typically served from the + // cache, so there may be no gain adding that info to SplitIdAndFooterOffsets. + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64)); + } + + let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; + + let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::search_thread_pool() - .run_cpu_intensive(move || { - let _span_guard = span.enter(); - searcher.search(&query, &quickwit_collector) - }) - .await - .map_err(|_| { - crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })??; + + let (search_request, leaf_search_response) = { + let split = split.clone(); + + crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + // Our search execution has been scheduled, let's check if we can improve the + // request based on the results of the preceding searches + check_optimize_search_request(&mut search_request, &split, &split_filter); + collector.update_search_param(&search_request); + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(( + search_request, + get_leaf_resp_from_count(searcher.num_docs() as u64), + )); + } + if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + Ok((search_request, get_leaf_resp_from_count(count))) + } else { + searcher + .search(&query, &collector) + .map(|resp| (search_request, resp)) + } + }) + .await + .map_err(|_| { + crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) + })?? + }; searcher_context .leaf_search_cache @@ -850,7 +897,7 @@ impl CanSplitDoBetter { /// Record the new worst-of-the-top document, that is, the document which would first be /// evicted from the list of best documents, if a better document was found. Only call this - /// funciton if you have at least max_hits documents already. + /// function if you have at least max_hits documents already. fn record_new_worst_hit(&mut self, hit: &PartialHit) { match self { CanSplitDoBetter::Uninformative => (), @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search( .await } +/// Optimizes the search_request based on CanSplitDoBetter +/// Returns a tuple of (the search_request was optimized, split can return better results) +fn check_optimize_search_request( + search_request: &mut SearchRequest, + split: &SplitIdAndFooterOffsets, + split_filter: &Arc>, +) -> bool { + let can_be_better = split_filter.read().unwrap().can_be_better(split); + if !can_be_better { + disable_search_request_hits(search_request); + } + can_be_better +} + +/// Alter the search request so it does not return any docs. +/// +/// This is usually done since it cannot provide better hits results than existing fetched results. +fn disable_search_request_hits(search_request: &mut SearchRequest) { + search_request.max_hits = 0; + search_request.start_offset = 0; + search_request.sort_fields.clear(); +} + /// `leaf` step of search. /// /// The leaf search collects all kind of information, and returns a set of @@ -1016,7 +1086,7 @@ pub async fn leaf_search( || (request.aggregation_request.is_some() && !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_))); - let split_filter = Arc::new(Mutex::new(split_filter)); + let split_filter = Arc::new(RwLock::new(split_filter)); let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); @@ -1034,13 +1104,9 @@ pub async fn leaf_search( let mut request = (*request).clone(); - if !split_filter.lock().unwrap().can_be_better(&split) { - if !run_all_splits { - continue; - } - request.max_hits = 0; - request.start_offset = 0; - request.sort_fields.clear(); + let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); + if !can_be_better && !run_all_splits { + continue; } leaf_search_single_split_futures.push(tokio::spawn( @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper( index_storage: Arc, doc_mapper: Arc, split: SplitIdAndFooterOffsets, - split_filter: Arc>, + split_filter: Arc>, incremental_merge_collector: Arc>, leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, aggregations_limits: AggregationLimits, @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper( index_storage, split.clone(), doc_mapper, + split_filter.clone(), aggregations_limits, ) .await; @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper( }), } if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() { - // TODO: we could use a RWLock instead and read the value instead of updateing it + // TODO: we could use the RWLock instead and read the value instead of updateing it // unconditionally. split_filter - .lock() + .write() .unwrap() .record_new_worst_hit(last_hit.as_ref()); } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index acd3712e09b..164493bc456 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -590,9 +590,19 @@ async fn search_partial_hits_phase_with_scroll( /// metadata count. /// /// This is done by exclusion, so we will need to keep it up to date if fields are added. -fn is_metadata_count_request(request: &SearchRequest) -> bool { +pub fn is_metadata_count_request(request: &SearchRequest) -> bool { let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap(); - if query_ast != QueryAst::MatchAll { + is_metadata_count_request_with_ast(&query_ast, request) +} + +/// Check if the request is a count request without any filters, so we can just return the split +/// metadata count. +/// +/// This is done by exclusion, so we will need to keep it up to date if fields are added. +/// +/// The passed query_ast should match the serialized on in request. +pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool { + if query_ast != &QueryAst::MatchAll { return false; } if request.max_hits != 0 { @@ -611,7 +621,7 @@ fn is_metadata_count_request(request: &SearchRequest) -> bool { } /// Get a leaf search response that returns the num_docs of the split -fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { +pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { split_metadatas .iter() .map(|metadata| LeafSearchResponse { From bcf5dfda9cffe7de1a1754914fd4b9a5ba141ff6 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 31 May 2024 13:35:38 +0800 Subject: [PATCH 2/2] add comments --- quickwit/Cargo.lock | 58 ++++++++++++++++++----- quickwit/quickwit-search/src/collector.rs | 2 +- quickwit/quickwit-search/src/leaf.rs | 2 +- quickwit/quickwit-search/src/root.rs | 7 ++- 4 files changed, 54 insertions(+), 15 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 5dd4d847fb6..e38f9c1a2d6 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2565,6 +2565,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows 0.48.0", +] + [[package]] name = "generator" version = "0.8.1" @@ -2576,7 +2589,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows", + "windows 0.54.0", ] [[package]] @@ -3845,6 +3858,20 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator 0.7.5", + "pin-utils", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "loom" version = "0.7.2" @@ -3852,7 +3879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" dependencies = [ "cfg-if", - "generator", + "generator 0.8.1", "pin-utils", "scoped-tls", "tracing", @@ -4336,19 +4363,19 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "oneshot" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071d1cf3298ad8e543dca18217d198cb6a3884443d204757b9624b935ef09fa0" +version = "0.1.6" +source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" dependencies = [ - "loom", + "loom 0.5.6", ] [[package]] name = "oneshot" -version = "0.1.6" -source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071d1cf3298ad8e543dca18217d198cb6a3884443d204757b9624b935ef09fa0" dependencies = [ - "loom", + "loom 0.7.2", ] [[package]] @@ -5885,7 +5912,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "oneshot 0.1.7", "openssl", "proptest", "prost", @@ -8034,7 +8061,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", + "oneshot 0.1.6", "rayon", "regex", "rust-stemmers", @@ -9387,6 +9414,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.54.0" diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index b7a4edfe2d4..97a4a3c75d4 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -723,7 +723,7 @@ pub(crate) struct QuickwitCollector { impl QuickwitCollector { pub fn is_count_only(&self) -> bool { - self.max_hits == 0 && self.aggregation.is_none() && self.search_after.is_none() + self.max_hits == 0 && self.aggregation.is_none() } /// Updates search parameters affecting the returned documents. /// Does not update aggregations. diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index bc6c3e51a96..1e66d70d5bb 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -1038,7 +1038,7 @@ async fn resolve_storage_and_leaf_search( } /// Optimizes the search_request based on CanSplitDoBetter -/// Returns a tuple of (the search_request was optimized, split can return better results) +/// Returns true if the split can return better results fn check_optimize_search_request( search_request: &mut SearchRequest, split: &SplitIdAndFooterOffsets, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 164493bc456..dc28ed8d5e8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -609,8 +609,11 @@ pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &Search return false; } - // TODO: if the start and end timestamp encompass the whole split, it is still a count query - // So some could be checked on metadata + // If the start and end timestamp encompass the whole split, it is still a count query. + // We remove this currently on the leaf level, but not yet on the root level. + // There's a small advantage when we would do this on the root level, since we have the + // counts available on the split. On the leaf it is currently required to open the split + // to get the count. if request.start_timestamp.is_some() || request.end_timestamp.is_some() { return false; }