Skip to content

Commit

Permalink
feat: support unnest multiple arrays (#10044)
Browse files Browse the repository at this point in the history
* Impl find_longest_length

* impl unnesting multi columns

* Change plans

* remove println

* fix tests

* simplify unnested fields

* update doc and tests

* more tests

* add test

* fix comment

* Add test for untyped null

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jonahgao and alamb authored Apr 15, 2024
1 parent beeb808 commit 34eda15
Show file tree
Hide file tree
Showing 19 changed files with 577 additions and 442 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
LogicalPlan::Unnest(Unnest { input, columns, schema, options }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let column_execs = columns.iter().map(|column| {
schema.index_of_column(column).map(|idx| Column::new(&column.name, idx))
}).collect::<Result<_>>()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())))
Ok(Arc::new(UnnestExec::new(input, column_execs, schema, options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
Expand Down
20 changes: 16 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ pub enum Expr {

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
pub expr: Box<Expr>,
}

impl Unnest {
/// Create a new Unnest expression.
pub fn new(expr: Expr) -> Self {
Self {
expr: Box::new(expr),
}
}
}

/// Alias expression
Expand Down Expand Up @@ -1567,8 +1576,8 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
Expr::Unnest(Unnest { expr }) => {
write!(f, "UNNEST({expr:?})")
}
}
}
Expand Down Expand Up @@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => create_function_name("unnest", false, exprs),
Expr::Unnest(Unnest { expr }) => {
let expr_name = create_name(expr)?;
Ok(format!("unnest({expr_name})"))
}
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
if let Expr::Unnest(Unnest { expr }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
expr.as_ref().clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
}

expr.transform(&|expr| {
Expand Down
8 changes: 2 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { exprs }) => {
let arg_data_types = exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
let arg_data_type = arg_data_types[0].clone();
Expr::Unnest(Unnest { expr }) => {
let arg_data_type = expr.get_type(schema)?;
// Unnest's output type is the inner type of the list
match arg_data_type{
DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) =>{
Expand Down
77 changes: 46 additions & 31 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ impl LogicalPlanBuilder {

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
Ok(Self::from(unnest(self.plan, column.into())?))
Ok(Self::from(unnest(self.plan, vec![column.into()])?))
}

/// Unnest the given column given [`UnnestOptions`]
Expand All @@ -1123,10 +1123,21 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
Ok(Self::from(unnest_with_options(
self.plan,
column.into(),
vec![column.into()],
options,
)?))
}

/// Unnest the given columns with the given [`UnnestOptions`]
pub fn unnest_columns_with_options(
self,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
Ok(Self::from(unnest_with_options(
self.plan, columns, options,
)?))
}
}
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
Expand Down Expand Up @@ -1534,44 +1545,50 @@ impl TableSource for LogicalTableSource {
}

/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
unnest_with_options(input, column, UnnestOptions::new())
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
unnest_with_options(input, columns, UnnestOptions::new())
}

/// Create a [`LogicalPlan::Unnest`] plan with options
pub fn unnest_with_options(
input: LogicalPlan,
column: Column,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
let (unnest_qualifier, unnest_field) =
input.schema().qualified_field_from_column(&column)?;

// Extract the type of the nested field in the list.
let unnested_field = match unnest_field.data_type() {
DataType::List(field)
| DataType::FixedSizeList(field, _)
| DataType::LargeList(field) => Arc::new(Field::new(
unnest_field.name(),
field.data_type().clone(),
unnest_field.is_nullable(),
)),
_ => {
// If the unnest field is not a list type return the input plan.
return Ok(input);
}
};
let mut unnested_fields: HashMap<usize, _> = HashMap::with_capacity(columns.len());
// Add qualifiers to the columns.
let mut qualified_columns = Vec::with_capacity(columns.len());
for c in &columns {
let index = input.schema().index_of_column(c)?;
let (unnest_qualifier, unnest_field) = input.schema().qualified_field(index);
let unnested_field = match unnest_field.data_type() {
DataType::List(field)
| DataType::FixedSizeList(field, _)
| DataType::LargeList(field) => Arc::new(Field::new(
unnest_field.name(),
field.data_type().clone(),
// Unnesting may produce NULLs even if the list is not null.
// For example: unnset([1], []) -> 1, null
true,
)),
_ => {
// If the unnest field is not a list type return the input plan.
return Ok(input);
}
};
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
unnested_fields.insert(index, unnested_field);
}

// Update the schema with the unnest column type changed to contain the nested type.
// Update the schema with the unnest column types changed to contain the nested types.
let input_schema = input.schema();
let fields = input_schema
.iter()
.map(|(q, f)| {
if f.as_ref() == unnest_field && q == unnest_qualifier {
(unnest_qualifier.cloned(), unnested_field.clone())
} else {
(q.cloned(), f.clone())
}
.enumerate()
.map(|(index, (q, f))| match unnested_fields.get(&index) {
Some(unnested_field) => (q.cloned(), unnested_field.clone()),
None => (q.cloned(), f.clone()),
})
.collect::<Vec<_>>();

Expand All @@ -1580,11 +1597,9 @@ pub fn unnest_with_options(
// We can use the existing functional dependencies:
let deps = input_schema.functional_dependencies().clone();
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
let column = Column::from((unnest_qualifier, unnested_field.as_ref()));

Ok(LogicalPlan::Unnest(Unnest {
input: Arc::new(input),
column,
columns: qualified_columns,
schema,
options,
}))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,10 +638,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Node Type": "DescribeTable"
})
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
LogicalPlan::Unnest(Unnest { columns, .. }) => {
json!({
"Node Type": "Unnest",
"Column": format!("{}", column)
"Column": expr_vec_fmt!(columns),
})
}
}
Expand Down
56 changes: 8 additions & 48 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::change_redundant_column;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
Expand Down Expand Up @@ -807,51 +807,11 @@ impl LogicalPlan {
}
LogicalPlan::DescribeTable(_) => Ok(self.clone()),
LogicalPlan::Unnest(Unnest {
column,
schema,
options,
..
columns, options, ..
}) => {
// Update schema with unnested column type.
let input = Arc::new(inputs.swap_remove(0));
let (nested_qualifier, nested_field) =
input.schema().qualified_field_from_column(column)?;
let (unnested_qualifier, unnested_field) =
schema.qualified_field_from_column(column)?;
let qualifiers_and_fields = input
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&nested_qualifier)
&& field.as_ref() == nested_field
{
(
unnested_qualifier.cloned(),
Arc::new(unnested_field.clone()),
)
} else {
(qualifier.cloned(), field.clone())
}
})
.collect::<Vec<_>>();

let schema = Arc::new(
DFSchema::new_with_metadata(
qualifiers_and_fields,
input.schema().metadata().clone(),
)?
// We can use the existing functional dependencies as is:
.with_functional_dependencies(
input.schema().functional_dependencies().clone(),
)?,
);

Ok(LogicalPlan::Unnest(Unnest {
input,
column: column.clone(),
schema,
options: options.clone(),
}))
let input = inputs.swap_remove(0);
unnest_with_options(input, columns.clone(), options.clone())
}
}
}
Expand Down Expand Up @@ -1581,8 +1541,8 @@ impl LogicalPlan {
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
write!(f, "Unnest: {column}")
LogicalPlan::Unnest(Unnest { columns, .. }) => {
write!(f, "Unnest: {}", expr_vec_fmt!(columns))
}
}
}
Expand Down Expand Up @@ -2556,8 +2516,8 @@ pub enum Partitioning {
pub struct Unnest {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The column to unnest
pub column: Column,
/// The columns to unnest
pub columns: Vec<Column>,
/// The output schema, containing the unnested field column.
pub schema: DFSchemaRef,
/// Options
Expand Down
27 changes: 9 additions & 18 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,13 @@ impl TreeNode for LogicalPlan {
}
LogicalPlan::Unnest(Unnest {
input,
column,
columns,
schema,
options,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Unnest(Unnest {
input,
column,
columns,
schema,
options,
})
Expand Down Expand Up @@ -507,8 +507,12 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScan { filters, .. }) => {
filters.iter().apply_until_stop(f)
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
LogicalPlan::Unnest(Unnest { columns, .. }) => {
let exprs = columns
.iter()
.map(|c| Expr::Column(c.clone()))
.collect::<Vec<_>>();
exprs.iter().apply_until_stop(f)
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
Expand Down Expand Up @@ -706,20 +710,6 @@ impl LogicalPlan {
fetch,
})
}),
LogicalPlan::Unnest(Unnest {
input,
column,
schema,
options,
}) => f(Expr::Column(column))?.map_data(|column| match column {
Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
input,
column,
schema,
options,
})),
_ => internal_err!("Transformation should return Column"),
})?,
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
Expand All @@ -744,6 +734,7 @@ impl LogicalPlan {
}),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl TreeNode for Expr {
) -> Result<TreeNodeRecursion> {
let children = match self {
Expr::Alias(Alias{expr,..})
| Expr::Unnest(Unnest{expr})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -60,7 +61,6 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
Expand Down
Loading

0 comments on commit 34eda15

Please sign in to comment.