Skip to content

Commit

Permalink
Simplify try_projection_insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak committed May 6, 2024
1 parent 2c7af01 commit c5d8d97
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions datafusion/core/src/physical_optimizer/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,66 +574,65 @@ impl ProjectionOptimizer {

// These plans preserve the input schema, and do not add new requirements.
if is_plan_schema_agnostic(&plan) {
self = self.try_insert_below_schema_agnostic()?;
self.try_insert_below_schema_agnostic()
}
// ------------------------------------------------------------------------
// These plans also preserve the input schema, but may extend requirements.
else if is_plan_requirement_extender(&plan) {
self = self.try_insert_below_req_extender()?;
self.try_insert_below_req_extender()
}
// ------------------------------------------------------------------------
// Preserves schema and do not change requirements, but have multi-child.
else if plan.as_any().downcast_ref::<UnionExec>().is_some()
|| plan.as_any().downcast_ref::<InterleaveExec>().is_some()
{
self = self.try_insert_below_union()?;
self.try_insert_below_union()
}
// ------------------------------------------------------------------------
// Concatenates schemas and do not change requirements.
else if let Some(cj) = plan.as_any().downcast_ref::<CrossJoinExec>() {
self = self.try_insert_below_cross_join(cj)?
self.try_insert_below_cross_join(cj)
}
// ------------------------------------------------------------------------
// Joins and aggregations require special attention.
else if let Some(hj) = plan.as_any().downcast_ref::<HashJoinExec>() {
self = self.try_insert_below_hash_join(hj)?
self.try_insert_below_hash_join(hj)
} else if let Some(nlj) = plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
self = self.try_insert_below_nested_loop_join(nlj)?
self.try_insert_below_nested_loop_join(nlj)
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
self = self.try_insert_below_sort_merge_join(smj)?
self.try_insert_below_sort_merge_join(smj)
} else if let Some(shj) = plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
self = self.try_insert_below_symmetric_hash_join(shj)?
self.try_insert_below_symmetric_hash_join(shj)
} else if let Some(agg) = plan.as_any().downcast_ref::<AggregateExec>() {
if !is_agg_expr_rewritable(agg.aggr_expr()) {
self.children_nodes[0].required_columns =
collect_columns_in_plan_schema(&self.children_nodes[0].plan);
return Ok(self);
}
self = self.try_insert_below_aggregate(agg)?
self.try_insert_below_aggregate(agg)
} else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
|| plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
.is_some()
{
let window_exprs = collect_window_expressions(&self.plan);
self = match self.try_insert_below_window_execs(window_exprs)? {
optimized if optimized.transformed => optimized.data,
match self.try_insert_below_window_execs(window_exprs)? {
optimized if optimized.transformed => Ok(optimized.data),
mut no_change => {
no_change.data.children_nodes[0].required_columns =
collect_columns_in_plan_schema(
&no_change.data.children_nodes[0].plan,
);
return Ok(no_change.data);
Ok(no_change.data)
}
}
} else {
self.children_nodes.iter_mut().for_each(|c| {
c.required_columns = collect_columns_in_plan_schema(&c.plan)
});
return Ok(self);
Ok(self)
}
Ok(self)
}

fn try_insert_below_schema_agnostic(mut self) -> Result<ProjectionOptimizer> {
Expand Down

0 comments on commit c5d8d97

Please sign in to comment.