Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite agg extended bounds #4989

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl SegmentCollector for QuickwitSegmentCollector {
}

/// Available aggregation types.
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(untagged)]
pub enum QuickwitAggregations {
/// Aggregation used by the Jaeger service to find trace IDs that match a
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/find_trace_ids_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Eq for TraceIdTermOrd {}
/// Finds the most recent trace ids among a set of matching spans. Multiple spans belonging to the
/// same trace can be found in the document set. As a result, this problem is akin to finding the
/// top k elements with duplicates
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FindTraceIdsCollector {
/// The number of traces to select.
pub num_traces: usize,
Expand Down
213 changes: 212 additions & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
Expand All @@ -44,7 +45,7 @@ use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::service::SearcherContext;
use crate::SearchError;
use crate::{QuickwitAggregations, SearchError};

#[instrument(skip_all)]
async fn get_split_footer_from_cache_or_fetch(
Expand Down Expand Up @@ -410,6 +411,55 @@ fn rewrite_request(
if let Some(timestamp_field) = timestamp_field {
remove_redundant_timestamp_range(search_request, split, timestamp_field);
}
rewrite_aggregation(search_request);
}

/// Rewrite aggregation to make them easier to cache
///
/// This is only valid for options which are handled while merging results, which is
/// mostly `extended_bounds`.
fn rewrite_aggregation(search_request: &mut SearchRequest) {
if let Some(aggregation) = &search_request.aggregation_request {
let Ok(QuickwitAggregations::TantivyAggregations(mut aggregations)) =
serde_json::from_str(aggregation)
else {
return;
};
let modified_something = visit_aggregation_mut(&mut aggregations, &|aggregation_variant| {
match aggregation_variant {
// we take() away the extended bounds, and record we did something
AggregationVariants::Histogram(histogram) => {
histogram.extended_bounds.take().is_some()
}
AggregationVariants::DateHistogram(histogram) => {
histogram.extended_bounds.take().is_some()
}
_ => false,
}
});
if modified_something {
if let Ok(serialized_aggregation) = serde_json::to_string(&aggregations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use expect here

// it's fine to put a (Tantivy)Aggregations and not a QuickwitAggregations because
// the former is an serde-untagged variant of the later
search_request.aggregation_request = Some(serialized_aggregation);
} else {
warn!("we failed to reserialize aggregation, this shouldn't happen");
}
}
}
}

// this is a rather limited visitor, but enough to do the job
fn visit_aggregation_mut(
aggregations: &mut Aggregations,
callback: &impl Fn(&mut AggregationVariants) -> bool,
) -> bool {
let mut modified_something = false;
for aggregation in aggregations.values_mut() {
modified_something |= callback(&mut aggregation.agg);
modified_something |= visit_aggregation_mut(&mut aggregation.sub_aggregation, callback);
}
modified_something
}

// equivalent to Bound::map, which is unstable
Expand Down Expand Up @@ -1263,4 +1313,165 @@ mod tests {
}),
);
}

#[test]
fn test_remove_extended_bounds_from_histogram() {
let histo_at_root = r#"
{
"date_histo": {
"date_histogram": {
"extended_bounds": {
"max": 1425254400000,
"min": 1420070400000
},
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
}
"#;

let histo_at_root_no_bounds = r#"
{
"date_histo": {
"date_histogram": {
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
}
"#;

let histo_at_root_with_sibling = r#"
{
"metrics": {
"aggs": {
"response": {
"percentiles": {
"field": "response",
"keyed": false,
"percents": [
85
]
}
}
},
"date_histogram": {
"extended_bounds": {
"max": 1425254400000,
"min": 1420070400000
},
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
}
"#;

let histo_at_root_with_sibling_no_bounds = r#"
{
"metrics": {
"aggs": {
"response": {
"percentiles": {
"field": "response",
"keyed": false,
"percents": [
85
]
}
}
},
"date_histogram": {
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
}
"#;
let histo_at_leaf = r#"
{
"metrics": {
"aggs": {
"response": {
"date_histogram": {
"extended_bounds": {
"max": 1425254400000,
"min": 1420070400000
},
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
},
"percentiles": {
"field": "response",
"keyed": false,
"percents": [
85
]
}
}
}
"#;

let histo_at_leaf_no_bounds = r#"
{
"metrics": {
"aggs": {
"response": {
"date_histogram": {
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
},
"percentiles": {
"field": "response",
"keyed": false,
"percents": [
85
]
}
}
}
"#;
for (bounds, no_bounds) in [
(histo_at_root, histo_at_root_no_bounds),
(
histo_at_root_with_sibling,
histo_at_root_with_sibling_no_bounds,
),
(histo_at_leaf, histo_at_leaf_no_bounds),
] {
// first assert we do nothing when there are no bounds
let request_no_bounds = SearchRequest {
aggregation_request: Some(no_bounds.to_string()),
..SearchRequest::default()
};
let mut request_no_bounds_clone = request_no_bounds.clone();
rewrite_aggregation(&mut request_no_bounds_clone);
assert_eq!(request_no_bounds, request_no_bounds_clone);

let mut request_bounds = SearchRequest {
aggregation_request: Some(bounds.to_string()),
..SearchRequest::default()
};
rewrite_aggregation(&mut request_bounds);
// we can't just compare bounds and no_bounds, they must be structuraly equal, but not
// necessarily identical (field order, null vs absent...). So we parse both and verify
// the results are equal instead
let no_bounds_agg: QuickwitAggregations =
serde_json::from_str(&request_no_bounds.aggregation_request.unwrap()).unwrap();
let rewrote_bounds_agg: QuickwitAggregations =
serde_json::from_str(&request_bounds.aggregation_request.unwrap()).unwrap();
assert_eq!(rewrote_bounds_agg, no_bounds_agg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ endpoint: _elastic/aggregations/_search
json:
query: { match_all: {} }
aggs:
date_histo:
date_histogram:
date_histo:
date_histogram:
field: "date"
fixed_interval: "30d"
offset: "-4d"
Expand All @@ -17,7 +17,31 @@ expected:
buckets:
- { "doc_count": 5, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" }
- { "doc_count": 2, "key": 1422662400000.0, "key_as_string": "2015-01-31T00:00:00Z" }
---
---
# Test date histogram with extended bounds
method: [GET]
engines:
- quickwit
endpoint: _elastic/aggregations/_search
json:
query: { match_all: {} }
aggs:
date_histo:
date_histogram:
field: "date"
fixed_interval: "30d"
offset: "-4d"
extended_bounds:
min: 1420070400000
max: 1425254400000
expected:
aggregations:
date_histo:
buckets:
- { "doc_count": 5, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" }
- { "doc_count": 2, "key": 1422662400000.0, "key_as_string": "2015-01-31T00:00:00Z" }
- { "doc_count": 0, "key": 1425254400000.0, "key_as_string": "2015-03-02T00:00:00Z" }
---
# Test date histogram aggregation and sub-aggregation
method: [GET]
engines:
Expand Down
Loading