Skip to content

Commit

Permalink
fix: special case to not remove the needed coalesce
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Feb 12, 2025
1 parent f792cfa commit 202860b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
23 changes: 11 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
use std::sync::Arc;

use super::utils::{add_sort_above, add_sort_above_with_check};
use super::utils::{add_sort_above, add_sort_above_with_check, is_aggregation};
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::replace_with_order_preserving_variants::{
Expand Down Expand Up @@ -516,7 +516,7 @@ fn remove_bottleneck_in_subplan(
) -> Result<PlanWithCorrespondingCoalescePartitions> {
let plan = &requirements.plan;
let children = &mut requirements.children;
if is_coalesce_partitions(&children[0].plan) {
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
let mut new_child_node = children[0].children.swap_remove(0);
while new_child_node.plan.output_partitioning() == plan.output_partitioning()
Expand Down Expand Up @@ -2261,21 +2261,20 @@ mod tests {
get_plan_string(&optimized),
vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
);

// Plan is now invalid.
// Plan is valid.
let checker = SanityCheckPlan::new();
let err = checker
.optimize(optimized, &Default::default())
.unwrap_err();
assert!(err.message().contains(" does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"));
let checker = checker.optimize(optimized, &Default::default());
assert!(checker.is_ok());

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,8 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Checks whether the given operator is a [`AggregateExec`].
pub fn is_aggregation(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<AggregateExec>()
}

0 comments on commit 202860b

Please sign in to comment.