Skip to content

Commit

Permalink
adding alias for table scan filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Sep 13, 2024
1 parent f7efd2d commit 3a63ca0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 14 deletions.
22 changes: 22 additions & 0 deletions datafusion/core/tests/expr_api/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_functions::{math, string};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use datafusion_sql::unparser::plan_to_sql;
use std::sync::Arc;

/// In order to simplify expressions, DataFusion must have information
Expand Down Expand Up @@ -693,3 +694,24 @@ fn test_simplify_cycles() {
let expected = lit(true);
test_simplify_with_cycle_count(expr, expected, 3);
}

#[test]
fn test_alias_pushdown() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("age", DataType::Int64, false),
]);

let plan = table_scan_with_filters(
Some("t1"),
&schema,
None,
vec![col("id").gt(col("age"))],
)?
.alias("ta")?
.build()?;
let plan_str = plan_to_sql(&plan)?.to_string();
assert_eq!(plan_str, "SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)");

Ok(())
}
36 changes: 24 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use crate::unparser::utils::unproject_agg_exprs;
use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DataFusionError, Result, TableReference,
internal_err, not_impl_err, plan_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
Expand All @@ -34,7 +36,7 @@ use super::{
rewrite::{
inject_column_aliases, normalize_union_schema,
rewrite_plan_for_sort_on_non_projected_fields,
subquery_alias_inner_query_and_columns,
subquery_alias_inner_query_and_columns, TableAliasRewriter,
},
utils::{find_agg_node_within_select, unproject_window_exprs, AggVariant},
Unparser,
Expand Down Expand Up @@ -554,13 +556,11 @@ impl Unparser<'_> {
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO: support filters for table scan with alias. Remove this check after #12368 issue.
// see the issue: https://github.com/apache/datafusion/issues/12368
if alias.is_some() && !table_scan.filters.is_empty() {
return not_impl_err!(
"Subquery alias is not supported for table scan with pushdown filters"
);
}
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: table_scan.source.schema(),
alias_name: alias_name.clone(),
});

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Expand All @@ -587,12 +587,24 @@ impl Unparser<'_> {
builder = builder.project(project_columns)?;
}

let filter_expr = table_scan
let filter_expr: Result<Option<Expr>> = table_scan
.filters
.iter()
.cloned()
.reduce(|acc, expr| acc.and(expr));
if let Some(filter) = filter_expr {
.map(|expr| {
if let Some(ref mut rewriter) = filter_alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.reduce(|acc, expr_result| match (acc, expr_result) {
(Ok(acc_expr), Ok(expr)) => Ok(acc_expr.and(expr)),
(Err(e), _) | (_, Err(e)) => Err(e),
})
.transpose();

if let Some(filter) = filter_expr? {
builder = builder.filter(filter)?;
}

Expand Down
29 changes: 27 additions & 2 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::{
sync::Arc,
};

use arrow_schema::SchemaRef;
use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
Result,
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
Column, Result, TableReference,
};
use datafusion_expr::{expr::Alias, tree_node::transform_sort_vec};
use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
Expand Down Expand Up @@ -300,3 +301,27 @@ fn find_projection(logical_plan: &LogicalPlan) -> Option<&Projection> {
_ => None,
}
}

pub struct TableAliasRewriter {
pub table_schema: SchemaRef,
pub alias_name: TableReference,
}

impl TreeNodeRewriter for TableAliasRewriter {
type Node = Expr;

fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
match expr {
Expr::Column(column) => {
if let Ok(field) = self.table_schema.field_with_name(&column.name) {
let new_column =
Column::new(Some(self.alias_name.clone()), field.name().clone());
Ok(Transformed::yes(Expr::Column(new_column)))
} else {
Ok(Transformed::no(Expr::Column(column)))
}
}
_ => Ok(Transformed::no(expr)),
}
}
}

0 comments on commit 3a63ca0

Please sign in to comment.