diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 083ee230c785..9f2d90accc16 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -20,29 +20,32 @@ use crate::dml::CopyOptions; use crate::expr::{Alias, Exists, InSubquery, Placeholder}; use crate::expr_rewriter::create_col_from_scalar_expr; -use crate::expr_vec_fmt; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ - enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, from_plan, + enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, inspect_expr_pre, }; use crate::{ build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource, }; +use crate::{ + expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, LogicalPlanBuilder, Operator, +}; use super::dml::CopyTo; use super::DdlStatement; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeVisitor, VisitRecursion, + RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor, + VisitRecursion, }; use datafusion_common::{ - aggregate_functional_dependencies, internal_err, plan_err, Column, DFField, DFSchema, - DFSchemaRef, DataFusionError, FunctionalDependencies, OwnedTableReference, Result, - ScalarValue, UnnestOptions, + aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, + DFField, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies, + OwnedTableReference, Result, ScalarValue, UnnestOptions, }; // backwards compatibility pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; @@ -513,6 +516,7 @@ impl LogicalPlan { } } + /// Returns a copy of this `LogicalPlan` with the new inputs pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result { // with_new_inputs use original expression, // so we don't need to recompute Schema. @@ -544,10 +548,377 @@ impl LogicalPlan { aggr_expr.to_vec(), schema.clone(), )?)), - _ => from_plan(self, &self.expressions(), inputs), + _ => self.with_new_exprs(self.expressions(), inputs), } } + /// Returns a new `LogicalPlan` based on `self` with inputs and + /// expressions replaced. + /// + /// The exprs correspond to the same order of expressions returned + /// by [`Self::expressions`]. This function is used by optimizers + /// to rewrite plans using the following pattern: + /// + /// ```text + /// let new_inputs = optimize_children(..., plan, props); + /// + /// // get the plans expressions to optimize + /// let exprs = plan.expressions(); + /// + /// // potentially rewrite plan expressions + /// let rewritten_exprs = rewrite_exprs(exprs); + /// + /// // create new plan using rewritten_exprs in same position + /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs); + /// ``` + /// + /// Note: sometimes [`Self::with_new_exprs`] will use schema of + /// original plan, it will not change the scheam. Such as + /// `Projection/Aggregate/Window` + pub fn with_new_exprs( + &self, + mut expr: Vec, + inputs: &[LogicalPlan], + ) -> Result { + match self { + LogicalPlan::Projection(Projection { schema, .. }) => { + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr, + Arc::new(inputs[0].clone()), + schema.clone(), + )?)) + } + LogicalPlan::Dml(DmlStatement { + table_name, + table_schema, + op, + .. + }) => Ok(LogicalPlan::Dml(DmlStatement { + table_name: table_name.clone(), + table_schema: table_schema.clone(), + op: op.clone(), + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::Copy(CopyTo { + input: _, + output_url, + file_format, + copy_options, + single_file_output, + }) => Ok(LogicalPlan::Copy(CopyTo { + input: Arc::new(inputs[0].clone()), + output_url: output_url.clone(), + file_format: file_format.clone(), + single_file_output: *single_file_output, + copy_options: copy_options.clone(), + })), + LogicalPlan::Values(Values { schema, .. }) => { + Ok(LogicalPlan::Values(Values { + schema: schema.clone(), + values: expr + .chunks_exact(schema.fields().len()) + .map(|s| s.to_vec()) + .collect::>(), + })) + } + LogicalPlan::Filter { .. } => { + assert_eq!(1, expr.len()); + let predicate = expr.pop().unwrap(); + + // filter predicates should not contain aliased expressions so we remove any aliases + // before this logic was added we would have aliases within filters such as for + // benchmark q6: + // + // lineitem.l_shipdate >= Date32(\"8766\") + // AND lineitem.l_shipdate < Date32(\"9131\") + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= + // Decimal128(Some(49999999999999),30,15) + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= + // Decimal128(Some(69999999999999),30,15) + // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) + + struct RemoveAliases {} + + impl TreeNodeRewriter for RemoveAliases { + type N = Expr; + + fn pre_visit(&mut self, expr: &Expr) -> Result { + match expr { + Expr::Exists { .. } + | Expr::ScalarSubquery(_) + | Expr::InSubquery(_) => { + // subqueries could contain aliases so we don't recurse into those + Ok(RewriteRecursion::Stop) + } + Expr::Alias(_) => Ok(RewriteRecursion::Mutate), + _ => Ok(RewriteRecursion::Continue), + } + } + + fn mutate(&mut self, expr: Expr) -> Result { + Ok(expr.unalias()) + } + } + + let mut remove_aliases = RemoveAliases {}; + let predicate = predicate.rewrite(&mut remove_aliases)?; + + Ok(LogicalPlan::Filter(Filter::try_new( + predicate, + Arc::new(inputs[0].clone()), + )?)) + } + LogicalPlan::Repartition(Repartition { + partitioning_scheme, + .. + }) => match partitioning_scheme { + Partitioning::RoundRobinBatch(n) => { + Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::RoundRobinBatch(*n), + input: Arc::new(inputs[0].clone()), + })) + } + Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::Hash(expr, *n), + input: Arc::new(inputs[0].clone()), + })), + Partitioning::DistributeBy(_) => { + Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::DistributeBy(expr), + input: Arc::new(inputs[0].clone()), + })) + } + }, + LogicalPlan::Window(Window { + window_expr, + schema, + .. + }) => { + assert_eq!(window_expr.len(), expr.len()); + Ok(LogicalPlan::Window(Window { + input: Arc::new(inputs[0].clone()), + window_expr: expr, + schema: schema.clone(), + })) + } + LogicalPlan::Aggregate(Aggregate { + group_expr, schema, .. + }) => { + // group exprs are the first expressions + let agg_expr = expr.split_off(group_expr.len()); + + Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( + Arc::new(inputs[0].clone()), + expr, + agg_expr, + schema.clone(), + )?)) + } + LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort { + expr, + input: Arc::new(inputs[0].clone()), + fetch: *fetch, + })), + LogicalPlan::Join(Join { + join_type, + join_constraint, + on, + null_equals_null, + .. + }) => { + let schema = + build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; + + let equi_expr_count = on.len(); + assert!(expr.len() >= equi_expr_count); + + // Assume that the last expr, if any, + // is the filter_expr (non equality predicate from ON clause) + let filter_expr = if expr.len() > equi_expr_count { + expr.pop() + } else { + None + }; + + // The first part of expr is equi-exprs, + // and the struct of each equi-expr is like `left-expr = right-expr`. + assert_eq!(expr.len(), equi_expr_count); + let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr| { + // SimplifyExpression rule may add alias to the equi_expr. + let unalias_expr = equi_expr.clone().unalias(); + if let Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) = unalias_expr { + Ok((*left, *right)) + } else { + internal_err!( + "The front part expressions should be an binary equality expression, actual:{equi_expr}" + ) + } + }).collect::>>()?; + + Ok(LogicalPlan::Join(Join { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + join_type: *join_type, + join_constraint: *join_constraint, + on: new_on, + filter: filter_expr, + schema: DFSchemaRef::new(schema), + null_equals_null: *null_equals_null, + })) + } + LogicalPlan::CrossJoin(_) => { + let left = inputs[0].clone(); + let right = inputs[1].clone(); + LogicalPlanBuilder::from(left).cross_join(right)?.build() + } + LogicalPlan::Subquery(Subquery { + outer_ref_columns, .. + }) => { + let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?; + Ok(LogicalPlan::Subquery(Subquery { + subquery: Arc::new(subquery), + outer_ref_columns: outer_ref_columns.clone(), + })) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( + inputs[0].clone(), + alias.clone(), + )?)) + } + LogicalPlan::Limit(Limit { skip, fetch, .. }) => { + Ok(LogicalPlan::Limit(Limit { + skip: *skip, + fetch: *fetch, + input: Arc::new(inputs[0].clone()), + })) + } + LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + name, + if_not_exists, + or_replace, + .. + })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( + CreateMemoryTable { + input: Arc::new(inputs[0].clone()), + constraints: Constraints::empty(), + name: name.clone(), + if_not_exists: *if_not_exists, + or_replace: *or_replace, + }, + ))), + LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name, + or_replace, + definition, + .. + })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + input: Arc::new(inputs[0].clone()), + name: name.clone(), + or_replace: *or_replace, + definition: definition.clone(), + }))), + LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { + node: e.node.from_template(&expr, inputs), + })), + LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union { + inputs: inputs.iter().cloned().map(Arc::new).collect(), + schema: schema.clone(), + })), + LogicalPlan::Distinct(Distinct { .. }) => { + Ok(LogicalPlan::Distinct(Distinct { + input: Arc::new(inputs[0].clone()), + })) + } + LogicalPlan::Analyze(a) => { + assert!(expr.is_empty()); + assert_eq!(inputs.len(), 1); + Ok(LogicalPlan::Analyze(Analyze { + verbose: a.verbose, + schema: a.schema.clone(), + input: Arc::new(inputs[0].clone()), + })) + } + LogicalPlan::Explain(_) => { + // Explain should be handled specially in the optimizers; + // If this check cannot pass it means some optimizer pass is + // trying to optimize Explain directly + if expr.is_empty() { + return plan_err!("Invalid EXPLAIN command. Expression is empty"); + } + + if inputs.is_empty() { + return plan_err!("Invalid EXPLAIN command. Inputs are empty"); + } + + Ok(self.clone()) + } + LogicalPlan::Prepare(Prepare { + name, data_types, .. + }) => Ok(LogicalPlan::Prepare(Prepare { + name: name.clone(), + data_types: data_types.clone(), + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::TableScan(ts) => { + assert!(inputs.is_empty(), "{self:?} should have no inputs"); + Ok(LogicalPlan::TableScan(TableScan { + filters: expr, + ..ts.clone() + })) + } + LogicalPlan::EmptyRelation(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Statement(_) => { + // All of these plan types have no inputs / exprs so should not be called + assert!(expr.is_empty(), "{self:?} should have no exprs"); + assert!(inputs.is_empty(), "{self:?} should have no inputs"); + Ok(self.clone()) + } + LogicalPlan::DescribeTable(_) => Ok(self.clone()), + LogicalPlan::Unnest(Unnest { + column, + schema, + options, + .. + }) => { + // Update schema with unnested column type. + let input = Arc::new(inputs[0].clone()); + let nested_field = input.schema().field_from_column(column)?; + let unnested_field = schema.field_from_column(column)?; + let fields = input + .schema() + .fields() + .iter() + .map(|f| { + if f == nested_field { + unnested_field.clone() + } else { + f.clone() + } + }) + .collect::>(); + + let schema = Arc::new( + DFSchema::new_with_metadata( + fields, + input.schema().metadata().clone(), + )? + // We can use the existing functional dependencies as is: + .with_functional_dependencies( + input.schema().functional_dependencies().clone(), + ), + ); + + Ok(LogicalPlan::Unnest(Unnest { + input, + column: column.clone(), + schema, + options: options.clone(), + })) + } + } + } /// Convert a prepared [`LogicalPlan`] into its inner logical plan /// with all params replaced with their corresponding values pub fn with_param_values( @@ -751,7 +1122,7 @@ impl LogicalPlan { .map(|inp| inp.replace_params_with_values(param_values)) .collect::>>()?; - from_plan(self, &new_exprs, &new_inputs_with_values) + self.with_new_exprs(new_exprs, &new_inputs_with_values) } /// Walk the logical plan, find any `PlaceHolder` tokens, and return a map of their IDs and DataTypes diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index be48418cb848..cfe751d0969d 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -17,32 +17,19 @@ //! Expression utilities -use crate::dml::CopyTo; use crate::expr::{Alias, Sort, WindowFunction}; -use crate::logical_plan::builder::build_join_schema; -use crate::logical_plan::{ - Aggregate, Analyze, Distinct, Extension, Filter, Join, Limit, Partitioning, Prepare, - Projection, Repartition, Sort as SortPlan, Subquery, SubqueryAlias, Union, Unnest, - Values, Window, -}; +use crate::logical_plan::Aggregate; use crate::signature::{Signature, TypeSignature}; -use crate::{ - BinaryExpr, Cast, CreateMemoryTable, CreateView, DdlStatement, DmlStatement, Expr, - ExprSchemable, GroupingSet, LogicalPlan, LogicalPlanBuilder, Operator, TableScan, - TryCast, -}; +use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast}; use arrow::datatypes::{DataType, TimeUnit}; -use datafusion_common::tree_node::{ - RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion, -}; +use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{ - internal_err, plan_err, Column, Constraints, DFField, DFSchema, DFSchemaRef, - DataFusionError, Result, ScalarValue, TableReference, + internal_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, + Result, ScalarValue, TableReference, }; use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions}; use std::cmp::Ordering; use std::collections::HashSet; -use std::sync::Arc; /// The value to which `COUNT(*)` is expanded to in /// `COUNT()` expressions @@ -727,326 +714,13 @@ where /// /// Notice: sometimes [from_plan] will use schema of original plan, it don't change schema! /// Such as `Projection/Aggregate/Window` +#[deprecated(since = "31.0.0", note = "use LogicalPlan::with_new_exprs instead")] pub fn from_plan( plan: &LogicalPlan, expr: &[Expr], inputs: &[LogicalPlan], ) -> Result { - match plan { - LogicalPlan::Projection(Projection { schema, .. }) => { - Ok(LogicalPlan::Projection(Projection::try_new_with_schema( - expr.to_vec(), - Arc::new(inputs[0].clone()), - schema.clone(), - )?)) - } - LogicalPlan::Dml(DmlStatement { - table_name, - table_schema, - op, - .. - }) => Ok(LogicalPlan::Dml(DmlStatement { - table_name: table_name.clone(), - table_schema: table_schema.clone(), - op: op.clone(), - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Copy(CopyTo { - input: _, - output_url, - file_format, - single_file_output, - copy_options, - }) => Ok(LogicalPlan::Copy(CopyTo { - input: Arc::new(inputs[0].clone()), - output_url: output_url.clone(), - file_format: file_format.clone(), - single_file_output: *single_file_output, - copy_options: copy_options.clone(), - })), - LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { - schema: schema.clone(), - values: expr - .chunks_exact(schema.fields().len()) - .map(|s| s.to_vec()) - .collect::>(), - })), - LogicalPlan::Filter { .. } => { - assert_eq!(1, expr.len()); - let predicate = expr[0].clone(); - - // filter predicates should not contain aliased expressions so we remove any aliases - // before this logic was added we would have aliases within filters such as for - // benchmark q6: - // - // lineitem.l_shipdate >= Date32(\"8766\") - // AND lineitem.l_shipdate < Date32(\"9131\") - // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= - // Decimal128(Some(49999999999999),30,15) - // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= - // Decimal128(Some(69999999999999),30,15) - // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) - - struct RemoveAliases {} - - impl TreeNodeRewriter for RemoveAliases { - type N = Expr; - - fn pre_visit(&mut self, expr: &Expr) -> Result { - match expr { - Expr::Exists { .. } - | Expr::ScalarSubquery(_) - | Expr::InSubquery(_) => { - // subqueries could contain aliases so we don't recurse into those - Ok(RewriteRecursion::Stop) - } - Expr::Alias(_) => Ok(RewriteRecursion::Mutate), - _ => Ok(RewriteRecursion::Continue), - } - } - - fn mutate(&mut self, expr: Expr) -> Result { - Ok(expr.unalias()) - } - } - - let mut remove_aliases = RemoveAliases {}; - let predicate = predicate.rewrite(&mut remove_aliases)?; - - Ok(LogicalPlan::Filter(Filter::try_new( - predicate, - Arc::new(inputs[0].clone()), - )?)) - } - LogicalPlan::Repartition(Repartition { - partitioning_scheme, - .. - }) => match partitioning_scheme { - Partitioning::RoundRobinBatch(n) => { - Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::RoundRobinBatch(*n), - input: Arc::new(inputs[0].clone()), - })) - } - Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n), - input: Arc::new(inputs[0].clone()), - })), - Partitioning::DistributeBy(_) => Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::DistributeBy(expr.to_owned()), - input: Arc::new(inputs[0].clone()), - })), - }, - LogicalPlan::Window(Window { - window_expr, - schema, - .. - }) => Ok(LogicalPlan::Window(Window { - input: Arc::new(inputs[0].clone()), - window_expr: expr[0..window_expr.len()].to_vec(), - schema: schema.clone(), - })), - LogicalPlan::Aggregate(Aggregate { - group_expr, schema, .. - }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( - Arc::new(inputs[0].clone()), - expr[0..group_expr.len()].to_vec(), - expr[group_expr.len()..].to_vec(), - schema.clone(), - )?)), - LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - fetch: *fetch, - })), - LogicalPlan::Join(Join { - join_type, - join_constraint, - on, - null_equals_null, - .. - }) => { - let schema = - build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; - - let equi_expr_count = on.len(); - assert!(expr.len() >= equi_expr_count); - - // The preceding part of expr is equi-exprs, - // and the struct of each equi-expr is like `left-expr = right-expr`. - let new_on:Vec<(Expr,Expr)> = expr.iter().take(equi_expr_count).map(|equi_expr| { - // SimplifyExpression rule may add alias to the equi_expr. - let unalias_expr = equi_expr.clone().unalias(); - if let Expr::BinaryExpr(BinaryExpr { left, op:Operator::Eq, right }) = unalias_expr { - Ok((*left, *right)) - } else { - internal_err!( - "The front part expressions should be an binary equiality expression, actual:{equi_expr}" - ) - } - }).collect::>>()?; - - // Assume that the last expr, if any, - // is the filter_expr (non equality predicate from ON clause) - let filter_expr = - (expr.len() > equi_expr_count).then(|| expr[expr.len() - 1].clone()); - - Ok(LogicalPlan::Join(Join { - left: Arc::new(inputs[0].clone()), - right: Arc::new(inputs[1].clone()), - join_type: *join_type, - join_constraint: *join_constraint, - on: new_on, - filter: filter_expr, - schema: DFSchemaRef::new(schema), - null_equals_null: *null_equals_null, - })) - } - LogicalPlan::CrossJoin(_) => { - let left = inputs[0].clone(); - let right = inputs[1].clone(); - LogicalPlanBuilder::from(left).cross_join(right)?.build() - } - LogicalPlan::Subquery(Subquery { - outer_ref_columns, .. - }) => { - let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?; - Ok(LogicalPlan::Subquery(Subquery { - subquery: Arc::new(subquery), - outer_ref_columns: outer_ref_columns.clone(), - })) - } - LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { - Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( - inputs[0].clone(), - alias.clone(), - )?)) - } - LogicalPlan::Limit(Limit { skip, fetch, .. }) => Ok(LogicalPlan::Limit(Limit { - skip: *skip, - fetch: *fetch, - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { - name, - if_not_exists, - or_replace, - .. - })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - input: Arc::new(inputs[0].clone()), - constraints: Constraints::empty(), - name: name.clone(), - if_not_exists: *if_not_exists, - or_replace: *or_replace, - }, - ))), - LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name, - or_replace, - definition, - .. - })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - input: Arc::new(inputs[0].clone()), - name: name.clone(), - or_replace: *or_replace, - definition: definition.clone(), - }))), - LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { - node: e.node.from_template(expr, inputs), - })), - LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union { - inputs: inputs.iter().cloned().map(Arc::new).collect(), - schema: schema.clone(), - })), - LogicalPlan::Distinct(Distinct { .. }) => Ok(LogicalPlan::Distinct(Distinct { - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Analyze(a) => { - assert!(expr.is_empty()); - assert_eq!(inputs.len(), 1); - Ok(LogicalPlan::Analyze(Analyze { - verbose: a.verbose, - schema: a.schema.clone(), - input: Arc::new(inputs[0].clone()), - })) - } - LogicalPlan::Explain(_) => { - // Explain should be handled specially in the optimizers; - // If this check cannot pass it means some optimizer pass is - // trying to optimize Explain directly - if expr.is_empty() { - return plan_err!("Invalid EXPLAIN command. Expression is empty"); - } - - if inputs.is_empty() { - return plan_err!("Invalid EXPLAIN command. Inputs are empty"); - } - - Ok(plan.clone()) - } - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => Ok(LogicalPlan::Prepare(Prepare { - name: name.clone(), - data_types: data_types.clone(), - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::TableScan(ts) => { - assert!(inputs.is_empty(), "{plan:?} should have no inputs"); - Ok(LogicalPlan::TableScan(TableScan { - filters: expr.to_vec(), - ..ts.clone() - })) - } - LogicalPlan::EmptyRelation(_) - | LogicalPlan::Ddl(_) - | LogicalPlan::Statement(_) => { - // All of these plan types have no inputs / exprs so should not be called - assert!(expr.is_empty(), "{plan:?} should have no exprs"); - assert!(inputs.is_empty(), "{plan:?} should have no inputs"); - Ok(plan.clone()) - } - LogicalPlan::DescribeTable(_) => Ok(plan.clone()), - LogicalPlan::Unnest(Unnest { - column, - schema, - options, - .. - }) => { - // Update schema with unnested column type. - let input = Arc::new(inputs[0].clone()); - let nested_field = input.schema().field_from_column(column)?; - let unnested_field = schema.field_from_column(column)?; - let fields = input - .schema() - .fields() - .iter() - .map(|f| { - if f == nested_field { - unnested_field.clone() - } else { - f.clone() - } - }) - .collect::>(); - - let schema = Arc::new( - DFSchema::new_with_metadata(fields, input.schema().metadata().clone())? - // We can use the existing functional dependencies as is: - .with_functional_dependencies( - input.schema().functional_dependencies().clone(), - ), - ); - - Ok(LogicalPlan::Unnest(Unnest { - input, - column: column.clone(), - schema, - options: options.clone(), - })) - } - } + plan.with_new_exprs(expr.to_vec(), inputs) } /// Find all columns referenced from an aggregate query diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index ba4b5d7b175c..c61a8d3350ee 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -42,7 +42,6 @@ use datafusion_expr::type_coercion::other::{ get_coerce_type_for_case_expression, get_coerce_type_for_list, }; use datafusion_expr::type_coercion::{is_datetime, is_numeric, is_utf8_or_large_utf8}; -use datafusion_expr::utils::from_plan; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, Expr, @@ -112,13 +111,13 @@ fn analyze_internal( }) .collect::>>()?; - // TODO: from_plan can't change the schema, so we need to do this here + // TODO: with_new_exprs can't change the schema, so we need to do this here match &plan { LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new( new_expr, Arc::new(new_inputs[0].clone()), )?)), - _ => from_plan(plan, &new_expr, &new_inputs), + _ => plan.with_new_exprs(new_expr, &new_inputs), } } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index d5c7641a13c4..571e2146c4e0 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -24,9 +24,7 @@ use datafusion_expr::{ and, expr_rewriter::replace_col, logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union}, - or, - utils::from_plan, - BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown, + or, BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown, }; use itertools::Itertools; use std::collections::{HashMap, HashSet}; @@ -457,7 +455,7 @@ fn push_down_all_join( if !join_conditions.is_empty() { new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap()); } - let plan = from_plan(join_plan, &new_exprs, &[left, right])?; + let plan = join_plan.with_new_exprs(new_exprs, &[left, right])?; if keep_predicates.is_empty() { Ok(plan) diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index c65768bb8b11..e6d66720ee1b 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -23,7 +23,7 @@ use super::{ExprSimplifier, SimplifyContext}; use crate::utils::merge_schema; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, DFSchemaRef, Result}; -use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan}; +use datafusion_expr::logical_plan::LogicalPlan; use datafusion_physical_expr::execution_props::ExecutionProps; /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting @@ -93,7 +93,7 @@ impl SimplifyExpressions { }) .collect::>>()?; - from_plan(plan, &expr, &new_inputs) + plan.with_new_exprs(expr, &new_inputs) } } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 0be856ca9ba6..963a3dc06fa8 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -31,7 +31,6 @@ use datafusion_common::{ }; use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; -use datafusion_expr::utils::from_plan; use datafusion_expr::{ binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator, }; @@ -105,11 +104,7 @@ impl OptimizerRule for UnwrapCastInComparison { .collect::>>()?; let inputs: Vec = plan.inputs().into_iter().cloned().collect(); - Ok(Some(from_plan( - plan, - new_exprs.as_slice(), - inputs.as_slice(), - )?)) + Ok(Some(plan.with_new_exprs(new_exprs, inputs.as_slice())?)) } fn name(&self) -> &str {