diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index c5c4ee824d61..610aebf75044 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -113,6 +113,52 @@ pub trait TreeNode: Sized { self.transform_up(op) } + /// Transforms the tree using `f_down` and `f_up` closures. `f_down` is applied on a + /// node while traversing the tree top-down (pre-order, before the node's children are + /// visited) while `f_up` is applied on a node while traversing the tree bottom-up + /// (post-order, after the the node's children are visited). + /// + /// The `f_down` closure takes + /// - a `PD` type payload from its parent + /// and returns a tuple made of: + /// - a possibly modified node, + /// - a `PC` type payload to pass to `f_up`, + /// - a `Vec` type payload to propagate down to the node's children + /// (one `PD` element is propagated down to each child). + /// + /// The `f_up` closure takes + /// - a `PC` type payload from `f_down` and + /// - a `Vec` type payload collected from the node's children + /// and returns a tuple made of: + /// - a possibly modified node, + /// - a `PU` type payload to propagate up to the node's parent. + fn transform_with_payload( + self, + f_down: &mut FD, + payload_down: PD, + f_up: &mut FU, + ) -> Result<(Self, PU)> + where + FD: FnMut(Self, PD) -> Result<(Transformed, Vec, PC)>, + FU: FnMut(Self, PC, Vec) -> Result<(Transformed, PU)>, + { + let (new_node, new_payload_down, payload_current) = f_down(self, payload_down)?; + let mut new_payload_down_iter = new_payload_down.into_iter(); + let mut payload_up = vec![]; + let node_with_new_children = new_node.into().map_children(|node| { + let (new_node, p) = node.transform_with_payload( + f_down, + new_payload_down_iter.next().unwrap(), + f_up, + )?; + payload_up.push(p); + Ok(new_node) + })?; + let (new_node, new_payload_up) = + f_up(node_with_new_children, payload_current, payload_up)?; + Ok((new_node.into(), new_payload_up)) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its /// children(Preorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -135,6 +181,26 @@ pub trait TreeNode: Sized { after_op.map_children(|node| node.transform_down_mut(op)) } + /// Transforms the tree using `f` closure. `f` is applied on a node while traversing + /// the tree top-down (pre-order, before the node's children are visited). + /// + /// The `f` closure takes + /// - a `P` type payload from its parent + /// and returns a tuple made of: + /// - a possibly modified node, + /// - a `Vec

` type payload to propagate down to the node's children + /// (one `P` element is propagated down to each child). + fn transform_down_with_payload(self, f: &mut F, payload: P) -> Result + where + F: FnMut(Self, P) -> Result<(Transformed, Vec

)>, + { + let (new_node, new_payload) = f(self, payload)?; + let mut new_payload_iter = new_payload.into_iter(); + new_node.into().map_children(|node| { + node.transform_down_with_payload(f, new_payload_iter.next().unwrap()) + }) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its /// children and then itself(Postorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -159,6 +225,28 @@ pub trait TreeNode: Sized { Ok(new_node) } + /// Transforms the tree using `f` closure. `f` is applied on a node while traversing + /// the tree bottom-up (post-order, after the the node's children are visited). + /// + /// The `f_up` closure takes + /// - a `Vec

