Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Transformed enum #8835

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
Expand Down Expand Up @@ -95,12 +95,9 @@ impl MyAnalyzerRule {
Ok(match plan {
LogicalPlan::Filter(filter) => {
let predicate = Self::analyze_expr(filter.predicate.clone())?;
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
)?))
LogicalPlan::Filter(Filter::try_new(predicate, filter.input)?)
}
_ => Transformed::No(plan),
_ => plan,
})
})
}
Expand All @@ -111,11 +108,9 @@ impl MyAnalyzerRule {
Ok(match expr {
Expr::Literal(ScalarValue::Int64(i)) => {
// transform to UInt64
Transformed::Yes(Expr::Literal(ScalarValue::UInt64(
i.map(|i| i as u64),
)))
Expr::Literal(ScalarValue::UInt64(i.map(|i| i as u64)))
}
_ => Transformed::No(expr),
_ => expr,
})
})
}
Expand Down Expand Up @@ -175,12 +170,12 @@ fn my_rewrite(expr: Expr) -> Result<Expr> {
let low: Expr = *low;
let high: Expr = *high;
if negated {
Transformed::Yes(expr.clone().lt(low).or(expr.gt(high)))
expr.clone().lt(low).or(expr.gt(high))
} else {
Transformed::Yes(expr.clone().gt_eq(low).and(expr.lt_eq(high)))
expr.clone().gt_eq(low).and(expr.lt_eq(high))
}
}
_ => Transformed::No(expr),
_ => expr,
})
})
}
Expand Down
41 changes: 9 additions & 32 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub trait TreeNode: Sized + Clone {
/// The default tree traversal direction is transform_up(Postorder Traversal).
fn transform<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
self.transform_up(op)
}
Expand All @@ -123,9 +123,9 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
let after_op = op(self)?.into();
let after_op = op(self)?;
after_op.map_children(|node| node.transform_down(op))
}

Expand All @@ -134,9 +134,9 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
F: FnMut(Self) -> Result<Self>,
{
let after_op = op(self)?.into();
let after_op = op(self)?;
after_op.map_children(|node| node.transform_down_mut(op))
}

Expand All @@ -145,11 +145,11 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up<F>(self, op: &F) -> Result<Self>
where
F: Fn(Self) -> Result<Transformed<Self>>,
F: Fn(Self) -> Result<Self>,
{
let after_op_children = self.map_children(|node| node.transform_up(op))?;

let new_node = op(after_op_children)?.into();
let new_node = op(after_op_children)?;
Ok(new_node)
}

Expand All @@ -158,11 +158,11 @@ pub trait TreeNode: Sized + Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
F: FnMut(Self) -> Result<Self>,
{
let after_op_children = self.map_children(|node| node.transform_up_mut(op))?;

let new_node = op(after_op_children)?.into();
let new_node = op(after_op_children)?;
Ok(new_node)
}

Expand Down Expand Up @@ -314,29 +314,6 @@ pub enum VisitRecursion {
Stop,
}

pub enum Transformed<T> {
/// The item was transformed / rewritten somehow
Yes(T),
/// The item was not transformed
No(T),
}

impl<T> Transformed<T> {
pub fn into(self) -> T {
match self {
Transformed::Yes(t) => t,
Transformed::No(t) => t,
}
}

pub fn into_pair(self) -> (T, bool) {
match self {
Transformed::Yes(t) => (t, true),
Transformed::No(t) => (t, false),
}
}
}

/// Helper trait for implementing [`TreeNode`] that have children stored as Arc's
///
/// If some trait object, such as `dyn T`, implements this trait,
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
repartition::RepartitionExec, Partitioning,
},
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use std::sync::Arc;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
Expand Down Expand Up @@ -71,12 +71,9 @@ impl PhysicalOptimizerRule for CoalesceBatches {
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::Yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))
Ok(Arc::new(CoalesceBatchesExec::new(plan, target_batch_size)))
} else {
Ok(Transformed::No(plan))
Ok(plan)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro
use crate::physical_plan::ExecutionPlan;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};

