Skip to content

Commit

Permalink
remove Transformed
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jan 11, 2024
1 parent 36877f3 commit 6094565
Show file tree
Hide file tree
Showing 37 changed files with 247 additions and 334 deletions.
21 changes: 8 additions & 13 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
Expand Down Expand Up @@ -95,12 +95,9 @@ impl MyAnalyzerRule {
Ok(match plan {
LogicalPlan::Filter(filter) => {
let predicate = Self::analyze_expr(filter.predicate.clone())?;
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
)?))
LogicalPlan::Filter(Filter::try_new(predicate, filter.input)?)
}
_ => Transformed::No(plan),
_ => plan,
})
})
}
Expand All @@ -111,11 +108,9 @@ impl MyAnalyzerRule {
Ok(match expr {
Expr::Literal(ScalarValue::Int64(i)) => {
// transform to UInt64
Transformed::Yes(Expr::Literal(ScalarValue::UInt64(
i.map(|i| i as u64),
)))
Expr::Literal(ScalarValue::UInt64(i.map(|i| i as u64)))
}
_ => Transformed::No(expr),
_ => expr,
})
})
}
Expand Down Expand Up @@ -175,12 +170,12 @@ fn my_rewrite(expr: Expr) -> Result<Expr> {
let low: Expr = *low;
let high: Expr = *high;
if negated {
Transformed::Yes(expr.clone().lt(low).or(expr.gt(high)))
expr.clone().lt(low).or(expr.gt(high))
} else {
Transformed::Yes(expr.clone().gt_eq(low).and(expr.lt_eq(high)))
expr.clone().gt_eq(low).and(expr.lt_eq(high))
}
}
_ => Transformed::No(expr),
_ => expr,
})
})
}
Expand Down
41 changes: 9 additions & 32 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub trait TreeNode: Sized + Clone {
/// The default tree traversal direction is transform_up(Postorder Traversal).
fn transform<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
self.transform_up(op)
}
Expand All @@ -123,9 +123,9 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
let after_op = op(self)?.into();
let after_op = op(self)?;
after_op.map_children(|node| node.transform_down(op))
}

Expand All @@ -134,9 +134,9 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
F: FnMut(Self) -> Result<Self>,
{
let after_op = op(self)?.into();
let after_op = op(self)?;
after_op.map_children(|node| node.transform_down_mut(op))
}

Expand All @@ -145,11 +145,11 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
let after_op_children = self.map_children(|node| node.transform_up(op))?;

let new_node = op(after_op_children)?.into();
let new_node = op(after_op_children)?;
Ok(new_node)
}

Expand All @@ -158,11 +158,11 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
F: FnMut(Self) -> Result<Self>,
{
let after_op_children = self.map_children(|node| node.transform_up_mut(op))?;

let new_node = op(after_op_children)?.into();
let new_node = op(after_op_children)?;
Ok(new_node)
}

Expand Down Expand Up @@ -314,29 +314,6 @@ pub enum VisitRecursion {
Stop,
}

pub enum Transformed<T> {
/// The item was transformed / rewritten somehow
Yes(T),
/// The item was not transformed
No(T),
}

impl<T> Transformed<T> {
pub fn into(self) -> T {
match self {
Transformed::Yes(t) => t,
Transformed::No(t) => t,
}
}

pub fn into_pair(self) -> (T, bool) {
match self {
Transformed::Yes(t) => (t, true),
Transformed::No(t) => (t, false),
}
}
}

/// Helper trait for implementing [`TreeNode`] that have children stored as Arc's
///
/// If some trait object, such as `dyn T`, implements this trait,
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
repartition::RepartitionExec, Partitioning,
},
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use std::sync::Arc;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
Expand Down Expand Up @@ -71,12 +71,9 @@ impl PhysicalOptimizerRule for CoalesceBatches {
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::Yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))
Ok(Arc::new(CoalesceBatchesExec::new(plan, target_batch_size)))
} else {
Ok(Transformed::No(plan))
Ok(plan)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro
use crate::physical_plan::ExecutionPlan;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};

