-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recursive unnest
#11062
Recursive unnest
#11062
Changes from all commits
ffde26d
e5c9e47
d8b8b93
1e58887
731b71b
2112617
c146545
82ac9de
2809297
67a4873
0bcb00f
02eeefe
93fbd26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,9 @@ use std::collections::HashMap; | |
use arrow_schema::{ | ||
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, | ||
}; | ||
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; | ||
use datafusion_common::tree_node::{ | ||
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, | ||
}; | ||
use datafusion_common::{ | ||
exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, | ||
}; | ||
|
@@ -267,11 +269,13 @@ pub(crate) fn normalize_ident(id: Ident) -> String { | |
/// - For list column: unnest(col) with type list -> unnest(col) with type list::item | ||
/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2 | ||
/// The transformed exprs will be used in the outer projection | ||
pub(crate) fn recursive_transform_unnest( | ||
/// If along the path from root to bottom, there are multiple unnest expressions, the transformation | ||
/// is done only for the bottom expression | ||
pub(crate) fn transform_bottom_unnest( | ||
input: &LogicalPlan, | ||
unnest_placeholder_columns: &mut Vec<String>, | ||
inner_projection_exprs: &mut Vec<Expr>, | ||
original_expr: Expr, | ||
original_expr: &Expr, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could avoid clone in L297 too. |
||
) -> Result<Vec<Expr>> { | ||
let mut transform = | ||
|unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> { | ||
|
@@ -298,35 +302,53 @@ pub(crate) fn recursive_transform_unnest( | |
.collect::<Vec<_>>(); | ||
Ok(expr) | ||
}; | ||
// expr transformed maybe either the same, or different from the originals exprs | ||
// for example: | ||
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 | ||
// This transformation is only done for list unnest | ||
// struct unnest is done at the root level, and at the later stage | ||
// because the syntax of TreeNode only support transform into 1 Expr, while | ||
// Unnest struct will be transformed into multiple Exprs | ||
// TODO: This can be resolved after this issue is resolved: https://github.com/apache/datafusion/issues/10102 | ||
// | ||
// The transformation looks like: | ||
// - unnest(array_col) will be transformed into unnest(array_col) | ||
// - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 | ||
|
||
// Specifically handle root level unnest expr, this is the only place | ||
// unnest on struct can be handled | ||
if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr { | ||
return transform(&original_expr, arg); | ||
} | ||
let Transformed { | ||
data: transformed_expr, | ||
transformed, | ||
tnr: _, | ||
} = original_expr.transform_up(|expr: Expr| { | ||
if let Expr::Unnest(Unnest { expr: ref arg }) = expr { | ||
let (data_type, _) = arg.data_type_and_nullable(input.schema())?; | ||
if let DataType::Struct(_) = data_type { | ||
return internal_err!("unnest on struct can ony be applied at the root level of select expression"); | ||
} | ||
let transformed_exprs = transform(&expr, arg)?; | ||
Ok(Transformed::yes(transformed_exprs[0].clone())) | ||
} else { | ||
Ok(Transformed::no(expr)) | ||
data: transformed_expr, | ||
transformed, | ||
tnr: _, | ||
} = original_expr.clone().transform_up(|expr: Expr| { | ||
let is_root_expr = &expr == original_expr; | ||
// Root expr is transformed separately | ||
if is_root_expr { | ||
return Ok(Transformed::no(expr)); | ||
} | ||
if let Expr::Unnest(Unnest { expr: ref arg }) = expr { | ||
let (data_type, _) = arg.data_type_and_nullable(input.schema())?; | ||
|
||
if let DataType::Struct(_) = data_type { | ||
return internal_err!("unnest on struct can ony be applied at the root level of select expression"); | ||
} | ||
})?; | ||
|
||
let mut transformed_exprs = transform(&expr, arg)?; | ||
// root_expr.push(transformed_exprs[0].clone()); | ||
Ok(Transformed::new( | ||
transformed_exprs.swap_remove(0), | ||
true, | ||
TreeNodeRecursion::Stop, | ||
)) | ||
} else { | ||
Ok(Transformed::no(expr)) | ||
} | ||
})?; | ||
|
||
if !transformed { | ||
// Because root expr need to transform separately | ||
// unnest struct is only possible here | ||
// The transformation looks like | ||
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 | ||
if let Expr::Unnest(Unnest { expr: ref arg }) = transformed_expr { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than having to clone the expr to special case the root, could you instead check the root initially? Something like if let Expr::Unnest(Unnest { expr }) = original_expr {
let expr = expr.transform_up(...);
Ok(transform(transformex_expe)
} And similarly for the others? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that approach, but it will add more logic to the code, something like:
For the clone() part, i don't know how to avoid it because transform_up will take the ownership of the expr There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you have is good -- thank you for trying this |
||
return transform(&transformed_expr, arg); | ||
} | ||
|
||
if matches!(&transformed_expr, Expr::Column(_)) { | ||
inner_projection_exprs.push(transformed_expr.clone()); | ||
Ok(vec![transformed_expr]) | ||
|
@@ -351,12 +373,13 @@ mod tests { | |
use arrow_schema::Fields; | ||
use datafusion_common::{DFSchema, Result}; | ||
use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan}; | ||
use datafusion_functions::core::expr_ext::FieldAccessor; | ||
use datafusion_functions_aggregate::expr_fn::count; | ||
|
||
use crate::utils::{recursive_transform_unnest, resolve_positions_to_exprs}; | ||
use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest}; | ||
|
||
#[test] | ||
fn test_recursive_transform_unnest() -> Result<()> { | ||
fn test_transform_bottom_unnest() -> Result<()> { | ||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"struct_col", | ||
|
@@ -390,11 +413,11 @@ mod tests { | |
|
||
// unnest(struct_col) | ||
let original_expr = unnest(col("struct_col")); | ||
let transformed_exprs = recursive_transform_unnest( | ||
let transformed_exprs = transform_bottom_unnest( | ||
&input, | ||
&mut unnest_placeholder_columns, | ||
&mut inner_projection_exprs, | ||
original_expr, | ||
&original_expr, | ||
)?; | ||
assert_eq!( | ||
transformed_exprs, | ||
|
@@ -413,11 +436,11 @@ mod tests { | |
|
||
// unnest(array_col) + 1 | ||
let original_expr = unnest(col("array_col")).add(lit(1i64)); | ||
let transformed_exprs = recursive_transform_unnest( | ||
let transformed_exprs = transform_bottom_unnest( | ||
&input, | ||
&mut unnest_placeholder_columns, | ||
&mut inner_projection_exprs, | ||
original_expr, | ||
&original_expr, | ||
)?; | ||
assert_eq!( | ||
unnest_placeholder_columns, | ||
|
@@ -440,6 +463,62 @@ mod tests { | |
] | ||
); | ||
|
||
// a nested structure struct[[]] | ||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"struct_col", // {array_col: [1,2,3]} | ||
ArrowDataType::Struct(Fields::from(vec![Field::new( | ||
"matrix", | ||
ArrowDataType::List(Arc::new(Field::new( | ||
"matrix_row", | ||
ArrowDataType::List(Arc::new(Field::new( | ||
"item", | ||
ArrowDataType::Int64, | ||
true, | ||
))), | ||
true, | ||
))), | ||
true, | ||
)])), | ||
false, | ||
), | ||
Field::new("int_col", ArrowDataType::Int32, false), | ||
]); | ||
|
||
let dfschema = DFSchema::try_from(schema)?; | ||
|
||
let input = LogicalPlan::EmptyRelation(EmptyRelation { | ||
produce_one_row: false, | ||
schema: Arc::new(dfschema), | ||
}); | ||
|
||
let mut unnest_placeholder_columns = vec![]; | ||
let mut inner_projection_exprs = vec![]; | ||
|
||
// An expr with multiple unnest | ||
let original_expr = unnest(unnest(col("struct_col").field("matrix"))); | ||
let transformed_exprs = transform_bottom_unnest( | ||
&input, | ||
&mut unnest_placeholder_columns, | ||
&mut inner_projection_exprs, | ||
&original_expr, | ||
)?; | ||
// Only the inner most/ bottom most unnest is transformed | ||
assert_eq!( | ||
transformed_exprs, | ||
vec![unnest(col("unnest(struct_col[matrix])"))] | ||
); | ||
assert_eq!( | ||
unnest_placeholder_columns, | ||
vec!["unnest(struct_col[matrix])"] | ||
); | ||
assert_eq!( | ||
inner_projection_exprs, | ||
vec![col("struct_col") | ||
.field("matrix") | ||
.alias("unnest(struct_col[matrix])"),] | ||
); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would transform List / Struct separately help readability here?
something like