` type payload collected from the node's children + /// and returns a tuple made of: + /// - a possibly modified node, + /// - a `P` type payload to propagate up to the node's parent. + fn transform_up_with_payload(self, f: &mut F) -> Result<(Self, P)> + where + F: FnMut(Self, Vec

) -> Result<(Transformed, P)>, + { + let mut payload = vec![]; + let node_with_new_children = self.map_children(|node| { + let (new_node, p) = node.transform_up_with_payload(f)?; + payload.push(p); + Ok(new_node) + })?; + let (new_node, new_payload) = f(node_with_new_children, payload)?; + Ok((new_node.into(), new_payload)) + } + /// Transform the tree node using the given [TreeNodeRewriter] /// It performs a depth first walk of an node and its children. /// diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index fab26c49c2da..8f1dda2df023 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -196,10 +196,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { let adjusted = if top_down_join_key_reordering { // Run a top-down process to adjust input key ordering recursively - let plan_requirements = PlanWithKeyRequirements::new_default(plan); - let adjusted = - plan_requirements.transform_down(&adjust_input_keys_ordering)?; - adjusted.plan + plan.transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])? } else { // Run a bottom-up process plan.transform_up(&|plan| { @@ -267,12 +264,17 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements /// 5) For other types of operators, by default, pushdown the parent requirements to children. /// -fn adjust_input_keys_ordering( - mut requirements: PlanWithKeyRequirements, -) -> Result> { - let plan = requirements.plan.clone(); +type RequiredKeyOrdering = Vec>; - if let Some(HashJoinExec { +#[allow(clippy::type_complexity)] +fn adjust_input_keys_ordering( + plan: Arc, + parent_required: RequiredKeyOrdering, +) -> Result<( + Transformed>, + Vec, +)> { + let request_key_ordering = if let Some(HashJoinExec { left, right, on, @@ -301,33 +303,36 @@ fn adjust_input_keys_ordering( .map(|e| Arc::new(e) as _) }; return reorder_partitioned_join_keys( - requirements, + plan.clone(), + &parent_required, on, vec![], &join_constructor, ) - .map(Transformed::Yes); + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }); } PartitionMode::CollectLeft => { // Push down requirements to the right side - requirements.children[1].data = match join_type { + let new_right_request = match join_type { JoinType::Inner | JoinType::Right => shift_right_required( - &requirements.data, + &parent_required, left.schema().fields().len(), - ) - .unwrap_or_default(), + ), JoinType::RightSemi | JoinType::RightAnti => { - requirements.data.clone() + Some(parent_required.clone()) } JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti - | JoinType::Full => vec![], + | JoinType::Full => None, }; + vec![vec![], new_right_request.unwrap_or_default()] } PartitionMode::Auto => { // Can not satisfy, clear the current requirements and generate new empty requirements - requirements.data.clear(); + vec![vec![]; plan.children().len()] } } } else if let Some(CrossJoinExec { left, .. }) = @@ -335,9 +340,10 @@ fn adjust_input_keys_ordering( { let left_columns_len = left.schema().fields().len(); // Push down requirements to the right side - requirements.children[1].data = - shift_right_required(&requirements.data, left_columns_len) - .unwrap_or_default(); + vec![ + vec![], + shift_right_required(&parent_required, left_columns_len).unwrap_or_default(), + ] } else if let Some(SortMergeJoinExec { left, right, @@ -363,35 +369,45 @@ fn adjust_input_keys_ordering( .map(|e| Arc::new(e) as _) }; return reorder_partitioned_join_keys( - requirements, + plan.clone(), + &parent_required, on, sort_options.clone(), &join_constructor, ) - .map(Transformed::Yes); + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }); } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { - if !requirements.data.is_empty() { + if !parent_required.is_empty() { if aggregate_exec.mode() == &AggregateMode::FinalPartitioned { - return reorder_aggregate_keys(requirements, aggregate_exec) - .map(Transformed::Yes); + return reorder_aggregate_keys( + plan.clone(), + &parent_required, + aggregate_exec, + ) + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }); } else { - requirements.data.clear(); + vec![vec![]; plan.children().len()] } } else { // Keep everything unchanged - return Ok(Transformed::No(requirements)); + let request_key_ordering = vec![vec![]; plan.children().len()]; + return Ok((Transformed::No(plan), request_key_ordering)); } } else if let Some(proj) = plan.as_any().downcast_ref::() { let expr = proj.expr(); // For Projection, we need to transform the requirements to the columns before the Projection // And then to push down the requirements // Construct a mapping from new name to the the orginal Column - let new_required = map_columns_before_projection(&requirements.data, expr); - if new_required.len() == requirements.data.len() { - requirements.children[0].data = new_required; + let new_required = map_columns_before_projection(&parent_required, expr); + if new_required.len() == parent_required.len() { + vec![new_required] } else { // Can not satisfy, clear the current requirements and generate new empty requirements - requirements.data.clear(); + vec![vec![]; plan.children().len()] } } else if plan.as_any().downcast_ref::().is_some() || plan @@ -400,30 +416,28 @@ fn adjust_input_keys_ordering( .is_some() || plan.as_any().downcast_ref::().is_some() { - requirements.data.clear(); + vec![vec![]; plan.children().len()] } else { // By default, push down the parent requirements to children - for child in requirements.children.iter_mut() { - child.data = requirements.data.clone(); - } - } - Ok(Transformed::Yes(requirements)) + vec![parent_required.clone(); plan.children().len()] + }; + Ok((Transformed::Yes(plan), request_key_ordering)) } fn reorder_partitioned_join_keys( - mut join_plan: PlanWithKeyRequirements, + mut join_plan: Arc, + parent_required: &[Arc], on: &[(PhysicalExprRef, PhysicalExprRef)], sort_options: Vec, join_constructor: &F, -) -> Result +) -> Result<(Arc, Vec)> where F: Fn( (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec), ) -> Result>, { - let parent_required = &join_plan.data; let join_key_pairs = extract_join_keys(on); - let eq_properties = join_plan.plan.equivalence_properties(); + let eq_properties = join_plan.equivalence_properties(); if let Some(( JoinKeyPairs { @@ -438,25 +452,22 @@ where let new_sort_options = (0..sort_options.len()) .map(|idx| sort_options[new_positions[idx]]) .collect(); - join_plan.plan = join_constructor((new_join_on, new_sort_options))?; + join_plan = join_constructor((new_join_on, new_sort_options))?; } - let mut requirements = join_plan; - requirements.children[0].data = left_keys; - requirements.children[1].data = right_keys; - Ok(requirements) + Ok((join_plan, vec![left_keys, right_keys])) } else { - let mut requirements = join_plan; - requirements.children[0].data = join_key_pairs.left_keys; - requirements.children[1].data = join_key_pairs.right_keys; - Ok(requirements) + Ok(( + join_plan, + vec![join_key_pairs.left_keys, join_key_pairs.right_keys], + )) } } fn reorder_aggregate_keys( - mut agg_node: PlanWithKeyRequirements, + agg_node: Arc, + parent_required: &[Arc], agg_exec: &AggregateExec, -) -> Result { - let parent_required = &agg_node.data; +) -> Result<(Arc, Vec)> { let output_columns = agg_exec .group_by() .expr() @@ -508,18 +519,10 @@ fn reorder_aggregate_keys( new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), - partial_agg.clone(), + partial_agg, agg_exec.input_schema(), )?); - agg_node.plan = new_final_agg.clone(); - agg_node.data.clear(); - agg_node.children = vec![PlanWithKeyRequirements::new( - partial_agg as _, - vec![], - agg_node.children.swap_remove(0).children, - )]; - // Need to create a new projection to change the expr ordering back let agg_schema = new_final_agg.schema(); let mut proj_exprs = output_columns @@ -538,14 +541,14 @@ fn reorder_aggregate_keys( let plan = Arc::new(Column::new(name, idx)) as _; proj_exprs.push((plan, name.clone())) } - return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| { - PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]) - }); + return ProjectionExec::try_new(proj_exprs, new_final_agg) + .map(|p| (Arc::new(p) as _, vec![vec![]; 1])); } } } - } - Ok(agg_node) + }; + let request_key_ordering = vec![vec![]; agg_node.children().len()]; + Ok((agg_node, request_key_ordering)) } fn shift_right_required( @@ -1295,9 +1298,6 @@ struct JoinKeyPairs { right_keys: Vec>, } -/// Keeps track of parent required key orderings. -type PlanWithKeyRequirements = PlanContext>>; - /// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on #[cfg(feature = "parquet")] #[cfg(test)] @@ -1783,13 +1783,7 @@ pub(crate) mod tests { { let adjusted = if config.optimizer.top_down_join_key_reordering { // Run adjust_input_keys_ordering rule - let plan_requirements = - PlanWithKeyRequirements::new_default($PLAN.clone()); - let adjusted = plan_requirements - .transform_down(&adjust_input_keys_ordering) - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - adjusted.plan + $PLAN.clone().transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])? } else { // Run reorder_join_keys_to_inputs rule $PLAN.clone().transform_up(&|plan| { diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 5c46e64a22f6..07533ce5463f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -36,15 +36,15 @@ use std::sync::Arc; -use super::utils::add_sort_above; +use super::utils::{add_sort_above, add_sort_above_plan_context}; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::replace_with_order_preserving_variants::{ - replace_with_order_preserving_variants, OrderPreservationContext, -}; -use crate::physical_optimizer::sort_pushdown::{ - assign_initial_requirements, pushdown_sorts, SortPushDown, + propagate_order_maintaining_connections_down, + replace_with_order_preserving_variants_up, }; +use crate::physical_optimizer::sort_pushdown::pushdown_sorts; + use crate::physical_optimizer::utils::{ is_coalesce_partitions, is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, @@ -116,35 +116,6 @@ fn update_sort_ctx_children( node.update_plan_from_children() } -/// This object is used within the [`EnforceSorting`] rule to track the closest -/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data -/// attribute stores whether the plan is a `CoalescePartitionsExec` or is -/// connected to a `CoalescePartitionsExec` via its children. -type PlanWithCorrespondingCoalescePartitions = PlanContext; - -fn update_coalesce_ctx_children( - coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, -) { - let children = &coalesce_context.children; - coalesce_context.data = if children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&coalesce_context.plan) { - // Initiate a connection: - true - } else { - children.iter().enumerate().any(|(idx, node)| { - // Only consider operators that don't require a single partition, - // and connected to some `CoalescePartitionsExec`: - node.data - && !matches!( - coalesce_context.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - }) - }; -} - /// The boolean flag `repartition_sorts` defined in the config indicates /// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades /// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to @@ -160,32 +131,37 @@ impl PhysicalOptimizerRule for EnforceSorting { // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(&ensure_sorting)?; let new_plan = if config.optimizer.repartition_sorts { - let plan_with_coalesce_partitions = - PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); - let parallel = - plan_with_coalesce_partitions.transform_up(¶llelize_sorts)?; - parallel.plan + let (parallel, _) = adjusted.plan.transform_with_payload( + &mut propagate_unnecessary_coalesce_connections_down, + false, + &mut parallelize_sorts_up, + )?; + parallel } else { adjusted.plan }; - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); - let updated_plan = - plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| { - replace_with_order_preserving_variants( - plan_with_pipeline_fixer, + let (updated_plan, _) = new_plan.transform_with_payload( + &mut |plan, ordering_connection| { + propagate_order_maintaining_connections_down(plan, ordering_connection) + }, + false, + &mut |plan, ordering_connection, order_preserving_children| { + replace_with_order_preserving_variants_up( + plan, + ordering_connection, + order_preserving_children, false, true, config, ) - })?; + }, + )?; // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: - let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); - assign_initial_requirements(&mut sort_pushdown); - let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?; - Ok(adjusted.plan) + let plan = updated_plan.transform_down_with_payload(&mut pushdown_sorts, None)?; + Ok(plan) } fn name(&self) -> &str { @@ -197,82 +173,212 @@ impl PhysicalOptimizerRule for EnforceSorting { } } -/// This function turns plans of the form -/// ```text +/// For a given `plan`, `propagate_unnecessary_coalesce_connections_down` and +/// `parallelize_sorts_up` can be used with `TreeNode.transform_with_payload()` to +/// propagate down/up the information one needs to decide if unnecessary coalesce nodes +/// can be dropped so as to increase parallelism. +/// +/// The algorithm flow is simply like this: +/// 1. During the top-down traversal, keep track of operators that allow eliminating +/// descendant coalesce nodes. +/// There are 2 scenarios that this rule covers: +/// - A one partition sort or sort preserving merge node allow eliminating descendant +/// coalesce nodes that can be reached on connection that doesn't require single +/// partition distribution. In this case the sort node needs to be adjusted to sort +/// preserving merge followed by a sort node. +/// E.g the following plan: +/// ```text /// "SortExec: expr=\[a@0 ASC\]", -/// " CoalescePartitionsExec", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", -/// ``` -/// to -/// ```text -/// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " SortExec: expr=\[a@0 ASC\]", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", -/// ``` -/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. -/// By performing sorting in parallel, we can increase performance in some scenarios. -fn parallelize_sorts( - mut requirements: PlanWithCorrespondingCoalescePartitions, -) -> Result> { - update_coalesce_ctx_children(&mut requirements); - - if requirements.children.is_empty() || !requirements.children[0].data { - // We only take an action when the plan is either a `SortExec`, a - // `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they - // all have a single child. Therefore, if the first child has no - // connection, we can return immediately. - Ok(Transformed::No(requirements)) - } else if (is_sort(&requirements.plan) - || is_sort_preserving_merge(&requirements.plan)) - && requirements.plan.output_partitioning().partition_count() <= 1 +/// " ..." +/// " CoalescePartitionsExec", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// can be optimized to +/// ```text +/// "SortPreservingMergeExec: \[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ... +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// - A coalesce node allows eliminating descendant coalesce nodes that can be reached +/// on connection that doesn't require single partition distribution. +/// 2. During the bottom-up traversal, we use the information from the top-down traversal +/// and propagate up an alternative plan that doesn't contain the unnecessary coalesce +/// nodes. +/// - If the node is a one partition sort or sort preserving merge node then the +/// alternative plan is better and is accepted. +/// - If the node is a coalesce node then the alternative plan is better and is +/// accepted. Also, if the node can be reached from its parent via a connection that +/// allows eliminating coalesce nodes then start propagating up continue propagating +/// up the alternative plan without the coalesce node. +/// - If the current node is something else, but we got an alternative plan from its +/// children then extend the alternative plan with the current node. +#[allow(clippy::type_complexity)] +pub(crate) fn propagate_unnecessary_coalesce_connections_down( + plan: Arc, + unnecessary_coalesce_connection: bool, +) -> Result<(Transformed>, Vec, bool)> { + let children_unnecessary_coalesce_connections = if (is_sort(&plan) + && plan.output_partitioning().partition_count() <= 1) + || is_sort_preserving_merge(&plan) + || is_coalesce_partitions(&plan) { - // Take the initial sort expressions and requirements - let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); - let sort_exprs = sort_exprs.to_vec(); - - // If there is a connection between a `CoalescePartitionsExec` and a - // global sort that satisfy the requirements (i.e. intermediate - // executors don't require single partition), then we can replace - // the `CoalescePartitionsExec` + `SortExec` cascade with a `SortExec` - // + `SortPreservingMergeExec` cascade to parallelize sorting. - requirements = remove_corresponding_coalesce_in_sub_plan(requirements)?; - // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan` - // deals with the children and their children and so on. - requirements = requirements.children.swap_remove(0); - - if !requirements - .plan - .equivalence_properties() - .ordering_satisfy_requirement(&sort_reqs) + // Start a unnecessary coalesce connection from + // - a single partition sort or + // - a sort preserving merge or, + // - a coalesce + // node down the tree. + vec![true] + } else { + // Keep the connection towards a child that doesn't require single partition + // distribution (coalesce). + plan.required_input_distribution() + .into_iter() + .map(|d| { + unnecessary_coalesce_connection + && !matches!(d, Distribution::SinglePartition) + }) + .collect() + }; + Ok(( + Transformed::No(plan), + children_unnecessary_coalesce_connections, + unnecessary_coalesce_connection, + )) +} + +#[allow(clippy::type_complexity)] +fn parallelize_sorts_up( + plan: Arc, + unnecessary_coalesce_connection: bool, + mut unnecessary_coalesce_eliminated_children: Vec>>, +) -> Result<( + Transformed>, + Option>, +)> { + if (is_sort(&plan) || is_sort_preserving_merge(&plan)) + && plan.output_partitioning().partition_count() <= 1 + { + // If we encounter a sort or sort preserving merge node then we might get an + // alternative plan propagated up from the child node. + // As unnecessary coalesce nodes have been removed from alternative plan which is + // always better than the original so we can accept it, but we need to make sure + // that a sort preserving merge and sort modes are placed on the top of the + // alternative plan. + if let Some(mut unnecessary_coalesce_eliminated_plan) = + unnecessary_coalesce_eliminated_children.swap_remove(0) { - requirements = add_sort_above(requirements, sort_reqs, fetch); + let (sort_exprs, fetch) = get_sort_exprs(&plan)?; + let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); + let sort_exprs = sort_exprs.to_vec(); + if !unnecessary_coalesce_eliminated_plan + .equivalence_properties() + .ordering_satisfy_requirement(&sort_reqs) + { + unnecessary_coalesce_eliminated_plan = add_sort_above( + &unnecessary_coalesce_eliminated_plan, + sort_reqs, + fetch, + ); + } + let spm = Arc::new( + SortPreservingMergeExec::new( + sort_exprs, + unnecessary_coalesce_eliminated_plan, + ) + .with_fetch(fetch), + ); + Ok((Transformed::Yes(spm), None)) + } else { + Ok((Transformed::No(plan), None)) } - - let spm = SortPreservingMergeExec::new(sort_exprs, requirements.plan.clone()); - Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new( - Arc::new(spm.with_fetch(fetch)), - false, - vec![requirements], - ), - )) - } else if is_coalesce_partitions(&requirements.plan) { - // There is an unnecessary `CoalescePartitionsExec` in the plan. - // This will handle the recursive `CoalescePartitionsExec` plans. - requirements = remove_corresponding_coalesce_in_sub_plan(requirements)?; - // For the removal of self node which is also a `CoalescePartitionsExec`. - requirements = requirements.children.swap_remove(0); - - Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new( - Arc::new(CoalescePartitionsExec::new(requirements.plan.clone())), - false, - vec![requirements], - ), - )) + } else if is_coalesce_partitions(&plan) { + // Drop coalesce node from the actual and alternative plans if possible. + if let Some(unnecessary_coalesce_eliminated_child_plan) = + unnecessary_coalesce_eliminated_children.swap_remove(0) + { + // If the alternative subplan already propagated up then it accept it as it + // means we manage to eliminate a coalesce node under the current coalesce + // node. + let unnecessary_coalesce_eliminated_plan = if unnecessary_coalesce_connection + { + // If the current node has a connection from its parent that allows + // eliminating unnecessary coalesce nodes then continue propagating up the + // alternative plan without the current node as even the current node + // might not be needed. + Some(unnecessary_coalesce_eliminated_child_plan.clone()) + } else { + None + }; + let new_plan = plan + .clone() + .with_new_children(vec![unnecessary_coalesce_eliminated_child_plan])?; + Ok(( + Transformed::Yes(new_plan), + unnecessary_coalesce_eliminated_plan, + )) + } else { + let unnecessary_coalesce_eliminated_plan = if unnecessary_coalesce_connection + { + // If the current node has a connection from its parent that allows + // eliminating unnecessary coalesce nodes then start propagating up an + // alternative plan without the current node. + Some(plan.children().swap_remove(0)) + } else { + None + }; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) + } + } else if unnecessary_coalesce_connection && is_repartition(&plan) { + // Due to the removal of coalesce nodes duplicate repartition nodes can become + // adjacent in which case we should get rid one of them. + // Please note that although getting rid of adjacent duplicate nodes is useful the + // issue should not be specific to this optimization rule so this part might be at + // a better place in a separate rule. + // But this optimization has been introduced since + // https://github.com/apache/arrow-datafusion/commit/5fc91cc8fbb56b6d2a32e66f8b327a871f7d15ac + // so we can leave it here for now. + + // Drop repartition node from the alternative plans if possible. + let unnecessary_coalesce_eliminated_plan = + unnecessary_coalesce_eliminated_children + .swap_remove(0) + .map(|unnecessary_coalesce_eliminated_child_plan| { + if is_repartition(&unnecessary_coalesce_eliminated_child_plan) + && plan.output_partitioning() + == unnecessary_coalesce_eliminated_child_plan + .output_partitioning() + { + Ok(unnecessary_coalesce_eliminated_child_plan) + } else { + plan.clone().with_new_children(vec![ + unnecessary_coalesce_eliminated_child_plan, + ]) + } + }) + .transpose()?; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) } else { - Ok(Transformed::Yes(requirements)) + // If any of the children propagated up an alternative plan then keep propagating + // up the alternative plan extended with the current node. + let unnecessary_coalesce_eliminated_plan = + if unnecessary_coalesce_eliminated_children + .iter() + .any(|opc| opc.is_some()) + { + Some( + plan.clone().with_new_children( + unnecessary_coalesce_eliminated_children + .into_iter() + .zip(plan.children().into_iter()) + .map(|(opc, c)| opc.unwrap_or(c)) + .collect(), + )?, + ) + } else { + None + }; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) } } @@ -309,7 +415,7 @@ fn ensure_sorting( if physical_ordering.is_some() { child = update_child_to_remove_unnecessary_sort(idx, child, plan)?; } - child = add_sort_above(child, required, None); + child = add_sort_above_plan_context(child, required, None); child = update_sort_ctx_children(child, true)?; } } else if physical_ordering.is_none() @@ -419,7 +525,7 @@ fn adjust_window_sort_removal( // Satisfy the ordering requirement so that the window can run: let mut child_node = window_tree.children.swap_remove(0); - child_node = add_sort_above(child_node, reqs, None); + child_node = add_sort_above_plan_context(child_node, reqs, None); let child_plan = child_node.plan.clone(); window_tree.children.push(child_node); @@ -443,39 +549,6 @@ fn adjust_window_sort_removal( Ok(window_tree) } -/// Removes the [`CoalescePartitionsExec`] from the plan in `node`. -fn remove_corresponding_coalesce_in_sub_plan( - mut requirements: PlanWithCorrespondingCoalescePartitions, -) -> Result { - let plan = &requirements.plan; - let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) { - // We can safely use the 0th index since we have a `CoalescePartitionsExec`. - let mut new_child_node = children[0].children.swap_remove(0); - while new_child_node.plan.output_partitioning() == plan.output_partitioning() - && is_repartition(&new_child_node.plan) - && is_repartition(plan) - { - new_child_node = new_child_node.children.swap_remove(0) - } - children[0] = new_child_node; - } else { - requirements.children = requirements - .children - .into_iter() - .map(|node| { - if node.data { - remove_corresponding_coalesce_in_sub_plan(node) - } else { - Ok(node) - } - }) - .collect::>()?; - } - - requirements.update_plan_from_children() -} - /// Updates child to remove the unnecessary sort below it. fn update_child_to_remove_unnecessary_sort( child_idx: usize, @@ -646,36 +719,34 @@ mod tests { // TODO: End state payloads will be checked here. let new_plan = if state.config_options().optimizer.repartition_sorts { - let plan_with_coalesce_partitions = - PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); - let parallel = plan_with_coalesce_partitions - .transform_up(¶llelize_sorts) - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - parallel.plan + let (parallel, _) = adjusted.plan.transform_with_payload( + &mut propagate_unnecessary_coalesce_connections_down, + false, + &mut parallelize_sorts_up, + )?; + parallel } else { adjusted.plan }; - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); - let updated_plan = plan_with_pipeline_fixer - .transform_up(&|plan_with_pipeline_fixer| { - replace_with_order_preserving_variants( - plan_with_pipeline_fixer, + let (updated_plan, _) = new_plan.transform_with_payload( + &mut |plan, ordering_connection| { + propagate_order_maintaining_connections_down(plan, ordering_connection) + }, + false, + &mut |plan, ordering_connection, order_preserving_children| { + replace_with_order_preserving_variants_up( + plan, + ordering_connection, + order_preserving_children, false, true, state.config_options(), ) - }) - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. + }, + )?; - let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); - assign_initial_requirements(&mut sort_pushdown); - sort_pushdown - .transform_down(&pushdown_sorts) - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. + updated_plan.transform_down_with_payload(&mut pushdown_sorts, None)?; } let physical_plan = $PLAN; diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 02626056f6cc..50c5dc5797e4 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -27,9 +27,6 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::pipeline_checker::{ - children_unbounded, PipelineStatePropagator, -}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use crate::physical_plan::joins::{ @@ -231,7 +228,6 @@ impl PhysicalOptimizerRule for JoinSelection { plan: Arc, config: &ConfigOptions, ) -> Result> { - let pipeline = PipelineStatePropagator::new_default(plan); // First, we make pipeline-fixing modifications to joins so as to accommodate // unbounded inputs. Each pipeline-fixing subrule, which is a function // of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`] @@ -241,7 +237,10 @@ impl PhysicalOptimizerRule for JoinSelection { Box::new(hash_join_convert_symmetric_subrule), Box::new(hash_join_swap_subrule), ]; - let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules, config))?; + let (plan, _) = + plan.transform_up_with_payload(&mut |p, children_unbounded| { + apply_subrules(p, children_unbounded, &subrules, config) + })?; // Next, we apply another subrule that tries to optimize joins using any // statistics their inputs might have. // - For a hash join with partition mode [`PartitionMode::Auto`], we will @@ -256,7 +255,7 @@ impl PhysicalOptimizerRule for JoinSelection { let config = &config.optimizer; let collect_threshold_byte_size = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; - state.plan.transform_up(&|plan| { + plan.transform_up(&|plan| { statistical_join_selection_subrule( plan, collect_threshold_byte_size, @@ -445,8 +444,11 @@ fn statistical_join_selection_subrule( } /// Pipeline-fixing join selection subrule. -pub type PipelineFixerSubrule = - dyn Fn(PipelineStatePropagator, &ConfigOptions) -> Result; +pub type PipelineFixerSubrule = dyn Fn( + Arc, + &[bool], + &ConfigOptions, +) -> Result<(Arc, bool)>; /// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides. /// @@ -464,16 +466,18 @@ pub type PipelineFixerSubrule = /// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state, /// or `Some(Err(...))` if an error occurs during the transformation. fn hash_join_convert_symmetric_subrule( - mut input: PipelineStatePropagator, + plan: Arc, + children_unbounded: &[bool], config_options: &ConfigOptions, -) -> Result { +) -> Result<(Arc, bool)> { + let mut unbounded = false; // Check if the current plan node is a HashJoinExec. - if let Some(hash_join) = input.plan.as_any().downcast_ref::() { + if let Some(hash_join) = plan.as_any().downcast_ref::() { // Determine if left and right children are unbounded. - let ub_flags = children_unbounded(&input); - let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); + let (left_unbounded, right_unbounded) = + (children_unbounded[0], children_unbounded[1]); // Update the unbounded flag of the input. - input.data = left_unbounded || right_unbounded; + unbounded = left_unbounded || right_unbounded; // Process only if both left and right sides are unbounded. if left_unbounded && right_unbounded { // Determine the partition mode based on configuration. @@ -515,11 +519,10 @@ fn hash_join_convert_symmetric_subrule( hash_join.right().schema(), ), }; - let name = schema.field(*index).name(); let col = Arc::new(Column::new(name, *index)) as _; // Check if the column is ordered. - equivalence.get_expr_ordering(col).data + equivalence.get_expr_ordering(col).1 != SortProperties::Unordered }, ) @@ -534,7 +537,6 @@ fn hash_join_convert_symmetric_subrule( }) .flatten() }; - // Determine the sort order for both left and right sides. let left_order = determine_order(JoinSide::Left); let right_order = determine_order(JoinSide::Right); @@ -550,15 +552,11 @@ fn hash_join_convert_symmetric_subrule( right_order, mode, ) - .map(|exec| { - input.plan = Arc::new(exec) as _; - input - }); + .map(|exec| (Arc::new(exec) as _, unbounded)); } } - Ok(input) + Ok((plan, unbounded)) } - /// This subrule will swap build/probe sides of a hash join depending on whether /// one of its inputs may produce an infinite stream of records. The rule ensures /// that the left (build) side of the hash join always operates on an input stream @@ -601,13 +599,15 @@ fn hash_join_convert_symmetric_subrule( /// /// ``` fn hash_join_swap_subrule( - mut input: PipelineStatePropagator, + mut plan: Arc, + children_unbounded: &[bool], _config_options: &ConfigOptions, -) -> Result { - if let Some(hash_join) = input.plan.as_any().downcast_ref::() { - let ub_flags = children_unbounded(&input); - let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); - input.data = left_unbounded || right_unbounded; +) -> Result<(Arc, bool)> { + let mut unbounded = false; + if let Some(hash_join) = plan.as_any().downcast_ref::() { + let (left_unbounded, right_unbounded) = + (children_unbounded[0], children_unbounded[1]); + unbounded = left_unbounded || right_unbounded; if left_unbounded && !right_unbounded && matches!( @@ -618,10 +618,10 @@ fn hash_join_swap_subrule( | JoinType::LeftAnti ) { - input.plan = swap_join_according_to_unboundedness(hash_join)?; + plan = swap_join_according_to_unboundedness(hash_join)?; } } - Ok(input) + Ok((plan, unbounded)) } /// This function swaps sides of a hash join to make it runnable even if one of @@ -654,16 +654,16 @@ fn swap_join_according_to_unboundedness( /// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with /// auxiliary boundedness information, is in the `PipelineStatePropagator` object. fn apply_subrules( - mut input: PipelineStatePropagator, + mut plan: Arc, + children_unbounded: Vec, subrules: &Vec>, config_options: &ConfigOptions, -) -> Result> { +) -> Result<(Transformed>, bool)> { for subrule in subrules { - input = subrule(input, config_options)?; + (plan, _) = subrule(plan, children_unbounded.as_slice(), config_options)?; } - input.data = input - .plan - .unbounded_output(&children_unbounded(&input)) + let is_unbounded = plan + .unbounded_output(children_unbounded.as_slice()) // Treat the case where an operator can not run on unbounded data as // if it can and it outputs unbounded data. Do not raise an error yet. // Such operators may be fixed, adjusted or replaced later on during @@ -671,7 +671,7 @@ fn apply_subrules( // etc. If this doesn't happen, the final `PipelineChecker` rule will // catch this and raise an error anyway. .unwrap_or(true); - Ok(Transformed::Yes(input)) + Ok((Transformed::Yes(plan), is_unbounded)) } #[cfg(test)] @@ -680,7 +680,6 @@ mod tests_statistical { use super::*; use crate::{ - physical_optimizer::test_utils::check_integrity, physical_plan::{ displayable, joins::PartitionMode, ColumnStatistics, Statistics, }, @@ -829,19 +828,18 @@ mod tests_statistical { } pub(crate) fn crosscheck_plans(plan: Arc) -> Result<()> { - let pipeline = PipelineStatePropagator::new_default(plan); let subrules: Vec> = vec![ Box::new(hash_join_convert_symmetric_subrule), Box::new(hash_join_swap_subrule), ]; - let state = pipeline - .transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new())) - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. + let (plan, _) = + plan.transform_up_with_payload(&mut |p, children_unbounded| { + apply_subrules(p, children_unbounded, &subrules, &ConfigOptions::new()) + })?; let config = ConfigOptions::new().optimizer; let collect_left_threshold = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; - let _ = state.plan.transform_up(&|plan| { + let _ = plan.transform_up(&|plan| { statistical_join_selection_subrule( plan, collect_left_threshold, @@ -1404,7 +1402,6 @@ mod hash_join_tests { use arrow::record_batch::RecordBatch; use datafusion_common::utils::DataPtr; use datafusion_common::JoinType; - use datafusion_physical_plan::empty::EmptyExec; use std::sync::Arc; struct TestCase { @@ -1772,18 +1769,13 @@ mod hash_join_tests { false, )?); - let left_child = Arc::new(EmptyExec::new(Arc::new(Schema::empty()))); - let right_child = Arc::new(EmptyExec::new(Arc::new(Schema::empty()))); - let children = vec![ - PipelineStatePropagator::new(left_child, left_unbounded, vec![]), - PipelineStatePropagator::new(right_child, right_unbounded, vec![]), - ]; - let initial_hash_join_state = - PipelineStatePropagator::new(join.clone(), false, children); + let children_unbounded = vec![left_unbounded, right_unbounded]; - let optimized_hash_join = - hash_join_swap_subrule(initial_hash_join_state, &ConfigOptions::new())?; - let optimized_join_plan = optimized_hash_join.plan; + let (optimized_join_plan, _) = hash_join_swap_subrule( + join, + children_unbounded.as_slice(), + &ConfigOptions::new(), + )?; // If swap did happen let projection_added = optimized_join_plan.as_any().is::(); diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index bb0665c10bcc..3d596bb9bb15 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -31,7 +31,6 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; -use datafusion_physical_plan::tree_node::PlanContext; /// The PipelineChecker rule rejects non-runnable query plans that use /// pipeline-breaking operators on infinite input(s). @@ -51,10 +50,11 @@ impl PhysicalOptimizerRule for PipelineChecker { plan: Arc, config: &ConfigOptions, ) -> Result> { - let pipeline = PipelineStatePropagator::new_default(plan); - let state = pipeline - .transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?; - Ok(state.plan) + let (plan, _) = + plan.transform_up_with_payload(&mut |p, children_unbounded| { + check_finiteness_requirements(p, children_unbounded, &config.optimizer) + })?; + Ok(plan) } fn name(&self) -> &str { @@ -66,21 +66,14 @@ impl PhysicalOptimizerRule for PipelineChecker { } } -/// This object propagates the [`ExecutionPlan`] pipelining information. -pub type PipelineStatePropagator = PlanContext; - -/// Collects unboundedness flags of all the children of the plan in `pipeline`. -pub fn children_unbounded(pipeline: &PipelineStatePropagator) -> Vec { - pipeline.children.iter().map(|c| c.data).collect() -} - /// This function propagates finiteness information and rejects any plan with /// pipeline-breaking operators acting on infinite inputs. pub fn check_finiteness_requirements( - mut input: PipelineStatePropagator, + plan: Arc, + children_unbounded: Vec, optimizer_options: &OptimizerOptions, -) -> Result> { - if let Some(exec) = input.plan.as_any().downcast_ref::() { +) -> Result<(Transformed>, bool)> { + if let Some(exec) = plan.as_any().downcast_ref::() { if !(optimizer_options.allow_symmetric_joins_without_pruning || (exec.check_if_order_information_available()? && is_prunable(exec))) { @@ -88,13 +81,8 @@ pub fn check_finiteness_requirements( the 'allow_symmetric_joins_without_pruning' configuration flag"); } } - input - .plan - .unbounded_output(&children_unbounded(&input)) - .map(|value| { - input.data = value; - Transformed::Yes(input) - }) + plan.unbounded_output(children_unbounded.as_slice()) + .map(|value| (Transformed::No(plan), value)) } /// This function returns whether a given symmetric hash join is amenable to diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index bc9bd0010dc5..70d7012ce043 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -21,211 +21,84 @@ use std::sync::Arc; -use super::utils::{is_repartition, is_sort_preserving_merge}; +use super::utils::is_repartition; use crate::error::Result; use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort}; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; -use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::unbounded_output; -use itertools::izip; - -/// For a given `plan`, this object carries the information one needs from its -/// descendants to decide whether it is beneficial to replace order-losing (but -/// somewhat faster) variants of certain operators with their order-preserving -/// (but somewhat slower) cousins. -pub type OrderPreservationContext = PlanContext; - -/// Updates order-preservation data for all children of the given node. -pub fn update_children(opc: &mut OrderPreservationContext) { - for PlanContext { - plan, - children, - data, - } in opc.children.iter_mut() - { - let maintains_input_order = plan.maintains_input_order(); - let inspect_child = |idx| { - maintains_input_order[idx] - || is_coalesce_partitions(plan) - || is_repartition(plan) - }; - - // We cut the path towards nodes that do not maintain ordering. - for (idx, c) in children.iter_mut().enumerate() { - c.data &= inspect_child(idx); - } - - let plan_children = plan.children(); - *data = if plan_children.is_empty() { - false - } else if !children[0].data - && ((is_repartition(plan) && !maintains_input_order[0]) - || (is_coalesce_partitions(plan) - && plan_children[0].output_ordering().is_some())) - { - // We either have a RepartitionExec or a CoalescePartitionsExec - // and they lose their input ordering, so initiate connection: - true - } else { - // Maintain connection if there is a child with a connection, - // and operator can possibly maintain that connection (either - // in its current form or when we replace it with the corresponding - // order preserving operator). - children - .iter() - .enumerate() - .any(|(idx, c)| c.data && inspect_child(idx)) - } - } - opc.data = false; -} - -/// Calculates the updated plan by replacing operators that lose ordering -/// inside `sort_input` with their order-preserving variants. This will -/// generate an alternative plan, which will be accepted or rejected later on -/// depending on whether it helps us remove a `SortExec`. -fn plan_with_order_preserving_variants( - mut sort_input: OrderPreservationContext, - // Flag indicating that it is desirable to replace `RepartitionExec`s with - // `SortPreservingRepartitionExec`s: - is_spr_better: bool, - // Flag indicating that it is desirable to replace `CoalescePartitionsExec`s - // with `SortPreservingMergeExec`s: - is_spm_better: bool, -) -> Result { - sort_input.children = sort_input - .children - .into_iter() - .map(|node| { - // Update descendants in the given tree if there is a connection: - if node.data { - plan_with_order_preserving_variants(node, is_spr_better, is_spm_better) - } else { - Ok(node) - } - }) - .collect::>()?; - sort_input.data = false; - - if is_repartition(&sort_input.plan) - && !sort_input.plan.maintains_input_order()[0] - && is_spr_better - { - // When a `RepartitionExec` doesn't preserve ordering, replace it with - // a sort-preserving variant if appropriate: - let child = sort_input.children[0].plan.clone(); - let partitioning = sort_input.plan.output_partitioning(); - sort_input.plan = Arc::new( - RepartitionExec::try_new(child, partitioning)?.with_preserve_order(), - ) as _; - sort_input.children[0].data = true; - return Ok(sort_input); - } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { - let child = &sort_input.children[0].plan; - if let Some(ordering) = child.output_ordering().map(Vec::from) { - // When the input of a `CoalescePartitionsExec` has an ordering, - // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = SortPreservingMergeExec::new(ordering, child.clone()); - sort_input.plan = Arc::new(spm) as _; - sort_input.children[0].data = true; - return Ok(sort_input); - } - } - - sort_input.update_plan_from_children() -} - -/// Calculates the updated plan by replacing operators that preserve ordering -/// inside `sort_input` with their order-breaking variants. This will restore -/// the original plan modified by [`plan_with_order_preserving_variants`]. -fn plan_with_order_breaking_variants( - mut sort_input: OrderPreservationContext, -) -> Result { - let plan = &sort_input.plan; - sort_input.children = izip!( - sort_input.children, - plan.maintains_input_order(), - plan.required_input_ordering() - ) - .map(|(node, maintains, required_ordering)| { - // Replace with non-order preserving variants as long as ordering is - // not required by intermediate operators: - if maintains - && (is_sort_preserving_merge(plan) - || !required_ordering.map_or(false, |required_ordering| { - node.plan - .equivalence_properties() - .ordering_satisfy_requirement(&required_ordering) - })) - { - plan_with_order_breaking_variants(node) - } else { - Ok(node) - } - }) - .collect::>()?; - sort_input.data = false; - - if is_repartition(plan) && plan.maintains_input_order()[0] { - // When a `RepartitionExec` preserves ordering, replace it with a - // non-sort-preserving variant: - let child = sort_input.children[0].plan.clone(); - let partitioning = plan.output_partitioning(); - sort_input.plan = Arc::new(RepartitionExec::try_new(child, partitioning)?) as _; - } else if is_sort_preserving_merge(plan) { - // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`: - let child = sort_input.children[0].plan.clone(); - let coalesce = CoalescePartitionsExec::new(child); - sort_input.plan = Arc::new(coalesce) as _; - } else { - return sort_input.update_plan_from_children(); - } - - sort_input.children[0].data = false; - Ok(sort_input) -} - -/// The `replace_with_order_preserving_variants` optimizer sub-rule tries to -/// remove `SortExec`s from the physical plan by replacing operators that do -/// not preserve ordering with their order-preserving variants; i.e. by replacing -/// ordinary `RepartitionExec`s with their sort-preserving variants or by replacing -/// `CoalescePartitionsExec`s with `SortPreservingMergeExec`s. +/// For a given `plan`, `propagate_order_maintaining_connections_down` and +/// `replace_with_order_preserving_variants_up` can be used with +/// `TreeNode.transform_with_payload()` to propagate down/up the information one needs to +/// decide whether it is beneficial to replace order-losing (but somewhat faster) variants +/// of certain operators with their order-preserving (but somewhat slower) cousins. /// -/// If this replacement is helpful for removing a `SortExec`, it updates the plan. -/// Otherwise, it leaves the plan unchanged. +/// The `replace_with_order_preserving_variants_up` optimizer sub-rule tries to remove +/// `SortExec`s from the physical plan by replacing operators that do not preserve +/// ordering with their order-preserving variants; i.e. by replacing `RepartitionExec`s +/// with `SortPreservingRepartitionExec`s or by replacing `CoalescePartitionsExec`s with +/// `SortPreservingMergeExec`s. /// -/// NOTE: This optimizer sub-rule will only produce sort-preserving `RepartitionExec`s -/// if the query is bounded or if the config option `prefer_existing_sort` is -/// set to `true`. +/// Note: this optimizer sub-rule will only produce `SortPreservingRepartitionExec`s +/// if the query is bounded or if the config option `bounded_order_preserving_variants` +/// is set to `true`. /// /// The algorithm flow is simply like this: -/// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` nodes. -/// During the traversal, keep track of operators that maintain ordering (or -/// can maintain ordering when replaced by an order-preserving variant) until -/// a `SortExec` is found. -/// 2. When a `SortExec` is found, update the child of the `SortExec` by replacing -/// operators that do not preserve ordering in the tree with their order -/// preserving variants. -/// 3. Check if the `SortExec` is still necessary in the updated plan by comparing -/// its input ordering with the output ordering it imposes. We do this because -/// replacing operators that lose ordering with their order-preserving variants -/// enables us to preserve the previously lost ordering at the input of `SortExec`. -/// 4. If the `SortExec` in question turns out to be unnecessary, remove it and -/// use updated plan. Otherwise, use the original plan. -/// 5. Continue the bottom-up traversal until another `SortExec` is seen, or the -/// traversal is complete. -pub(crate) fn replace_with_order_preserving_variants( - mut requirements: OrderPreservationContext, - // A flag indicating that replacing `RepartitionExec`s with sort-preserving - // variants is desirable when it helps to remove a `SortExec` from the plan. - // If this flag is `false`, this replacement should only be made to fix the - // pipeline (streaming). +/// 1. During the top-down traversal, keep track of operators that maintain ordering (or +/// can maintain ordering when replaced by an order-preserving variant) starting from a +/// `SortExec` node down the tree. +/// 2. During the bottom-up traversal, we use the order maintaining information from the +/// top-down traversal and propagate up order maintaining alternative of the current +/// plan. +/// - If the node is `SortExec` then check if `SortExec` is still necessary. If the +/// propagated up alternative plan satisfies the ordering needs then `SortExec` can +/// be dropped and the alternative plan can be accepted. If it doesn't satisfy then +/// the alternative can be dropped. +/// - If a node can be reached from its parent via an order maintaining connection and +/// the node is an operator that can be replaced to its order maintaining variant +/// then start propagating up or extend the already propagated up alternative plan +/// with the order maintaining operator variant of the current node. +/// - If the node can't be replaced but we got order maintaining alternative from its +/// children then extend the alternative plan with the current node. +#[allow(clippy::type_complexity)] +pub(crate) fn propagate_order_maintaining_connections_down( + plan: Arc, + ordering_connection: bool, +) -> Result<(Transformed>, Vec, bool)> { + let children_ordering_connections = if is_sort(&plan) { + // Start an order maintaining connection from the sort node down the tree. + vec![true] + } else { + // Keep the connection towards a child if a node maintains ordering to the child + // by default or the node can be replaced to an order maintaining alternative. + let possible_ordering_connection = + is_repartition(&plan) || is_coalesce_partitions(&plan); + plan.maintains_input_order() + .into_iter() + .map(|mio| ordering_connection && (mio || possible_ordering_connection)) + .collect() + }; + Ok(( + Transformed::No(plan), + children_ordering_connections, + ordering_connection, + )) +} + +#[allow(clippy::type_complexity)] +pub(crate) fn replace_with_order_preserving_variants_up( + plan: Arc, + ordering_connection: bool, + mut order_preserving_children: Vec>>, + // A flag indicating that replacing `RepartitionExec`s with + // `SortPreservingRepartitionExec`s is desirable when it helps + // to remove a `SortExec` from the plan. If this flag is `false`, + // this replacement should only be made to fix the pipeline (streaming). is_spr_better: bool, // A flag indicating that replacing `CoalescePartitionsExec`s with // `SortPreservingMergeExec`s is desirable when it helps to remove a @@ -233,40 +106,83 @@ pub(crate) fn replace_with_order_preserving_variants( // should only be made to fix the pipeline (streaming). is_spm_better: bool, config: &ConfigOptions, -) -> Result> { - update_children(&mut requirements); - if !(is_sort(&requirements.plan) && requirements.children[0].data) { - return Ok(Transformed::No(requirements)); - } - - // For unbounded cases, we replace with the order-preserving variant in any - // case, as doing so helps fix the pipeline. Also replace if config allows. +) -> Result<( + Transformed>, + Option>, +)> { + // For unbounded cases, replace with the order-preserving variant in + // any case, as doing so helps fix the pipeline. + // Also do the replacement if opted-in via config options. let use_order_preserving_variant = - config.optimizer.prefer_existing_sort || unbounded_output(&requirements.plan); - - // Create an alternate plan with order-preserving variants: - let mut alternate_plan = plan_with_order_preserving_variants( - requirements.children.swap_remove(0), - is_spr_better || use_order_preserving_variant, - is_spm_better || use_order_preserving_variant, - )?; - - // If the alternate plan makes this sort unnecessary, accept the alternate: - if alternate_plan - .plan - .equivalence_properties() - .ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[])) - { - for child in alternate_plan.children.iter_mut() { - child.data = false; + config.optimizer.prefer_existing_sort || unbounded_output(&plan); + + if is_sort(&plan) { + if let Some(order_preserving_plan) = order_preserving_children.swap_remove(0) { + // If there is an order preserving alternative available we need to check if + // it satisfies ordering of the original sort operator + if order_preserving_plan + .equivalence_properties() + .ordering_satisfy(plan.output_ordering().unwrap_or(&[])) + { + // If the sort is unnecessary, we should remove it: + Ok((Transformed::Yes(order_preserving_plan), None)) + } else { + Ok((Transformed::No(plan), None)) + } + } else { + Ok((Transformed::No(plan), None)) } - Ok(Transformed::Yes(alternate_plan)) + } else if ordering_connection + && is_repartition(&plan) + && !plan.maintains_input_order()[0] + && (is_spr_better || use_order_preserving_variant) + { + // Replace repartition to its order maintaining variant in the alternative plan. + // If the alternative subplan already propagated up then extend that, if not then + // start a new from the actual plan. + let child = order_preserving_children + .swap_remove(0) + .unwrap_or_else(|| plan.children().swap_remove(0)); + let order_preserving_plan = Arc::new( + RepartitionExec::try_new(child, plan.output_partitioning())? + .with_preserve_order(), + ); + Ok((Transformed::No(plan), Some(order_preserving_plan))) + } else if ordering_connection + && is_coalesce_partitions(&plan) + && (is_spm_better || use_order_preserving_variant) + { + // Replace coalesce to its order maintaining variant in the alternative plan. + // If the alternative subplan already propagated up then extend that, if not then + // start a new from the actual plan. + let child = order_preserving_children + .swap_remove(0) + .unwrap_or_else(|| plan.children().swap_remove(0)); + + // When the input of a `CoalescePartitionsExec` has an ordering, replace it + // with a `SortPreservingMergeExec` if appropriate: + let order_preserving_plan = child.output_ordering().map(|o| { + Arc::new(SortPreservingMergeExec::new(o.to_vec(), child.clone())) as _ + }); + Ok((Transformed::No(plan), order_preserving_plan)) } else { - // The alternate plan does not help, use faster order-breaking variants: - alternate_plan = plan_with_order_breaking_variants(alternate_plan)?; - alternate_plan.data = false; - requirements.children = vec![alternate_plan]; - Ok(Transformed::Yes(requirements)) + // If any of the children propagated up an alternative plan then keep propagating + // up the alternative plan with the current node. + let order_preserving_plan = + if order_preserving_children.iter().any(|opc| opc.is_some()) { + Some( + plan.clone().with_new_children( + order_preserving_children + .into_iter() + .zip(plan.children().into_iter()) + .map(|(opc, c)| opc.unwrap_or(c)) + .collect(), + )?, + ) + } else { + None + }; + Ok((Transformed::No(plan), order_preserving_plan)) } } @@ -277,9 +193,7 @@ mod tests { use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; - use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::{HashJoinExec, PartitionMode}; use crate::physical_plan::repartition::RepartitionExec; @@ -301,6 +215,7 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::streaming::StreamingTableExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use rstest::rstest; /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts @@ -394,9 +309,10 @@ mod tests { // Run the rule top-down let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT); - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan); - let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).and_then(check_integrity)?; - let optimized_physical_plan = parallel.plan; + let (optimized_physical_plan, _) = physical_plan.transform_with_payload( + &mut |plan, ordering_connection| propagate_order_maintaining_connections_down(plan, ordering_connection), + false, + &mut |plan, ordering_connection, order_preserving_children| replace_with_order_preserving_variants_up(plan, ordering_connection, order_preserving_children, false, false, config.options()))?; // Get string representation of the plan let actual = get_plan_string(&optimized_physical_plan); diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 3413486c6b46..b9dcf156e1ce 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -26,8 +26,6 @@ use crate::physical_plan::joins::utils::calculate_join_output_ordering; use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::tree_node::PlanContext; use crate::physical_plan::ExecutionPlan; use datafusion_common::tree_node::Transformed; @@ -37,33 +35,25 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_plan::sorts::sort::SortExec; -/// This is a "data class" we use within the [`EnforceSorting`] rule to push -/// down [`SortExec`] in the plan. In some cases, we can reduce the total -/// computational cost by pushing down `SortExec`s through some executors. The /// object carries the parent required ordering as its data. -/// -/// [`EnforceSorting`]: crate::physical_optimizer::enforce_sorting::EnforceSorting -pub type SortPushDown = PlanContext>>; - -/// Assigns the ordering requirement of the root node to the its children. -pub fn assign_initial_requirements(node: &mut SortPushDown) { - let reqs = node.plan.required_input_ordering(); - for (child, requirement) in node.children.iter_mut().zip(reqs) { - child.data = requirement; - } -} - +#[allow(clippy::type_complexity)] pub(crate) fn pushdown_sorts( - mut requirements: SortPushDown, -) -> Result> { - let plan = &requirements.plan; - let parent_reqs = requirements.data.as_deref().unwrap_or(&[]); + mut plan: Arc, + parent_required_ordering: Option>, +) -> Result<( + Transformed>, + Vec>>, +)> { + let parent_reqs = parent_required_ordering.as_deref().unwrap_or(&[]); let satisfy_parent = plan .equivalence_properties() .ordering_satisfy_requirement(parent_reqs); - if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let required_ordering = if let Some(sort_exec) = + plan.as_any().downcast_ref::() + { let required_ordering = plan .output_ordering() .map(PhysicalSortRequirement::from_sort_exprs) @@ -72,50 +62,38 @@ pub(crate) fn pushdown_sorts( if !satisfy_parent { // Make sure this `SortExec` satisfies parent requirements: let fetch = sort_exec.fetch(); - let sort_reqs = requirements.data.unwrap_or_default(); - requirements = requirements.children.swap_remove(0); - requirements = add_sort_above(requirements, sort_reqs, fetch); + let sort_reqs = parent_required_ordering.unwrap_or_default(); + plan = plan.children().swap_remove(0); + plan = add_sort_above(&plan, sort_reqs, fetch); }; // We can safely get the 0th index as we are dealing with a `SortExec`. - let mut child = requirements.children.swap_remove(0); + let child = plan.children().swap_remove(0); if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? + pushdown_requirement_to_children(&child, &required_ordering)? { - for (grand_child, order) in child.children.iter_mut().zip(adjusted) { - grand_child.data = order; - } - // Can push down requirements - child.data = None; - return Ok(Transformed::Yes(child)); + return Ok((Transformed::Yes(child), adjusted)); } else { // Can not push down requirements - requirements.children = vec![child]; - assign_initial_requirements(&mut requirements); + plan.required_input_ordering() } } else if satisfy_parent { // For non-sort operators, immediately return if parent requirements are met: - let reqs = plan.required_input_ordering(); - for (child, order) in requirements.children.iter_mut().zip(reqs) { - child.data = order; - } - } else if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_reqs)? { + plan.required_input_ordering() + } else if let Some(adjusted) = pushdown_requirement_to_children(&plan, parent_reqs)? { // Can not satisfy the parent requirements, check whether we can push // requirements down: - for (child, order) in requirements.children.iter_mut().zip(adjusted) { - child.data = order; - } - requirements.data = None; + adjusted } else { // Can not push down requirements, add new `SortExec`: - let sort_reqs = requirements.data.clone().unwrap_or_default(); - requirements = add_sort_above(requirements, sort_reqs, None); - assign_initial_requirements(&mut requirements); - } - Ok(Transformed::Yes(requirements)) + let sort_reqs = parent_required_ordering.unwrap_or_default(); + plan = add_sort_above(&plan, sort_reqs, None); + plan.required_input_ordering() + }; + Ok((Transformed::Yes(plan), required_ordering)) } -fn pushdown_requirement_to_children( +pub fn pushdown_requirement_to_children( plan: &Arc, parent_required: LexRequirementRef, ) -> Result>>>> { diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 4f4b17345ef8..c4c15ccf995d 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -33,17 +33,29 @@ use datafusion_physical_plan::tree_node::PlanContext; /// This utility function adds a `SortExec` above an operator according to the /// given ordering requirements while preserving the original partitioning. -pub fn add_sort_above( +pub fn add_sort_above_plan_context( node: PlanContext, sort_requirements: LexRequirement, fetch: Option, ) -> PlanContext { + PlanContext::new( + add_sort_above(&node.plan, sort_requirements, fetch), + T::default(), + vec![node], + ) +} + +pub fn add_sort_above( + node: &Arc, + sort_requirements: LexRequirement, + fetch: Option, +) -> Arc { let sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements); - let mut new_sort = SortExec::new(sort_expr, node.plan.clone()).with_fetch(fetch); - if node.plan.output_partitioning().partition_count() > 1 { + let mut new_sort = SortExec::new(sort_expr, node.clone()).with_fetch(fetch); + if node.output_partitioning().partition_count() > 1 { new_sort = new_sort.with_preserve_partitioning(true); } - PlanContext::new(Arc::new(new_sort), T::default(), vec![node]) + Arc::new(new_sort) } /// Checks whether the given operator is a limit; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2471d9249e16..2a8be7479e1c 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -24,7 +24,7 @@ use crate::equivalence::{ collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::Literal; -use crate::sort_properties::{ExprOrdering, SortProperties}; +use crate::sort_properties::SortProperties; use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, @@ -313,8 +313,7 @@ impl EquivalenceProperties { /// /// Returns `true` if the specified ordering is satisfied, `false` otherwise. fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool { - let expr_ordering = self.get_expr_ordering(req.expr.clone()); - let ExprOrdering { expr, data, .. } = expr_ordering; + let (expr, data) = self.get_expr_ordering(req.expr.clone()); match data { SortProperties::Ordered(options) => { let sort_expr = PhysicalSortExpr { expr, options }; @@ -708,7 +707,7 @@ impl EquivalenceProperties { let ordered_exprs = search_indices .iter() .flat_map(|&idx| { - let ExprOrdering { expr, data, .. } = + let (expr, data) = eq_properties.get_expr_ordering(exprs[idx].clone()); if let SortProperties::Ordered(options) = data { Some((PhysicalSortExpr { expr, options }, idx)) @@ -775,15 +774,29 @@ impl EquivalenceProperties { /// /// Returns an `ExprOrdering` object containing the ordering information for /// the given expression. - pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { - ExprOrdering::new_default(expr.clone()) - .transform_up(&|expr| Ok(update_ordering(expr, self))) - // Guaranteed to always return `Ok`. - .unwrap() + pub fn get_expr_ordering( + &self, + expr: Arc, + ) -> (Arc, SortProperties) { + // The transform is designed to aid in the determination of ordering (represented + // by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the orderings + // of a [`PhysicalExpr`], the process begins by assigning the ordering of its leaf nodes. + // By propagating these leaf node orderings upwards in the expression tree, the overall + // ordering of the entire [`PhysicalExpr`] can be derived. + // + // This struct holds the necessary state information for each expression in the [`PhysicalExpr`]. + // It encapsulates the orderings (`state`) associated with the expression (`expr`), and + // orderings of the children expressions (`children_states`). The `state` of a parent + // expression is determined based on the states of its children expressions. + expr.transform_up_with_payload(&mut |expr, children_states| { + Ok(update_ordering(expr, children_states, self)) + }) + // Guaranteed to always return `Ok`. + .unwrap() } } -/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. +/// Calculates the [`SortProperties`] of a given [`PhysicalExpr`] node. /// The node can either be a leaf node, or an intermediate node: /// - If it is a leaf node, we directly find the order of the node by looking /// at the given sort expression and equivalence properties if it is a `Column` @@ -796,29 +809,31 @@ impl EquivalenceProperties { /// sort expression emerges at that node immediately, discarding the recursive /// result coming from its children. fn update_ordering( - mut node: ExprOrdering, + expr: Arc, + children_states: Vec, eq_properties: &EquivalenceProperties, -) -> Transformed { +) -> (Transformed>, SortProperties) { // We have a Column, which is one of the two possible leaf node types: - let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone()); + let normalized_expr = eq_properties.eq_group.normalize_expr(expr.clone()); + + let state; if eq_properties.is_expr_constant(&normalized_expr) { - node.data = SortProperties::Singleton; + state = SortProperties::Singleton; } else if let Some(options) = eq_properties .normalized_oeq_class() .get_options(&normalized_expr) { - node.data = SortProperties::Ordered(options); - } else if !node.expr.children().is_empty() { + state = SortProperties::Ordered(options); + } else if !expr.children().is_empty() { // We have an intermediate (non-leaf) node, account for its children: - let children_orderings = node.children.iter().map(|c| c.data).collect_vec(); - node.data = node.expr.get_ordering(&children_orderings); - } else if node.expr.as_any().is::() { + state = expr.get_ordering(&children_states); + } else if expr.as_any().is::() { // We have a Literal, which is the other possible leaf node type: - node.data = node.expr.get_ordering(&[]); + state = expr.get_ordering(&[]); } else { - return Transformed::No(node); + return (Transformed::No(expr), Default::default()); } - Transformed::Yes(node) + (Transformed::Yes(expr), state) } /// This function determines whether the provided expression is constant @@ -1681,12 +1696,12 @@ mod tests { .iter() .flat_map(|ordering| ordering.first().cloned()) .collect::>(); - let expr_ordering = eq_properties.get_expr_ordering(expr.clone()); + let (_, state) = eq_properties.get_expr_ordering(expr.clone()); let err_msg = format!( "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}", - expr, expected, expr_ordering.data + expr, expected, state ); - assert_eq!(expr_ordering.data, expected, "{}", err_msg); + assert_eq!(state, expected, "{}", err_msg); } Ok(()) diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index b2403dadf05a..dbd357fbe8b6 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -25,7 +25,7 @@ use super::utils::{ convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, }; use crate::expressions::Literal; -use crate::utils::{build_dag, ExprTreeNode}; +use crate::utils::build_dag; use crate::PhysicalExpr; use arrow_schema::{DataType, Schema}; @@ -176,11 +176,11 @@ impl ExprIntervalGraphNode { &self.interval } - /// This function creates a DAEG node from Datafusion's [`ExprTreeNode`] + /// This function creates a DAEG node from Datafusion's [`PhysicalExpr`] /// object. Literals are created with definite, singleton intervals while /// any other expression starts with an indefinite interval ([-∞, ∞]). - pub fn make_node(node: &ExprTreeNode, schema: &Schema) -> Result { - let expr = node.expr.clone(); + pub fn make_node(expr: &Arc, schema: &Schema) -> Result { + let expr = expr.clone(); if let Some(literal) = expr.as_any().downcast_ref::() { let value = literal.value(); Interval::try_new(value.clone(), value.clone()) diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 4df29ced2f01..57b87a8cc749 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -15,11 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Neg; - -use crate::tree_node::ExprContext; - use arrow_schema::SortOptions; +use std::ops::Neg; /// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient /// to simply use `Option`: There must be a differentiation between @@ -134,15 +131,3 @@ impl Neg for SortProperties { } } } - -/// The `ExprOrdering` struct is designed to aid in the determination of ordering (represented -/// by [`SortProperties`]) for a given `PhysicalExpr`. When analyzing the orderings -/// of a `PhysicalExpr`, the process begins by assigning the ordering of its leaf nodes. -/// By propagating these leaf node orderings upwards in the expression tree, the overall -/// ordering of the entire `PhysicalExpr` can be derived. -/// -/// This struct holds the necessary state information for each expression in the `PhysicalExpr`. -/// It encapsulates the orderings (`data`) associated with the expression (`expr`), and -/// orderings of the children expressions (`children`). The [`ExprOrdering`] of a parent -/// expression is determined based on the [`ExprOrdering`] states of its children expressions. -pub type ExprOrdering = ExprContext; diff --git a/datafusion/physical-expr/src/tree_node.rs b/datafusion/physical-expr/src/tree_node.rs index 42dc6673af6a..6756da7c8969 100644 --- a/datafusion/physical-expr/src/tree_node.rs +++ b/datafusion/physical-expr/src/tree_node.rs @@ -17,12 +17,11 @@ //! This module provides common traits for visiting or rewriting tree nodes easily. -use std::fmt::{self, Display, Formatter}; use std::sync::Arc; use crate::physical_expr::{with_new_children_if_necessary, PhysicalExpr}; -use datafusion_common::tree_node::{ConcreteTreeNode, DynTreeNode}; +use datafusion_common::tree_node::DynTreeNode; use datafusion_common::Result; impl DynTreeNode for dyn PhysicalExpr { @@ -38,63 +37,3 @@ impl DynTreeNode for dyn PhysicalExpr { with_new_children_if_necessary(arc_self, new_children) } } - -/// A node object encapsulating a [`PhysicalExpr`] node with a payload. Since there are -/// two ways to access child plans—directly from the plan and through child nodes—it's -/// recommended to perform mutable operations via [`Self::update_expr_from_children`]. -#[derive(Debug)] -pub struct ExprContext { - /// The physical expression associated with this context. - pub expr: Arc, - /// Custom data payload of the node. - pub data: T, - /// Child contexts of this node. - pub children: Vec, -} - -impl ExprContext { - pub fn new(expr: Arc, data: T, children: Vec) -> Self { - Self { - expr, - data, - children, - } - } - - pub fn update_expr_from_children(mut self) -> Result { - let children_expr = self.children.iter().map(|c| c.expr.clone()).collect(); - self.expr = with_new_children_if_necessary(self.expr, children_expr)?; - Ok(self) - } -} - -impl ExprContext { - pub fn new_default(plan: Arc) -> Self { - let children = plan.children().into_iter().map(Self::new_default).collect(); - Self::new(plan, Default::default(), children) - } -} - -impl Display for ExprContext { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "expr: {:?}", self.expr)?; - write!(f, "data:{}", self.data)?; - write!(f, "") - } -} - -impl ConcreteTreeNode for ExprContext { - fn children(&self) -> Vec<&Self> { - self.children.iter().collect() - } - - fn take_children(mut self) -> (Self, Vec) { - let children = std::mem::take(&mut self.children); - (self, children) - } - - fn with_new_children(mut self, children: Vec) -> Result { - self.children = children; - self.update_expr_from_children() - } -} diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index e14ff2692146..aa907d6ea6e3 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -23,15 +23,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::expressions::{BinaryExpr, Column}; -use crate::tree_node::ExprContext; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRewriter, VisitRecursion, -}; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::Result; use datafusion_expr::Operator; @@ -128,14 +125,10 @@ pub fn get_indices_of_exprs_strict>>( .collect() } -pub type ExprTreeNode = ExprContext>; - -/// This struct facilitates the [TreeNodeRewriter] mechanism to convert a -/// [PhysicalExpr] tree into a DAEG (i.e. an expression DAG) by collecting -/// identical expressions in one node. Caller specifies the node type in the -/// DAEG via the `constructor` argument, which constructs nodes in the DAEG -/// from the [ExprTreeNode] ancillary object. -struct PhysicalExprDAEGBuilder<'a, T, F: Fn(&ExprTreeNode) -> Result> { +/// This struct converts a [PhysicalExpr] tree into a DAEG (i.e. an expression DAG) by +/// collecting identical expressions in one node. Caller specifies the node type in the +/// DAEG via the `constructor` argument. +struct PhysicalExprDAEGBuilder<'a, T, F: Fn(&Arc) -> Result> { // The resulting DAEG (expression DAG). graph: StableGraph, // A vector of visited expression nodes and their corresponding node indices. @@ -144,19 +137,16 @@ struct PhysicalExprDAEGBuilder<'a, T, F: Fn(&ExprTreeNode) -> Result< constructor: &'a F, } -impl<'a, T, F: Fn(&ExprTreeNode) -> Result> TreeNodeRewriter - for PhysicalExprDAEGBuilder<'a, T, F> +impl<'a, T, F: Fn(&Arc) -> Result> + PhysicalExprDAEGBuilder<'a, T, F> { - type N = ExprTreeNode; - // This method mutates an expression node by transforming it to a physical expression - // and adding it to the graph. The method returns the mutated expression node. - fn mutate( + // This method adds an expression to the graph and returns the corresponding node + // index. + fn calculate_node_index( &mut self, - mut node: ExprTreeNode, - ) -> Result> { - // Get the expression associated with the input expression node. - let expr = &node.expr; - + expr: Arc, + children_node_indices: Vec, + ) -> Result<(Transformed>, NodeIndex)> { // Check if the expression has already been visited. let node_idx = match self.visited_plans.iter().find(|(e, _)| expr.eq(e)) { // If the expression has been visited, return the corresponding node index. @@ -165,18 +155,15 @@ impl<'a, T, F: Fn(&ExprTreeNode) -> Result> TreeNodeRewriter // add edges to its child nodes. Add the visited expression to the vector // of visited expressions and return the newly created node index. None => { - let node_idx = self.graph.add_node((self.constructor)(&node)?); - for expr_node in node.children.iter() { - self.graph.add_edge(node_idx, expr_node.data.unwrap(), 0); + let node_idx = self.graph.add_node((self.constructor)(&expr)?); + for child_node_index in children_node_indices.into_iter() { + self.graph.add_edge(node_idx, child_node_index, 0); } self.visited_plans.push((expr.clone(), node_idx)); node_idx } }; - // Set the data field of the input expression node to the corresponding node index. - node.data = Some(node_idx); - // Return the mutated expression node. - Ok(node) + Ok((Transformed::No(expr), node_idx)) } } @@ -186,10 +173,8 @@ pub fn build_dag( constructor: &F, ) -> Result<(NodeIndex, StableGraph)> where - F: Fn(&ExprTreeNode) -> Result, + F: Fn(&Arc) -> Result, { - // Create a new expression tree node from the input expression. - let init = ExprTreeNode::new_default(expr); // Create a new `PhysicalExprDAEGBuilder` instance. let mut builder = PhysicalExprDAEGBuilder { graph: StableGraph::::new(), @@ -197,9 +182,12 @@ where constructor, }; // Use the builder to transform the expression tree node into a DAG. - let root = init.rewrite(&mut builder)?; + let (_, node_index) = + expr.transform_up_with_payload(&mut |expr, children_node_indices| { + builder.calculate_node_index(expr, children_node_indices) + })?; // Return a tuple containing the root node index and the DAG. - Ok((root.data.unwrap(), builder.graph)) + Ok((node_index, builder.graph)) } /// Recursively extract referenced [`Column`]s within a [`PhysicalExpr`]. @@ -346,8 +334,8 @@ mod tests { } } - fn make_dummy_node(node: &ExprTreeNode) -> Result { - let expr = node.expr.clone(); + fn make_dummy_node(expr: &Arc) -> Result { + let expr = expr.clone(); let dummy_property = if expr.as_any().is::() { "Binary" } else if expr.as_any().is::() {