From 1c73a23900ad27a0561ff79d840897916802966b Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Thu, 12 Dec 2024 21:16:11 +0000 Subject: [PATCH] fix union serialisation order in proto (#13709) * fix union serialisation order in proto * clippy * address comments --- datafusion/proto/src/logical_plan/mod.rs | 19 ++++------- .../tests/cases/roundtrip_logical_plan.rs | 34 +++++++++++++++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 50636048ebc9..addafeb7629d 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -737,23 +737,18 @@ impl AsLogicalPlan for LogicalPlanNode { builder.build() } LogicalPlanType::Union(union) => { - let mut input_plans: Vec = union - .inputs - .iter() - .map(|i| i.try_into_logical_plan(ctx, extension_codec)) - .collect::>()?; - - if input_plans.len() < 2 { + if union.inputs.len() < 2 { return Err( DataFusionError::Internal(String::from( "Protobuf deserialization error, Union was require at least two input.", ))); } + let (first, rest) = union.inputs.split_first().unwrap(); + let mut builder = LogicalPlanBuilder::from( + first.try_into_logical_plan(ctx, extension_codec)?, + ); - let first = input_plans.pop().ok_or_else(|| DataFusionError::Internal(String::from( - "Protobuf deserialization error, Union was require at least two input.", - )))?; - let mut builder = LogicalPlanBuilder::from(first); - for plan in input_plans { + for i in rest { + let plan = i.try_into_logical_plan(ctx, extension_codec)?; builder = builder.union(plan)?; } builder.build() diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 8c150b20dd00..f793e96f612b 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -25,6 +25,8 @@ use arrow::datatypes::{ }; use arrow::util::pretty::pretty_format_batches; use datafusion::datasource::file_format::json::JsonFormatFactory; +use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion; +use datafusion::optimizer::Optimizer; use datafusion_common::parsers::CompressionTypeVariant; use prost::Message; use std::any::Any; @@ -2555,3 +2557,35 @@ async fn roundtrip_recursive_query() { format!("{}", pretty_format_batches(&output_round_trip).unwrap()) ); } + +#[tokio::test] +async fn roundtrip_union_query() -> Result<()> { + let query = "SELECT a FROM t1 + UNION (SELECT a from t1 UNION SELECT a from t2)"; + + let ctx = SessionContext::new(); + ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) + .await?; + ctx.register_csv("t2", "tests/testdata/test.csv", CsvReadOptions::default()) + .await?; + let dataframe = ctx.sql(query).await?; + let plan = dataframe.into_optimized_plan()?; + + let bytes = logical_plan_to_bytes(&plan)?; + + let ctx = SessionContext::new(); + ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) + .await?; + ctx.register_csv("t2", "tests/testdata/test.csv", CsvReadOptions::default()) + .await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + // proto deserialisation only supports 2-way union, hence this plan has nested unions + // apply the flatten unions optimizer rule to be able to compare + let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateNestedUnion::new())]); + let unnested = optimizer.optimize(logical_round_trip, &(ctx.state()), |_x, _y| {})?; + assert_eq!( + format!("{}", plan.display_indent_schema()), + format!("{}", unnested.display_indent_schema()), + ); + Ok(()) +}