diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 928020c84bb..b7a4edfe2d4 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector { } impl QuickwitCollector { + pub fn is_count_only(&self) -> bool { + self.max_hits == 0 && self.aggregation.is_none() && self.search_after.is_none() + } + /// Updates search parameters affecting the returned documents. + /// Does not update aggregations. + pub fn update_search_param(&mut self, search_request: &SearchRequest) { + let sort_by = sort_by_from_request(search_request); + self.sort_by = sort_by; + self.max_hits = search_request.max_hits as usize; + self.start_offset = search_request.start_offset as usize; + self.search_after.clone_from(&search_request.search_after); + } pub fn fast_field_names(&self) -> HashSet { let mut fast_field_names = HashSet::default(); self.sort_by.first.add_fast_field(&mut fast_field_names); diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d8ff592338d..bc6c3e51a96 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use anyhow::Context; use futures::future::try_join_all; @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; +use crate::root::is_metadata_count_request_with_ast; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -330,6 +331,16 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh Ok(()) } +fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { + LeafSearchResponse { + num_hits: count, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + intermediate_aggregation_result: None, + } +} + /// Apply a leaf search on a single split. async fn leaf_search_single_split( searcher_context: &SearcherContext, @@ -337,6 +348,7 @@ async fn leaf_search_single_split( storage: Arc, split: SplitIdAndFooterOffsets, doc_mapper: Arc, + split_filter: Arc>, aggregations_limits: AggregationLimits, ) -> crate::Result { rewrite_request( @@ -362,32 +374,67 @@ async fn leaf_search_single_split( .await?; let split_schema = index.schema(); - let quickwit_collector = - make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; - let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = reader.searcher(); - let collector_warmup_info = quickwit_collector.warmup_info(); + let mut collector = + make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request + // This may be the case for AllQuery with a sort by date, where the current split can't have + // better results. + // + // TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save + // opening the index and execute this earlier. Opening splits is typically served from the + // cache, so there may be no gain adding that info to SplitIdAndFooterOffsets. + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64)); + } + + let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; + + let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::search_thread_pool() - .run_cpu_intensive(move || { - let _span_guard = span.enter(); - searcher.search(&query, &quickwit_collector) - }) - .await - .map_err(|_| { - crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })??; + + let (search_request, leaf_search_response) = { + let split = split.clone(); + + crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + // Our search execution has been scheduled, let's check if we can improve the + // request based on the results of the preceding searches + check_optimize_search_request(&mut search_request, &split, &split_filter); + collector.update_search_param(&search_request); + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(( + search_request, + get_leaf_resp_from_count(searcher.num_docs() as u64), + )); + } + if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + Ok((search_request, get_leaf_resp_from_count(count))) + } else { + searcher + .search(&query, &collector) + .map(|resp| (search_request, resp)) + } + }) + .await + .map_err(|_| { + crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) + })?? + }; searcher_context .leaf_search_cache @@ -850,7 +897,7 @@ impl CanSplitDoBetter { /// Record the new worst-of-the-top document, that is, the document which would first be /// evicted from the list of best documents, if a better document was found. Only call this - /// funciton if you have at least max_hits documents already. + /// function if you have at least max_hits documents already. fn record_new_worst_hit(&mut self, hit: &PartialHit) { match self { CanSplitDoBetter::Uninformative => (), @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search( .await } +/// Optimizes the search_request based on CanSplitDoBetter +/// Returns a tuple of (the search_request was optimized, split can return better results) +fn check_optimize_search_request( + search_request: &mut SearchRequest, + split: &SplitIdAndFooterOffsets, + split_filter: &Arc>, +) -> bool { + let can_be_better = split_filter.read().unwrap().can_be_better(split); + if !can_be_better { + disable_search_request_hits(search_request); + } + can_be_better +} + +/// Alter the search request so it does not return any docs. +/// +/// This is usually done since it cannot provide better hits results than existing fetched results. +fn disable_search_request_hits(search_request: &mut SearchRequest) { + search_request.max_hits = 0; + search_request.start_offset = 0; + search_request.sort_fields.clear(); +} + /// `leaf` step of search. /// /// The leaf search collects all kind of information, and returns a set of @@ -1016,7 +1086,7 @@ pub async fn leaf_search( || (request.aggregation_request.is_some() && !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_))); - let split_filter = Arc::new(Mutex::new(split_filter)); + let split_filter = Arc::new(RwLock::new(split_filter)); let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); @@ -1034,13 +1104,9 @@ pub async fn leaf_search( let mut request = (*request).clone(); - if !split_filter.lock().unwrap().can_be_better(&split) { - if !run_all_splits { - continue; - } - request.max_hits = 0; - request.start_offset = 0; - request.sort_fields.clear(); + let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); + if !can_be_better && !run_all_splits { + continue; } leaf_search_single_split_futures.push(tokio::spawn( @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper( index_storage: Arc, doc_mapper: Arc, split: SplitIdAndFooterOffsets, - split_filter: Arc>, + split_filter: Arc>, incremental_merge_collector: Arc>, leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, aggregations_limits: AggregationLimits, @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper( index_storage, split.clone(), doc_mapper, + split_filter.clone(), aggregations_limits, ) .await; @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper( }), } if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() { - // TODO: we could use a RWLock instead and read the value instead of updateing it + // TODO: we could use the RWLock instead and read the value instead of updateing it // unconditionally. split_filter - .lock() + .write() .unwrap() .record_new_worst_hit(last_hit.as_ref()); } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index acd3712e09b..164493bc456 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -590,9 +590,19 @@ async fn search_partial_hits_phase_with_scroll( /// metadata count. /// /// This is done by exclusion, so we will need to keep it up to date if fields are added. -fn is_metadata_count_request(request: &SearchRequest) -> bool { +pub fn is_metadata_count_request(request: &SearchRequest) -> bool { let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap(); - if query_ast != QueryAst::MatchAll { + is_metadata_count_request_with_ast(&query_ast, request) +} + +/// Check if the request is a count request without any filters, so we can just return the split +/// metadata count. +/// +/// This is done by exclusion, so we will need to keep it up to date if fields are added. +/// +/// The passed query_ast should match the serialized on in request. +pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool { + if query_ast != &QueryAst::MatchAll { return false; } if request.max_hits != 0 { @@ -611,7 +621,7 @@ fn is_metadata_count_request(request: &SearchRequest) -> bool { } /// Get a leaf search response that returns the num_docs of the split -fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { +pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { split_metadatas .iter() .map(|metadata| LeafSearchResponse {