Skip to content

Commit

Permalink
count optimization for multisplits
Browse files Browse the repository at this point in the history
* optimization requests by passing threshold in leaf search
* Execute query.count() instead of QuickwitCollector for count searches

We have 100 concurrent split searches by default, but num_cpus worker
threads. This means most search futures will wait to be
scheduled. When they are scheduled they can check the new threshold from
the preceding searches and maybe skip the search.

Switches to RWLock for the threshold since we read more often now.

Future Work:
We run num_cpu full searches in some cases before the threshold kicks
in. But in some cases we could statically
analyze from which split the best results come and generate count only
requests for the others.

Addresses #5032
  • Loading branch information
PSeitz committed May 30, 2024
1 parent 7b62b40 commit b6fb677
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 31 deletions.
12 changes: 12 additions & 0 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector {
}

impl QuickwitCollector {
pub fn is_count_only(&self) -> bool {
self.max_hits == 0 && self.aggregation.is_none() && self.search_after.is_none()
}
/// Updates search parameters affecting the returned documents.
/// Does not update aggregations.
pub fn update_search_param(&mut self, search_request: &SearchRequest) {
let sort_by = sort_by_from_request(search_request);
self.sort_by = sort_by;
self.max_hits = search_request.max_hits as usize;
self.start_offset = search_request.start_offset as usize;
self.search_after.clone_from(&search_request.search_after);
}
pub fn fast_field_names(&self) -> HashSet<String> {
let mut fast_field_names = HashSet::default();
self.sort_by.first.add_fast_field(&mut fast_field_names);
Expand Down
123 changes: 95 additions & 28 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

use anyhow::Context;
use futures::future::try_join_all;
Expand All @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::root::is_metadata_count_request_with_ast;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -330,13 +331,24 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh
Ok(())
}

fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
LeafSearchResponse {
num_hits: count,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
intermediate_aggregation_result: None,
}
}

/// Apply a leaf search on a single split.
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
doc_mapper: Arc<dyn DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimits,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
Expand All @@ -362,32 +374,67 @@ async fn leaf_search_single_split(
.await?;
let split_schema = index.schema();

let quickwit_collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let collector_warmup_info = quickwit_collector.warmup_info();
let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

let collector_warmup_info = collector.warmup_info();
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search");
let leaf_search_response = crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
searcher.search(&query, &quickwit_collector)
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??;

let (search_request, leaf_search_response) = {
let split = split.clone();

crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
// Our search execution has been scheduled, let's check if we can improve the
// request based on the results of the preceding searches
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok((
search_request,
get_leaf_resp_from_count(searcher.num_docs() as u64),
));
}
if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Ok((search_request, get_leaf_resp_from_count(count)))
} else {
searcher
.search(&query, &collector)
.map(|resp| (search_request, resp))
}
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??
};

searcher_context
.leaf_search_cache
Expand Down Expand Up @@ -850,7 +897,7 @@ impl CanSplitDoBetter {

/// Record the new worst-of-the-top document, that is, the document which would first be
/// evicted from the list of best documents, if a better document was found. Only call this
/// funciton if you have at least max_hits documents already.
/// function if you have at least max_hits documents already.
fn record_new_worst_hit(&mut self, hit: &PartialHit) {
match self {
CanSplitDoBetter::Uninformative => (),
Expand Down Expand Up @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search(
.await
}

/// Optimizes the search_request based on CanSplitDoBetter
/// Returns a tuple of (the search_request was optimized, split can return better results)
fn check_optimize_search_request(
search_request: &mut SearchRequest,
split: &SplitIdAndFooterOffsets,
split_filter: &Arc<RwLock<CanSplitDoBetter>>,
) -> bool {
let can_be_better = split_filter.read().unwrap().can_be_better(split);
if !can_be_better {
disable_search_request_hits(search_request);
}
can_be_better
}

/// Alter the search request so it does not return any docs.
///
/// This is usually done since it cannot provide better hits results than existing fetched results.
fn disable_search_request_hits(search_request: &mut SearchRequest) {
search_request.max_hits = 0;
search_request.start_offset = 0;
search_request.sort_fields.clear();
}

/// `leaf` step of search.
///
/// The leaf search collects all kind of information, and returns a set of
Expand All @@ -1016,7 +1086,7 @@ pub async fn leaf_search(
|| (request.aggregation_request.is_some()
&& !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_)));

let split_filter = Arc::new(Mutex::new(split_filter));
let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());

Expand All @@ -1034,13 +1104,9 @@ pub async fn leaf_search(

let mut request = (*request).clone();

if !split_filter.lock().unwrap().can_be_better(&split) {
if !run_all_splits {
continue;
}
request.max_hits = 0;
request.start_offset = 0;
request.sort_fields.clear();
let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
}

leaf_search_single_split_futures.push(tokio::spawn(
Expand Down Expand Up @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper(
index_storage: Arc<dyn Storage>,
doc_mapper: Arc<dyn DocMapper>,
split: SplitIdAndFooterOffsets,
split_filter: Arc<Mutex<CanSplitDoBetter>>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
aggregations_limits: AggregationLimits,
Expand All @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper(
index_storage,
split.clone(),
doc_mapper,
split_filter.clone(),
aggregations_limits,
)
.await;
Expand Down Expand Up @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper(
}),
}
if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() {
// TODO: we could use a RWLock instead and read the value instead of updateing it
// TODO: we could use the RWLock instead and read the value instead of updateing it
// unconditionally.
split_filter
.lock()
.write()
.unwrap()
.record_new_worst_hit(last_hit.as_ref());
}
Expand Down
16 changes: 13 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,19 @@ async fn search_partial_hits_phase_with_scroll(
/// metadata count.
///
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
fn is_metadata_count_request(request: &SearchRequest) -> bool {
pub fn is_metadata_count_request(request: &SearchRequest) -> bool {
let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap();
if query_ast != QueryAst::MatchAll {
is_metadata_count_request_with_ast(&query_ast, request)
}

/// Check if the request is a count request without any filters, so we can just return the split
/// metadata count.
///
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
///
/// The passed query_ast should match the serialized on in request.
pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool {
if query_ast != &QueryAst::MatchAll {
return false;
}
if request.max_hits != 0 {
Expand All @@ -611,7 +621,7 @@ fn is_metadata_count_request(request: &SearchRequest) -> bool {
}

/// Get a leaf search response that returns the num_docs of the split
fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
split_metadatas
.iter()
.map(|metadata| LeafSearchResponse {
Expand Down

0 comments on commit b6fb677

Please sign in to comment.