From 826d51faeefc5f20a1a4cb3976fe342dc83218ab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 May 2024 10:34:13 -0400 Subject: [PATCH] Add `LogicalPlan::recompute_schema` for handling rewrite passes --- datafusion/expr/src/logical_plan/plan.rs | 195 +++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 23f5280377a3..776238952dd7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -52,6 +52,7 @@ use datafusion_common::{ // backwards compatibility use crate::display::PgJsonVisitor; +use crate::logical_plan::tree_node::unwrap_arc; pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; pub use datafusion_common::{JoinConstraint, JoinType}; @@ -467,6 +468,200 @@ impl LogicalPlan { self.with_new_exprs(self.expressions(), inputs.to_vec()) } + /// Recomputes schema and type information for this LogicalPlan if needed. + /// + /// Some `LogicalPlan`s may need to recompute their schema if the number or + /// type of expressions have been changed (for example due to type + /// coercion). For example [`LogicalPlan::Projection`]s schema depends on + /// its expressions. + /// + /// Some `LogicalPlan`s schema is unaffected by any changes to their + /// expressions. For example [`LogicalPlan::Filter`] schema is always the + /// same as its input schema. + /// + /// # Return value + /// Returns an error if there is some issue recomputing the schema. + /// + /// # Notes + /// + /// * Does not recursively recompute schema for input (child) plans. + pub fn recompute_schema(self) -> Result { + match self { + // Since expr may be different than the previous expr, schema of the projection + // may change. We need to use try_new method instead of try_new_with_schema method. + LogicalPlan::Projection(Projection { + expr, + input, + schema: _, + }) => Projection::try_new(expr, input).map(LogicalPlan::Projection), + LogicalPlan::Dml(_) => Ok(self), + LogicalPlan::Copy(_) => Ok(self), + LogicalPlan::Values(Values { schema, values }) => { + // todo it isn't clear why the schema is not recomputed here + Ok(LogicalPlan::Values(Values { schema, values })) + } + LogicalPlan::Filter(Filter { predicate, input }) => { + // todo: should this logic be moved to Filter::try_new? + + // filter predicates should not contain aliased expressions so we remove any aliases + // before this logic was added we would have aliases within filters such as for + // benchmark q6: + // + // lineitem.l_shipdate >= Date32(\"8766\") + // AND lineitem.l_shipdate < Date32(\"9131\") + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= + // Decimal128(Some(49999999999999),30,15) + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= + // Decimal128(Some(69999999999999),30,15) + // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) + + let predicate = predicate + .transform_down(|expr| { + match expr { + Expr::Exists { .. } + | Expr::ScalarSubquery(_) + | Expr::InSubquery(_) => { + // subqueries could contain aliases so we don't recurse into those + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } + Expr::Alias(_) => Ok(Transformed::new( + expr.unalias(), + true, + TreeNodeRecursion::Jump, + )), + _ => Ok(Transformed::no(expr)), + } + }) + .data()?; + + Filter::try_new(predicate, input).map(LogicalPlan::Filter) + } + LogicalPlan::Repartition(_) => Ok(self), + LogicalPlan::Window(Window { + input, + window_expr, + schema: _, + }) => Window::try_new(window_expr, input).map(LogicalPlan::Window), + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema: _, + }) => Aggregate::try_new(input, group_expr, aggr_expr) + .map(LogicalPlan::Aggregate), + LogicalPlan::Sort(_) => Ok(self), + LogicalPlan::Join(Join { + left, + right, + filter, + join_type, + join_constraint, + on, + schema: _, + null_equals_null, + }) => { + let schema = + build_join_schema(left.schema(), right.schema(), &join_type)?; + + let new_on: Vec<_> = on + .into_iter() + .map(|equi_expr| { + // SimplifyExpression rule may add alias to the equi_expr. + (equi_expr.0.unalias(), equi_expr.1.unalias()) + }) + .collect(); + + Ok(LogicalPlan::Join(Join { + left, + right, + join_type, + join_constraint, + on: new_on, + filter, + schema: DFSchemaRef::new(schema), + null_equals_null, + })) + } + LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema: _, + }) => { + let join_schema = + build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?; + + Ok(LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema: join_schema.into(), + })) + } + LogicalPlan::Subquery(_) => Ok(self), + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema: _, + }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias), + LogicalPlan::Limit(_) => Ok(self), + LogicalPlan::Ddl(_) => Ok(self), + LogicalPlan::Extension(Extension { node }) => { + // todo make an API that does not require cloning + // This requires a copy of the extension nodes expressions and inputs + let expr = node.expressions(); + let inputs: Vec<_> = node.inputs().into_iter().cloned().collect(); + Ok(LogicalPlan::Extension(Extension { + node: node.from_template(&expr, &inputs), + })) + } + LogicalPlan::Union(Union { inputs, schema }) => { + let input_schema = inputs[0].schema(); + // If inputs are not pruned do not change schema + // TODO this seems wrong (shouldn't we always use the schema of the input?) + let schema = if schema.fields().len() == input_schema.fields().len() { + schema.clone() + } else { + input_schema.clone() + }; + Ok(LogicalPlan::Union(Union { inputs, schema })) + } + LogicalPlan::Distinct(distinct) => { + let distinct = match distinct { + Distinct::All(input) => Distinct::All(input), + Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema: _, + }) => Distinct::On(DistinctOn::try_new( + on_expr, + select_expr, + sort_expr, + input, + )?), + }; + Ok(LogicalPlan::Distinct(distinct)) + } + LogicalPlan::RecursiveQuery(_) => Ok(self), + LogicalPlan::Analyze(_) => Ok(self), + LogicalPlan::Explain(_) => Ok(self), + LogicalPlan::Prepare(_) => Ok(self), + LogicalPlan::TableScan(_) => Ok(self), + LogicalPlan::EmptyRelation(_) => Ok(self), + LogicalPlan::Statement(_) => Ok(self), + LogicalPlan::DescribeTable(_) => Ok(self), + LogicalPlan::Unnest(Unnest { + input, + columns, + schema: _, + options, + }) => { + // Update schema with unnested column type. + unnest_with_options(unwrap_arc(input), columns, options) + } + } + } + /// Returns a new `LogicalPlan` based on `self` with inputs and /// expressions replaced. ///