Skip to content

Commit

Permalink
move to function, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Jun 6, 2024
1 parent 14af027 commit e8057e5
Showing 1 changed file with 58 additions and 52 deletions.
110 changes: 58 additions & 52 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,29 @@ pub(crate) fn rewrite_start_end_time_bounds(
}
}

/// Checks if request is a simple all query.
/// Simple in this case would still including sorting
fn is_simple_all_query(search_request: &SearchRequest) -> bool {
if search_request.aggregation_request.is_some() {
return false;
}

if search_request.search_after.is_some() {
return false;
}

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() {
return false;
}

let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else {
return false;
};

matches!(query_ast, QueryAst::MatchAll)
}

#[derive(Debug, Clone)]
enum CanSplitDoBetter {
Uninformative,
Expand Down Expand Up @@ -887,66 +910,51 @@ impl CanSplitDoBetter {
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);
// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

if request.aggregation_request.is_some() || request.search_after.is_some() {
return Ok(split_with_req);
if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>());
}
let query_ast: QueryAst = serde_json::from_str(request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return Ok(split_with_req);
}
let num_requested_docs = request.start_offset + request.max_hits;

if !matches!(query_ast, QueryAst::MatchAll) {
return Ok(split_with_req);
}
// Calculate the number of splits which are guaranteed to deliver enough documents.
let min_required_splits = splits
.iter()
.map(|split| split.num_docs)
// computing the partial sum
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| {
*partial_sum += num_docs_in_split;
Some(*partial_sum)
})
.take_while(|partial_sum| *partial_sum < num_requested_docs)
.count()
+ 1;

// Count the number of splits which contain enough documents
let count_required_splits =
move |split_with_req: &[(SplitIdAndFooterOffsets, SearchRequest)],
num_requested_docs: u64| {
let mut num_docs = 0;
split_with_req
.iter()
.take_while(|(split, _req)| {
let need_more_docs = num_docs < num_requested_docs;
num_docs += split.num_docs;
need_more_docs
})
.count()
};
// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
let num_requested_docs = request.start_offset + request.max_hits;
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
for (_split, ref mut request) in &mut split_with_req[num_splits..] {
for (_split, ref mut request) in &mut split_with_req[min_required_splits..] {
disable_search_request_hits(request);
}
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
Expand All @@ -960,11 +968,13 @@ impl CanSplitDoBetter {
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(num_splits)
.take(min_required_splits)
.map(|(split, _)| split.timestamp_end())
.max()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MIN);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
Expand All @@ -973,23 +983,19 @@ impl CanSplitDoBetter {
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(num_splits)
.take(min_required_splits)
.map(|(split, _)| split.timestamp_start())
.min()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MAX);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
Expand Down

0 comments on commit e8057e5

Please sign in to comment.