Skip to content

Commit

Permalink
refactor TreeNode::apply and relatives
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Oct 28, 2023
1 parent d28c79d commit 26b4811
Show file tree
Hide file tree
Showing 18 changed files with 401 additions and 500 deletions.
315 changes: 224 additions & 91 deletions datafusion/common/src/tree_node.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(name);
if is_applicable {
VisitRecursion::Skip
VisitRecursion::Prune
} else {
VisitRecursion::Stop
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2036,9 +2036,9 @@ impl<'a> BadPlanVisitor<'a> {
}

impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
type N = LogicalPlan;
type Node = LogicalPlan;

fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
fn pre_visit(&mut self, node: &Self::Node) -> Result<VisitRecursion> {
match node {
LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => {
plan_err!("DDL not supported: {}", ddl.name())
Expand Down
28 changes: 7 additions & 21 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, VisitRecursion,
};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, NoOp};
Expand Down Expand Up @@ -1503,18 +1505,11 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1593,20 +1588,11 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children();
for child in children {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
30 changes: 7 additions & 23 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ use crate::physical_plan::windows::{
};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, VisitRecursion,
};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::utils::{
ordering_satisfy, ordering_satisfy_requirement_concrete,
Expand Down Expand Up @@ -158,20 +160,11 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children();
for child in children {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -274,20 +267,11 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children();
for child in children {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
20 changes: 8 additions & 12 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, VisitRecursion,
};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};

Expand Down Expand Up @@ -86,20 +88,14 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.plan.children();
for child in children {
match op(&PipelineStatePropagator::new(child))? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
self.plan
.children()
.into_iter()
.for_each_continue_with(&mut |c| f(&PipelineStatePropagator::new(c)))
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use super::utils::is_repartition;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, VisitRecursion,
};
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_plan::unbounded_output;

Expand Down Expand Up @@ -119,18 +121,11 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
17 changes: 5 additions & 12 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, VisitRecursion,
};
use datafusion_common::{
plan_datafusion_err, plan_err, DataFusionError, JoinSide, Result,
};
Expand Down Expand Up @@ -85,20 +87,11 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
fn apply_children<F>(&self, f: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children();
for child in children {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
self.children().iter().for_each_continue_with(f)
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<'a, 'b> IndentVisitor<'a, 'b> {
}

impl<'a, 'b> TreeNodeVisitor for IndentVisitor<'a, 'b> {
type N = LogicalPlan;
type Node = LogicalPlan;

fn pre_visit(
&mut self,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl<'a, 'b> GraphvizVisitor<'a, 'b> {
}

impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
type N = LogicalPlan;
type Node = LogicalPlan;

fn pre_visit(
&mut self,
Expand Down
Loading

0 comments on commit 26b4811

Please sign in to comment.