Expand Down Expand Up @@ -109,9 +109,9 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
});

Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
transformed
} else {
Transformed::No(plan)
plan
})
})
}
Expand Down Expand Up @@ -185,9 +185,9 @@ fn discard_column_index(group_expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalEx
None => None,
};
Ok(if let Some(normalized_form) = normalized_form {
Transformed::Yes(normalized_form)
normalized_form
} else {
Transformed::No(expr)
expr
})
})
.unwrap_or(group_expr)
Expand Down
55 changes: 20 additions & 35 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -205,9 +205,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
adjusted.plan
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Ok(Transformed::Yes(reorder_join_keys_to_inputs(plan)?))
})?
plan.transform_up(&reorder_join_keys_to_inputs)?
};

let distribution_context = DistributionContext::new(adjusted);
Expand Down Expand Up @@ -272,7 +270,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
///
fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
) -> Result<PlanWithKeyRequirements> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();

Expand Down Expand Up @@ -309,7 +307,6 @@ fn adjust_input_keys_ordering(
vec![],
&join_constructor,
)
.map(Transformed::Yes)
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
Expand All @@ -329,13 +326,11 @@ fn adjust_input_keys_ordering(
// Push down requirements to the right side
requirements.children[1].required_key_ordering =
new_right_request.unwrap_or(vec![]);
Ok(Transformed::Yes(requirements))
Ok(requirements)
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
Expand All @@ -345,7 +340,7 @@ fn adjust_input_keys_ordering(
// Push down requirements to the right side
requirements.children[1].required_key_ordering =
shift_right_required(&parent_required, left_columns_len).unwrap_or_default();
Ok(Transformed::Yes(requirements))
Ok(requirements)
} else if let Some(SortMergeJoinExec {
left,
right,
Expand Down Expand Up @@ -375,23 +370,19 @@ fn adjust_input_keys_ordering(
sort_options.clone(),
&join_constructor,
)
.map(Transformed::Yes)
} else if let Some(aggregate_exec) = plan_any.downcast_ref::<AggregateExec>() {
if !parent_required.is_empty() {
match aggregate_exec.mode() {
AggregateMode::FinalPartitioned => reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
aggregate_exec,
)
.map(Transformed::Yes),
_ => Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
))),
),
_ => Ok(PlanWithKeyRequirements::new(requirements.plan)),
}
} else {
// Keep everything unchanged
Ok(Transformed::No(requirements))
Ok(requirements)
}
} else if let Some(proj) = plan_any.downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
Expand All @@ -401,26 +392,22 @@ fn adjust_input_keys_ordering(
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
requirements.children[0].required_key_ordering = new_required;
Ok(Transformed::Yes(requirements))
Ok(requirements)
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
Ok(Transformed::Yes(PlanWithKeyRequirements::new(
requirements.plan,
)))
Ok(PlanWithKeyRequirements::new(requirements.plan))
} else {
// By default, push down the parent requirements to children
requirements.children.iter_mut().for_each(|child| {
child.required_key_ordering = parent_required.clone();
});
Ok(Transformed::Yes(requirements))
Ok(requirements)
}
}

Expand Down Expand Up @@ -1139,11 +1126,11 @@ fn add_sort_preserving_partitions(
fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
) -> Result<DistributionContext> {
let dist_context = dist_context.update_children()?;

if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
return Ok(dist_context);
}

let target_partitions = config.execution.target_partitions;
Expand Down Expand Up @@ -1333,7 +1320,7 @@ fn ensure_distribution(
children_nodes,
};

Ok(Transformed::Yes(new_distribution_context))
Ok(new_distribution_context)
}

/// A struct to keep track of distribution changing operators
Expand Down Expand Up @@ -1395,7 +1382,7 @@ impl DistributionContext {
.collect::<Vec<_>>();

Ok(Self {
plan: with_new_children_if_necessary(self.plan, children_plans)?.into(),
plan: with_new_children_if_necessary(self.plan, children_plans)?,
distribution_connection: false,
children_nodes: self.children_nodes,
})
Expand All @@ -1420,8 +1407,7 @@ impl TreeNode for DistributionContext {
self.plan = with_new_children_if_necessary(
self.plan,
self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
)?;
}
Ok(self)
}
Expand Down Expand Up @@ -1484,8 +1470,7 @@ impl TreeNode for PlanWithKeyRequirements {
self.plan = with_new_children_if_necessary(
self.plan,
self.children.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
)?;
}
Ok(self)
}
Expand Down Expand Up @@ -1912,7 +1897,7 @@ pub(crate) mod tests {
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
ensure_distribution(distribution_context, &config).map(|item| item.plan)
}

/// Test whether plan matches with expected plan
Expand Down
Loading