Skip to content

Commit

Permalink
Avoid copies in InlineTableScan via TreeNode API (#10038)
Browse files Browse the repository at this point in the history
* Avoid copies in `InlineTableScan` via TreeNode API

* Improve variable name
  • Loading branch information
alamb authored Apr 12, 2024
1 parent e161cd6 commit a5cf0b8
Showing 1 changed file with 31 additions and 62 deletions.
93 changes: 31 additions & 62 deletions datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

//! Analyzed rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan.
use std::sync::Arc;
use crate::analyzer::AnalyzerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::{
logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
};
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};

/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
/// (DataFrame / ViewTable)
Expand All @@ -51,65 +47,38 @@ impl AnalyzerRule for InlineTableScan {
}

fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_name)?
.build()
.map(Transformed::yes)
}
LogicalPlan::Filter(filter) => {
let new_expr = filter.predicate.transform(&rewrite_subquery).data()?;
Filter::try_new(new_expr, filter.input)
.map(|e| Transformed::yes(LogicalPlan::Filter(e)))
// rewrite any subqueries in the plan first
let transformed_plan =
plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?;

let transformed_plan = transformed_plan.transform_data(|plan| {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_name)?
.build()
.map(Transformed::yes)
}
_ => Ok(Transformed::no(plan)),
}
_ => Ok(Transformed::no(plan)),
}
}
})?;

fn rewrite_subquery(expr: Expr) -> Result<Transformed<Expr>> {
match expr {
Expr::Exists(Exists { subquery, negated }) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal).data()?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::yes(Expr::Exists(Exists { subquery, negated })))
}
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal).data()?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
expr, subquery, negated,
))))
}
Expr::ScalarSubquery(subquery) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal).data()?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::yes(Expr::ScalarSubquery(subquery)))
}
_ => Ok(Transformed::no(expr)),
}
Ok(transformed_plan)
}

fn generate_projection_expr(
Expand Down

0 comments on commit a5cf0b8

Please sign in to comment.