From aac10a4d880dddba59d5fab2b18a01be1f3bd8b3 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Sun, 15 Sep 2024 06:58:13 -0700 Subject: [PATCH] init (#12453) --- datafusion/sql/src/unparser/plan.rs | 37 ++++++++++++++------- datafusion/sql/src/unparser/rewrite.rs | 40 +++++++++++++++++++++-- datafusion/sql/tests/cases/plan_to_sql.rs | 26 +++++++-------- 3 files changed, 76 insertions(+), 27 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 802d4762574d..dc746b472a7e 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -17,7 +17,9 @@ use crate::unparser::utils::unproject_agg_exprs; use datafusion_common::{ - internal_err, not_impl_err, plan_err, Column, DataFusionError, Result, TableReference, + internal_err, not_impl_err, plan_err, + tree_node::{TransformedResult, TreeNode}, + Column, DataFusionError, Result, TableReference, }; use datafusion_expr::{ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, @@ -34,7 +36,7 @@ use super::{ rewrite::{ inject_column_aliases, normalize_union_schema, rewrite_plan_for_sort_on_non_projected_fields, - subquery_alias_inner_query_and_columns, + subquery_alias_inner_query_and_columns, TableAliasRewriter, }, utils::{find_agg_node_within_select, unproject_window_exprs, AggVariant}, Unparser, @@ -554,13 +556,11 @@ impl Unparser<'_> { ) -> Result { match plan { LogicalPlan::TableScan(table_scan) => { - // TODO: support filters for table scan with alias. Remove this check after #12368 issue. - // see the issue: https://github.com/apache/datafusion/issues/12368 - if alias.is_some() && !table_scan.filters.is_empty() { - return not_impl_err!( - "Subquery alias is not supported for table scan with pushdown filters" - ); - } + let mut filter_alias_rewriter = + alias.as_ref().map(|alias_name| TableAliasRewriter { + table_schema: table_scan.source.schema(), + alias_name: alias_name.clone(), + }); let mut builder = LogicalPlanBuilder::scan( table_scan.table_name.clone(), @@ -587,12 +587,25 @@ impl Unparser<'_> { builder = builder.project(project_columns)?; } - let filter_expr = table_scan + let filter_expr: Result> = table_scan .filters .iter() .cloned() - .reduce(|acc, expr| acc.and(expr)); - if let Some(filter) = filter_expr { + .map(|expr| { + if let Some(ref mut rewriter) = filter_alias_rewriter { + expr.rewrite(rewriter).data() + } else { + Ok(expr) + } + }) + .reduce(|acc, expr_result| { + acc.and_then(|acc_expr| { + expr_result.map(|expr| acc_expr.and(expr)) + }) + }) + .transpose(); + + if let Some(filter) = filter_expr? { builder = builder.filter(filter)?; } diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 0f5cf5abe64b..e43c2eae238e 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -20,9 +20,10 @@ use std::{ sync::Arc, }; +use arrow_schema::SchemaRef; use datafusion_common::{ - tree_node::{Transformed, TransformedResult, TreeNode}, - Result, + tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, + Column, Result, TableReference, }; use datafusion_expr::{expr::Alias, tree_node::transform_sort_vec}; use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr}; @@ -300,3 +301,38 @@ fn find_projection(logical_plan: &LogicalPlan) -> Option<&Projection> { _ => None, } } +/// A `TreeNodeRewriter` implementation that rewrites `Expr::Column` expressions by +/// replacing the column's name with an alias if the column exists in the provided schema. +/// +/// This is typically used to apply table aliases in query plans, ensuring that +/// the column references in the expressions use the correct table alias. +/// +/// # Fields +/// +/// * `table_schema`: The schema (`SchemaRef`) representing the table structure +/// from which the columns are referenced. This is used to look up columns by their names. +/// * `alias_name`: The alias (`TableReference`) that will replace the table name +/// in the column references when applicable. +pub struct TableAliasRewriter { + pub table_schema: SchemaRef, + pub alias_name: TableReference, +} + +impl TreeNodeRewriter for TableAliasRewriter { + type Node = Expr; + + fn f_down(&mut self, expr: Expr) -> Result> { + match expr { + Expr::Column(column) => { + if let Ok(field) = self.table_schema.field_with_name(&column.name) { + let new_column = + Column::new(Some(self.alias_name.clone()), field.name().clone()); + Ok(Transformed::yes(Expr::Column(new_column))) + } else { + Ok(Transformed::no(Expr::Column(column))) + } + } + _ => Ok(Transformed::no(expr)), + } + } +} diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index bd338e440e36..caec1e9c9d70 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -705,19 +705,19 @@ fn test_table_scan_pushdown() -> Result<()> { "SELECT * FROM t1 WHERE ((t1.id > 1) AND (t1.age < 2))" ); - // TODO: support filters for table scan with alias. Enable this test after #12368 issue is fixed - // see the issue: https://github.com/apache/datafusion/issues/12368 - // let table_scan_with_filter_alias = table_scan_with_filters( - // Some("t1"), - // &schema, - // None, - // vec![col("id").gt(col("age"))], - // )?.alias("ta")?.build()?; - // let table_scan_with_filter_alias = plan_to_sql(&table_scan_with_filter_alias)?; - // assert_eq!( - // format!("{}", table_scan_with_filter_alias), - // "SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)" - // ); + let table_scan_with_filter_alias = table_scan_with_filters( + Some("t1"), + &schema, + None, + vec![col("id").gt(col("age"))], + )? + .alias("ta")? + .build()?; + let table_scan_with_filter_alias = plan_to_sql(&table_scan_with_filter_alias)?; + assert_eq!( + format!("{}", table_scan_with_filter_alias), + "SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)" + ); let table_scan_with_projection_and_filter = table_scan_with_filters( Some("t1"),