Skip to content

Commit

Permalink
Added a count-only optimisation to QuickwitCollector
Browse files Browse the repository at this point in the history
Related to #5032
  • Loading branch information
fulmicoton committed May 26, 2024
1 parent 7b62b40 commit dd97875
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 10 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,10 @@ pub(crate) struct QuickwitCollector {
}

impl QuickwitCollector {
pub fn is_count_only(&self) -> bool {
self.max_hits == 0 && self.aggregation.is_none() && self.search_after.is_none()
}

pub fn fast_field_names(&self) -> HashSet<String> {
let mut fast_field_names = HashSet::default();
self.sort_by.first.add_fast_field(&mut fast_field_names);
Expand Down
99 changes: 89 additions & 10 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimits;
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::query::{AllQuery, Bm25StatisticsProvider};
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::collector::{
make_collector_for_split, make_merge_collector, IncrementalCollector, QuickwitCollector,
};
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -360,20 +363,100 @@ async fn leaf_search_single_split(
true,
)
.await?;
let split_schema = index.schema();

let quickwit_collector =
let collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let collector_warmup_info = quickwit_collector.warmup_info();
let leaf_search_response =
leaf_search_single_split_core(query_ast, collector, searcher, &*doc_mapper, &split_id)
.await?;

searcher_context
.leaf_search_cache
.put(split, search_request, leaf_search_response.clone());

Ok(leaf_search_response)
}

/// Optimization for the case where the user only requested the number of hits.
/// (no hits & no aggregations)
async fn leaf_search_single_split_count_only(
query_ast: QueryAst,
searcher: Searcher,
doc_mapper: &dyn DocMapper,
split_id: &str,
) -> crate::Result<u64> {

let split_schema = searcher.index().schema();

let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;

// If this is for the all query, we do not need to read further information.
if query.is::<AllQuery>() {
let count = searcher.total_num_docs()?;
return Ok(count);
}

// If this is a term query, we can get the information from the term dictionary.
if let Some(term_query) = query.downcast_ref::<tantivy::query::TermQuery>() {
let mut count = 0u64;
let term = term_query.term();
for segment_reader in searcher.segment_readers() {
let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq_async(term).await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})?;
count += doc_freq as u64;
}
return Ok(count);
}

// Even outside of these edge case, we can rely
// an tantivy's optimization for counting.
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search");
crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
let count = query.count(&searcher)? as u64;
Ok(count)
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})?
}

async fn leaf_search_single_split_core(
query_ast: QueryAst,
collector: QuickwitCollector,
searcher: Searcher,
doc_mapper: &dyn DocMapper,
split_id: &str,
) -> crate::Result<LeafSearchResponse> {

if collector.is_count_only() {
let count = leaf_search_single_split_count_only(query_ast, searcher, doc_mapper, split_id).await?;
return Ok(LeafSearchResponse {
num_hits: count,
.. Default::default()
});
}

let split_schema = searcher.index().schema();

let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;
let collector_warmup_info = collector.warmup_info();
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

Expand All @@ -382,16 +465,12 @@ async fn leaf_search_single_split(
let leaf_search_response = crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
searcher.search(&query, &quickwit_collector)
searcher.search(&query, &collector)
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??;

searcher_context
.leaf_search_cache
.put(split, search_request, leaf_search_response.clone());
Ok(leaf_search_response)
}

Expand Down

0 comments on commit dd97875

Please sign in to comment.