Skip to content

Commit

Permalink
optimize topn requests (#5075)
Browse files Browse the repository at this point in the history
* optimize topn requests

add logic to detect which splits will deliver the top n results for
requests. This is only supported for match_all requests, with optional
sort_by on timestamp sorting.

start_timestamp, end_timestamp as well as a filter on the timestamp field
is not supported currently but could be.

* move to function, refactor
  • Loading branch information
PSeitz authored Jul 5, 2024
1 parent cb63d32 commit 7845137
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 19 deletions.
2 changes: 2 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_start = 4;
// The highest timestamp appearing in the split, in seconds since epoch
optional int64 timestamp_end = 5;
// The number of docs in the split
uint64 num_docs = 6;
}

// Hits returned by a FetchDocRequest.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
}],
..Default::default()
}
Expand All @@ -406,13 +407,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down Expand Up @@ -441,13 +444,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}
Expand Down
165 changes: 146 additions & 19 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
//
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(split.num_docs));
}

let split_id = split.split_id.to_string();
let index = open_index_with_caches(
searcher_context,
Expand All @@ -382,19 +393,6 @@ async fn leaf_search_single_split(

let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

Expand Down Expand Up @@ -808,6 +806,29 @@ pub(crate) fn rewrite_start_end_time_bounds(
}
}

/// Checks if request is a simple all query.
/// Simple in this case would still including sorting
fn is_simple_all_query(search_request: &SearchRequest) -> bool {
if search_request.aggregation_request.is_some() {
return false;
}

if search_request.search_after.is_some() {
return false;
}

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() {
return false;
}

let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else {
return false;
};

matches!(query_ast, QueryAst::MatchAll)
}

#[derive(Debug, Clone)]
enum CanSplitDoBetter {
Uninformative,
Expand Down Expand Up @@ -879,6 +900,114 @@ impl CanSplitDoBetter {
}
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>());
}

let num_requested_docs = request.start_offset + request.max_hits;

// Calculate the number of splits which are guaranteed to deliver enough documents.
let min_required_splits = splits
.iter()
.map(|split| split.num_docs)
// computing the partial sum
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| {
*partial_sum += num_docs_in_split;
Some(*partial_sum)
})
.take_while(|partial_sum| *partial_sum < num_requested_docs)
.count()
+ 1;

// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
for (_split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
disable_search_request_hits(request);
}
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_end())
.max()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MIN);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_start())
.min()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MAX);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {}
}

Ok(split_with_req)
}

/// Returns whether the given split can possibly give documents better than the one already
/// known to match.
fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool {
Expand Down Expand Up @@ -1071,14 +1200,14 @@ pub async fn leaf_search(
searcher_context: Arc<SearcherContext>,
request: Arc<SearchRequest>,
index_storage: Arc<dyn Storage>,
mut splits: Vec<SplitIdAndFooterOffsets>,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);
let split_with_req = split_filter.optimize(request.clone(), splits)?;

// if client wants full count, or we are doing an aggregation, we want to run every splits.
// However if the aggregation is the tracing aggregation, we don't actually need all splits.
Expand All @@ -1088,22 +1217,20 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for split in splits {
for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

let mut request = (*request).clone();

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -215,6 +216,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down Expand Up @@ -269,20 +271,23 @@ mod tests {
split_footer_end: 100,
timestamp_start: Some(100),
timestamp_end: Some(199),
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};
let split_3 = SplitIdAndFooterOffsets {
split_id: "split_3".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
.time_range
.as_ref()
.map(|time_range| *time_range.end()),
num_docs: split_metadata.num_docs as u64,
}
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/list_fields_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -95,6 +96,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let result = ListFieldsEntryResponse {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let client_for_retry = retry_client(
&search_job_placer,
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_end: 100,
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let retry_policy = LeafSearchStreamRetryPolicy {};
let request = LeafSearchStreamRequest {
Expand Down
Loading

0 comments on commit 7845137

Please sign in to comment.