Skip to content

Commit

Permalink
plug exact count optimization (#4019)
Browse files Browse the repository at this point in the history
* plug exact count optimization

* add tests for count_all query param
  • Loading branch information
trinity-1686a authored Oct 26, 2023
1 parent 0274fe1 commit e1dc0fb
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 17 deletions.
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{SortField, SortOrder};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
Expand Down Expand Up @@ -856,6 +856,7 @@ pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchRespons
max_hits: args.max_hits as u64,
start_offset: args.start_offset as u64,
sort_by,
count_all: CountHits::CountAll,
..Default::default()
};
let qw_client = args.client_args.search_client();
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::SearchResponse;
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::NodeId;
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
Expand Down Expand Up @@ -551,6 +551,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
aggs,
format: BodyFormat::Json,
sort_by,
count_all: CountHits::CountAll,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use quickwit_proto::jaeger::storage::v1::{
SpansResponseChunk, TraceQueryParameters,
};
use quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode as OtlpStatusCode;
use quickwit_proto::search::{ListTermsRequest, SearchRequest};
use quickwit_proto::search::{CountHits, ListTermsRequest, SearchRequest};
use quickwit_query::query_ast::{BoolQuery, QueryAst, RangeQuery, TermQuery};
use quickwit_search::{FindTraceIdsCollector, SearchService};
use serde::Deserialize;
Expand Down Expand Up @@ -261,6 +261,7 @@ impl JaegerService {
max_hits,
start_timestamp: min_span_start_timestamp_secs_opt,
end_timestamp: max_span_start_timestamp_secs_opt,
count_hits: CountHits::Underestimate.into(),
..Default::default()
};
let search_response = self.search_service.root_search(search_request).await?;
Expand Down Expand Up @@ -308,6 +309,7 @@ impl JaegerService {
start_timestamp: Some(*search_window.start()),
end_timestamp: Some(*search_window.end()),
max_hits: self.max_fetch_spans,
count_hits: CountHits::Underestimate.into(),
..Default::default()
};
let search_response = match self.search_service.root_search(search_request).await {
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ message SearchRequest {
// enable pagination.
// If split_id is empty, no comparison with _shard_doc should be done
optional PartialHit search_after = 16;

CountHits count_hits = 17;
}

enum CountHits {
// Count all hits, querying all splits.
COUNT_ALL = 0;
// Give an underestimate of the number of hits, possibly skipping entire
// splits if they are otherwise not needed to fulfull a query.
UNDERESTIMATE = 1;
}

message SortField {
Expand Down
33 changes: 33 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub struct SearchRequest {
/// If split_id is empty, no comparison with _shard_doc should be done
#[prost(message, optional, tag = "16")]
pub search_after: ::core::option::Option<PartialHit>,
#[prost(enumeration = "CountHits", tag = "17")]
pub count_hits: i32,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
Expand Down Expand Up @@ -501,6 +503,37 @@ pub struct LeafSearchStreamResponse {
#[serde(rename_all = "snake_case")]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CountHits {
/// Count all hits, querying all splits.
CountAll = 0,
/// Give an underestimate of the number of hits, possibly skipping entire
/// splits if they are otherwise not needed to fulfull a query.
Underestimate = 1,
}
impl CountHits {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
CountHits::CountAll => "COUNT_ALL",
CountHits::Underestimate => "UNDERESTIMATE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"COUNT_ALL" => Some(Self::CountAll),
"UNDERESTIMATE" => Some(Self::Underestimate),
_ => None,
}
}
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum SortOrder {
/// Ascending order.
Asc = 0,
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use quickwit_common::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit, SearchRequest,
SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError,
CountHits, LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit,
SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::QueryAst;
use quickwit_storage::{
Expand Down Expand Up @@ -549,7 +549,8 @@ pub async fn leaf_search(

// In the future this should become `request.aggregation_request.is_some() ||
// request.exact_count == true`
let run_all_splits = true;
let run_all_splits =
request.aggregation_request.is_some() || request.count_hits() == CountHits::CountAll;

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);
Expand Down Expand Up @@ -597,6 +598,8 @@ pub async fn leaf_search(
));
}

// TODO we could cancel running splits when !run_all_splits and the running split can no longer
// give better results after some other split answered.
let split_search_results: Vec<Result<(), _>> =
futures::future::join_all(leaf_search_single_split_futures).await;

Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@
use std::ops::Bound;

use prost::Message;
use quickwit_proto::search::{LeafSearchResponse, SearchRequest, SplitIdAndFooterOffsets};
use quickwit_proto::search::{
CountHits, LeafSearchResponse, SearchRequest, SplitIdAndFooterOffsets,
};
use quickwit_storage::{MemorySizedCache, OwnedBytes};

/// A cache to memoize `leaf_search_single_split` results.
pub struct LeafSearchCache {
content: MemorySizedCache<CacheKey>,
}

// TODO we could be smarted about search_after. If we have a cached request with a search_after
// TODO we could be smarter about search_after. If we have a cached request with a search_after
// (possibly equal to None) A, and a corresponding response with the 1st element having the value
// B, and we receive a 2nd request with a search_after such that A <= C < B, we can serve from
// cache directly. Only the case A = C < B is currently handled.
// TODO if we don't request counting all results, have no aggregation, and we get a request we can
// match, the merged_time_range is strictly smaller, and every hit we had fits in the new
// timebound, we can reply from cache, saying we hit only result.partial_hits.len() res. It always
// undercount, and necessarily returns the right hits.

impl LeafSearchCache {
pub fn new(capacity: usize) -> LeafSearchCache {
Expand Down Expand Up @@ -89,6 +95,9 @@ impl CacheKey {

search_request.start_timestamp = None;
search_request.end_timestamp = None;
// it doesn't matter whether or not we count all hits at the scale of a
// single split: either we did process it and got everything, or we didn't.
search_request.count_hits = CountHits::CountAll.into();

CacheKey {
split_id: split_info.split_id,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// We remove the scroll ttl parameter. It is irrelevant to process later request
scroll_ttl_secs: None,
search_after: None,
count_hits: req.count_hits,
})
}

Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use hyper::StatusCode;
use itertools::Itertools;
use quickwit_common::truncate_str;
use quickwit_config::{validate_index_id_pattern, NodeConfig};
use quickwit_proto::search::{PartialHit, ScrollRequest, SearchResponse, SortByValue};
use quickwit_proto::search::{CountHits, PartialHit, ScrollRequest, SearchResponse, SortByValue};
use quickwit_proto::ServiceErrorCode;
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
Expand All @@ -46,6 +46,7 @@ use super::model::{
ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
};
use super::TrackTotalHits;
use crate::format::BodyFormat;
use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse};
use crate::{with_arg, BuildInfo};
Expand Down Expand Up @@ -156,6 +157,13 @@ fn build_request_for_es_api(

let max_hits = search_params.size.or(search_body.size).unwrap_or(10);
let start_offset = search_params.from.or(search_body.from).unwrap_or(0);
let count_hits = match search_params.track_total_hits {
None => CountHits::Underestimate,
Some(TrackTotalHits::Track(false)) => CountHits::Underestimate,
Some(TrackTotalHits::Count(count)) if count <= max_hits as i64 => CountHits::Underestimate,
Some(TrackTotalHits::Track(true) | TrackTotalHits::Count(_)) => CountHits::CountAll,
}
.into();

let sort_fields: Vec<quickwit_proto::search::SortField> = search_params
.sort_fields()?
Expand Down Expand Up @@ -193,6 +201,7 @@ fn build_request_for_es_api(
snippet_fields: Vec::new(),
scroll_ttl_secs,
search_after,
count_hits,
},
has_doc_id_field,
))
Expand Down
88 changes: 80 additions & 8 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use hyper::header::HeaderValue;
use hyper::HeaderMap;
use once_cell::sync::Lazy;
use quickwit_config::validate_index_id_pattern;
use quickwit_proto::search::{OutputFormat, SortField, SortOrder};
use quickwit_proto::search::{CountHits, OutputFormat, SortField, SortOrder};
use quickwit_proto::ServiceError;
use quickwit_query::query_ast::query_ast_from_user_text;
use quickwit_search::{SearchError, SearchResponseRest, SearchService};
Expand Down Expand Up @@ -217,6 +217,39 @@ pub struct SearchRequestQueryString {
#[serde(skip_serializing_if = "SortBy::is_empty")]
#[param(value_type = String)]
pub sort_by: SortBy,
#[param(value_type = bool)]
#[schema(value_type = bool)]
#[serde(with = "count_hits_from_bool")]
#[serde(default = "count_hits_from_bool::default")]
pub count_all: CountHits,
}

mod count_hits_from_bool {
use quickwit_proto::search::CountHits;
use serde::{self, Deserialize, Deserializer, Serializer};

pub fn serialize<S>(count_hits: &CountHits, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
if count_hits == &CountHits::Underestimate {
serializer.serialize_bool(false)
} else {
serializer.serialize_none()
}
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<CountHits, D::Error>
where D: Deserializer<'de> {
let count_all = Option::<bool>::deserialize(deserializer)?.unwrap_or(true);
Ok(if count_all {
CountHits::CountAll
} else {
CountHits::Underestimate
})
}

pub fn default() -> CountHits {
CountHits::CountAll
}
}

pub fn search_request_from_api_request(
Expand All @@ -242,6 +275,7 @@ pub fn search_request_from_api_request(
sort_fields: search_request.sort_by.sort_fields,
scroll_ttl_secs: None,
search_after: None,
count_hits: search_request.count_all.into(),
};
Ok(search_request)
}
Expand Down Expand Up @@ -548,7 +582,7 @@ mod tests {
let rest_search_api_filter = search_post_filter();
let (indexes, req) = warp::test::request()
.method("POST")
.path("/quickwit-demo-index/search?query=*&max_hits=10")
.path("/quickwit-demo-index/search")
.json(&true)
.body(r#"{"query": "*", "max_hits":10, "aggs": {"range":[]} }"#)
.filter(&rest_search_api_filter)
Expand All @@ -565,6 +599,7 @@ mod tests {
format: BodyFormat::default(),
sort_by: SortBy::default(),
aggs: Some(json!({"range":[]})),
count_all: CountHits::CountAll,
..Default::default()
}
);
Expand All @@ -575,10 +610,7 @@ mod tests {
let rest_search_api_filter = search_post_filter();
let (indexes, req) = warp::test::request()
.method("POST")
.path(
"/quickwit-demo-index,quickwit-demo,quickwit-demo-index-*/search?query=*&\
max_hits=10",
)
.path("/quickwit-demo-index,quickwit-demo,quickwit-demo-index-*/search")
.json(&true)
.body(r#"{"query": "*", "max_hits":10, "aggs": {"range":[]} }"#)
.filter(&rest_search_api_filter)
Expand Down Expand Up @@ -612,7 +644,7 @@ mod tests {
let rest_search_api_filter = search_post_filter();
let bad_pattern_rejection = warp::test::request()
.method("POST")
.path("/quickwit-demo-index**/search?query=*&max_hits=10")
.path("/quickwit-demo-index**/search")
.json(&true)
.body(r#"{"query": "*", "max_hits":10, "aggs": {"range":[]} }"#)
.filter(&rest_search_api_filter)
Expand Down Expand Up @@ -656,6 +688,46 @@ mod tests {
);
}

#[tokio::test]
async fn test_rest_search_api_route_count_all() {
let rest_search_api_filter = search_get_filter();
let (indexes, req) = warp::test::request()
.path("/quickwit-demo-index/search?query=*&count_all=true")
.filter(&rest_search_api_filter)
.await
.unwrap();
assert_eq!(indexes, vec!["quickwit-demo-index".to_string()]);
assert_eq!(
&req,
&super::SearchRequestQueryString {
query: "*".to_string(),
format: BodyFormat::default(),
sort_by: SortBy::default(),
max_hits: 20,
count_all: CountHits::CountAll,
..Default::default()
}
);
let rest_search_api_filter = search_get_filter();
let (indexes, req) = warp::test::request()
.path("/quickwit-demo-index/search?query=*&count_all=false")
.filter(&rest_search_api_filter)
.await
.unwrap();
assert_eq!(indexes, vec!["quickwit-demo-index".to_string()]);
assert_eq!(
&req,
&super::SearchRequestQueryString {
query: "*".to_string(),
format: BodyFormat::default(),
sort_by: SortBy::default(),
max_hits: 20,
count_all: CountHits::Underestimate,
..Default::default()
}
);
}

#[tokio::test]
async fn test_rest_search_api_route_simple_default_num_hits_default_offset() {
let rest_search_api_filter = search_get_filter();
Expand Down Expand Up @@ -850,7 +922,7 @@ mod tests {
async fn test_rest_search_api_route_post_with_invalid_payload() -> anyhow::Result<()> {
let resp = warp::test::request()
.method("POST")
.path("/quickwit-demo-index/search?query=*&max_hits=10")
.path("/quickwit-demo-index/search")
.json(&true)
.body(r#"{"query": "*", "bad_param":10, "aggs": {"range":[]} }"#)
.reply(&search_handler(MockSearchService::new()))
Expand Down

0 comments on commit e1dc0fb

Please sign in to comment.