From 826d51faeefc5f20a1a4cb3976fe342dc83218ab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 May 2024 10:34:13 -0400 Subject: [PATCH 1/3] Add `LogicalPlan::recompute_schema` for handling rewrite passes --- datafusion/expr/src/logical_plan/plan.rs | 195 +++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 23f5280377a3..776238952dd7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -52,6 +52,7 @@ use datafusion_common::{ // backwards compatibility use crate::display::PgJsonVisitor; +use crate::logical_plan::tree_node::unwrap_arc; pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; pub use datafusion_common::{JoinConstraint, JoinType}; @@ -467,6 +468,200 @@ impl LogicalPlan { self.with_new_exprs(self.expressions(), inputs.to_vec()) } + /// Recomputes schema and type information for this LogicalPlan if needed. + /// + /// Some `LogicalPlan`s may need to recompute their schema if the number or + /// type of expressions have been changed (for example due to type + /// coercion). For example [`LogicalPlan::Projection`]s schema depends on + /// its expressions. + /// + /// Some `LogicalPlan`s schema is unaffected by any changes to their + /// expressions. For example [`LogicalPlan::Filter`] schema is always the + /// same as its input schema. + /// + /// # Return value + /// Returns an error if there is some issue recomputing the schema. + /// + /// # Notes + /// + /// * Does not recursively recompute schema for input (child) plans. + pub fn recompute_schema(self) -> Result { + match self { + // Since expr may be different than the previous expr, schema of the projection + // may change. We need to use try_new method instead of try_new_with_schema method. + LogicalPlan::Projection(Projection { + expr, + input, + schema: _, + }) => Projection::try_new(expr, input).map(LogicalPlan::Projection), + LogicalPlan::Dml(_) => Ok(self), + LogicalPlan::Copy(_) => Ok(self), + LogicalPlan::Values(Values { schema, values }) => { + // todo it isn't clear why the schema is not recomputed here + Ok(LogicalPlan::Values(Values { schema, values })) + } + LogicalPlan::Filter(Filter { predicate, input }) => { + // todo: should this logic be moved to Filter::try_new? + + // filter predicates should not contain aliased expressions so we remove any aliases + // before this logic was added we would have aliases within filters such as for + // benchmark q6: + // + // lineitem.l_shipdate >= Date32(\"8766\") + // AND lineitem.l_shipdate < Date32(\"9131\") + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= + // Decimal128(Some(49999999999999),30,15) + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= + // Decimal128(Some(69999999999999),30,15) + // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) + + let predicate = predicate + .transform_down(|expr| { + match expr { + Expr::Exists { .. } + | Expr::ScalarSubquery(_) + | Expr::InSubquery(_) => { + // subqueries could contain aliases so we don't recurse into those + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } + Expr::Alias(_) => Ok(Transformed::new( + expr.unalias(), + true, + TreeNodeRecursion::Jump, + )), + _ => Ok(Transformed::no(expr)), + } + }) + .data()?; + + Filter::try_new(predicate, input).map(LogicalPlan::Filter) + } + LogicalPlan::Repartition(_) => Ok(self), + LogicalPlan::Window(Window { + input, + window_expr, + schema: _, + }) => Window::try_new(window_expr, input).map(LogicalPlan::Window), + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema: _, + }) => Aggregate::try_new(input, group_expr, aggr_expr) + .map(LogicalPlan::Aggregate), + LogicalPlan::Sort(_) => Ok(self), + LogicalPlan::Join(Join { + left, + right, + filter, + join_type, + join_constraint, + on, + schema: _, + null_equals_null, + }) => { + let schema = + build_join_schema(left.schema(), right.schema(), &join_type)?; + + let new_on: Vec<_> = on + .into_iter() + .map(|equi_expr| { + // SimplifyExpression rule may add alias to the equi_expr. + (equi_expr.0.unalias(), equi_expr.1.unalias()) + }) + .collect(); + + Ok(LogicalPlan::Join(Join { + left, + right, + join_type, + join_constraint, + on: new_on, + filter, + schema: DFSchemaRef::new(schema), + null_equals_null, + })) + } + LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema: _, + }) => { + let join_schema = + build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?; + + Ok(LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema: join_schema.into(), + })) + } + LogicalPlan::Subquery(_) => Ok(self), + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema: _, + }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias), + LogicalPlan::Limit(_) => Ok(self), + LogicalPlan::Ddl(_) => Ok(self), + LogicalPlan::Extension(Extension { node }) => { + // todo make an API that does not require cloning + // This requires a copy of the extension nodes expressions and inputs + let expr = node.expressions(); + let inputs: Vec<_> = node.inputs().into_iter().cloned().collect(); + Ok(LogicalPlan::Extension(Extension { + node: node.from_template(&expr, &inputs), + })) + } + LogicalPlan::Union(Union { inputs, schema }) => { + let input_schema = inputs[0].schema(); + // If inputs are not pruned do not change schema + // TODO this seems wrong (shouldn't we always use the schema of the input?) + let schema = if schema.fields().len() == input_schema.fields().len() { + schema.clone() + } else { + input_schema.clone() + }; + Ok(LogicalPlan::Union(Union { inputs, schema })) + } + LogicalPlan::Distinct(distinct) => { + let distinct = match distinct { + Distinct::All(input) => Distinct::All(input), + Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema: _, + }) => Distinct::On(DistinctOn::try_new( + on_expr, + select_expr, + sort_expr, + input, + )?), + }; + Ok(LogicalPlan::Distinct(distinct)) + } + LogicalPlan::RecursiveQuery(_) => Ok(self), + LogicalPlan::Analyze(_) => Ok(self), + LogicalPlan::Explain(_) => Ok(self), + LogicalPlan::Prepare(_) => Ok(self), + LogicalPlan::TableScan(_) => Ok(self), + LogicalPlan::EmptyRelation(_) => Ok(self), + LogicalPlan::Statement(_) => Ok(self), + LogicalPlan::DescribeTable(_) => Ok(self), + LogicalPlan::Unnest(Unnest { + input, + columns, + schema: _, + options, + }) => { + // Update schema with unnested column type. + unnest_with_options(unwrap_arc(input), columns, options) + } + } + } + /// Returns a new `LogicalPlan` based on `self` with inputs and /// expressions replaced. /// From 5ed976bd56c1ea411a288c1ce01a1cc40de2fe2e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 May 2024 12:44:25 -0400 Subject: [PATCH 2/3] Stop copying LogicalPlan and Exprs in `TypeCoercion` --- .../optimizer/src/analyzer/type_coercion.rs | 125 ++++++++++++------ 1 file changed, 88 insertions(+), 37 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 10479f29a583..9aa226853cb2 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, IntervalUnit}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, @@ -31,8 +31,8 @@ use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, ScalarFunction, WindowFunction, }; -use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::expr_schema::cast_subquery; +use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::Subquery; use datafusion_expr::type_coercion::binary::{ comparison_coercion, get_input_types, like_coercion, @@ -51,6 +51,7 @@ use datafusion_expr::{ }; use crate::analyzer::AnalyzerRule; +use crate::utils::NamePreserver; #[derive(Default)] pub struct TypeCoercion {} @@ -67,26 +68,28 @@ impl AnalyzerRule for TypeCoercion { } fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - analyze_internal(&DFSchema::empty(), &plan) + let empty_schema = DFSchema::empty(); + + let transformed_plan = plan + .transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))? + .data; + + Ok(transformed_plan) } } +/// use the external schema to handle the correlated subqueries case +/// +/// Assumes that children have already been optimized fn analyze_internal( - // use the external schema to handle the correlated subqueries case external_schema: &DFSchema, - plan: &LogicalPlan, -) -> Result { - // optimize child plans first - let new_inputs = plan - .inputs() - .iter() - .map(|p| analyze_internal(external_schema, p)) - .collect::>>()?; + plan: LogicalPlan, +) -> Result> { // get schema representing all available input fields. This is used for data type // resolution only, so order does not matter here - let mut schema = merge_schema(new_inputs.iter().collect()); + let mut schema = merge_schema(plan.inputs()); - if let LogicalPlan::TableScan(ts) = plan { + if let LogicalPlan::TableScan(ts) = &plan { let source_schema = DFSchema::try_from_qualified_schema( ts.table_name.clone(), &ts.source.schema(), @@ -99,25 +102,75 @@ fn analyze_internal( // select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3) schema.merge(external_schema); - let mut expr_rewrite = TypeCoercionRewriter { schema: &schema }; - - let new_expr = plan - .expressions() - .into_iter() - .map(|expr| { - // ensure aggregate names don't change: - // https://github.com/apache/datafusion/issues/3555 - rewrite_preserving_name(expr, &mut expr_rewrite) - }) - .collect::>>()?; - - plan.with_new_exprs(new_expr, new_inputs) + let mut expr_rewrite = TypeCoercionRewriter::new(&schema); + + let name_preserver = NamePreserver::new(&plan); + // apply coercion rewrite all expressions in the plan indivdually + plan.map_expressions(|expr| { + let original_name = name_preserver.save(&expr)?; + expr.rewrite(&mut expr_rewrite)? + .map_data(|expr| original_name.restore(expr)) + })? + // coerce join expressions specially + .map_data(|plan| expr_rewrite.coerce_joins(plan))? + // recompute the schema after the expressions have been rewritten as the types may have changed + .map_data(|plan| plan.recompute_schema()) } pub(crate) struct TypeCoercionRewriter<'a> { pub(crate) schema: &'a DFSchema, } +impl<'a> TypeCoercionRewriter<'a> { + fn new(schema: &'a DFSchema) -> Self { + Self { schema } + } + + /// Coerce join equality expressions + /// + /// Joins must be treated specially as their equality expressions are stored + /// as a parallel list of left and right expressions, rather than a single + /// equality expression + /// + /// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored + /// as a list of `(t1.a, t2.b), (t1.x, t2.y)` + fn coerce_joins(&mut self, plan: LogicalPlan) -> Result { + let LogicalPlan::Join(mut join) = plan else { + return Ok(plan); + }; + + join.on = join + .on + .into_iter() + .map(|(lhs, rhs)| { + // coerce the arguments as though they were a single binary equality + // expression + let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?; + Ok((lhs, rhs)) + }) + .collect::>>()?; + + Ok(LogicalPlan::Join(join)) + } + + fn coerce_binary_op( + &self, + left: Expr, + op: Operator, + right: Expr, + ) -> Result<(Expr, Expr)> { + let (left_type, right_type) = get_input_types( + &left.get_type(self.schema)?, + &op, + &right.get_type(self.schema)?, + )?; + Ok(( + left.cast_to(&left_type, self.schema)?, + right.cast_to(&right_type, self.schema)?, + )) + } +} + impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { type Node = Expr; @@ -130,14 +183,15 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, outer_ref_columns, }) => { - let new_plan = analyze_internal(self.schema, &subquery)?; + let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data; Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, }))) } Expr::Exists(Exists { subquery, negated }) => { - let new_plan = analyze_internal(self.schema, &subquery.subquery)?; + let new_plan = + analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; Ok(Transformed::yes(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(new_plan), @@ -151,7 +205,8 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, negated, }) => { - let new_plan = analyze_internal(self.schema, &subquery.subquery)?; + let new_plan = + analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; let expr_type = expr.get_type(self.schema)?; let subquery_type = new_plan.schema().field(0).data_type(); let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!( @@ -220,15 +275,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { )))) } Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - let (left_type, right_type) = get_input_types( - &left.get_type(self.schema)?, - &op, - &right.get_type(self.schema)?, - )?; + let (left, right) = self.coerce_binary_op(*left, op, *right)?; Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( - Box::new(left.cast_to(&left_type, self.schema)?), + Box::new(left), op, - Box::new(right.cast_to(&right_type, self.schema)?), + Box::new(right), )))) } Expr::Between(Between { From 41ecf4b77a9497807bc280f98aea0d7de4a3b641 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 May 2024 11:40:12 -0400 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Oleks V --- datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index ea214c5c7a09..5faef21de230 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -104,7 +104,7 @@ fn analyze_internal( let mut expr_rewrite = TypeCoercionRewriter::new(&schema); let name_preserver = NamePreserver::new(&plan); - // apply coercion rewrite all expressions in the plan indivdually + // apply coercion rewrite all expressions in the plan individually plan.map_expressions(|expr| { let original_name = name_preserver.save(&expr)?; expr.rewrite(&mut expr_rewrite)?