diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 3c8d08ee32d44..ff9189b5d0b89 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1416,6 +1416,7 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; + use crate::physical_optimizer::sanity_checker::check_plan_sanity; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, }; @@ -1426,11 +1427,14 @@ pub(crate) mod tests { use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + use datafusion_functions_aggregate::sum::sum_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_optimizer::output_requirements::OutputRequirements; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; - use datafusion_expr::Operator; + use datafusion_expr::{AggregateUDF, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, @@ -1526,8 +1530,8 @@ pub(crate) mod tests { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -1643,6 +1647,14 @@ pub(crate) mod tests { fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, + ) -> Arc { + aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) + } + + fn aggregate_exec_with_aggr_expr_and_alias( + input: Arc, + aggr_expr: Vec<(Arc, Vec>)>, + alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -1664,18 +1676,33 @@ pub(crate) mod tests { .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); + let aggr_expr = aggr_expr + .into_iter() + .map(|(udaf, exprs)| { + AggregateExprBuilder::new(udaf.clone(), exprs) + .alias(udaf.name()) + .schema(Arc::clone(&schema)) + .build() + .map(Arc::new) + .unwrap() + }) + .collect::>(); + let filter_exprs = std::iter::repeat(None) + .take(aggr_expr.len()) + .collect::>(); + Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - vec![], - vec![], + aggr_expr.clone(), + filter_exprs.clone(), Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - vec![], - vec![], + aggr_expr, + filter_exprs, input, schema.clone(), ) @@ -3436,6 +3463,373 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn repartitions_for_aggregate_after_union() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // distribution error without repartitioning + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)")); + + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + assert_optimized!(expected, plan.clone(), true); + assert_optimized!(expected, plan.clone(), false); + + Ok(()) + } + + #[derive(Debug, Clone)] + struct MyExtensionNode { + input: Arc, + cache: PlanProperties, + } + impl MyExtensionNode { + fn new(input: Arc) -> Self { + let cache = PlanProperties::new( + EquivalenceProperties::new(schema()), + Partitioning::UnknownPartitioning(1), // from our extension node + input.pipeline_behavior(), + input.boundedness(), + ); + Self { cache, input } + } + } + impl ExecutionPlan for MyExtensionNode { + fn required_input_distribution(&self) -> Vec { + // from our extension node + vec![Distribution::SinglePartition] + } + fn name(&self) -> &str { + "MyExtensionNode" + } + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn properties(&self) -> &PlanProperties { + &self.cache + } + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(Arc::new(Self::new(children[0].clone()))) + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for MyExtensionNode { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "MyExtensionNode") + } + } + + #[test] + fn repartitions_for_extension_node_with_aggregate_after_union() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + let plan: Arc = Arc::new(MyExtensionNode::new(plan)); + + // same plan as before, but with the extension node on top + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: SinglePartition. Child-0 output partitioning: UnknownPartitioning(2)")); + + let expected = &[ + "MyExtensionNode", + "CoalescePartitionsExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + assert_optimized!(expected, plan.clone(), true); + assert_optimized!(expected, plan.clone(), false); + + Ok(()) + } + + #[test] + fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = + aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // with the sort, there is no error + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // it still repartitions + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortPreservingMergeExec: [a@0 ASC]", + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_first_run, plan.clone(), true); + + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", // adds another sort + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", // removes the SPM + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_second_run, plan.clone(), false); + + Ok(()) + } + + #[test] + fn repartition_for_aggregate_after_sorted_union_projection() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, projection, false); + let plan = + aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // with the sort, there is no error + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // it still repartitions + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortPreservingMergeExec: [a@0 ASC]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_first_run, plan.clone(), true); + + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", // adds another sort + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + // removes the SPM + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "CoalescePartitionsExec", // adds the coalesce + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_second_run, plan.clone(), false); + + Ok(()) + } + + #[test] + fn repartition_for_aggregate_sum_after_sorted_union_projection() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, projection, false); + let plan = aggregate_exec_with_aggr_expr_and_alias( + sort, + vec![(sum_udaf(), vec![col("b", &schema)?])], + vec![("a".to_string(), "a1".to_string())], + ); + let plan: Arc = Arc::new(MyExtensionNode::new(plan)); + + // with the sort, there is no error + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // it still repartitions + let expected_after_first_run = &[ + "MyExtensionNode", + "CoalescePartitionsExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortPreservingMergeExec: [a@0 ASC]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + "ProjectionExec: expr=[a@0 as a, b@1 as b]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_first_run, plan.clone(), true); + + let expected_after_second_run = &[ + "MyExtensionNode", + "CoalescePartitionsExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "CoalescePartitionsExec", + "ProjectionExec: expr=[a@0 as a, b@1 as b]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_second_run, plan.clone(), false); + + Ok(()) + } + + #[test] + fn dlw() -> Result<()> { + let union = union_exec(vec![parquet_exec(); 2]); + let projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key.clone(), projection, false); + let plan = aggregate_exec_with_aggr_expr_and_alias( + sort, + vec![(sum_udaf(), vec![col("b", &schema)?])], + vec![("a".to_string(), "a1".to_string())], + ); + let plan = Arc::new(MyExtensionNode::new(plan)); + // let plan = projection_exec_with_alias( + // plan, + // vec![ + // ("a1".to_string(), "a1".to_string()), + // ("sum".to_string(), "sum".to_string()), + // ], + // ); + let plan = sort_exec(sort_key, plan, false); + println!("START PLAN:\n{}", displayable(plan.as_ref()).indent(true)); + + // with the sort, there is no error + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // it still repartitions + let expected_after_first_run = &[ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "MyExtensionNode", + "CoalescePartitionsExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortPreservingMergeExec: [a@0 ASC]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + "ProjectionExec: expr=[a@0 as a, b@1 as b]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_first_run, plan.clone(), true); + + let expected_after_second_run = &[ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "MyExtensionNode", + "CoalescePartitionsExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "CoalescePartitionsExec", + "ProjectionExec: expr=[a@0 as a, b@1 as b]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected_after_second_run, plan.clone(), false); + + Ok(()) + } + #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input