From 410f44a7b8016a581799499f5c6401309c21f27b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 16 Feb 2024 16:47:51 +0300 Subject: [PATCH] Simplifications --- .../optimize_projections.rs | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/optimize_projections.rs b/datafusion/core/src/physical_optimizer/optimize_projections.rs index 236d7fa94efb..98257b303c7f 100644 --- a/datafusion/core/src/physical_optimizer/optimize_projections.rs +++ b/datafusion/core/src/physical_optimizer/optimize_projections.rs @@ -69,7 +69,7 @@ use datafusion_common::DataFusionError; use datafusion_common::{internal_err, JoinSide, JoinType}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::collect_columns; -use datafusion_physical_expr::{Partitioning, PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_expr::{LexOrdering, Partitioning, PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -671,13 +671,8 @@ impl ProjectionOptimizer { // Rewrite the hashed expressions if there is any with possibly updated column indices. let new_partitioning = if let Partitioning::Hash(exprs, size) = repartition.partitioning() { - Partitioning::Hash( - exprs - .iter() - .map(|expr| update_column_index(expr, &schema_mapping)) - .collect::>(), - *size, - ) + let updated_exprs = update_exprs(exprs, &schema_mapping); + Partitioning::Hash(updated_exprs, *size) } else { repartition.partitioning().clone() }; @@ -711,14 +706,7 @@ impl ProjectionOptimizer { } else { let (new_child, schema_mapping) = self.insert_projection(requirement_map)?; // Rewrite the sort expressions with possibly updated column indices. - let new_sort_exprs = sort - .expr() - .iter() - .map(|sort_expr| PhysicalSortExpr { - expr: update_column_index(&sort_expr.expr, &schema_mapping), - options: sort_expr.options, - }) - .collect::>(); + let new_sort_exprs = update_sort_exprs(sort.expr(), &schema_mapping); let plan = Arc::new( SortExec::new(new_sort_exprs, new_child.plan.clone()) .with_preserve_partitioning(sort.preserve_partitioning()) @@ -754,14 +742,7 @@ impl ProjectionOptimizer { } else { let (new_child, schema_mapping) = self.insert_projection(requirement_map)?; // Rewrite the sort expressions with possibly updated column indices. - let new_sort_exprs = sortp_merge - .expr() - .iter() - .map(|sort_expr| PhysicalSortExpr { - expr: update_column_index(&sort_expr.expr, &schema_mapping), - options: sort_expr.options, - }) - .collect::>(); + let new_sort_exprs = update_sort_exprs(sortp_merge.expr(), &schema_mapping); let plan = Arc::new( SortPreservingMergeExec::new(new_sort_exprs, new_child.plan.clone()) .with_fetch(sortp_merge.fetch()), @@ -2050,12 +2031,9 @@ impl ProjectionOptimizer { .zip(window_usage.clone()) .filter(|(_window_expr, (_window_col, usage))| *usage) .map(|(window_expr, (_window_col, _usage))| { + let new_exprs = update_exprs(&window_expr.expressions(), &schema_mapping); window_expr.clone().with_new_expressions( - window_expr - .expressions() - .iter() - .map(|expr| update_column_index(expr, &schema_mapping)) - .collect(), + new_exprs, ) }) .collect::>>() @@ -3272,6 +3250,22 @@ fn update_expr( new_expr.map(|e| (state == RewriteState::RewrittenValid).then_some(e)) } +fn update_sort_exprs(sort_exprs: &[PhysicalSortExpr], mapping: &HashMap) -> LexOrdering{ + sort_exprs + .iter() + .map(|sort_expr| PhysicalSortExpr { + expr: update_column_index(&sort_expr.expr, mapping), + options: sort_expr.options, + }) + .collect::>() +} +fn update_exprs(exprs: &[Arc], mapping: &HashMap) -> Vec> { + exprs + .iter() + .map(|expr| update_column_index(expr, mapping)) + .collect::>() +} + /// Given mapping representing the initial and new index values, /// it updates the indices of columns in the [`PhysicalExpr`]. fn update_column_index(