diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 7f1893e52ec..b5cc92cd5d2 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -353,6 +353,8 @@ message SplitIdAndFooterOffsets { optional int64 timestamp_start = 4; // The highest timestamp appearing in the split, in seconds since epoch optional int64 timestamp_end = 5; + // The number of docs in the split + uint64 num_docs = 6; } // Hits returned by a FetchDocRequest. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 9b89a4629f6..bee455bccd3 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -303,6 +303,9 @@ pub struct SplitIdAndFooterOffsets { /// The highest timestamp appearing in the split, in seconds since epoch #[prost(int64, optional, tag = "5")] pub timestamp_end: ::core::option::Option, + /// The number of docs in the split + #[prost(uint64, tag = "6")] + pub num_docs: u64, } /// Hits returned by a FetchDocRequest. /// diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index fff9b2af281..8d30b6daeab 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -380,6 +380,7 @@ mod tests { split_footer_start: 0, timestamp_start: None, timestamp_end: None, + num_docs: 0, }], ..Default::default() } @@ -406,6 +407,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -413,6 +415,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, ], }], @@ -441,6 +444,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -448,6 +452,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, ], } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 1e66d70d5bb..5f73757ea56 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -363,6 +363,17 @@ async fn leaf_search_single_split( return Ok(cached_answer); } + let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request + // This may be the case for AllQuery with a sort by date and time filter, where the current + // split can't have better results. + // + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + return Ok(get_leaf_resp_from_count(split.num_docs)); + } + let split_id = split.split_id.to_string(); let index = open_index_with_caches( searcher_context, @@ -382,19 +393,6 @@ async fn leaf_search_single_split( let mut collector = make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; - let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - - // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request - // This may be the case for AllQuery with a sort by date, where the current split can't have - // better results. - // - // TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save - // opening the index and execute this earlier. Opening splits is typically served from the - // cache, so there may be no gain adding that info to SplitIdAndFooterOffsets. - if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64)); - } let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; @@ -808,6 +806,29 @@ pub(crate) fn rewrite_start_end_time_bounds( } } +/// Checks if request is a simple all query. +/// Simple in this case would still including sorting +fn is_simple_all_query(search_request: &SearchRequest) -> bool { + if search_request.aggregation_request.is_some() { + return false; + } + + if search_request.search_after.is_some() { + return false; + } + + // TODO: Update the logic to handle start_timestamp end_timestamp ranges + if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() { + return false; + } + + let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else { + return false; + }; + + matches!(query_ast, QueryAst::MatchAll) +} + #[derive(Debug, Clone)] enum CanSplitDoBetter { Uninformative, @@ -879,6 +900,114 @@ impl CanSplitDoBetter { } } + /// This function tries to detect upfront which splits contain the top n hits and convert other + /// split searches to count only searches. It also optimizes split order. + /// + /// Returns the search_requests with their split. + fn optimize( + &self, + request: Arc, + mut splits: Vec, + ) -> Result, SearchError> { + self.optimize_split_order(&mut splits); + + if !is_simple_all_query(&request) { + // no optimization opportunity here. + return Ok(splits + .into_iter() + .map(|split| (split, (*request).clone())) + .collect::>()); + } + + let num_requested_docs = request.start_offset + request.max_hits; + + // Calculate the number of splits which are guaranteed to deliver enough documents. + let min_required_splits = splits + .iter() + .map(|split| split.num_docs) + // computing the partial sum + .scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| { + *partial_sum += num_docs_in_split; + Some(*partial_sum) + }) + .take_while(|partial_sum| *partial_sum < num_requested_docs) + .count() + + 1; + + // TODO: we maybe want here some deduplication + Cow logic + let mut split_with_req = splits + .into_iter() + .map(|split| (split, (*request).clone())) + .collect::>(); + + // reuse the detected sort order in split_filter + // we want to detect cases where we can convert some split queries to count only queries + match self { + CanSplitDoBetter::SplitIdHigher(_) => { + // In this case there is no sort order, we order by split id. + // If the the first split has enough documents, we can convert the other queries to + // count only queries + for (_split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits) + { + disable_search_request_hits(request); + } + } + CanSplitDoBetter::Uninformative => {} + CanSplitDoBetter::SplitTimestampLower(_) => { + // We order by timestamp asc. split_with_req is sorted by timestamp_start. + // + // If we know that some splits will deliver enough documents, we can convert the + // others to count only queries. + // Since we only have start and end ranges and don't know the distribution we make + // sure the splits dont' overlap, since the distribution of two + // splits could be like this (dot is a timestamp doc on a x axis), for top 2 + // queries. + // ``` + // [. .] Split1 has enough docs, but last doc is not in top 2 + // [.. .] Split2 first doc is in top2 + // ``` + // Let's get the biggest timestamp_end of the first num_splits splits + let biggest_end_timestamp = split_with_req + .iter() + .take(min_required_splits) + .map(|(split, _)| split.timestamp_end()) + .max() + // if min_required_splits is 0, we choose a value that disables all splits + .unwrap_or(i64::MIN); + for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits) + { + if split.timestamp_start() > biggest_end_timestamp { + disable_search_request_hits(request); + } + } + } + CanSplitDoBetter::SplitTimestampHigher(_) => { + // We order by timestamp desc. split_with_req is sorted by timestamp_end desc. + // + // We have the number of splits we need to search to get enough docs, now we need to + // find the splits that don't overlap. + // + // Let's get the smallest timestamp_start of the first num_splits splits + let smallest_start_timestamp = split_with_req + .iter() + .take(min_required_splits) + .map(|(split, _)| split.timestamp_start()) + .min() + // if min_required_splits is 0, we choose a value that disables all splits + .unwrap_or(i64::MAX); + for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits) + { + if split.timestamp_end() < smallest_start_timestamp { + disable_search_request_hits(request); + } + } + } + CanSplitDoBetter::FindTraceIdsAggregation(_) => {} + } + + Ok(split_with_req) + } + /// Returns whether the given split can possibly give documents better than the one already /// known to match. fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool { @@ -1071,14 +1200,14 @@ pub async fn leaf_search( searcher_context: Arc, request: Arc, index_storage: Arc, - mut splits: Vec, + splits: Vec, doc_mapper: Arc, aggregations_limits: AggregationLimits, ) -> Result { info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5)); let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name()); - split_filter.optimize_split_order(&mut splits); + let split_with_req = split_filter.optimize(request.clone(), splits)?; // if client wants full count, or we are doing an aggregation, we want to run every splits. // However if the aggregation is the tracing aggregation, we don't actually need all splits. @@ -1088,13 +1217,13 @@ pub async fn leaf_search( let split_filter = Arc::new(RwLock::new(split_filter)); - let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); + let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len()); let merge_collector = make_merge_collector(&request, &aggregations_limits)?; let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); - for split in splits { + for (split, mut request) in split_with_req { let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore .clone() .acquire_owned() @@ -1102,8 +1231,6 @@ pub async fn leaf_search( .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - let mut request = (*request).clone(); - let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { continue; diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index f94f2d64629..86c5496240e 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -207,6 +207,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let split_2 = SplitIdAndFooterOffsets { @@ -215,6 +216,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let query_1 = SearchRequest { @@ -269,6 +271,7 @@ mod tests { split_footer_end: 100, timestamp_start: Some(100), timestamp_end: Some(199), + num_docs: 0, }; let split_2 = SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -276,6 +279,7 @@ mod tests { split_footer_end: 100, timestamp_start: Some(150), timestamp_end: Some(249), + num_docs: 0, }; let split_3 = SplitIdAndFooterOffsets { split_id: "split_3".to_string(), @@ -283,6 +287,7 @@ mod tests { split_footer_end: 100, timestamp_start: Some(150), timestamp_end: Some(249), + num_docs: 0, }; let query_1 = SearchRequest { diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 38d602f1bbc..d43bba9f507 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -173,6 +173,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn .time_range .as_ref() .map(|time_range| *time_range.end()), + num_docs: split_metadata.num_docs as u64, } } diff --git a/quickwit/quickwit-search/src/list_fields_cache.rs b/quickwit/quickwit-search/src/list_fields_cache.rs index 0ab04c584f6..df60258db25 100644 --- a/quickwit/quickwit-search/src/list_fields_cache.rs +++ b/quickwit/quickwit-search/src/list_fields_cache.rs @@ -87,6 +87,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let split_2 = SplitIdAndFooterOffsets { @@ -95,6 +96,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let result = ListFieldsEntryResponse { diff --git a/quickwit/quickwit-search/src/retry/mod.rs b/quickwit/quickwit-search/src/retry/mod.rs index 6df601eb3b7..72a9e214c30 100644 --- a/quickwit/quickwit-search/src/retry/mod.rs +++ b/quickwit/quickwit-search/src/retry/mod.rs @@ -133,6 +133,7 @@ mod tests { split_footer_start: 0, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let client_for_retry = retry_client( &search_job_placer, diff --git a/quickwit/quickwit-search/src/retry/search.rs b/quickwit/quickwit-search/src/retry/search.rs index cc6bce4f9cf..f6932d92d3b 100644 --- a/quickwit/quickwit-search/src/retry/search.rs +++ b/quickwit/quickwit-search/src/retry/search.rs @@ -96,6 +96,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -103,6 +104,7 @@ mod tests { split_footer_end: 100, timestamp_start: None, timestamp_end: None, + num_docs: 0, }, ], }], diff --git a/quickwit/quickwit-search/src/retry/search_stream.rs b/quickwit/quickwit-search/src/retry/search_stream.rs index 15b184f890c..d2f825eddb6 100644 --- a/quickwit/quickwit-search/src/retry/search_stream.rs +++ b/quickwit/quickwit-search/src/retry/search_stream.rs @@ -110,6 +110,7 @@ mod tests { split_footer_start: 0, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let split_2 = SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -117,6 +118,7 @@ mod tests { split_footer_start: 0, timestamp_start: None, timestamp_end: None, + num_docs: 0, }; let retry_policy = LeafSearchStreamRetryPolicy {}; let request = LeafSearchStreamRequest { diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 3bd43c6da77..4a690be37b0 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +import copy import glob import gzip import http import json import os import requests +import random import shutil import subprocess import sys @@ -97,6 +99,16 @@ def run_request_step(method, step, previous_result): kvargs["data"] = open(body_from_file, 'rb').read() kvargs = resolve_previous_result(kvargs, previous_result) + shuffle_ndjson = step.get("shuffle_ndjson", None) + if shuffle_ndjson is not None: + docs_per_split = distribute_items(shuffle_ndjson, step.get("min_splits", 1), step.get("max_splits", 5), step.get("seed", None)) + + for i, bucket in enumerate(docs_per_split): + new_step = copy.deepcopy(step) + del new_step["shuffle_ndjson"] + new_step["ndjson"] = bucket + run_request_step(method, new_step, previous_result) + return; ndjson = step.get("ndjson", None) if ndjson is not None: # Add a newline at the end to please elasticsearch -> "The bulk request must be terminated by a newline [\\n]". @@ -120,6 +132,27 @@ def run_request_step(method, step, previous_result): raise e return json_resp +def distribute_items(items, min_buckets, max_buckets, seed=None): + if seed is None: + seed = random.randint(0, 10000) + random.seed(seed) + + # Determine the number of buckets + num_buckets = random.randint(min_buckets, max_buckets) + + # Initialize empty buckets + buckets = [[] for _ in range(num_buckets)] + + # Distribute items randomly into buckets + for item in items: + random_bucket = random.randint(0, num_buckets - 1) + buckets[random_bucket].append(item) + + # Print the seed for reproducibility + print(f"Seed: {seed}") + + return buckets + def check_result(result, expected, context_path = ""): if type(expected) == dict and "$expect" in expected: expectations = expected["$expect"] diff --git a/quickwit/rest-api-tests/scenarii/multi_splits/0001-request-optimizations.yaml b/quickwit/rest-api-tests/scenarii/multi_splits/0001-request-optimizations.yaml new file mode 100644 index 00000000000..b14c6f99e95 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/multi_splits/0001-request-optimizations.yaml @@ -0,0 +1,133 @@ +json: + size: 1 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T10:00:00Z"} +--- +json: + size: 2 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} +--- +json: + size: 3 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} +--- +json: + size: 5 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T12:00:00Z"} +--- # ASC + TIMESTAMP filter +json: + size: 5 + track_total_hits: true + query: + range: + timestamp: + gte: "2015-01-10T12:00:00Z" + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T12:00:00Z"} + - _source: {"timestamp": "2015-01-10T13:00:00Z"} + - _source: {"timestamp": "2015-01-10T14:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} +--- # ASC + TIMESTAMP filter +json: + size: 5 + track_total_hits: true + query: + range: + timestamp: + lt: "2015-01-10T12:00:00Z" + sort: + - timestamp: + order: asc +expected: + hits: + hits: + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-10T10:00:00Z"} +--- # DESC +json: + size: 6 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: desc +expected: + hits: + hits: + - _source: {"timestamp": "2016-01-11T12:00:00Z"} + - _source: {"timestamp": "2016-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} +--- +json: + size: 7 + track_total_hits: true + query: + match_all: {} + sort: + - timestamp: + order: desc +expected: + hits: + hits: + - _source: {"timestamp": "2016-01-11T12:00:00Z"} + - _source: {"timestamp": "2016-01-10T10:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-11T12:00:00Z"} + - _source: {"timestamp": "2015-01-10T14:00:00Z"} + diff --git a/quickwit/rest-api-tests/scenarii/multi_splits/_ctx.yaml b/quickwit/rest-api-tests/scenarii/multi_splits/_ctx.yaml new file mode 100644 index 00000000000..bb619032c8b --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/multi_splits/_ctx.yaml @@ -0,0 +1,5 @@ +method: [GET] +endpoint: "multi_splits/_search" +# The entire suite is just for Quickwit +engines: [quickwit] +api_root: "http://localhost:7280/api/v1/_elastic/" diff --git a/quickwit/rest-api-tests/scenarii/multi_splits/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/multi_splits/_setup.quickwit.yaml new file mode 100644 index 00000000000..48c0f215b4d --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/multi_splits/_setup.quickwit.yaml @@ -0,0 +1,46 @@ +# Delete possibly remaining index +method: DELETE +endpoint: indexes/multi_splits +status_code: null +--- +# Create index +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/ +json: + version: "0.7" + index_id: multi_splits + doc_mapping: + mode: dynamic + timestamp_field: timestamp + field_mappings: + - name: timestamp + type: datetime + input_formats: + - rfc3339 + fast: true +sleep_after: 3 +--- +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: multi_splits/ingest +params: + commit: force +min_splits: 1 +max_splits: 10 +#seed: 3694 +shuffle_ndjson: + - {"timestamp": "2015-01-10T10:00:00Z"} + - {"timestamp": "2015-01-11T12:00:00Z"} + - {"timestamp": "2015-01-10T10:00:00Z"} + - {"timestamp": "2015-01-10T13:00:00Z"} + - {"timestamp": "2015-01-11T12:00:00Z"} + - {"timestamp": "2015-01-10T10:00:00Z"} + - {"timestamp": "2015-01-10T14:00:00Z"} # 1h later than 2.doc + - {"timestamp": "2015-01-11T12:00:00Z"} + - {"timestamp": "2015-01-10T10:00:00Z"} + - {"timestamp": "2015-01-10T12:00:00Z"} # 1h earlier than 2. doc + - {"timestamp": "2015-01-11T12:00:00Z"} + - {"timestamp": "2016-01-10T10:00:00Z"} + - {"timestamp": "2016-01-11T12:00:00Z"} + diff --git a/quickwit/rest-api-tests/scenarii/multi_splits/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/multi_splits/_teardown.quickwit.yaml new file mode 100644 index 00000000000..68aeddec4bf --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/multi_splits/_teardown.quickwit.yaml @@ -0,0 +1,4 @@ +method: DELETE +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/multi_splits +status_code: null