From a5cf0b8902ae55b81ac86b875c7e94cf1bdc205d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Apr 2024 07:37:06 -0400 Subject: [PATCH] Avoid copies in `InlineTableScan` via TreeNode API (#10038) * Avoid copies in `InlineTableScan` via TreeNode API * Improve variable name --- .../src/analyzer/inline_table_scan.rs | 93 +++++++------------ 1 file changed, 31 insertions(+), 62 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 88202ffd21f1..cc5f870a9c73 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -17,17 +17,13 @@ //! Analyzed rule to replace TableScan references //! such as DataFrames and Views and inlines the LogicalPlan. -use std::sync::Arc; use crate::analyzer::AnalyzerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; -use datafusion_expr::expr::{Exists, InSubquery}; -use datafusion_expr::{ - logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan, -}; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan}; /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`] /// (DataFrame / ViewTable) @@ -51,65 +47,38 @@ impl AnalyzerRule for InlineTableScan { } fn analyze_internal(plan: LogicalPlan) -> Result> { - match plan { - // Match only on scans without filter / projection / fetch - // Views and DataFrames won't have those added - // during the early stage of planning - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - filters, - .. - }) if filters.is_empty() && source.get_logical_plan().is_some() => { - let sub_plan = source.get_logical_plan().unwrap(); - let projection_exprs = generate_projection_expr(&projection, sub_plan)?; - LogicalPlanBuilder::from(sub_plan.clone()) - .project(projection_exprs)? - // Ensures that the reference to the inlined table remains the - // same, meaning we don't have to change any of the parent nodes - // that reference this table. - .alias(table_name)? - .build() - .map(Transformed::yes) - } - LogicalPlan::Filter(filter) => { - let new_expr = filter.predicate.transform(&rewrite_subquery).data()?; - Filter::try_new(new_expr, filter.input) - .map(|e| Transformed::yes(LogicalPlan::Filter(e))) + // rewrite any subqueries in the plan first + let transformed_plan = + plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?; + + let transformed_plan = transformed_plan.transform_data(|plan| { + match plan { + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + filters, + .. + }) if filters.is_empty() && source.get_logical_plan().is_some() => { + let sub_plan = source.get_logical_plan().unwrap(); + let projection_exprs = generate_projection_expr(&projection, sub_plan)?; + LogicalPlanBuilder::from(sub_plan.clone()) + .project(projection_exprs)? + // Ensures that the reference to the inlined table remains the + // same, meaning we don't have to change any of the parent nodes + // that reference this table. + .alias(table_name)? + .build() + .map(Transformed::yes) + } + _ => Ok(Transformed::no(plan)), } - _ => Ok(Transformed::no(plan)), - } -} + })?; -fn rewrite_subquery(expr: Expr) -> Result> { - match expr { - Expr::Exists(Exists { subquery, negated }) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::Exists(Exists { subquery, negated }))) - } - Expr::InSubquery(InSubquery { - expr, - subquery, - negated, - }) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( - expr, subquery, negated, - )))) - } - Expr::ScalarSubquery(subquery) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::ScalarSubquery(subquery))) - } - _ => Ok(Transformed::no(expr)), - } + Ok(transformed_plan) } fn generate_projection_expr(