From 63114e2cfe39847c2c9505a0aa5b89911b3709b0 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Wed, 17 Apr 2024 23:41:47 +0800 Subject: [PATCH 1/9] feat: `DataFrame` supports unnesting multiple columns --- datafusion/core/src/dataframe/mod.rs | 35 ++++- datafusion/core/tests/dataframe/mod.rs | 137 +++++++++++++++--- datafusion/sqllogictest/test_files/unnest.slt | 18 +++ 3 files changed, 163 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 683cb809a5b1..6dc7ce09334d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -238,6 +238,26 @@ impl DataFrame { } /// Expand each list element of a column to multiple rows. + #[deprecated(since = "37.0.0", note = "use unnest_columns instead")] + pub fn unnest_column(self, column: &str) -> Result { + self.unnest_columns(&[column]) + } + + /// Expand each list element of a column to multiple rows, with + /// behavior controlled by [`UnnestOptions`]. + /// + /// Please see the documentation on [`UnnestOptions`] for more + /// details about the meaning of unnest. + #[deprecated(since = "37.0.0", note = "use unnest_columns_with_options instead")] + pub fn unnest_column_with_options( + self, + column: &str, + options: UnnestOptions, + ) -> Result { + self.unnest_columns_with_options(&[column], options) + } + + /// Expands multiple list columns into a set of rows. /// /// See also: /// @@ -252,26 +272,27 @@ impl DataFrame { /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; - /// let df = df.unnest_column("a")?; + /// let df = df.unnest_columns(&["a", "b"])?; /// # Ok(()) /// # } /// ``` - pub fn unnest_column(self, column: &str) -> Result { - self.unnest_column_with_options(column, UnnestOptions::new()) + pub fn unnest_columns(self, columns: &[&str]) -> Result { + self.unnest_columns_with_options(columns, UnnestOptions::new()) } - /// Expand each list element of a column to multiple rows, with + /// Expand multiple list columns into a set of rows, with /// behavior controlled by [`UnnestOptions`]. /// /// Please see the documentation on [`UnnestOptions`] for more /// details about the meaning of unnest. - pub fn unnest_column_with_options( + pub fn unnest_columns_with_options( self, - column: &str, + columns: &[&str], options: UnnestOptions, ) -> Result { + let columns = columns.iter().map(|c| Column::from(*c)).collect(); let plan = LogicalPlanBuilder::from(self.plan) - .unnest_column_with_options(column, options)? + .unnest_columns_with_options(columns, options)? .build()?; Ok(DataFrame::new(self.session_state, plan)) } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 305a7e69fdb2..0be327cdbc3a 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -19,12 +19,13 @@ mod dataframe_functions; mod describe; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Float32Type, Int32Type, Schema, UInt64Type}; use arrow::util::pretty::pretty_format_batches; use arrow::{ array::{ - ArrayRef, FixedSizeListBuilder, Int32Array, Int32Builder, ListBuilder, - StringArray, StringBuilder, StructBuilder, UInt32Array, UInt32Builder, + ArrayRef, FixedSizeListArray, FixedSizeListBuilder, Int32Array, Int32Builder, + LargeListArray, ListArray, ListBuilder, StringArray, StringBuilder, + StructBuilder, UInt32Array, UInt32Builder, }, record_batch::RecordBatch, }; @@ -896,7 +897,7 @@ async fn unnest_columns() -> Result<()> { // Unnest tags let df = table_with_nested_types(NUM_ROWS).await?; - let results = df.unnest_column("tags")?.collect().await?; + let results = df.unnest_columns(&["tags"])?.collect().await?; let expected = [ "+----------+------------------------------------------------+------+", "| shape_id | points | tags |", @@ -914,12 +915,12 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for tags. let df = table_with_nested_types(NUM_ROWS).await?; - let count = df.unnest_column("tags")?.count().await?; + let count = df.unnest_columns(&["tags"])?.count().await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); // Unnest points let df = table_with_nested_types(NUM_ROWS).await?; - let results = df.unnest_column("points")?.collect().await?; + let results = df.unnest_columns(&["points"])?.collect().await?; let expected = [ "+----------+-----------------+--------------------+", "| shape_id | points | tags |", @@ -938,14 +939,14 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for points. let df = table_with_nested_types(NUM_ROWS).await?; - let count = df.unnest_column("points")?.count().await?; + let count = df.unnest_columns(&["points"])?.count().await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); // Unnest both points and tags. let df = table_with_nested_types(NUM_ROWS).await?; let results = df - .unnest_column("points")? - .unnest_column("tags")? + .unnest_columns(&["points"])? + .unnest_columns(&["tags"])? .collect() .await?; let expected = vec![ @@ -972,8 +973,8 @@ async fn unnest_columns() -> Result<()> { // Test aggregate results for points and tags. let df = table_with_nested_types(NUM_ROWS).await?; let count = df - .unnest_column("points")? - .unnest_column("tags")? + .unnest_columns(&["points"])? + .unnest_columns(&["tags"])? .count() .await?; assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); @@ -1002,7 +1003,7 @@ async fn unnest_column_nulls() -> Result<()> { let results = df .clone() - .unnest_column_with_options("list", options)? + .unnest_columns_with_options(&["list"], options)? .collect() .await?; let expected = [ @@ -1019,7 +1020,7 @@ async fn unnest_column_nulls() -> Result<()> { let options = UnnestOptions::new().with_preserve_nulls(false); let results = df - .unnest_column_with_options("list", options)? + .unnest_columns_with_options(&["list"], options)? .collect() .await?; let expected = [ @@ -1062,7 +1063,7 @@ async fn unnest_fixed_list() -> Result<()> { let options = UnnestOptions::new().with_preserve_nulls(true); let results = df - .unnest_column_with_options("tags", options)? + .unnest_columns_with_options(&["tags"], options)? .collect() .await?; let expected = vec![ @@ -1112,7 +1113,7 @@ async fn unnest_fixed_list_drop_nulls() -> Result<()> { let options = UnnestOptions::new().with_preserve_nulls(false); let results = df - .unnest_column_with_options("tags", options)? + .unnest_columns_with_options(&["tags"], options)? .collect() .await?; let expected = [ @@ -1178,7 +1179,7 @@ async fn unnest_fixed_list_nonull() -> Result<()> { let options = UnnestOptions::new().with_preserve_nulls(true); let results = df - .unnest_column_with_options("tags", options)? + .unnest_columns_with_options(&["tags"], options)? .collect() .await?; let expected = vec![ @@ -1225,7 +1226,7 @@ async fn unnest_aggregate_columns() -> Result<()> { let df = table_with_nested_types(NUM_ROWS).await?; let results = df - .unnest_column("tags")? + .unnest_columns(&["tags"])? .aggregate(vec![], vec![count(col("tags"))])? .collect() .await?; @@ -1308,7 +1309,7 @@ async fn unnest_array_agg() -> Result<()> { vec![col("shape_id")], vec![array_agg(col("tag_id")).alias("tag_id")], )? - .unnest_column("tag_id")? + .unnest_columns(&["tag_id"])? .collect() .await?; let expected = vec![ @@ -1377,7 +1378,7 @@ async fn unnest_with_redundant_columns() -> Result<()> { vec![col("shape_id")], vec![array_agg(col("shape_id")).alias("shape_id2")], )? - .unnest_column("shape_id2")? + .unnest_columns(&["shape_id2"])? .select(vec![col("shape_id")])?; let optimized_plan = df.clone().into_optimized_plan()?; @@ -1422,7 +1423,7 @@ async fn unnest_analyze_metrics() -> Result<()> { let df = table_with_nested_types(NUM_ROWS).await?; let results = df - .unnest_column("tags")? + .unnest_columns(&["tags"])? .explain(false, true)? .collect() .await?; @@ -1437,6 +1438,61 @@ async fn unnest_analyze_metrics() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn unnest_multiple_columns() -> Result<()> { + let df = table_with_mixed_lists().await?; + // Default behavior is to preserve nulls. + let results = df + .clone() + .unnest_columns(&["list", "large_list", "fixed_list"])? + .collect() + .await?; + let expected = [ + "+------+------------+------------+--------+", + "| list | large_list | fixed_list | string |", + "+------+------------+------------+--------+", + "| | | | d |", + "| | | 3 | c |", + "| | | 4 | c |", + "| | 2.2 | 1 | b |", + "| | 3.3 | 2 | b |", + "| | 4.4 | | b |", + "| 1 | | | a |", + "| 2 | 1.1 | | a |", + "| 3 | | | a |", + "+------+------------+------------+--------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + // Test with `preserve_nulls = false`` + let results = df + .clone() + .unnest_columns_with_options( + &["list", "large_list", "fixed_list"], + UnnestOptions::new().with_preserve_nulls(false), + )? + .collect() + .await?; + let expected = [ + "+------+------------+------------+--------+", + "| list | large_list | fixed_list | string |", + "+------+------------+------------+--------+", + "| | | 3 | c |", + "| | | 4 | c |", + "| | 2.2 | 1 | b |", + "| | 3.3 | 2 | b |", + "| | 4.4 | | b |", + "| 1 | | | a |", + "| 2 | 1.1 | | a |", + "| 3 | | | a |", + "+------+------------+------------+--------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn test_read_batches() -> Result<()> { let config = SessionConfig::new(); @@ -1731,6 +1787,47 @@ fn get_fixed_list_batch() -> Result { Ok(batch) } +/// Create a table with different types of list columns and a string column. +async fn table_with_mixed_lists() -> Result { + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + None, + Some(vec![None]), + None, + ]); + + let large_list_array = + LargeListArray::from_iter_primitive::(vec![ + Some(vec![None, Some(1.1)]), + Some(vec![Some(2.2), Some(3.3), Some(4.4)]), + None, + Some(vec![]), + ]); + + let fixed_list_array = FixedSizeListArray::from_iter_primitive::( + vec![ + None, + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + None, + ], + 2, + ); + + let string_array = StringArray::from(vec!["a", "b", "c", "d"]); + + let batch = RecordBatch::try_from_iter(vec![ + ("list", Arc::new(list_array) as ArrayRef), + ("large_list", Arc::new(large_list_array) as ArrayRef), + ("fixed_list", Arc::new(fixed_list_array) as ArrayRef), + ("string", Arc::new(string_array) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("mixed_lists", batch)?; + ctx.table("mixed_lists").await +} + /// A a data frame that a list of integers and string IDs async fn table_with_lists_and_nulls() -> Result { let mut list_builder = ListBuilder::new(UInt32Builder::new()); diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 38207fa7d1d6..8c5a44b769d1 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -383,5 +383,23 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 from unnest_table; 5 3 11 NULL +# Unnest with non-nullable lists +statement ok +create table non_nullable_list( + c1 int[] not null, + c2 int[] not null +) as values ([1,2,3], [4]); + +# Unnesting may produce NULLs even if the lists are non-nullable +query II +select unnest(c1), unnest(c2) from non_nullable_list; +---- +1 4 +2 NULL +3 NULL + statement ok drop table unnest_table; + +statement ok +drop table non_nullable_list; From c47f0cabeeab717331c24f794540c8c5d3862eb8 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 00:03:31 +0800 Subject: [PATCH 2/9] replace test with unnest_non_nullable_list --- datafusion/core/tests/dataframe/mod.rs | 30 +++++++++++++++++++ datafusion/sqllogictest/test_files/unnest.slt | 20 +------------ 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 0be327cdbc3a..e2d4f11c9b8a 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1493,6 +1493,36 @@ async fn unnest_multiple_columns() -> Result<()> { Ok(()) } +/// Test unnesting a non-nullable list. +#[tokio::test] +async fn unnest_non_nullable_list() -> Result<()> { + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![None]), + ]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::new_list(DataType::Int32, true), + false, + )])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(list_array)])?; + let ctx = SessionContext::new(); + let results = ctx + .read_batches(vec![batch])? + .unnest_columns(&["c1"])? + .collect() + .await?; + + // Unnesting may produce NULLs even if the lists are non-nullable. + let expected = [ + "+----+", "| c1 |", "+----+", "| |", "| 1 |", "| 2 |", "+----+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn test_read_batches() -> Result<()> { let config = SessionConfig::new(); diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 8c5a44b769d1..292f97b331c1 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -383,23 +383,5 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 from unnest_table; 5 3 11 NULL -# Unnest with non-nullable lists statement ok -create table non_nullable_list( - c1 int[] not null, - c2 int[] not null -) as values ([1,2,3], [4]); - -# Unnesting may produce NULLs even if the lists are non-nullable -query II -select unnest(c1), unnest(c2) from non_nullable_list; ----- -1 4 -2 NULL -3 NULL - -statement ok -drop table unnest_table; - -statement ok -drop table non_nullable_list; +drop table unnest_table; \ No newline at end of file From 481bfc2877803f1ae78eb1f898af932ab802f4d0 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 00:04:15 +0800 Subject: [PATCH 3/9] restore unnest.slt --- datafusion/sqllogictest/test_files/unnest.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 292f97b331c1..38207fa7d1d6 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -384,4 +384,4 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 from unnest_table; 11 NULL statement ok -drop table unnest_table; \ No newline at end of file +drop table unnest_table; From 7e8c9462101c6075ada64dd0020aa6dd7a75bf16 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 00:05:13 +0800 Subject: [PATCH 4/9] fix comment --- datafusion/core/tests/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e2d4f11c9b8a..73b40a55b898 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1514,7 +1514,7 @@ async fn unnest_non_nullable_list() -> Result<()> { .collect() .await?; - // Unnesting may produce NULLs even if the lists are non-nullable. + // Unnesting may produce NULLs even if the list is non-nullable. let expected = [ "+----+", "| c1 |", "+----+", "| |", "| 1 |", "| 2 |", "+----+", ]; From 015a0184b59546b574e7e2eb78344e4a02215441 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 00:32:06 +0800 Subject: [PATCH 5/9] fix typo --- datafusion/core/src/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 6dc7ce09334d..e3e77385919c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -257,7 +257,7 @@ impl DataFrame { self.unnest_columns_with_options(&[column], options) } - /// Expands multiple list columns into a set of rows. + /// Expand multiple list columns into a set of rows. /// /// See also: /// From 11fd52456ce393f06197e07eb2620456dabbf077 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 09:51:11 +0800 Subject: [PATCH 6/9] make tests straightforward --- datafusion/core/tests/dataframe/mod.rs | 32 ++++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 73b40a55b898..8c2aa0f6fc6c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1448,22 +1448,26 @@ async fn unnest_multiple_columns() -> Result<()> { .unnest_columns(&["list", "large_list", "fixed_list"])? .collect() .await?; + // list: [1,2,3], null, [null], null, + // large_list: [null, 1.1], [2.2, 3.3, 4.4], null, [], + // fixed_list: null, [1,2], [3,4], null + // string: a, b, c, d let expected = [ "+------+------------+------------+--------+", "| list | large_list | fixed_list | string |", "+------+------------+------------+--------+", - "| | | | d |", - "| | | 3 | c |", - "| | | 4 | c |", - "| | 2.2 | 1 | b |", - "| | 3.3 | 2 | b |", - "| | 4.4 | | b |", "| 1 | | | a |", "| 2 | 1.1 | | a |", "| 3 | | | a |", + "| | 2.2 | 1 | b |", + "| | 3.3 | 2 | b |", + "| | 4.4 | | b |", + "| | | 3 | c |", + "| | | 4 | c |", + "| | | | d |", "+------+------------+------------+--------+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); // Test with `preserve_nulls = false`` let results = df @@ -1474,18 +1478,22 @@ async fn unnest_multiple_columns() -> Result<()> { )? .collect() .await?; + // list: [1,2,3], null, [null], null, + // large_list: [null, 1.1], [2.2, 3.3, 4.4], null, [], + // fixed_list: null, [1,2], [3,4], null + // string: a, b, c, d let expected = [ "+------+------------+------------+--------+", "| list | large_list | fixed_list | string |", "+------+------------+------------+--------+", - "| | | 3 | c |", - "| | | 4 | c |", - "| | 2.2 | 1 | b |", - "| | 3.3 | 2 | b |", - "| | 4.4 | | b |", "| 1 | | | a |", "| 2 | 1.1 | | a |", "| 3 | | | a |", + "| | 2.2 | 1 | b |", + "| | 3.3 | 2 | b |", + "| | 4.4 | | b |", + "| | | 3 | c |", + "| | | 4 | c |", "+------+------------+------------+--------+", ]; assert_batches_sorted_eq!(expected, &results); From a9be76cc63dcad910c576df5d21479c8123f46d9 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 09:52:41 +0800 Subject: [PATCH 7/9] use assert_batches_eq --- datafusion/core/tests/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 8c2aa0f6fc6c..3e78cf923b1a 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1496,7 +1496,7 @@ async fn unnest_multiple_columns() -> Result<()> { "| | | 4 | c |", "+------+------------+------------+--------+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); Ok(()) } From 2f334ceb72f2c5e94f70386b463f8dbd043083e1 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 09:54:58 +0800 Subject: [PATCH 8/9] format test --- datafusion/core/tests/dataframe/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 3e78cf923b1a..957ed67c2b4b 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1523,10 +1523,17 @@ async fn unnest_non_nullable_list() -> Result<()> { .await?; // Unnesting may produce NULLs even if the list is non-nullable. + #[rustfmt::skip] let expected = [ - "+----+", "| c1 |", "+----+", "| |", "| 1 |", "| 2 |", "+----+", + "+----+", + "| c1 |", + "+----+", + "| 1 |", + "| 2 |", + "| |", + "+----+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); Ok(()) } From 15b34ef0d37b2c8de89ba5722556a531aed41911 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 18 Apr 2024 10:04:15 +0800 Subject: [PATCH 9/9] remove clone --- datafusion/core/tests/dataframe/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 957ed67c2b4b..6576655a8f15 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1471,7 +1471,6 @@ async fn unnest_multiple_columns() -> Result<()> { // Test with `preserve_nulls = false`` let results = df - .clone() .unnest_columns_with_options( &["list", "large_list", "fixed_list"], UnnestOptions::new().with_preserve_nulls(false),