Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aggregators/metric): Add a top_hits aggregator #2198

Merged
merged 32 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
39b1684
feat(aggregators/metric): Implement a top_hits aggregator
ditsuke Sep 29, 2023
453ac23
fix: Expose get_fields
ditsuke Oct 3, 2023
d574384
fix: Serializer for top_hits request
ditsuke Oct 3, 2023
ba5e23f
chore: Avert panick on parsing invalid top_hits query
ditsuke Oct 8, 2023
00cce7c
refactor: Allow multiple field names from aggregations
ditsuke Oct 8, 2023
a7cf3f0
perf: Replace binary heap with TopNComputer
ditsuke Oct 8, 2023
b4de0c6
fix: Avoid comparator inversion by ComparableDoc
ditsuke Oct 10, 2023
ab49acc
fix: Rank missing field values lower than present values
ditsuke Oct 10, 2023
4e1b9c4
refactor: Make KeyOrder a struct
ditsuke Oct 10, 2023
4238c63
feat: Rough attempt at docvalue_fields
ditsuke Oct 15, 2023
9858402
feat: Complete stab at docvalue_fields
ditsuke Oct 18, 2023
21a8b1d
test(unit): Add tests for top_hits aggregator
ditsuke Oct 19, 2023
34df32d
fix: docfield_value field globbing
ditsuke Oct 19, 2023
cdfe4de
test(unit): Include dynamic fields
ditsuke Oct 19, 2023
f9430e6
fix: Value -> OwnedValue
ditsuke Oct 20, 2023
22407c0
fix: Use OwnedValue's native Null variant
ditsuke Oct 20, 2023
0e7dea9
chore: Improve readability of test asserts
ditsuke Oct 23, 2023
0d89133
chore: Remove DocAddress from top_hits result
ditsuke Oct 25, 2023
2fb0018
docs: Update aggregator doc
ditsuke Oct 25, 2023
ae4a9e5
Merge `tantivy/main` into `feat/aggregators/top-hits`
ditsuke Oct 26, 2023
de9f113
revert: accidental doc test
ditsuke Oct 26, 2023
537cebd
chore: enable time macros only for tests
ditsuke Nov 4, 2023
9dea259
chore: Apply suggestions from review
ditsuke Nov 4, 2023
57a811c
chore: Apply suggestions from review
ditsuke Nov 16, 2023
d760c6c
fix: Retrieve all values for fields
ditsuke Nov 16, 2023
bc8a4cf
test(unit): Update for multi-value retrieval
ditsuke Nov 17, 2023
5162e14
chore: Assert term existence
ditsuke Nov 17, 2023
33b12a8
feat: Include all columns for a column name
ditsuke Nov 17, 2023
46d1cf7
fix: Resolve json fields
ditsuke Nov 27, 2023
467b70b
chore: Address review on mutability
ditsuke Jan 16, 2024
da13a1c
chore: s/segment_id/segment_ordinal instances of SegmentOrdinal
ditsuke Jan 18, 2024
e2ba462
chore: Revert erroneous grammar change
ditsuke Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }

[target.'cfg(not(windows))'.dev-dependencies]
criterion = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions src/aggregation/agg_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ impl AggregationLimits {
/// Create a new ResourceLimitGuard, that will release the memory when dropped.
pub fn new_guard(&self) -> ResourceLimitGuard {
ResourceLimitGuard {
/// The counter which is shared between the aggregations for one request.
// The counter which is shared between the aggregations for one request.
memory_consumption: Arc::clone(&self.memory_consumption),
/// The memory_limit in bytes
// The memory_limit in bytes
memory_limit: self.memory_limit,
allocated_with_the_guard: 0,
}
Expand Down
39 changes: 24 additions & 15 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use super::bucket::{
};
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation,
PercentilesAggregationReq, StatsAggregation, SumAggregation,
PercentilesAggregationReq, StatsAggregation, SumAggregation, TopHitsAggregation,
};

/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
Expand Down Expand Up @@ -93,7 +93,12 @@ impl Aggregation {
}

fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
fast_field_names.insert(self.agg.get_fast_field_name().to_string());
fast_field_names.extend(
self.agg
.get_fast_field_names()
.iter()
.map(|s| s.to_string()),
);
fast_field_names.extend(get_fast_field_names(&self.sub_aggregation));
}
}
Expand Down Expand Up @@ -147,23 +152,27 @@ pub enum AggregationVariants {
/// Computes the sum of the extracted values.
#[serde(rename = "percentiles")]
Percentiles(PercentilesAggregationReq),
/// Finds the top k values matching some order
#[serde(rename = "top_hits")]
TopHits(TopHitsAggregation),
}