Expand Down Expand Up @@ -109,9 +109,9 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
});

Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
transformed
} else {
Transformed::No(plan)
plan
})
})
}
Expand Down Expand Up @@ -185,9 +185,9 @@ fn discard_column_index(group_expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalEx
None => None,
};
Ok(if let Some(normalized_form) = normalized_form {
Transformed::Yes(normalized_form)
normalized_form
} else {
Transformed::No(expr)
expr
})
})
.unwrap_or(group_expr)
Expand Down
55 changes: 20 additions & 35 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -205,9 +205,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
adjusted.plan
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Ok(Transformed::Yes(reorder_join_keys_to_inputs(plan)?))
})?
plan.transform_up(&reorder_join_keys_to_inputs)?
};

let distribution_context = DistributionContext::new(adjusted);
Expand Down Expand Up @@ -272,7 +270,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
///
fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
) -> Result<PlanWithKeyRequirements> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();

Expand Down Expand Up @@ -309,7 +307,6 @@ fn adjust_input_keys_ordering(
vec![],
&join_constructor,
)
.map(Transformed::Yes)
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
Expand All @@ -329,13 +326,11 @@ fn adjust_input_keys_ordering(
// Push down requirements to the right side
requirements.children[1].required_key_ordering =
new_right_request.unwrap_or(vec![]);
Ok(Transformed::Yes(requirements))
Ok(requirements)
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
Expand All @@ -345,7 +340,7 @@ fn adjust_input_keys_ordering(
// Push down requirements to the right side
requirements.children[1].required_key_ordering =
shift_right_required(&parent_required, left_columns_len).unwrap_or_default();
Ok(Transformed::Yes(requirements))
Ok(requirements)
} else if let Some(SortMergeJoinExec {
left,
right,
Expand Down Expand Up @@ -375,23 +370,19 @@ fn adjust_input_keys_ordering(
sort_options.clone(),
&join_constructor,
)
.map(Transformed::Yes)
} else if let Some(aggregate_exec) = plan_any.downcast_ref::<AggregateExec>() {
if !parent_required.is_empty() {
match aggregate_exec.mode() {
AggregateMode::FinalPartitioned => reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
aggregate_exec,
)
.map(Transformed::Yes),
_ => Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
))),
),
_ => Ok(PlanWithKeyRequirements::new(requirements.plan)),
}
} else {
// Keep everything unchanged
Ok(Transformed::No(requirements))
Ok(requirements)
}
} else if let Some(proj) = plan_any.downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
Expand All @@ -401,26 +392,22 @@ fn adjust_input_keys_ordering(
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
requirements.children[0].required_key_ordering = new_required;
Ok(Transformed::Yes(requirements))
Ok(requirements)
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
} else {
// By default, push down the parent requirements to children
requirements.children.iter_mut().for_each(|child| {
child.required_key_ordering = parent_required.clone();
});
Ok(Transformed::Yes(requirements))
Ok(requirements)
}
}

Expand Down Expand Up @@ -1139,11 +1126,11 @@ fn add_sort_preserving_partitions(
fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
) -> Result<DistributionContext> {
let dist_context = dist_context.update_children()?;

if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
return Ok(dist_context);
}

let target_partitions = config.execution.target_partitions;
Expand Down Expand Up @@ -1333,7 +1320,7 @@ fn ensure_distribution(
children_nodes,
};

Ok(Transformed::Yes(new_distribution_context))
Ok(new_distribution_context)
}

/// A struct to keep track of distribution changing operators
Expand Down Expand Up @@ -1395,7 +1382,7 @@ impl DistributionContext {
.collect::<Vec<_>>();

Ok(Self {
plan: with_new_children_if_necessary(self.plan, children_plans)?.into(),
plan: with_new_children_if_necessary(self.plan, children_plans)?,
distribution_connection: false,
children_nodes: self.children_nodes,
})
Expand All @@ -1420,8 +1407,7 @@ impl TreeNode for DistributionContext {
self.plan = with_new_children_if_necessary(
self.plan,
self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
)?;
}
Ok(self)
}
Expand Down Expand Up @@ -1484,8 +1470,7 @@ impl TreeNode for PlanWithKeyRequirements {
self.plan = with_new_children_if_necessary(
self.plan,
self.children.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
)?;
}
Ok(self)
}
Expand Down Expand Up @@ -1912,7 +1897,7 @@ pub(crate) mod tests {
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
ensure_distribution(distribution_context, &config).map(|item| item.plan)
}

/// Test whether plan matches with expected plan
Expand Down
Loading

0 comments on commit 6094565

Please sign in to comment.