Skip to content

Commit

Permalink
Simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 16, 2024
1 parent 4233028 commit 410f44a
Showing 1 changed file with 23 additions and 29 deletions.
52 changes: 23 additions & 29 deletions datafusion/core/src/physical_optimizer/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use datafusion_common::DataFusionError;
use datafusion_common::{internal_err, JoinSide, JoinType};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{Partitioning, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_expr::{LexOrdering, Partitioning, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand Down Expand Up @@ -671,13 +671,8 @@ impl ProjectionOptimizer {
// Rewrite the hashed expressions if there is any with possibly updated column indices.
let new_partitioning =
if let Partitioning::Hash(exprs, size) = repartition.partitioning() {
Partitioning::Hash(
exprs
.iter()
.map(|expr| update_column_index(expr, &schema_mapping))
.collect::<Vec<_>>(),
*size,
)
let updated_exprs = update_exprs(exprs, &schema_mapping);
Partitioning::Hash(updated_exprs, *size)
} else {
repartition.partitioning().clone()
};
Expand Down Expand Up @@ -711,14 +706,7 @@ impl ProjectionOptimizer {
} else {
let (new_child, schema_mapping) = self.insert_projection(requirement_map)?;
// Rewrite the sort expressions with possibly updated column indices.
let new_sort_exprs = sort
.expr()
.iter()
.map(|sort_expr| PhysicalSortExpr {
expr: update_column_index(&sort_expr.expr, &schema_mapping),
options: sort_expr.options,
})
.collect::<Vec<_>>();
let new_sort_exprs = update_sort_exprs(sort.expr(), &schema_mapping);
let plan = Arc::new(
SortExec::new(new_sort_exprs, new_child.plan.clone())
.with_preserve_partitioning(sort.preserve_partitioning())
Expand Down Expand Up @@ -754,14 +742,7 @@ impl ProjectionOptimizer {
} else {
let (new_child, schema_mapping) = self.insert_projection(requirement_map)?;
// Rewrite the sort expressions with possibly updated column indices.
let new_sort_exprs = sortp_merge
.expr()
.iter()
.map(|sort_expr| PhysicalSortExpr {
expr: update_column_index(&sort_expr.expr, &schema_mapping),
options: sort_expr.options,
})
.collect::<Vec<_>>();
let new_sort_exprs = update_sort_exprs(sortp_merge.expr(), &schema_mapping);
let plan = Arc::new(
SortPreservingMergeExec::new(new_sort_exprs, new_child.plan.clone())
.with_fetch(sortp_merge.fetch()),
Expand Down Expand Up @@ -2050,12 +2031,9 @@ impl ProjectionOptimizer {
.zip(window_usage.clone())
.filter(|(_window_expr, (_window_col, usage))| *usage)
.map(|(window_expr, (_window_col, _usage))| {
let new_exprs = update_exprs(&window_expr.expressions(), &schema_mapping);
window_expr.clone().with_new_expressions(
window_expr
.expressions()
.iter()
.map(|expr| update_column_index(expr, &schema_mapping))
.collect(),
new_exprs,
)
})
.collect::<Option<Vec<_>>>()
Expand Down Expand Up @@ -3272,6 +3250,22 @@ fn update_expr(
new_expr.map(|e| (state == RewriteState::RewrittenValid).then_some(e))
}

fn update_sort_exprs(sort_exprs: &[PhysicalSortExpr], mapping: &HashMap<Column, Column>) -> LexOrdering{
sort_exprs
.iter()
.map(|sort_expr| PhysicalSortExpr {
expr: update_column_index(&sort_expr.expr, mapping),
options: sort_expr.options,
})
.collect::<Vec<_>>()
}
fn update_exprs(exprs: &[Arc<dyn PhysicalExpr>], mapping: &HashMap<Column, Column>) -> Vec<Arc<dyn PhysicalExpr>> {
exprs
.iter()
.map(|expr| update_column_index(expr, mapping))
.collect::<Vec<_>>()
}

/// Given mapping representing the initial and new index values,
/// it updates the indices of columns in the [`PhysicalExpr`].
fn update_column_index(
Expand Down

0 comments on commit 410f44a

Please sign in to comment.