impl AggregationVariants {
/// Returns the name of the field used by the aggregation.
pub fn get_fast_field_name(&self) -> &str {
/// Returns the name of the fields used by the aggregation.
pub fn get_fast_field_names(&self) -> Vec<&str> {
match self {
AggregationVariants::Terms(terms) => terms.field.as_str(),
AggregationVariants::Range(range) => range.field.as_str(),
AggregationVariants::Histogram(histogram) => histogram.field.as_str(),
AggregationVariants::DateHistogram(histogram) => histogram.field.as_str(),
AggregationVariants::Average(avg) => avg.field_name(),
AggregationVariants::Count(count) => count.field_name(),
AggregationVariants::Max(max) => max.field_name(),
AggregationVariants::Min(min) => min.field_name(),
AggregationVariants::Stats(stats) => stats.field_name(),
AggregationVariants::Sum(sum) => sum.field_name(),
AggregationVariants::Percentiles(per) => per.field_name(),
AggregationVariants::Terms(terms) => vec![terms.field.as_str()],
AggregationVariants::Range(range) => vec![range.field.as_str()],
AggregationVariants::Histogram(histogram) => vec![histogram.field.as_str()],
AggregationVariants::DateHistogram(histogram) => vec![histogram.field.as_str()],
AggregationVariants::Average(avg) => vec![avg.field_name()],
AggregationVariants::Count(count) => vec![count.field_name()],
AggregationVariants::Max(max) => vec![max.field_name()],
AggregationVariants::Min(min) => vec![min.field_name()],
AggregationVariants::Stats(stats) => vec![stats.field_name()],
AggregationVariants::Sum(sum) => vec![sum.field_name()],
AggregationVariants::Percentiles(per) => vec![per.field_name()],
AggregationVariants::TopHits(top_hits) => top_hits.field_names(),
}
}

Expand Down
145 changes: 116 additions & 29 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use columnar::{Column, ColumnBlockAccessor, ColumnType, StrColumn};
use std::collections::HashMap;

use columnar::{Column, ColumnBlockAccessor, ColumnType, DynamicColumn, StrColumn};

use super::agg_limits::ResourceLimitGuard;
use super::agg_req::{Aggregation, AggregationVariants, Aggregations};
Expand All @@ -14,7 +16,7 @@ use super::metric::{
use super::segment_agg_result::AggregationLimits;
use super::VecWithNames;
use crate::aggregation::{f64_to_fastfield_u64, Key};
use crate::SegmentReader;
use crate::{SegmentOrdinal, SegmentReader, TantivyError};

#[derive(Default)]
pub(crate) struct AggregationsWithAccessor {
Expand All @@ -32,6 +34,7 @@ impl AggregationsWithAccessor {
}

pub struct AggregationWithAccessor {
pub(crate) segment_id: SegmentOrdinal,
/// In general there can be buckets without fast field access, e.g. buckets that are created
/// based on search terms. That is not that case currently, but eventually this needs to be
/// Option or moved.
Expand All @@ -44,10 +47,16 @@ pub struct AggregationWithAccessor {
pub(crate) limits: ResourceLimitGuard,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
/// Used for missing term aggregation, which checks all columns for existence.
/// And also for `top_hits` aggregation, which may sort on multiple fields.
/// By convention the missing aggregation is chosen, when this property is set
/// (instead bein set in `agg`).
/// If this needs to used by other aggregations, we need to refactor this.
pub(crate) accessors: Vec<Column<u64>>,
// NOTE: we can make all other aggregations use this instead of the `accessor` and `field_type`
// (making them obsolete) But will it have a performance impact?
pub(crate) accessors: Vec<(Column<u64>, ColumnType)>,
/// Map field names to dynamic column accessors.
/// This field is used for `docvalue_fields`, which is currently only supported for `top_hits`.
pub(crate) value_accessors: HashMap<String, DynamicColumn>,
ditsuke marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) agg: Aggregation,
}

Expand All @@ -57,19 +66,57 @@ impl AggregationWithAccessor {
agg: &Aggregation,
sub_aggregation: &Aggregations,
reader: &SegmentReader,
segment_id: SegmentOrdinal,
limits: AggregationLimits,
) -> crate::Result<Vec<AggregationWithAccessor>> {
let add_agg_with_accessor = |accessor: Column<u64>,
column_type: ColumnType,
aggs: &mut Vec<AggregationWithAccessor>|
-> crate::Result<()> {
let res = AggregationWithAccessor {
segment_id,
accessor,
accessors: Vec::new(),
accessors: Default::default(),
value_accessors: Default::default(),
field_type: column_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
segment_id,
&limits,
)?,
agg: agg.clone(),
limits: limits.new_guard(),
missing_value_for_accessor: None,
str_dict_column: None,
column_block_accessor: Default::default(),
};
aggs.push(res);
Ok(())
};

let add_agg_with_accessors = |accessors: Vec<(Column<u64>, ColumnType)>,
aggs: &mut Vec<AggregationWithAccessor>,
value_accessors: HashMap<String, DynamicColumn>,
agg_override: Option<&Aggregation>|
-> crate::Result<()> {
let agg = if let Some(agg_override) = agg_override {
agg_override
} else {
agg
};
let (accessor, field_type) = accessors.get(0).expect("at least one accessor");
let res = AggregationWithAccessor {
segment_id,
// TODO: We should do away with the `accessor` field altogether
accessor: accessor.clone(),
value_accessors,
field_type: field_type.clone(),
accessors,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
segment_id,
&limits,
)?,
agg: agg.clone(),
Expand Down Expand Up @@ -162,24 +209,11 @@ impl AggregationWithAccessor {
let column_and_types =
get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?;

let accessors: Vec<Column> =
column_and_types.iter().map(|(a, _)| a.clone()).collect();
let agg_wit_acc = AggregationWithAccessor {
missing_value_for_accessor: None,
accessor: accessors[0].clone(),
accessors,
field_type: ColumnType::U64,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
};
res.push(agg_wit_acc);
let accessors = column_and_types
.iter()
.map(|c_t| (c_t.0.clone(), c_t.1))
.collect();
add_agg_with_accessors(accessors, &mut res, Default::default(), None)?;
}

for (accessor, column_type) in column_and_types {
Expand All @@ -189,21 +223,25 @@ impl AggregationWithAccessor {
missing.clone()
};

let missing_value_for_accessor =
if let Some(missing) = missing_value_term_agg.as_ref() {
get_missing_val(column_type, missing, agg.agg.get_fast_field_name())?
} else {
None
};
let missing_value_for_accessor = if let Some(missing) =
missing_value_term_agg.as_ref()
{
get_missing_val(column_type, missing, agg.agg.get_fast_field_names()[0])?
} else {
None
};

let agg = AggregationWithAccessor {
segment_id,
missing_value_for_accessor,
accessor,
accessors: Vec::new(),
accessors: Default::default(),
value_accessors: Default::default(),
field_type: column_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
segment_id,
&limits,
)?,
agg: agg.clone(),
Expand Down Expand Up @@ -244,6 +282,39 @@ impl AggregationWithAccessor {
)?;
add_agg_with_accessor(accessor, column_type, &mut res)?;
}
TopHits(top_hits) => {
let (agg, top_hits) = {
let top_hits =
top_hits.validate_and_resolve(reader.fast_fields().columnar())?;
(
Aggregation {
ditsuke marked this conversation as resolved.
Show resolved Hide resolved
agg: AggregationVariants::TopHits(top_hits.clone()),
sub_aggregation: agg.sub_aggregation().clone(),
},
top_hits,
)
};
let accessors: Vec<(Column<u64>, ColumnType)> = top_hits
.field_names()
.iter()
.map(|field| {
get_ff_reader(reader, field, Some(get_numeric_or_date_column_types()))
})
.collect::<crate::Result<_>>()?;

let value_accessors = top_hits
.value_field_names()
.iter()
.map(|field_name| {
Ok((
field_name.to_string(),
get_dynamic_column(reader, field_name)?,
))
})
.collect::<crate::Result<_>>()?;

add_agg_with_accessors(accessors, &mut res, value_accessors, Some(&agg))?;
}
};

Ok(res)
Expand Down Expand Up @@ -284,6 +355,7 @@ fn get_numeric_or_date_column_types() -> &'static [ColumnType] {
pub(crate) fn get_aggs_with_segment_accessor_and_validate(
aggs: &Aggregations,
reader: &SegmentReader,
segment_id: SegmentOrdinal,
limits: &AggregationLimits,
) -> crate::Result<AggregationsWithAccessor> {
let mut aggss = Vec::new();
Expand All @@ -292,6 +364,7 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
agg,
agg.sub_aggregation(),
reader,
segment_id,
limits.clone(),
)?;
for agg in aggs {
Expand Down Expand Up @@ -321,6 +394,20 @@ fn get_ff_reader(
Ok(ff_field_with_type)
}

fn get_dynamic_column(
reader: &SegmentReader,
field_name: &str,
) -> crate::Result<columnar::DynamicColumn> {
let ff_fields = reader.fast_fields();
Ok(ff_fields
.dynamic_column_handles(field_name)?
.get(0)
.ok_or(TantivyError::FieldNotFound(format!(
"field {field_name} not found"
)))?
.open()?)
}

/// Get all fast field reader or empty as default.
///
/// Is guaranteed to return at least one column.
Expand Down
9 changes: 7 additions & 2 deletions src/aggregation/agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};

use super::bucket::GetDocCount;
use super::metric::{PercentilesMetricResult, SingleMetricResult, Stats};
use super::metric::{PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult};
use super::{AggregationError, Key};
use crate::TantivyError;

Expand Down Expand Up @@ -90,8 +90,10 @@ pub enum MetricResult {
Stats(Stats),
/// Sum metric result.
Sum(SingleMetricResult),
/// Sum metric result.
/// Percentiles metric result.
Percentiles(PercentilesMetricResult),
/// Top hits metric result
TopHits(TopHitsMetricResult),
}

impl MetricResult {
Expand All @@ -106,6 +108,9 @@ impl MetricResult {
MetricResult::Percentiles(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("percentiles can't be used to order".to_string()),
)),
MetricResult::TopHits(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("top_hits can't be used to order".to_string()),
)),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub struct TermsAggregation {
#[serde(alias = "shard_size")]
pub split_size: Option<u32>,

/// The get more accurate results, we fetch more than `size` from each segment.
/// To get more accurate results, we fetch more than `size` from each segment.
///
/// Increasing this value is will increase the cost for more accuracy.
///
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/bucket/term_missing_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ impl SegmentAggregationCollector for TermMissingAgg {
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let agg = &mut agg_with_accessor.aggs.values[self.accessor_idx];
let has_value = agg.accessors.iter().any(|acc| acc.index.has_value(doc));
let has_value = agg
.accessors
.iter()
.any(|(acc, _)| acc.index.has_value(doc));
if !has_value {
self.missing_count += 1;
if let Some(sub_agg) = self.sub_agg.as_mut() {
Expand Down
Loading