Skip to content

Commit

Permalink
Add LogicalPlan::recompute_schema for handling rewrite passes
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 7, 2024
1 parent 80be0b0 commit 826d51f
Showing 1 changed file with 195 additions and 0 deletions.
195 changes: 195 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_common::{

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -467,6 +468,200 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// Recomputes schema and type information for this LogicalPlan if needed.
///
/// Some `LogicalPlan`s may need to recompute their schema if the number or
/// type of expressions have been changed (for example due to type
/// coercion). For example [`LogicalPlan::Projection`]s schema depends on
/// its expressions.
///
/// Some `LogicalPlan`s schema is unaffected by any changes to their
/// expressions. For example [`LogicalPlan::Filter`] schema is always the
/// same as its input schema.
///
/// # Return value
/// Returns an error if there is some issue recomputing the schema.
///
/// # Notes
///
/// * Does not recursively recompute schema for input (child) plans.
pub fn recompute_schema(self) -> Result<Self> {
match self {
// Since expr may be different than the previous expr, schema of the projection
// may change. We need to use try_new method instead of try_new_with_schema method.
LogicalPlan::Projection(Projection {
expr,
input,
schema: _,
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
LogicalPlan::Dml(_) => Ok(self),
LogicalPlan::Copy(_) => Ok(self),
LogicalPlan::Values(Values { schema, values }) => {
// todo it isn't clear why the schema is not recomputed here
Ok(LogicalPlan::Values(Values { schema, values }))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
// todo: should this logic be moved to Filter::try_new?

// filter predicates should not contain aliased expressions so we remove any aliases
// before this logic was added we would have aliases within filters such as for
// benchmark q6:
//
// lineitem.l_shipdate >= Date32(\"8766\")
// AND lineitem.l_shipdate < Date32(\"9131\")
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
// Decimal128(Some(49999999999999),30,15)
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
// Decimal128(Some(69999999999999),30,15)
// AND lineitem.l_quantity < Decimal128(Some(2400),15,2)

let predicate = predicate
.transform_down(|expr| {
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
Expr::Alias(_) => Ok(Transformed::new(
expr.unalias(),
true,
TreeNodeRecursion::Jump,
)),
_ => Ok(Transformed::no(expr)),
}
})
.data()?;

Filter::try_new(predicate, input).map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(_) => Ok(self),
LogicalPlan::Window(Window {
input,
window_expr,
schema: _,
}) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
schema: _,
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
LogicalPlan::Join(Join {
left,
right,
filter,
join_type,
join_constraint,
on,
schema: _,
null_equals_null,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;

let new_on: Vec<_> = on
.into_iter()
.map(|equi_expr| {
// SimplifyExpression rule may add alias to the equi_expr.
(equi_expr.0.unalias(), equi_expr.1.unalias())
})
.collect();

Ok(LogicalPlan::Join(Join {
left,
right,
join_type,
join_constraint,
on: new_on,
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: _,
}) => {
let join_schema =
build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;

Ok(LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: join_schema.into(),
}))
}
LogicalPlan::Subquery(_) => Ok(self),
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
alias,
schema: _,
}) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
LogicalPlan::Limit(_) => Ok(self),
LogicalPlan::Ddl(_) => Ok(self),
LogicalPlan::Extension(Extension { node }) => {
// todo make an API that does not require cloning
// This requires a copy of the extension nodes expressions and inputs
let expr = node.expressions();
let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
Ok(LogicalPlan::Extension(Extension {
node: node.from_template(&expr, &inputs),
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema
// TODO this seems wrong (shouldn't we always use the schema of the input?)
let schema = if schema.fields().len() == input_schema.fields().len() {
schema.clone()
} else {
input_schema.clone()
};
Ok(LogicalPlan::Union(Union { inputs, schema }))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(input) => Distinct::All(input),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
input,
schema: _,
}) => Distinct::On(DistinctOn::try_new(
on_expr,
select_expr,
sort_expr,
input,
)?),
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::RecursiveQuery(_) => Ok(self),
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
LogicalPlan::DescribeTable(_) => Ok(self),
LogicalPlan::Unnest(Unnest {
input,
columns,
schema: _,
options,
}) => {
// Update schema with unnested column type.
unnest_with_options(unwrap_arc(input), columns, options)
}
}
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
Expand Down

0 comments on commit 826d51f

Please sign in to comment.