From 2da06d52c8af8d805b10220c294f730be2f0f625 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 28 Dec 2023 15:35:36 -0800 Subject: [PATCH 1/8] Refactor TreeNode and cleanup some implementations --- datafusion/common/src/tree_node.rs | 26 ++++-- .../enforce_distribution.rs | 31 ++----- .../src/physical_optimizer/enforce_sorting.rs | 21 ++--- .../physical_optimizer/pipeline_checker.rs | 17 +--- .../replace_with_order_preserving_variants.rs | 16 +--- .../src/physical_optimizer/sort_pushdown.rs | 18 +--- datafusion/expr/src/tree_node/expr.rs | 89 +++++++++++-------- datafusion/expr/src/tree_node/plan.rs | 19 +--- .../physical-expr/src/sort_properties.rs | 18 +--- datafusion/physical-expr/src/utils/mod.rs | 17 +--- 10 files changed, 104 insertions(+), 168 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 5da9636ffe18..bc4dc03dabc5 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -33,6 +33,9 @@ use crate::Result; /// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html /// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html pub trait TreeNode: Sized { + /// Returns all children of the TreeNode + fn children_nodes(&self) -> Vec<&Self>; + /// Use preorder to iterate the node on the tree so that we can /// stop fast for some cases. /// @@ -211,7 +214,17 @@ pub trait TreeNode: Sized { /// Apply the closure `F` to the node's children fn apply_children(&self, op: &mut F) -> Result where - F: FnMut(&Self) -> Result; + F: FnMut(&Self) -> Result, + { + for child in self.children_nodes() { + match op(child)? { + VisitRecursion::Continue => {} + VisitRecursion::Skip => return Ok(VisitRecursion::Continue), + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + } + } + Ok(VisitRecursion::Continue) + } /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder) fn map_children(self, transform: F) -> Result @@ -342,18 +355,21 @@ pub trait DynTreeNode { /// Blanket implementation for Arc for any tye that implements /// [`DynTreeNode`] (such as [`Arc`]) impl TreeNode for Arc { + fn children_nodes(&self) -> Vec<&Arc> { + unimplemented!("Call arc_children instead") + } + fn apply_children(&self, op: &mut F) -> Result where F: FnMut(&Self) -> Result, { - for child in self.arc_children() { - match op(&child)? { + for child in &self.arc_children() { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), } } - Ok(VisitRecursion::Continue) } @@ -368,7 +384,7 @@ impl TreeNode for Arc { let arc_self = Arc::clone(&self); self.with_new_arc_children(arc_self, new_children?) } else { - Ok(self) + Ok(self.clone()) } } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index d5a086227323..f3af6d2c0d34 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -47,7 +47,7 @@ use crate::physical_plan::{ }; use arrow::compute::SortOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; @@ -1409,18 +1409,8 @@ impl DistributionContext { } impl TreeNode for DistributionContext { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children_nodes.iter().collect() } fn map_children(mut self, transform: F) -> Result @@ -1483,19 +1473,8 @@ impl PlanWithKeyRequirements { } impl TreeNode for PlanWithKeyRequirements { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children.iter().collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 77d04a61c59e..27bd71d41393 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -145,11 +145,15 @@ impl PlanWithCorrespondingSort { } impl TreeNode for PlanWithCorrespondingSort { + fn children_nodes(&self) -> Vec<&Self> { + self.children_nodes.iter().collect() + } + fn apply_children(&self, op: &mut F) -> Result where F: FnMut(&Self) -> Result, { - for child in &self.children_nodes { + for child in self.children_nodes() { match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), @@ -237,19 +241,8 @@ impl PlanWithCorrespondingCoalescePartitions { } impl TreeNode for PlanWithCorrespondingCoalescePartitions { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children_nodes.iter().collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 9e9f647d073f..cd2f7c716a2e 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -27,7 +27,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::OptimizerOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; @@ -91,19 +91,8 @@ impl PipelineStatePropagator { } impl TreeNode for PipelineStatePropagator { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children.iter().collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 91f3d2abc6ff..f1fb01fc0aee 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_physical_plan::unbounded_output; /// For a given `plan`, this object carries the information one needs from its @@ -104,18 +104,8 @@ impl OrderPreservationContext { } impl TreeNode for OrderPreservationContext { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children_nodes.iter().collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index b0013863010a..7b33a526d364 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -28,7 +28,7 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, DataFusionError, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; @@ -71,20 +71,10 @@ impl SortPushDown { } impl TreeNode for SortPushDown { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children_nodes.iter().collect() } + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 1098842716b9..939e7b120b27 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -28,11 +28,8 @@ use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{internal_err, DataFusionError, Result}; impl TreeNode for Expr { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - let children = match self { + fn children_nodes(&self) -> Vec<&Self> { + match self { Expr::Alias(Alias{expr,..}) | Expr::Not(expr) | Expr::IsNotNull(expr) @@ -47,15 +44,15 @@ impl TreeNode for Expr { | Expr::Cast(Cast { expr, .. }) | Expr::TryCast(TryCast { expr, .. }) | Expr::Sort(Sort { expr, .. }) - | Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref().clone()], + | Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref()], Expr::GetIndexedField(GetIndexedField { expr, field }) => { - let expr = expr.as_ref().clone(); + let expr = expr.as_ref(); match field { GetFieldAccess::ListIndex {key} => { - vec![key.as_ref().clone(), expr] + vec![key.as_ref(), expr] }, GetFieldAccess::ListRange {start, stop} => { - vec![start.as_ref().clone(), stop.as_ref().clone(), expr] + vec![start.as_ref(), stop.as_ref(), expr] } GetFieldAccess::NamedStructField {name: _name} => { vec![expr] @@ -63,12 +60,12 @@ impl TreeNode for Expr { } } Expr::GroupingSet(GroupingSet::Rollup(exprs)) - | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(), + | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(), Expr::ScalarFunction (ScalarFunction{ args, .. } ) => { - args.clone() + args.iter().collect() } Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { - lists_of_exprs.clone().into_iter().flatten().collect() + lists_of_exprs.iter().map(|a| a.iter().collect::>()).flatten().collect() } Expr::Column(_) // Treat OuterReferenceColumn as a leaf expression @@ -80,66 +77,80 @@ impl TreeNode for Expr { | Expr::Wildcard {..} | Expr::Placeholder (_) => vec![], Expr::BinaryExpr(BinaryExpr { left, right, .. }) => { - vec![left.as_ref().clone(), right.as_ref().clone()] + vec![left.as_ref(), right.as_ref()] } Expr::Like(Like { expr, pattern, .. }) | Expr::SimilarTo(Like { expr, pattern, .. }) => { - vec![expr.as_ref().clone(), pattern.as_ref().clone()] + vec![expr.as_ref(), pattern.as_ref()] } Expr::Between(Between { - expr, low, high, .. - }) => vec![ - expr.as_ref().clone(), - low.as_ref().clone(), - high.as_ref().clone(), + expr, low, high, .. + }) => vec![ + expr.as_ref(), + low.as_ref(), + high.as_ref(), ], Expr::Case(case) => { let mut expr_vec = vec![]; if let Some(expr) = case.expr.as_ref() { - expr_vec.push(expr.as_ref().clone()); + expr_vec.push(expr.as_ref()); }; for (when, then) in case.when_then_expr.iter() { - expr_vec.push(when.as_ref().clone()); - expr_vec.push(then.as_ref().clone()); + expr_vec.push(when.as_ref()); + expr_vec.push(then.as_ref()); } if let Some(else_expr) = case.else_expr.as_ref() { - expr_vec.push(else_expr.as_ref().clone()); + expr_vec.push(else_expr.as_ref()); } expr_vec } Expr::AggregateFunction(AggregateFunction { args, filter, order_by, .. }) - => { - let mut expr_vec = args.clone(); + => { + let mut expr_vec: Vec<&Expr> = args.iter().collect(); if let Some(f) = filter { - expr_vec.push(f.as_ref().clone()); + expr_vec.push(f.as_ref()); } if let Some(o) = order_by { - expr_vec.extend(o.clone()); + for x in o { + expr_vec.push(x); + } } expr_vec } Expr::WindowFunction(WindowFunction { - args, - partition_by, - order_by, - .. - }) => { - let mut expr_vec = args.clone(); - expr_vec.extend(partition_by.clone()); - expr_vec.extend(order_by.clone()); + args, + partition_by, + order_by, + .. + }) => { + let mut expr_vec: Vec<&Expr> = args.iter().collect(); + for x in partition_by { + expr_vec.push(x); + } + for x in order_by { + expr_vec.push(x); + } expr_vec } Expr::InList(InList { expr, list, .. }) => { let mut expr_vec = vec![]; - expr_vec.push(expr.as_ref().clone()); - expr_vec.extend(list.clone()); + expr_vec.push(expr.as_ref()); + for x in list { + expr_vec.push(x); + } expr_vec } - }; + } + } - for child in children.iter() { + fn apply_children(&self, op: &mut F) -> Result + where + F: FnMut(&Self) -> Result, + { + let children = self.children_nodes(); + for child in children.into_iter() { match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index c7621bc17833..8cd2ac39b252 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -22,6 +22,10 @@ use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion}; use datafusion_common::{tree_node::TreeNode, Result}; impl TreeNode for LogicalPlan { + fn children_nodes(&self) -> Vec<&Self> { + self.inputs() + } + fn apply(&self, op: &mut F) -> Result where F: FnMut(&Self) -> Result, @@ -91,21 +95,6 @@ impl TreeNode for LogicalPlan { visitor.post_visit(self) } - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in self.inputs() { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - fn map_children(self, transform: F) -> Result where F: FnMut(Self) -> Result, diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 91238e5b04b4..259b4bf89a46 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -20,7 +20,7 @@ use std::{ops::Neg, sync::Arc}; use arrow_schema::SortOptions; use crate::PhysicalExpr; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Result; /// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient @@ -158,7 +158,7 @@ impl ExprOrdering { /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states /// for `expr` and its children. pub fn new(expr: Arc) -> Self { - let children = expr.children(); + let children = PhysicalExpr::children(expr.as_ref()); Self { expr, state: Default::default(), @@ -173,18 +173,8 @@ impl ExprOrdering { } impl TreeNode for ExprOrdering { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children.iter().collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 87ef36558b96..c2335ab979da 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -136,7 +136,7 @@ pub struct ExprTreeNode { impl ExprTreeNode { pub fn new(expr: Arc) -> Self { - let children = expr.children(); + let children = PhysicalExpr::children(expr.as_ref()); ExprTreeNode { expr, data: None, @@ -154,19 +154,8 @@ impl ExprTreeNode { } impl TreeNode for ExprTreeNode { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in self.children() { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec<&Self> { + self.children().iter().collect() } fn map_children(mut self, transform: F) -> Result From f7255537736a14ccc8500274cc2d76da97f63b87 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 28 Dec 2023 18:36:45 -0800 Subject: [PATCH 2/8] More --- datafusion/common/src/tree_node.rs | 8 +- .../enforce_distribution.rs | 8 +- .../src/physical_optimizer/enforce_sorting.rs | 25 +--- .../physical_optimizer/pipeline_checker.rs | 4 +- .../replace_with_order_preserving_variants.rs | 4 +- .../src/physical_optimizer/sort_pushdown.rs | 4 +- datafusion/expr/src/tree_node/expr.rs | 114 +++++++----------- datafusion/expr/src/tree_node/plan.rs | 4 +- .../physical-expr/src/sort_properties.rs | 6 +- datafusion/physical-expr/src/utils/mod.rs | 4 +- 10 files changed, 72 insertions(+), 109 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index bc4dc03dabc5..48c04b63e2b5 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -34,7 +34,7 @@ use crate::Result; /// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html pub trait TreeNode: Sized { /// Returns all children of the TreeNode - fn children_nodes(&self) -> Vec<&Self>; + fn children_nodes(&self) -> Vec; /// Use preorder to iterate the node on the tree so that we can /// stop fast for some cases. @@ -217,7 +217,7 @@ pub trait TreeNode: Sized { F: FnMut(&Self) -> Result, { for child in self.children_nodes() { - match op(child)? { + match op(&child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -355,8 +355,8 @@ pub trait DynTreeNode { /// Blanket implementation for Arc for any tye that implements /// [`DynTreeNode`] (such as [`Arc`]) impl TreeNode for Arc { - fn children_nodes(&self) -> Vec<&Arc> { - unimplemented!("Call arc_children instead") + fn children_nodes(&self) -> Vec> { + self.arc_children() } fn apply_children(&self, op: &mut F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index f3af6d2c0d34..6f35a97fac24 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1409,8 +1409,8 @@ impl DistributionContext { } impl TreeNode for DistributionContext { - fn children_nodes(&self) -> Vec<&Self> { - self.children_nodes.iter().collect() + fn children_nodes(&self) -> Vec { + self.children_nodes.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result @@ -1473,8 +1473,8 @@ impl PlanWithKeyRequirements { } impl TreeNode for PlanWithKeyRequirements { - fn children_nodes(&self) -> Vec<&Self> { - self.children.iter().collect() + fn children_nodes(&self) -> Vec { + self.children.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 27bd71d41393..52b59969139a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -57,7 +57,7 @@ use crate::physical_plan::{ with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode, }; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -145,23 +145,8 @@ impl PlanWithCorrespondingSort { } impl TreeNode for PlanWithCorrespondingSort { - fn children_nodes(&self) -> Vec<&Self> { - self.children_nodes.iter().collect() - } - - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in self.children_nodes() { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) + fn children_nodes(&self) -> Vec { + self.children_nodes.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result @@ -241,8 +226,8 @@ impl PlanWithCorrespondingCoalescePartitions { } impl TreeNode for PlanWithCorrespondingCoalescePartitions { - fn children_nodes(&self) -> Vec<&Self> { - self.children_nodes.iter().collect() + fn children_nodes(&self) -> Vec { + self.children_nodes.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index cd2f7c716a2e..01664e4bbd18 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -91,8 +91,8 @@ impl PipelineStatePropagator { } impl TreeNode for PipelineStatePropagator { - fn children_nodes(&self) -> Vec<&Self> { - self.children.iter().collect() + fn children_nodes(&self) -> Vec { + self.children.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f1fb01fc0aee..ae946195c043 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -104,8 +104,8 @@ impl OrderPreservationContext { } impl TreeNode for OrderPreservationContext { - fn children_nodes(&self) -> Vec<&Self> { - self.children_nodes.iter().collect() + fn children_nodes(&self) -> Vec { + self.children_nodes.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 7b33a526d364..441abec040f1 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -71,8 +71,8 @@ impl SortPushDown { } impl TreeNode for SortPushDown { - fn children_nodes(&self) -> Vec<&Self> { - self.children_nodes.iter().collect() + fn children_nodes(&self) -> Vec { + self.children_nodes.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 939e7b120b27..93904a7d2bc4 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -24,13 +24,13 @@ use crate::expr::{ }; use crate::{Expr, GetFieldAccess}; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{internal_err, DataFusionError, Result}; impl TreeNode for Expr { - fn children_nodes(&self) -> Vec<&Self> { + fn children_nodes(&self) -> Vec { match self { - Expr::Alias(Alias{expr,..}) + Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::IsNotNull(expr) | Expr::IsTrue(expr) @@ -44,28 +44,26 @@ impl TreeNode for Expr { | Expr::Cast(Cast { expr, .. }) | Expr::TryCast(TryCast { expr, .. }) | Expr::Sort(Sort { expr, .. }) - | Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref()], + | Expr::InSubquery(InSubquery { expr, .. }) => vec![expr.as_ref().clone()], Expr::GetIndexedField(GetIndexedField { expr, field }) => { - let expr = expr.as_ref(); + let expr = expr.as_ref().clone(); match field { - GetFieldAccess::ListIndex {key} => { - vec![key.as_ref(), expr] - }, - GetFieldAccess::ListRange {start, stop} => { - vec![start.as_ref(), stop.as_ref(), expr] + GetFieldAccess::ListIndex { key } => { + vec![key.as_ref().clone(), expr] } - GetFieldAccess::NamedStructField {name: _name} => { + GetFieldAccess::ListRange { start, stop } => { + vec![start.as_ref().clone(), stop.as_ref().clone(), expr] + } + GetFieldAccess::NamedStructField { name: _name } => { vec![expr] } } } Expr::GroupingSet(GroupingSet::Rollup(exprs)) - | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(), - Expr::ScalarFunction (ScalarFunction{ args, .. } ) => { - args.iter().collect() - } + | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(), + Expr::ScalarFunction(ScalarFunction { args, .. }) => args.clone(), Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { - lists_of_exprs.iter().map(|a| a.iter().collect::>()).flatten().collect() + lists_of_exprs.clone().into_iter().flatten().collect() } Expr::Column(_) // Treat OuterReferenceColumn as a leaf expression @@ -74,93 +72,73 @@ impl TreeNode for Expr { | Expr::Literal(_) | Expr::Exists { .. } | Expr::ScalarSubquery(_) - | Expr::Wildcard {..} - | Expr::Placeholder (_) => vec![], + | Expr::Wildcard { .. } + | Expr::Placeholder(_) => vec![], Expr::BinaryExpr(BinaryExpr { left, right, .. }) => { - vec![left.as_ref(), right.as_ref()] + vec![left.as_ref().clone(), right.as_ref().clone()] } Expr::Like(Like { expr, pattern, .. }) | Expr::SimilarTo(Like { expr, pattern, .. }) => { - vec![expr.as_ref(), pattern.as_ref()] + vec![expr.as_ref().clone(), pattern.as_ref().clone()] } Expr::Between(Between { - expr, low, high, .. - }) => vec![ - expr.as_ref(), - low.as_ref(), - high.as_ref(), + expr, low, high, .. + }) => vec![ + expr.as_ref().clone(), + low.as_ref().clone(), + high.as_ref().clone(), ], Expr::Case(case) => { let mut expr_vec = vec![]; if let Some(expr) = case.expr.as_ref() { - expr_vec.push(expr.as_ref()); + expr_vec.push(expr.as_ref().clone()); }; for (when, then) in case.when_then_expr.iter() { - expr_vec.push(when.as_ref()); - expr_vec.push(then.as_ref()); + expr_vec.push(when.as_ref().clone()); + expr_vec.push(then.as_ref().clone()); } if let Some(else_expr) = case.else_expr.as_ref() { - expr_vec.push(else_expr.as_ref()); + expr_vec.push(else_expr.as_ref().clone()); } expr_vec } - Expr::AggregateFunction(AggregateFunction { args, filter, order_by, .. }) - => { - let mut expr_vec: Vec<&Expr> = args.iter().collect(); + Expr::AggregateFunction(AggregateFunction { + args, + filter, + order_by, + .. + }) => { + let mut expr_vec = args.clone(); if let Some(f) = filter { - expr_vec.push(f.as_ref()); + expr_vec.push(f.as_ref().clone()); } if let Some(o) = order_by { - for x in o { - expr_vec.push(x); - } + expr_vec.extend(o.clone()); } expr_vec } Expr::WindowFunction(WindowFunction { - args, - partition_by, - order_by, - .. - }) => { - let mut expr_vec: Vec<&Expr> = args.iter().collect(); - for x in partition_by { - expr_vec.push(x); - } - for x in order_by { - expr_vec.push(x); - } + args, + partition_by, + order_by, + .. + }) => { + let mut expr_vec = args.clone(); + expr_vec.extend(partition_by.clone()); + expr_vec.extend(order_by.clone()); expr_vec } Expr::InList(InList { expr, list, .. }) => { let mut expr_vec = vec![]; - expr_vec.push(expr.as_ref()); - for x in list { - expr_vec.push(x); - } + expr_vec.push(expr.as_ref().clone()); + expr_vec.extend(list.clone()); expr_vec } } } - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - let children = self.children_nodes(); - for child in children.into_iter() { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - fn map_children(self, transform: F) -> Result where F: FnMut(Self) -> Result, diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 8cd2ac39b252..d994b1367944 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -22,8 +22,8 @@ use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion}; use datafusion_common::{tree_node::TreeNode, Result}; impl TreeNode for LogicalPlan { - fn children_nodes(&self) -> Vec<&Self> { - self.inputs() + fn children_nodes(&self) -> Vec { + self.inputs().into_iter().map(|p| p.clone()).collect() } fn apply(&self, op: &mut F) -> Result diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 259b4bf89a46..f3d158c1a002 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -147,7 +147,7 @@ impl Neg for SortProperties { /// It encapsulates the orderings (`state`) associated with the expression (`expr`), and /// orderings of the children expressions (`children_states`). The [`ExprOrdering`] of a parent /// expression is determined based on the [`ExprOrdering`] states of its children expressions. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ExprOrdering { pub expr: Arc, pub state: SortProperties, @@ -173,8 +173,8 @@ impl ExprOrdering { } impl TreeNode for ExprOrdering { - fn children_nodes(&self) -> Vec<&Self> { - self.children.iter().collect() + fn children_nodes(&self) -> Vec { + self.children.iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index c2335ab979da..fc59e716487b 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -154,8 +154,8 @@ impl ExprTreeNode { } impl TreeNode for ExprTreeNode { - fn children_nodes(&self) -> Vec<&Self> { - self.children().iter().collect() + fn children_nodes(&self) -> Vec { + self.children().iter().map(|c| c.clone()).collect() } fn map_children(mut self, transform: F) -> Result From e05123ac24ebb9518ecff5a02e7fbefa7b987115 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 28 Dec 2023 18:42:17 -0800 Subject: [PATCH 3/8] More --- datafusion/physical-expr/src/sort_properties.rs | 2 +- datafusion/physical-expr/src/utils/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index f3d158c1a002..7b003f320846 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -158,7 +158,7 @@ impl ExprOrdering { /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states /// for `expr` and its children. pub fn new(expr: Arc) -> Self { - let children = PhysicalExpr::children(expr.as_ref()); + let children = expr.children(); Self { expr, state: Default::default(), diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index fc59e716487b..35a1cec7c49e 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -136,7 +136,7 @@ pub struct ExprTreeNode { impl ExprTreeNode { pub fn new(expr: Arc) -> Self { - let children = PhysicalExpr::children(expr.as_ref()); + let children = expr.children(); ExprTreeNode { expr, data: None, From 56c39199cc272764255fbcebefe4c6c90398656d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 28 Dec 2023 18:53:51 -0800 Subject: [PATCH 4/8] Fix clippy --- .../core/src/physical_optimizer/enforce_distribution.rs | 4 ++-- datafusion/core/src/physical_optimizer/enforce_sorting.rs | 4 ++-- datafusion/core/src/physical_optimizer/pipeline_checker.rs | 2 +- .../replace_with_order_preserving_variants.rs | 2 +- datafusion/core/src/physical_optimizer/sort_pushdown.rs | 2 +- datafusion/expr/src/tree_node/plan.rs | 2 +- datafusion/physical-expr/src/sort_properties.rs | 2 +- datafusion/physical-expr/src/utils/mod.rs | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 6f35a97fac24..55976d4fa543 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1410,7 +1410,7 @@ impl DistributionContext { impl TreeNode for DistributionContext { fn children_nodes(&self) -> Vec { - self.children_nodes.iter().map(|c| c.clone()).collect() + self.children_nodes.to_vec() } fn map_children(mut self, transform: F) -> Result @@ -1474,7 +1474,7 @@ impl PlanWithKeyRequirements { impl TreeNode for PlanWithKeyRequirements { fn children_nodes(&self) -> Vec { - self.children.iter().map(|c| c.clone()).collect() + self.children.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 52b59969139a..270db054e2c9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -146,7 +146,7 @@ impl PlanWithCorrespondingSort { impl TreeNode for PlanWithCorrespondingSort { fn children_nodes(&self) -> Vec { - self.children_nodes.iter().map(|c| c.clone()).collect() + self.children_nodes.to_vec() } fn map_children(mut self, transform: F) -> Result @@ -227,7 +227,7 @@ impl PlanWithCorrespondingCoalescePartitions { impl TreeNode for PlanWithCorrespondingCoalescePartitions { fn children_nodes(&self) -> Vec { - self.children_nodes.iter().map(|c| c.clone()).collect() + self.children_nodes.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 01664e4bbd18..5d263ad90394 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -92,7 +92,7 @@ impl PipelineStatePropagator { impl TreeNode for PipelineStatePropagator { fn children_nodes(&self) -> Vec { - self.children.iter().map(|c| c.clone()).collect() + self.children.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index ae946195c043..b995b06a7b6e 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -105,7 +105,7 @@ impl OrderPreservationContext { impl TreeNode for OrderPreservationContext { fn children_nodes(&self) -> Vec { - self.children_nodes.iter().map(|c| c.clone()).collect() + self.children_nodes.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 441abec040f1..532ff9480f28 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -72,7 +72,7 @@ impl SortPushDown { impl TreeNode for SortPushDown { fn children_nodes(&self) -> Vec { - self.children_nodes.iter().map(|c| c.clone()).collect() + self.children_nodes.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index d994b1367944..1ae3be0df6ad 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -23,7 +23,7 @@ use datafusion_common::{tree_node::TreeNode, Result}; impl TreeNode for LogicalPlan { fn children_nodes(&self) -> Vec { - self.inputs().into_iter().map(|p| p.clone()).collect() + self.inputs().into_iter().cloned().collect() } fn apply(&self, op: &mut F) -> Result diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 7b003f320846..b30e32017af4 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -174,7 +174,7 @@ impl ExprOrdering { impl TreeNode for ExprOrdering { fn children_nodes(&self) -> Vec { - self.children.iter().map(|c| c.clone()).collect() + self.children.to_vec() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 35a1cec7c49e..eacba29cbd76 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -155,7 +155,7 @@ impl ExprTreeNode { impl TreeNode for ExprTreeNode { fn children_nodes(&self) -> Vec { - self.children().iter().map(|c| c.clone()).collect() + self.children().to_vec() } fn map_children(mut self, transform: F) -> Result From ee4c9df0628a2c8f16680815f7c3fcc8322852a1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sat, 30 Dec 2023 09:09:44 +0100 Subject: [PATCH 5/8] avoid cloning in `TreeNode.children_nodes()` implementations where possible using `Cow` --- datafusion/common/src/tree_node.rs | 9 ++-- .../enforce_distribution.rs | 9 ++-- .../src/physical_optimizer/enforce_sorting.rs | 9 ++-- .../physical_optimizer/pipeline_checker.rs | 5 +- .../replace_with_order_preserving_variants.rs | 5 +- .../src/physical_optimizer/sort_pushdown.rs | 5 +- datafusion/expr/src/tree_node/expr.rs | 52 +++++++++---------- datafusion/expr/src/tree_node/plan.rs | 5 +- .../physical-expr/src/sort_properties.rs | 5 +- datafusion/physical-expr/src/utils/mod.rs | 6 +-- 10 files changed, 59 insertions(+), 51 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 48c04b63e2b5..f7cf890c7ab0 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -18,6 +18,7 @@ //! This module provides common traits for visiting or rewriting tree //! data structures easily. +use std::borrow::Cow; use std::sync::Arc; use crate::Result; @@ -32,9 +33,9 @@ use crate::Result; /// [`PhysicalExpr`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html /// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html /// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html -pub trait TreeNode: Sized { +pub trait TreeNode: Sized + Clone { /// Returns all children of the TreeNode - fn children_nodes(&self) -> Vec; + fn children_nodes(&self) -> Vec>; /// Use preorder to iterate the node on the tree so that we can /// stop fast for some cases. @@ -355,8 +356,8 @@ pub trait DynTreeNode { /// Blanket implementation for Arc for any tye that implements /// [`DynTreeNode`] (such as [`Arc`]) impl TreeNode for Arc { - fn children_nodes(&self) -> Vec> { - self.arc_children() + fn children_nodes(&self) -> Vec> { + self.arc_children().into_iter().map(Cow::Owned).collect() } fn apply_children(&self, op: &mut F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 55976d4fa543..bf5aa7d02272 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -21,6 +21,7 @@ //! according to the configuration), this rule increases partition counts in //! the physical plan. +use std::borrow::Cow; use std::fmt; use std::fmt::Formatter; use std::sync::Arc; @@ -1409,8 +1410,8 @@ impl DistributionContext { } impl TreeNode for DistributionContext { - fn children_nodes(&self) -> Vec { - self.children_nodes.to_vec() + fn children_nodes(&self) -> Vec> { + self.children_nodes.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result @@ -1473,8 +1474,8 @@ impl PlanWithKeyRequirements { } impl TreeNode for PlanWithKeyRequirements { - fn children_nodes(&self) -> Vec { - self.children.to_vec() + fn children_nodes(&self) -> Vec> { + self.children.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 270db054e2c9..f609ddea66cf 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -34,6 +34,7 @@ //! in the physical plan. The first sort is unnecessary since its result is overwritten //! by another [`SortExec`]. Therefore, this rule removes it from the physical plan. +use std::borrow::Cow; use std::sync::Arc; use crate::config::ConfigOptions; @@ -145,8 +146,8 @@ impl PlanWithCorrespondingSort { } impl TreeNode for PlanWithCorrespondingSort { - fn children_nodes(&self) -> Vec { - self.children_nodes.to_vec() + fn children_nodes(&self) -> Vec> { + self.children_nodes.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result @@ -226,8 +227,8 @@ impl PlanWithCorrespondingCoalescePartitions { } impl TreeNode for PlanWithCorrespondingCoalescePartitions { - fn children_nodes(&self) -> Vec { - self.children_nodes.to_vec() + fn children_nodes(&self) -> Vec> { + self.children_nodes.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 5d263ad90394..e281d0e7c23e 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -19,6 +19,7 @@ //! infinite sources, if there are any. It will reject non-runnable query plans //! that use pipeline-breaking operators on infinite input(s). +use std::borrow::Cow; use std::sync::Arc; use crate::config::ConfigOptions; @@ -91,8 +92,8 @@ impl PipelineStatePropagator { } impl TreeNode for PipelineStatePropagator { - fn children_nodes(&self) -> Vec { - self.children.to_vec() + fn children_nodes(&self) -> Vec> { + self.children.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index b995b06a7b6e..e49b358608aa 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -19,6 +19,7 @@ //! order-preserving variants when it is helpful; either in terms of //! performance or to accommodate unbounded streams by fixing the pipeline. +use std::borrow::Cow; use std::sync::Arc; use super::utils::is_repartition; @@ -104,8 +105,8 @@ impl OrderPreservationContext { } impl TreeNode for OrderPreservationContext { - fn children_nodes(&self) -> Vec { - self.children_nodes.to_vec() + fn children_nodes(&self) -> Vec> { + self.children_nodes.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 532ff9480f28..97ca47baf05f 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::borrow::Cow; use std::sync::Arc; use crate::physical_optimizer::utils::{ @@ -71,8 +72,8 @@ impl SortPushDown { } impl TreeNode for SortPushDown { - fn children_nodes(&self) -> Vec { - self.children_nodes.to_vec() + fn children_nodes(&self) -> Vec> { + self.children_nodes.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 93904a7d2bc4..3a12a2cfb8b5 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -23,12 +23,13 @@ use crate::expr::{ ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction, }; use crate::{Expr, GetFieldAccess}; +use std::borrow::Cow; use datafusion_common::tree_node::TreeNode; use datafusion_common::{internal_err, DataFusionError, Result}; impl TreeNode for Expr { - fn children_nodes(&self) -> Vec { + fn children_nodes(&self) -> Vec> { match self { Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) @@ -44,15 +45,15 @@ impl TreeNode for Expr { | Expr::Cast(Cast { expr, .. }) | Expr::TryCast(TryCast { expr, .. }) | Expr::Sort(Sort { expr, .. }) - | Expr::InSubquery(InSubquery { expr, .. }) => vec![expr.as_ref().clone()], + | Expr::InSubquery(InSubquery { expr, .. }) => vec![Cow::Borrowed(expr)], Expr::GetIndexedField(GetIndexedField { expr, field }) => { - let expr = expr.as_ref().clone(); + let expr = Cow::Borrowed(expr.as_ref()); match field { GetFieldAccess::ListIndex { key } => { - vec![key.as_ref().clone(), expr] + vec![Cow::Borrowed(key.as_ref()), expr] } GetFieldAccess::ListRange { start, stop } => { - vec![start.as_ref().clone(), stop.as_ref().clone(), expr] + vec![Cow::Borrowed(start), Cow::Borrowed(stop), expr] } GetFieldAccess::NamedStructField { name: _name } => { vec![expr] @@ -60,10 +61,10 @@ impl TreeNode for Expr { } } Expr::GroupingSet(GroupingSet::Rollup(exprs)) - | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(), - Expr::ScalarFunction(ScalarFunction { args, .. }) => args.clone(), + | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.into_iter().map(Cow::Borrowed).collect(), + Expr::ScalarFunction(ScalarFunction { args, .. }) => args.into_iter().map(Cow::Borrowed).collect(), Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { - lists_of_exprs.clone().into_iter().flatten().collect() + lists_of_exprs.into_iter().flatten().map(Cow::Borrowed).collect() } Expr::Column(_) // Treat OuterReferenceColumn as a leaf expression @@ -75,30 +76,30 @@ impl TreeNode for Expr { | Expr::Wildcard { .. } | Expr::Placeholder(_) => vec![], Expr::BinaryExpr(BinaryExpr { left, right, .. }) => { - vec![left.as_ref().clone(), right.as_ref().clone()] + vec![Cow::Borrowed(left), Cow::Borrowed(right)] } Expr::Like(Like { expr, pattern, .. }) | Expr::SimilarTo(Like { expr, pattern, .. }) => { - vec![expr.as_ref().clone(), pattern.as_ref().clone()] + vec![Cow::Borrowed(expr), Cow::Borrowed(pattern)] } Expr::Between(Between { expr, low, high, .. }) => vec![ - expr.as_ref().clone(), - low.as_ref().clone(), - high.as_ref().clone(), + Cow::Borrowed(expr), + Cow::Borrowed(low), + Cow::Borrowed(high), ], Expr::Case(case) => { let mut expr_vec = vec![]; if let Some(expr) = case.expr.as_ref() { - expr_vec.push(expr.as_ref().clone()); + expr_vec.push(Cow::Borrowed(expr.as_ref())); }; for (when, then) in case.when_then_expr.iter() { - expr_vec.push(when.as_ref().clone()); - expr_vec.push(then.as_ref().clone()); + expr_vec.push(Cow::Borrowed(when)); + expr_vec.push(Cow::Borrowed(then)); } if let Some(else_expr) = case.else_expr.as_ref() { - expr_vec.push(else_expr.as_ref().clone()); + expr_vec.push(Cow::Borrowed(else_expr)); } expr_vec } @@ -108,13 +109,13 @@ impl TreeNode for Expr { order_by, .. }) => { - let mut expr_vec = args.clone(); + let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect(); if let Some(f) = filter { - expr_vec.push(f.as_ref().clone()); + expr_vec.push(Cow::Borrowed(f)); } if let Some(o) = order_by { - expr_vec.extend(o.clone()); + expr_vec.extend(o.into_iter().map(Cow::Borrowed).collect::>()); } expr_vec @@ -125,15 +126,14 @@ impl TreeNode for Expr { order_by, .. }) => { - let mut expr_vec = args.clone(); - expr_vec.extend(partition_by.clone()); - expr_vec.extend(order_by.clone()); + let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect(); + expr_vec.extend(partition_by.into_iter().map(Cow::Borrowed).collect::>()); + expr_vec.extend(order_by.into_iter().map(Cow::Borrowed).collect::>()); expr_vec } Expr::InList(InList { expr, list, .. }) => { - let mut expr_vec = vec![]; - expr_vec.push(expr.as_ref().clone()); - expr_vec.extend(list.clone()); + let mut expr_vec = vec![Cow::Borrowed(expr.as_ref())]; + expr_vec.extend(list.into_iter().map(Cow::Borrowed).collect::>()); expr_vec } } diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 1ae3be0df6ad..217116530d4a 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -20,10 +20,11 @@ use crate::LogicalPlan; use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion}; use datafusion_common::{tree_node::TreeNode, Result}; +use std::borrow::Cow; impl TreeNode for LogicalPlan { - fn children_nodes(&self) -> Vec { - self.inputs().into_iter().cloned().collect() + fn children_nodes(&self) -> Vec> { + self.inputs().into_iter().map(Cow::Borrowed).collect() } fn apply(&self, op: &mut F) -> Result diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index b30e32017af4..0205f85dced4 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::borrow::Cow; use std::{ops::Neg, sync::Arc}; use arrow_schema::SortOptions; @@ -173,8 +174,8 @@ impl ExprOrdering { } impl TreeNode for ExprOrdering { - fn children_nodes(&self) -> Vec { - self.children.to_vec() + fn children_nodes(&self) -> Vec> { + self.children.iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index eacba29cbd76..808833b65946 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -18,7 +18,7 @@ mod guarantee; pub use guarantee::{Guarantee, LiteralGuarantee}; -use std::borrow::Borrow; +use std::borrow::{Borrow, Cow}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -154,8 +154,8 @@ impl ExprTreeNode { } impl TreeNode for ExprTreeNode { - fn children_nodes(&self) -> Vec { - self.children().to_vec() + fn children_nodes(&self) -> Vec> { + self.children().into_iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result From d4eab3a842cad05463ac688765b41a984bee0a8b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 30 Dec 2023 10:42:36 -0800 Subject: [PATCH 6/8] Remove more unnecessary apply_children --- datafusion/common/src/tree_node.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 48c04b63e2b5..642dc82cde70 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -359,20 +359,6 @@ impl TreeNode for Arc { self.arc_children() } - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.arc_children() { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) - } - fn map_children(self, transform: F) -> Result where F: FnMut(Self) -> Result, From 2958714ec69032790518a7d74e1006540a5b163a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 30 Dec 2023 16:49:51 -0800 Subject: [PATCH 7/8] Fix clippy --- datafusion/expr/src/tree_node/expr.rs | 18 +++++++++--------- datafusion/physical-expr/src/utils/mod.rs | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 3a12a2cfb8b5..56388be58b8a 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -61,10 +61,10 @@ impl TreeNode for Expr { } } Expr::GroupingSet(GroupingSet::Rollup(exprs)) - | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.into_iter().map(Cow::Borrowed).collect(), - Expr::ScalarFunction(ScalarFunction { args, .. }) => args.into_iter().map(Cow::Borrowed).collect(), + | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().map(Cow::Borrowed).collect(), + Expr::ScalarFunction(ScalarFunction { args, .. }) => args.iter().map(Cow::Borrowed).collect(), Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { - lists_of_exprs.into_iter().flatten().map(Cow::Borrowed).collect() + lists_of_exprs.iter().flatten().map(Cow::Borrowed).collect() } Expr::Column(_) // Treat OuterReferenceColumn as a leaf expression @@ -109,13 +109,13 @@ impl TreeNode for Expr { order_by, .. }) => { - let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect(); + let mut expr_vec: Vec<_> = args.iter().map(Cow::Borrowed).collect(); if let Some(f) = filter { expr_vec.push(Cow::Borrowed(f)); } if let Some(o) = order_by { - expr_vec.extend(o.into_iter().map(Cow::Borrowed).collect::>()); + expr_vec.extend(o.iter().map(Cow::Borrowed).collect::>()); } expr_vec @@ -126,14 +126,14 @@ impl TreeNode for Expr { order_by, .. }) => { - let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect(); - expr_vec.extend(partition_by.into_iter().map(Cow::Borrowed).collect::>()); - expr_vec.extend(order_by.into_iter().map(Cow::Borrowed).collect::>()); + let mut expr_vec: Vec<_> = args.iter().map(Cow::Borrowed).collect(); + expr_vec.extend(partition_by.iter().map(Cow::Borrowed).collect::>()); + expr_vec.extend(order_by.iter().map(Cow::Borrowed).collect::>()); expr_vec } Expr::InList(InList { expr, list, .. }) => { let mut expr_vec = vec![Cow::Borrowed(expr.as_ref())]; - expr_vec.extend(list.into_iter().map(Cow::Borrowed).collect::>()); + expr_vec.extend(list.iter().map(Cow::Borrowed).collect::>()); expr_vec } } diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 808833b65946..64a62dc7820d 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -155,7 +155,7 @@ impl ExprTreeNode { impl TreeNode for ExprTreeNode { fn children_nodes(&self) -> Vec> { - self.children().into_iter().map(Cow::Borrowed).collect() + self.children().iter().map(Cow::Borrowed).collect() } fn map_children(mut self, transform: F) -> Result From 26736bba215a32153515cbbbef5460c928f6ce4e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 30 Dec 2023 23:10:45 -0800 Subject: [PATCH 8/8] Remove --- datafusion/common/src/tree_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 2efb1127953d..5f11c8cc1d11 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -371,7 +371,7 @@ impl TreeNode for Arc { let arc_self = Arc::clone(&self); self.with_new_arc_children(arc_self, new_children?) } else { - Ok(self.clone()) + Ok(self) } } }