diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 5dd4d847fb6..e38f9c1a2d6 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2565,6 +2565,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows 0.48.0", +] + [[package]] name = "generator" version = "0.8.1" @@ -2576,7 +2589,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows", + "windows 0.54.0", ] [[package]] @@ -3845,6 +3858,20 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator 0.7.5", + "pin-utils", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "loom" version = "0.7.2" @@ -3852,7 +3879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" dependencies = [ "cfg-if", - "generator", + "generator 0.8.1", "pin-utils", "scoped-tls", "tracing", @@ -4336,19 +4363,19 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "oneshot" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071d1cf3298ad8e543dca18217d198cb6a3884443d204757b9624b935ef09fa0" +version = "0.1.6" +source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" dependencies = [ - "loom", + "loom 0.5.6", ] [[package]] name = "oneshot" -version = "0.1.6" -source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071d1cf3298ad8e543dca18217d198cb6a3884443d204757b9624b935ef09fa0" dependencies = [ - "loom", + "loom 0.7.2", ] [[package]] @@ -5885,7 +5912,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "oneshot 0.1.7", "openssl", "proptest", "prost", @@ -8034,7 +8061,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", + "oneshot 0.1.6", "rayon", "regex", "rust-stemmers", @@ -9387,6 +9414,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.54.0" diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 928020c84bb..97a4a3c75d4 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector { } impl QuickwitCollector { + pub fn is_count_only(&self) -> bool { + self.max_hits == 0 && self.aggregation.is_none() + } + /// Updates search parameters affecting the returned documents. + /// Does not update aggregations. + pub fn update_search_param(&mut self, search_request: &SearchRequest) { + let sort_by = sort_by_from_request(search_request); + self.sort_by = sort_by; + self.max_hits = search_request.max_hits as usize; + self.start_offset = search_request.start_offset as usize; + self.search_after.clone_from(&search_request.search_after); + } pub fn fast_field_names(&self) -> HashSet { let mut fast_field_names = HashSet::default(); self.sort_by.first.add_fast_field(&mut fast_field_names); diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d8ff592338d..1e66d70d5bb 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use anyhow::Context; use futures::future::try_join_all; @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; +use crate::root::is_metadata_count_request_with_ast; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -330,6 +331,16 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh Ok(()) } +fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { + LeafSearchResponse { + num_hits: count, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + intermediate_aggregation_result: None, + } +} + /// Apply a leaf search on a single split. async fn leaf_search_single_split( searcher_context: &SearcherContext, @@ -337,6 +348,7 @@ async fn leaf_search_single_split( storage: Arc, split: SplitIdAndFooterOffsets, doc_mapper: Arc, + split_filter: Arc>, aggregations_limits: AggregationLimits, ) -> crate::Result { rewrite_request( @@ -362,32 +374,67 @@ async fn leaf_search_single_split( .await?; let split_schema = index.schema(); - let quickwit_collector = - make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; - let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = reader.searcher(); - let collector_warmup_info = quickwit_collector.warmup_info(); + let mut collector = + make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request + // This may be the case for AllQuery with a sort by date, where the current split can't have + // better results. + // + // TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save + // opening the index and execute this earlier. Opening splits is typically served from the + // cache, so there may be no gain adding that info to SplitIdAndFooterOffsets. + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64)); + } + + let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; + + let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::search_thread_pool() - .run_cpu_intensive(move || { - let _span_guard = span.enter(); - searcher.search(&query, &quickwit_collector) - }) - .await - .map_err(|_| { - crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })??; + + let (search_request, leaf_search_response) = { + let split = split.clone(); + + crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + // Our search execution has been scheduled, let's check if we can improve the + // request based on the results of the preceding searches + check_optimize_search_request(&mut search_request, &split, &split_filter); + collector.update_search_param(&search_request); + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(( + search_request, + get_leaf_resp_from_count(searcher.num_docs() as u64), + )); + } + if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + Ok((search_request, get_leaf_resp_from_count(count))) + } else { + searcher + .search(&query, &collector) + .map(|resp| (search_request, resp)) + } + }) + .await + .map_err(|_| { + crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) + })?? + }; searcher_context .leaf_search_cache @@ -850,7 +897,7 @@ impl CanSplitDoBetter { /// Record the new worst-of-the-top document, that is, the document which would first be /// evicted from the list of best documents, if a better document was found. Only call this - /// funciton if you have at least max_hits documents already. + /// function if you have at least max_hits documents already. fn record_new_worst_hit(&mut self, hit: &PartialHit) { match self { CanSplitDoBetter::Uninformative => (), @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search( .await } +/// Optimizes the search_request based on CanSplitDoBetter +/// Returns true if the split can return better results +fn check_optimize_search_request( + search_request: &mut SearchRequest, + split: &SplitIdAndFooterOffsets, + split_filter: &Arc>, +) -> bool { + let can_be_better = split_filter.read().unwrap().can_be_better(split); + if !can_be_better { + disable_search_request_hits(search_request); + } + can_be_better +} + +/// Alter the search request so it does not return any docs. +/// +/// This is usually done since it cannot provide better hits results than existing fetched results. +fn disable_search_request_hits(search_request: &mut SearchRequest) { + search_request.max_hits = 0; + search_request.start_offset = 0; + search_request.sort_fields.clear(); +} + /// `leaf` step of search. /// /// The leaf search collects all kind of information, and returns a set of @@ -1016,7 +1086,7 @@ pub async fn leaf_search( || (request.aggregation_request.is_some() && !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_))); - let split_filter = Arc::new(Mutex::new(split_filter)); + let split_filter = Arc::new(RwLock::new(split_filter)); let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); @@ -1034,13 +1104,9 @@ pub async fn leaf_search( let mut request = (*request).clone(); - if !split_filter.lock().unwrap().can_be_better(&split) { - if !run_all_splits { - continue; - } - request.max_hits = 0; - request.start_offset = 0; - request.sort_fields.clear(); + let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); + if !can_be_better && !run_all_splits { + continue; } leaf_search_single_split_futures.push(tokio::spawn( @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper( index_storage: Arc, doc_mapper: Arc, split: SplitIdAndFooterOffsets, - split_filter: Arc>, + split_filter: Arc>, incremental_merge_collector: Arc>, leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, aggregations_limits: AggregationLimits, @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper( index_storage, split.clone(), doc_mapper, + split_filter.clone(), aggregations_limits, ) .await; @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper( }), } if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() { - // TODO: we could use a RWLock instead and read the value instead of updateing it + // TODO: we could use the RWLock instead and read the value instead of updateing it // unconditionally. split_filter - .lock() + .write() .unwrap() .record_new_worst_hit(last_hit.as_ref()); } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index acd3712e09b..dc28ed8d5e8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -590,17 +590,30 @@ async fn search_partial_hits_phase_with_scroll( /// metadata count. /// /// This is done by exclusion, so we will need to keep it up to date if fields are added. -fn is_metadata_count_request(request: &SearchRequest) -> bool { +pub fn is_metadata_count_request(request: &SearchRequest) -> bool { let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap(); - if query_ast != QueryAst::MatchAll { + is_metadata_count_request_with_ast(&query_ast, request) +} + +/// Check if the request is a count request without any filters, so we can just return the split +/// metadata count. +/// +/// This is done by exclusion, so we will need to keep it up to date if fields are added. +/// +/// The passed query_ast should match the serialized on in request. +pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool { + if query_ast != &QueryAst::MatchAll { return false; } if request.max_hits != 0 { return false; } - // TODO: if the start and end timestamp encompass the whole split, it is still a count query - // So some could be checked on metadata + // If the start and end timestamp encompass the whole split, it is still a count query. + // We remove this currently on the leaf level, but not yet on the root level. + // There's a small advantage when we would do this on the root level, since we have the + // counts available on the split. On the leaf it is currently required to open the split + // to get the count. if request.start_timestamp.is_some() || request.end_timestamp.is_some() { return false; } @@ -611,7 +624,7 @@ fn is_metadata_count_request(request: &SearchRequest) -> bool { } /// Get a leaf search response that returns the num_docs of the split -fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { +pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { split_metadatas .iter() .map(|metadata| LeafSearchResponse {