Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

count optimization for multisplits #5048

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector {
}

impl QuickwitCollector {
pub fn is_count_only(&self) -> bool {
self.max_hits == 0 && self.aggregation.is_none()
}
/// Updates search parameters affecting the returned documents.
/// Does not update aggregations.
pub fn update_search_param(&mut self, search_request: &SearchRequest) {
let sort_by = sort_by_from_request(search_request);
self.sort_by = sort_by;
self.max_hits = search_request.max_hits as usize;
self.start_offset = search_request.start_offset as usize;
self.search_after.clone_from(&search_request.search_after);
}
pub fn fast_field_names(&self) -> HashSet<String> {
let mut fast_field_names = HashSet::default();
self.sort_by.first.add_fast_field(&mut fast_field_names);
Expand Down
123 changes: 95 additions & 28 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

use anyhow::Context;
use futures::future::try_join_all;
Expand All @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::root::is_metadata_count_request_with_ast;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -330,13 +331,24 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh
Ok(())
}

fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
LeafSearchResponse {
num_hits: count,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
intermediate_aggregation_result: None,
}
}

/// Apply a leaf search on a single split.
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
doc_mapper: Arc<dyn DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimits,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
Expand All @@ -362,32 +374,67 @@ async fn leaf_search_single_split(
.await?;
let split_schema = index.schema();

let quickwit_collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let collector_warmup_info = quickwit_collector.warmup_info();
let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great comment.

// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

let collector_warmup_info = collector.warmup_info();
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search");
let leaf_search_response = crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
searcher.search(&query, &quickwit_collector)
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??;

let (search_request, leaf_search_response) = {
let split = split.clone();

crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
// Our search execution has been scheduled, let's check if we can improve the
// request based on the results of the preceding searches
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok((
search_request,
get_leaf_resp_from_count(searcher.num_docs() as u64),
));
}
if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Ok((search_request, get_leaf_resp_from_count(count)))
} else {
searcher
.search(&query, &collector)
.map(|resp| (search_request, resp))
}
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??
};

searcher_context
.leaf_search_cache
Expand Down Expand Up @@ -850,7 +897,7 @@ impl CanSplitDoBetter {

/// Record the new worst-of-the-top document, that is, the document which would first be
/// evicted from the list of best documents, if a better document was found. Only call this
/// funciton if you have at least max_hits documents already.
/// function if you have at least max_hits documents already.
fn record_new_worst_hit(&mut self, hit: &PartialHit) {
match self {
CanSplitDoBetter::Uninformative => (),
Expand Down Expand Up @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search(
.await
}

/// Optimizes the search_request based on CanSplitDoBetter
/// Returns true if the split can return better results
fn check_optimize_search_request(
search_request: &mut SearchRequest,
split: &SplitIdAndFooterOffsets,
split_filter: &Arc<RwLock<CanSplitDoBetter>>,
) -> bool {
let can_be_better = split_filter.read().unwrap().can_be_better(split);
if !can_be_better {
disable_search_request_hits(search_request);
}
can_be_better
}

/// Alter the search request so it does not return any docs.
///
/// This is usually done since it cannot provide better hits results than existing fetched results.
fn disable_search_request_hits(search_request: &mut SearchRequest) {
search_request.max_hits = 0;
search_request.start_offset = 0;
search_request.sort_fields.clear();
}

/// `leaf` step of search.
///
/// The leaf search collects all kind of information, and returns a set of
Expand All @@ -1016,7 +1086,7 @@ pub async fn leaf_search(
|| (request.aggregation_request.is_some()
&& !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_)));

let split_filter = Arc::new(Mutex::new(split_filter));
let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());

Expand All @@ -1034,13 +1104,9 @@ pub async fn leaf_search(

let mut request = (*request).clone();

if !split_filter.lock().unwrap().can_be_better(&split) {
if !run_all_splits {
continue;
}
request.max_hits = 0;
request.start_offset = 0;
request.sort_fields.clear();
let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
}

leaf_search_single_split_futures.push(tokio::spawn(
Expand Down Expand Up @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper(
index_storage: Arc<dyn Storage>,
doc_mapper: Arc<dyn DocMapper>,
split: SplitIdAndFooterOffsets,
split_filter: Arc<Mutex<CanSplitDoBetter>>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
aggregations_limits: AggregationLimits,
Expand All @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper(
index_storage,
split.clone(),
doc_mapper,
split_filter.clone(),
aggregations_limits,
)
.await;
Expand Down Expand Up @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper(
}),
}
if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() {
// TODO: we could use a RWLock instead and read the value instead of updateing it
// TODO: we could use the RWLock instead and read the value instead of updateing it
// unconditionally.
split_filter
.lock()
.write()
.unwrap()
.record_new_worst_hit(last_hit.as_ref());
}
Expand Down
Loading
Loading