Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recursive unnest #11062

Merged
merged 13 commits into from
Jul 2, 2024
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ impl Unnest {
expr: Box::new(expr),
}
}

/// Create a new Unnest expression.
pub fn new_boxed(boxed: Box<Expr>) -> Self {
Self { expr: boxed }
}
}

/// Alias expression
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,6 @@ pub fn project(
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}

validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Unnest(_)
| Expr::Literal(_) => Transformed::no(self),
Expr::Unnest(Unnest { expr, .. }) => transform_box(expr, &mut f)?
.update_data(|be| Expr::Unnest(Unnest::new_boxed(be))),
Expr::Alias(Alias {
expr,
relation,
Expand Down
98 changes: 56 additions & 42 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr,
recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns,
resolve_positions_to_exprs,
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
Expand Down Expand Up @@ -296,46 +295,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = select_exprs
.into_iter()
.map(|expr| {
recursive_transform_unnest(
&input,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// Do the final projection
if unnest_columns.is_empty() {
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.build()
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.project(outer_projection_exprs)?
.build()
let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
// The transformation happen bottom up, one at a time for each iteration
// Ony exaust the loop if no more unnest transformation is found
for i in 0.. {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// No more unnest is possible
if unnest_columns.is_empty() {
// The original expr does not contain any unnest
if i == 0 {
return LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.build();
}
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
let plan = LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;
intermediate_plan = plan;
intermediate_select_exprs = outer_projection_exprs;
}
}
LogicalPlanBuilder::from(intermediate_plan)
.project(intermediate_select_exprs)?
.build()
}

fn plan_selection(
Expand Down
143 changes: 111 additions & 32 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::collections::HashMap;
use arrow_schema::{
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -267,11 +269,13 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
/// - For list column: unnest(col) with type list -> unnest(col) with type list::item
/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2
/// The transformed exprs will be used in the outer projection
pub(crate) fn recursive_transform_unnest(
/// If along the path from root to bottom, there are multiple unnest expressions, the transformation
/// is done only for the bottom expression
pub(crate) fn transform_bottom_unnest(
Copy link
Contributor

@jayzhan211 jayzhan211 Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would transform List / Struct separately help readability here?

something like

transform_bottom_unnest() {
 // common logic
 fn transform();

 list:
     transform_list // list specific logic
 struct:
     transform_struct // struct specific logic (only unnest with the top level)
}

input: &LogicalPlan,
unnest_placeholder_columns: &mut Vec<String>,
inner_projection_exprs: &mut Vec<Expr>,
original_expr: Expr,
original_expr: &Expr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid clone in L297 too.

) -> Result<Vec<Expr>> {
let mut transform =
|unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
Expand All @@ -298,35 +302,53 @@ pub(crate) fn recursive_transform_unnest(
.collect::<Vec<_>>();
Ok(expr)
};
// expr transformed maybe either the same, or different from the originals exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// This transformation is only done for list unnest
// struct unnest is done at the root level, and at the later stage
// because the syntax of TreeNode only support transform into 1 Expr, while
// Unnest struct will be transformed into multiple Exprs
// TODO: This can be resolved after this issue is resolved: https://github.com/apache/datafusion/issues/10102
//
// The transformation looks like:
// - unnest(array_col) will be transformed into unnest(array_col)
// - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1

// Specifically handle root level unnest expr, this is the only place
// unnest on struct can be handled
if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
return transform(&original_expr, arg);
}
let Transformed {
data: transformed_expr,
transformed,
tnr: _,
} = original_expr.transform_up(|expr: Expr| {
if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let (data_type, _) = arg.data_type_and_nullable(input.schema())?;
if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
}
let transformed_exprs = transform(&expr, arg)?;
Ok(Transformed::yes(transformed_exprs[0].clone()))
} else {
Ok(Transformed::no(expr))
data: transformed_expr,
transformed,
tnr: _,
} = original_expr.clone().transform_up(|expr: Expr| {
let is_root_expr = &expr == original_expr;
// Root expr is transformed separately
if is_root_expr {
return Ok(Transformed::no(expr));
}
if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let (data_type, _) = arg.data_type_and_nullable(input.schema())?;

if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
}
})?;

let mut transformed_exprs = transform(&expr, arg)?;
// root_expr.push(transformed_exprs[0].clone());
Ok(Transformed::new(
transformed_exprs.swap_remove(0),
true,
TreeNodeRecursion::Stop,
))
} else {
Ok(Transformed::no(expr))
}
})?;

if !transformed {
// Because root expr need to transform separately
// unnest struct is only possible here
// The transformation looks like
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
if let Expr::Unnest(Unnest { expr: ref arg }) = transformed_expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having to clone the expr to special case the root, could you instead check the root initially?

Something like

if let Expr::Unnest(Unnest { expr })  = original_expr {
  let expr = expr.transform_up(...);
  Ok(transform(transformex_expe)
}

And similarly for the others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that approach, but it will add more logic to the code, something like:

    if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
        let Transformed {
            data: transformed_expr,
            transformed,
            tnr: _,
        } = arg.clone().transform_up(|expr: Expr| {
...
        })?;
        // no transformation eligible in the children
        // only then the root unnest can be transformed
        if !transformed {
            return transform(&transformed_expr, arg);
        } else {
            return Ok(vec![transformed_expr]);
        }
    }

    // from this onward, unnest(struct) is invalid because
    // we are sure that root_expr is not unnest expr
    let Transformed {
        data: transformed_expr,
        transformed,
        tnr: _,
    } = original_expr.clone().transform_up(|expr: Expr| {
...
    if !transformed {
        if matches!(&transformed_expr, Expr::Column(_)) {
            inner_projection_exprs.push(transformed_expr.clone());
            Ok(vec![transformed_expr])
        } else {
            // We need to evaluate the expr in the inner projection,
            // outer projection just select its name
            let column_name = transformed_expr.display_name()?;
            inner_projection_exprs.push(transformed_expr);
            Ok(vec![Expr::Column(Column::from_name(column_name))])
        }
    } else {
        Ok(vec![transformed_expr])
    }

    })?;

For the clone() part, i don't know how to avoid it because transform_up will take the ownership of the expr
Please let me know your opinion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you have is good -- thank you for trying this

return transform(&transformed_expr, arg);
}

if matches!(&transformed_expr, Expr::Column(_)) {
inner_projection_exprs.push(transformed_expr.clone());
Ok(vec![transformed_expr])
Expand All @@ -351,12 +373,13 @@ mod tests {
use arrow_schema::Fields;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_functions_aggregate::expr_fn::count;

use crate::utils::{recursive_transform_unnest, resolve_positions_to_exprs};
use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest};

#[test]
fn test_recursive_transform_unnest() -> Result<()> {
fn test_transform_bottom_unnest() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"struct_col",
Expand Down Expand Up @@ -390,11 +413,11 @@ mod tests {

// unnest(struct_col)
let original_expr = unnest(col("struct_col"));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
transformed_exprs,
Expand All @@ -413,11 +436,11 @@ mod tests {

// unnest(array_col) + 1
let original_expr = unnest(col("array_col")).add(lit(1i64));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
unnest_placeholder_columns,
Expand All @@ -440,6 +463,62 @@ mod tests {
]
);

// a nested structure struct[[]]
let schema = Schema::new(vec![
Field::new(
"struct_col", // {array_col: [1,2,3]}
ArrowDataType::Struct(Fields::from(vec![Field::new(
"matrix",
ArrowDataType::List(Arc::new(Field::new(
"matrix_row",
ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Int64,
true,
))),
true,
))),
true,
)])),
false,
),
Field::new("int_col", ArrowDataType::Int32, false),
]);

let dfschema = DFSchema::try_from(schema)?;

let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(dfschema),
});

let mut unnest_placeholder_columns = vec![];
let mut inner_projection_exprs = vec![];

// An expr with multiple unnest
let original_expr = unnest(unnest(col("struct_col").field("matrix")));
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
&original_expr,
)?;
// Only the inner most/ bottom most unnest is transformed
assert_eq!(
transformed_exprs,
vec![unnest(col("unnest(struct_col[matrix])"))]
);
assert_eq!(
unnest_placeholder_columns,
vec!["unnest(struct_col[matrix])"]
);
assert_eq!(
inner_projection_exprs,
vec![col("struct_col")
.field("matrix")
.alias("unnest(struct_col[matrix])"),]
);

Ok(())
}

Expand Down
Loading