From 10af8a73662f4f6aac09a34157b7cf5fee034502 Mon Sep 17 00:00:00 2001 From: Albert Skalt <133099191+askalt@users.noreply.github.com> Date: Fri, 18 Oct 2024 23:41:53 +0300 Subject: [PATCH] Improve performance for physical plan creation with many columns (#12950) * Add a benchmark for physical plan creation with many aggregates * Wrap AggregateFunctionExpr with Arc Patch f5c47fa274d53c1d524a1fb788d9a063bf5240ef removed Arc wrappers for AggregateFunctionExpr. But, it can be inefficient. When physical optimizer decides to replace a node child to other, it clones the node (with `with_new_children`). Assume, that node is `AggregateExec` than contains hundreds aggregates and these aggregates are cloned each time. This patch returns a Arc wrapping to not clone AggregateFunctionExpr itself but clone a pointer. * Do not build mapping if parent does not require any This patch adds a small optimization that can soft the edges on some queries. If there are no parent requirements we do not need to build column mapping. --- datafusion/core/benches/sql_planner.rs | 14 +++ .../physical_optimizer/update_aggr_exprs.rs | 10 +- datafusion/core/src/physical_planner.rs | 5 +- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 1 + .../combine_partial_final_agg.rs | 8 +- .../limited_distinct_aggregation.rs | 16 +-- datafusion/physical-expr/src/aggregate.rs | 2 +- datafusion/physical-expr/src/utils/mod.rs | 4 + .../physical-expr/src/window/aggregate.rs | 8 +- .../src/window/sliding_aggregate.rs | 13 +- .../src/aggregate_statistics.rs | 24 ++-- .../src/combine_partial_final_agg.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 119 ++++++++++-------- .../physical-plan/src/aggregates/row_hash.rs | 4 +- datafusion/physical-plan/src/windows/mod.rs | 5 +- datafusion/proto/src/physical_plan/mod.rs | 3 +- .../proto/src/physical_plan/to_proto.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 44 ++++--- 18 files changed, 165 insertions(+), 119 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 00f6d5916751..e7c35c8d86d6 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -144,6 +144,20 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("physical_select_aggregates_from_200", |b| { + let mut aggregates = String::new(); + for i in 0..200 { + if i > 0 { + aggregates.push_str(", "); + } + aggregates.push_str(format!("MAX(a{})", i).as_str()); + } + let query = format!("SELECT {} FROM t1", aggregates); + b.iter(|| { + physical_plan(&ctx, &query); + }); + }); + // --- TPC-H --- let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas()); diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index c0d9140c025e..26cdd65883e4 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// successfully. Any errors occurring during the conversion process are /// passed through. fn try_convert_aggregate_if_better( - aggr_exprs: Vec, + aggr_exprs: Vec>, prefix_requirement: &[PhysicalSortRequirement], eq_properties: &EquivalenceProperties, -) -> Result> { +) -> Result>> { aggr_exprs .into_iter() .map(|aggr_expr| { @@ -154,7 +154,7 @@ fn try_convert_aggregate_if_better( let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs); if eq_properties.ordering_satisfy_requirement(&reqs) { // Existing ordering satisfies the aggregator requirements: - aggr_expr.with_beneficial_ordering(true)? + aggr_expr.with_beneficial_ordering(true)?.map(Arc::new) } else if eq_properties.ordering_satisfy_requirement(&concat_slices( prefix_requirement, &reverse_aggr_req, @@ -163,12 +163,14 @@ fn try_convert_aggregate_if_better( // given the existing ordering (if possible): aggr_expr .reverse_expr() + .map(Arc::new) .unwrap_or(aggr_expr) .with_beneficial_ordering(true)? + .map(Arc::new) } else { // There is no beneficial ordering present -- aggregation // will still work albeit in a less efficient mode. - aggr_expr.with_beneficial_ordering(false)? + aggr_expr.with_beneficial_ordering(false)?.map(Arc::new) } .ok_or_else(|| { plan_datafusion_err!( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cf2a157b04b6..a4dffd3d0208 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1523,7 +1523,7 @@ pub fn create_window_expr( } type AggregateExprWithOptionalArgs = ( - AggregateFunctionExpr, + Arc, // The filter clause, if any Option>, // Ordering requirements, if any @@ -1587,7 +1587,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( .alias(name) .with_ignore_nulls(ignore_nulls) .with_distinct(*distinct) - .build()?; + .build() + .map(Arc::new)?; (agg_expr, filter, physical_sort_exprs) }; diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 34061a64d783..ff512829333a 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -405,6 +405,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str .schema(Arc::clone(&schema)) .alias("sum1") .build() + .map(Arc::new) .unwrap(), ]; let expr = group_by_columns diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 24e46b3ad97c..85076abdaf29 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc { fn partial_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -104,7 +104,7 @@ fn partial_aggregate_exec( fn final_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -130,11 +130,12 @@ fn count_expr( expr: Arc, name: &str, schema: &Schema, -) -> AggregateFunctionExpr { +) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![expr]) .schema(Arc::new(schema.clone())) .alias(name) .build() + .map(Arc::new) .unwrap() } @@ -218,6 +219,7 @@ fn aggregations_with_group_combined() -> datafusion_common::Result<()> { .schema(Arc::clone(&schema)) .alias("Sum(b)") .build() + .map(Arc::new) .unwrap(), ]; let groups: Vec<(Arc, String)> = diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 042f6d622565..d6991711f581 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -347,10 +347,10 @@ fn test_has_aggregate_expression() -> Result<()> { let single_agg = AggregateExec::try_new( AggregateMode::Single, build_group_by(&schema, vec!["a".to_string()]), - vec![agg.count_expr(&schema)], /* aggr_expr */ - vec![None], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ + vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ + vec![None], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ )?; let limit_exec = LocalLimitExec::new( Arc::new(single_agg), @@ -384,10 +384,10 @@ fn test_has_filter() -> Result<()> { let single_agg = AggregateExec::try_new( AggregateMode::Single, build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![agg.count_expr(&schema)], /* aggr_expr */ - vec![filter_expr], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ + vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ + vec![filter_expr], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ )?; let limit_exec = LocalLimitExec::new( Arc::new(single_agg), diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 866596d0b690..6330c240241a 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -328,7 +328,7 @@ impl AggregateFunctionExpr { /// not implement the method, returns an error. Order insensitive and hard /// requirement aggregators return `Ok(None)`. pub fn with_beneficial_ordering( - self, + self: Arc, beneficial_ordering: bool, ) -> Result> { let Some(updated_fn) = self diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 4c37db4849a7..4bd022975ac3 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -86,6 +86,10 @@ pub fn map_columns_before_projection( parent_required: &[Arc], proj_exprs: &[(Arc, String)], ) -> Vec> { + if parent_required.is_empty() { + // No need to build mapping. + return vec![]; + } let column_mapping = proj_exprs .iter() .filter_map(|(expr, name)| { diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index d012fef93b67..3fe5d842dfd1 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct PlainAggregateWindowExpr { - aggregate: AggregateFunctionExpr, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr { impl PlainAggregateWindowExpr { /// Create a new aggregate window function expression pub fn new( - aggregate: AggregateFunctionExpr, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -137,14 +137,14 @@ impl WindowExpr for PlainAggregateWindowExpr { let reverse_window_frame = self.window_frame.reverse(); if reverse_window_frame.start_bound.is_unbounded() { Arc::new(PlainAggregateWindowExpr::new( - reverse_expr, + Arc::new(reverse_expr), &self.partition_by.clone(), &reverse_order_bys(&self.order_by), Arc::new(self.window_frame.reverse()), )) as _ } else { Arc::new(SlidingAggregateWindowExpr::new( - reverse_expr, + Arc::new(reverse_expr), &self.partition_by.clone(), &reverse_order_bys(&self.order_by), Arc::new(self.window_frame.reverse()), diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 143d59eb4495..b889ec8c5d98 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct SlidingAggregateWindowExpr { - aggregate: AggregateFunctionExpr, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr { impl SlidingAggregateWindowExpr { /// Create a new (sliding) aggregate window function expression. pub fn new( - aggregate: AggregateFunctionExpr, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -121,14 +121,14 @@ impl WindowExpr for SlidingAggregateWindowExpr { let reverse_window_frame = self.window_frame.reverse(); if reverse_window_frame.start_bound.is_unbounded() { Arc::new(PlainAggregateWindowExpr::new( - reverse_expr, + Arc::new(reverse_expr), &self.partition_by.clone(), &reverse_order_bys(&self.order_by), Arc::new(self.window_frame.reverse()), )) as _ } else { Arc::new(SlidingAggregateWindowExpr::new( - reverse_expr, + Arc::new(reverse_expr), &self.partition_by.clone(), &reverse_order_bys(&self.order_by), Arc::new(self.window_frame.reverse()), @@ -159,7 +159,10 @@ impl WindowExpr for SlidingAggregateWindowExpr { }) .collect::>(); Some(Arc::new(SlidingAggregateWindowExpr { - aggregate: self.aggregate.with_new_expressions(args, vec![])?, + aggregate: self + .aggregate + .with_new_expressions(args, vec![]) + .map(Arc::new)?, partition_by: partition_bys, order_by: new_order_by, window_frame: Arc::clone(&self.window_frame), diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index fd21362fd3eb..27870c7865f3 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -312,7 +312,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], source, Arc::clone(&schema), @@ -321,7 +321,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(partial_agg), Arc::clone(&schema), @@ -342,7 +342,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], source, Arc::clone(&schema), @@ -351,7 +351,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(partial_agg), Arc::clone(&schema), @@ -371,7 +371,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], source, Arc::clone(&schema), @@ -383,7 +383,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(coalesce), Arc::clone(&schema), @@ -403,7 +403,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], source, Arc::clone(&schema), @@ -415,7 +415,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(coalesce), Arc::clone(&schema), @@ -446,7 +446,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], filter, Arc::clone(&schema), @@ -455,7 +455,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(partial_agg), Arc::clone(&schema), @@ -491,7 +491,7 @@ mod tests { let partial_agg = AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], filter, Arc::clone(&schema), @@ -500,7 +500,7 @@ mod tests { let final_agg = AggregateExec::try_new( AggregateMode::Final, PhysicalGroupBy::default(), - vec![agg.count_expr(&schema)], + vec![Arc::new(agg.count_expr(&schema))], vec![None], Arc::new(partial_agg), Arc::clone(&schema), diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 4e352e25b52c..86f7e73e9e35 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { type GroupExprsRef<'a> = ( &'a PhysicalGroupBy, - &'a [AggregateFunctionExpr], + &'a [Arc], &'a [Option>], ); diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 296c5811e577..f36bd920e83c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -351,7 +351,7 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec, + aggr_expr: Vec>, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause @@ -378,7 +378,10 @@ impl AggregateExec { /// Function used in `OptimizeAggregateOrder` optimizer rule, /// where we need parts of the new value, others cloned from the old one /// Rewrites aggregate exec with new aggregate expressions. - pub fn with_new_aggr_exprs(&self, aggr_expr: Vec) -> Self { + pub fn with_new_aggr_exprs( + &self, + aggr_expr: Vec>, + ) -> Self { Self { aggr_expr, // clone the rest of the fields @@ -404,7 +407,7 @@ impl AggregateExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec, + aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -435,7 +438,7 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec, + mut aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -545,7 +548,7 @@ impl AggregateExec { } /// Aggregate expressions - pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] { + pub fn aggr_expr(&self) -> &[Arc] { &self.aggr_expr } @@ -876,7 +879,7 @@ impl ExecutionPlan for AggregateExec { fn create_schema( input_schema: &Schema, group_by: &PhysicalGroupBy, - aggr_expr: &[AggregateFunctionExpr], + aggr_expr: &[Arc], mode: AggregateMode, ) -> Result { let mut fields = Vec::with_capacity(group_by.num_output_exprs() + aggr_expr.len()); @@ -1006,7 +1009,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. pub fn get_finer_aggregate_exprs_requirement( - aggr_exprs: &mut [AggregateFunctionExpr], + aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -1034,7 +1037,7 @@ pub fn get_finer_aggregate_exprs_requirement( // Reverse requirement is satisfied by exiting ordering. // Hence reverse the aggregator requirement = finer_ordering; - *aggr_expr = reverse_aggr_expr; + *aggr_expr = Arc::new(reverse_aggr_expr); continue; } } @@ -1058,7 +1061,7 @@ pub fn get_finer_aggregate_exprs_requirement( // There is a requirement that both satisfies existing requirement and reverse // aggregate requirement. Use updated requirement requirement = finer_ordering; - *aggr_expr = reverse_aggr_expr; + *aggr_expr = Arc::new(reverse_aggr_expr); continue; } } @@ -1080,7 +1083,7 @@ pub fn get_finer_aggregate_exprs_requirement( /// * Partial: AggregateFunctionExpr::expressions /// * Final: columns of `AggregateFunctionExpr::state_fields()` pub fn aggregate_expressions( - aggr_expr: &[AggregateFunctionExpr], + aggr_expr: &[Arc], mode: &AggregateMode, col_idx_base: usize, ) -> Result>>> { @@ -1135,7 +1138,7 @@ fn merge_expressions( pub type AccumulatorItem = Box; pub fn create_accumulators( - aggr_expr: &[AggregateFunctionExpr], + aggr_expr: &[Arc], ) -> Result> { aggr_expr .iter() @@ -1458,10 +1461,12 @@ mod tests { ], ); - let aggregates = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)]) - .schema(Arc::clone(&input_schema)) - .alias("COUNT(1)") - .build()?]; + let aggregates = vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)]) + .schema(Arc::clone(&input_schema)) + .alias("COUNT(1)") + .build()?, + )]; let task_ctx = if spill { // adjust the max memory size to have the partial aggregate result for spill mode. @@ -1596,13 +1601,12 @@ mod tests { vec![vec![false]], ); - let aggregates: Vec = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + )]; let task_ctx = if spill { // set to an appropriate value to trigger spill @@ -1925,17 +1929,16 @@ mod tests { ); // something that allocates within the aggregator - let aggregates_v0: Vec = - vec![test_median_agg_expr(Arc::clone(&input_schema))?]; + let aggregates_v0: Vec> = + vec![Arc::new(test_median_agg_expr(Arc::clone(&input_schema))?)]; // use fast-path in `row_hash.rs`. - let aggregates_v2: Vec = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates_v2: Vec> = vec![Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + )]; for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), @@ -1989,13 +1992,12 @@ mod tests { let groups = PhysicalGroupBy::default(); - let aggregates: Vec = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(a)") - .build()?, - ]; + let aggregates: Vec> = vec![Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(a)") + .build()?, + )]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -2029,13 +2031,12 @@ mod tests { let groups = PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); - let aggregates: Vec = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build()?, + )]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -2080,7 +2081,7 @@ mod tests { fn test_first_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -2092,13 +2093,14 @@ mod tests { .schema(Arc::new(schema.clone())) .alias(String::from("first_value(b) ORDER BY [b ASC NULLS LAST]")) .build() + .map(Arc::new) } // LAST_VALUE(b ORDER BY b ) fn test_last_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -2109,6 +2111,7 @@ mod tests { .schema(Arc::new(schema.clone())) .alias(String::from("last_value(b) ORDER BY [b ASC NULLS LAST]")) .build() + .map(Arc::new) } // This function either constructs the physical plan below, @@ -2153,7 +2156,7 @@ mod tests { descending: false, nulls_first: false, }; - let aggregates: Vec = if is_first_acc { + let aggregates: Vec> = if is_first_acc { vec![test_first_value_agg_expr(&schema, sort_options)?] } else { vec![test_last_value_agg_expr(&schema, sort_options)?] @@ -2289,6 +2292,7 @@ mod tests { .order_by(ordering_req.to_vec()) .schema(Arc::clone(&test_schema)) .build() + .map(Arc::new) .unwrap() }) .collect::>(); @@ -2318,7 +2322,7 @@ mod tests { }; let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); - let aggregates: Vec = vec![ + let aggregates: Vec> = vec![ test_first_value_agg_expr(&schema, option_desc)?, test_last_value_agg_expr(&schema, option_desc)?, ]; @@ -2376,11 +2380,12 @@ mod tests { ], ); - let aggregates: Vec = + let aggregates: Vec> = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)]) .schema(Arc::clone(&schema)) .alias("1") - .build()?]; + .build() + .map(Arc::new)?]; let input_batches = (0..4) .map(|_| { @@ -2512,7 +2517,8 @@ mod tests { ) .schema(Arc::clone(&batch.schema())) .alias(String::from("SUM(value)")) - .build()?]; + .build() + .map(Arc::new)?]; let input = Arc::new(MemoryExec::try_new( &[vec![batch.clone()]], @@ -2560,7 +2566,8 @@ mod tests { AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?]) .schema(Arc::clone(&schema)) .alias(String::from("COUNT(val)")) - .build()?, + .build() + .map(Arc::new)?, ]; let input_data = vec![ @@ -2641,7 +2648,8 @@ mod tests { AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?]) .schema(Arc::clone(&schema)) .alias(String::from("COUNT(val)")) - .build()?, + .build() + .map(Arc::new)?, ]; let input_data = vec![ @@ -2728,7 +2736,8 @@ mod tests { AggregateExprBuilder::new(count_udaf(), vec![col("a", &input_schema)?]) .schema(Arc::clone(&input_schema)) .alias("COUNT(a)") - .build()?, + .build() + .map(Arc::new)?, ]; let grouping_set = PhysicalGroupBy::new( diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 624844b6b985..7d21cc2f1944 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -591,7 +591,7 @@ impl GroupedHashAggregateStream { /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. pub(crate) fn create_group_accumulator( - agg_expr: &AggregateFunctionExpr, + agg_expr: &Arc, ) -> Result> { if agg_expr.groups_accumulator_supported() { agg_expr.create_groups_accumulator() @@ -601,7 +601,7 @@ pub(crate) fn create_group_accumulator( "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}", agg_expr.name() ); - let agg_expr_captured = agg_expr.clone(); + let agg_expr_captured = Arc::clone(agg_expr); let factory = move || agg_expr_captured.create_accumulator(); Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index adf61f27bc6f..f6902fcbe2e7 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -119,7 +119,8 @@ pub fn create_window_expr( .schema(Arc::new(input_schema.clone())) .alias(name) .with_ignore_nulls(ignore_nulls) - .build()?; + .build() + .map(Arc::new)?; window_expr_from_aggregate_expr( partition_by, order_by, @@ -142,7 +143,7 @@ fn window_expr_from_aggregate_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - aggregate: AggregateFunctionExpr, + aggregate: Arc, ) -> Arc { // Is there a potentially unlimited sized window frame? let unbounded_window = window_frame.start_bound.is_unbounded(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 9a6850cb2108..634ae284c955 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -488,7 +488,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let physical_aggr_expr: Vec = hash_agg + let physical_aggr_expr: Vec> = hash_agg .aggr_expr .iter() .zip(hash_agg.aggr_expr_name.iter()) @@ -518,6 +518,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .with_distinct(agg_node.distinct) .order_by(ordering_req) .build() + .map(Arc::new) } } }).transpose()?.ok_or_else(|| { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 6072baca688c..33eca0723103 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -48,7 +48,7 @@ use crate::protobuf::{ use super::PhysicalExtensionCodec; pub fn serialize_physical_aggr_expr( - aggr_expr: AggregateFunctionExpr, + aggr_expr: Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 025676f790a8..4a9bf6afb49e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -73,7 +73,6 @@ use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; -use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{ @@ -305,7 +304,8 @@ fn roundtrip_window() -> Result<()> { ) .schema(Arc::clone(&schema)) .alias("avg(b)") - .build()?, + .build() + .map(Arc::new)?, &[], &[], Arc::new(WindowFrame::new(None)), @@ -321,7 +321,8 @@ fn roundtrip_window() -> Result<()> { let sum_expr = AggregateExprBuilder::new(sum_udaf(), args) .schema(Arc::clone(&schema)) .alias("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING") - .build()?; + .build() + .map(Arc::new)?; let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new( sum_expr, @@ -367,13 +368,13 @@ fn rountrip_aggregate() -> Result<()> { .alias("NTH_VALUE(b, 1)") .build()?; - let test_cases: Vec> = vec![ + let test_cases = vec![ // AVG - vec![avg_expr], + vec![Arc::new(avg_expr)], // NTH_VALUE - vec![nth_expr], + vec![Arc::new(nth_expr)], // STRING_AGG - vec![str_agg_expr], + vec![Arc::new(str_agg_expr)], ]; for aggregates in test_cases { @@ -400,12 +401,13 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec = + let aggregates = vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) .alias("AVG(b)") - .build()?, + .build() + .map(Arc::new)?, ]; let agg = AggregateExec::try_new( @@ -429,13 +431,14 @@ fn rountrip_aggregate_with_approx_pencentile_cont() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec = vec![AggregateExprBuilder::new( + let aggregates = vec![AggregateExprBuilder::new( approx_percentile_cont_udaf(), vec![col("b", &schema)?, lit(0.5)], ) .schema(Arc::clone(&schema)) .alias("APPROX_PERCENTILE_CONT(b, 0.5)") - .build()?]; + .build() + .map(Arc::new)?]; let agg = AggregateExec::try_new( AggregateMode::Final, @@ -464,13 +467,14 @@ fn rountrip_aggregate_with_sort() -> Result<()> { }, }]; - let aggregates: Vec = + let aggregates = vec![ AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) .alias("ARRAY_AGG(b)") .order_by(sort_exprs) - .build()?, + .build() + .map(Arc::new)?, ]; let agg = AggregateExec::try_new( @@ -531,12 +535,13 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec = + let aggregates = vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) .alias("example_agg") - .build()?, + .build() + .map(Arc::new)?, ]; roundtrip_test_with_context( @@ -1001,7 +1006,8 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { AggregateExprBuilder::new(max_udaf(), vec![udf_expr as Arc]) .schema(schema.clone()) .alias("max") - .build()?; + .build() + .map(Arc::new)?; let window = Arc::new(WindowAggExec::try_new( vec![Arc::new(PlainAggregateWindowExpr::new( @@ -1052,7 +1058,8 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { let aggr_expr = AggregateExprBuilder::new(Arc::clone(&udaf), aggr_args.clone()) .schema(Arc::clone(&schema)) .alias("aggregate_udf") - .build()?; + .build() + .map(Arc::new)?; let filter = Arc::new(FilterExec::try_new( Arc::new(BinaryExpr::new( @@ -1079,7 +1086,8 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { .alias("aggregate_udf") .distinct() .ignore_nulls() - .build()?; + .build() + .map(Arc::new)?; let aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Final,