From 66f4fcb4664fc797ffb046d5b2ebcfca65ba4cd7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 2 Apr 2024 17:21:02 -0400 Subject: [PATCH 1/3] Use `struct` instead of `named_struct` when there are no aliases (#9897) * Revert "use alias (#9894)" This reverts commit 9487ca057353370aa75895453c92bb40b9f33ac6. * Use `struct` instead of `named_struct` when there are no aliases * Update docs * fmt --- datafusion/sql/src/expr/mod.rs | 48 +++++++++++++++++++ .../sqllogictest/test_files/explain.slt | 4 +- datafusion/sqllogictest/test_files/struct.slt | 6 +-- .../source/user-guide/sql/scalar_functions.md | 10 ++++ 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 064578ad51d6..43aea1f2b75a 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -593,6 +593,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + /// Parses a struct(..) expression fn parse_struct( &self, values: Vec, @@ -603,6 +604,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if !fields.is_empty() { return not_impl_err!("Struct fields are not supported yet"); } + + if values + .iter() + .any(|value| matches!(value, SQLExpr::Named { .. })) + { + self.create_named_struct(values, input_schema, planner_context) + } else { + self.create_struct(values, input_schema, planner_context) + } + } + + // Handles a call to struct(...) where the arguments are named. For example + // `struct (v as foo, v2 as bar)` by creating a call to the `named_struct` function + fn create_named_struct( + &self, + values: Vec, + input_schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { let args = values .into_iter() .enumerate() @@ -647,6 +667,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))) } + // Handles a call to struct(...) where the arguments are not named. For example + // `struct (v, v2)` by creating a call to the `struct` function + // which will create a struct with fields named `c0`, `c1`, etc. + fn create_struct( + &self, + values: Vec, + input_schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let args = values + .into_iter() + .map(|value| { + self.sql_expr_to_logical_expr(value, input_schema, planner_context) + }) + .collect::>>()?; + let struct_func = self + .context_provider + .get_function_meta("struct") + .ok_or_else(|| { + internal_datafusion_err!("Unable to find expected 'struct' function") + })?; + + Ok(Expr::ScalarFunction(ScalarFunction::new_udf( + struct_func, + args, + ))) + } + fn parse_array_agg( &self, array_agg: ArrayAgg, diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 4653250cf93f..b7ad36dace16 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -390,8 +390,8 @@ query TT explain select struct(1, 2.3, 'abc'); ---- logical_plan -Projection: Struct({c0:1,c1:2.3,c2:abc}) AS named_struct(Utf8("c0"),Int64(1),Utf8("c1"),Float64(2.3),Utf8("c2"),Utf8("abc")) +Projection: Struct({c0:1,c1:2.3,c2:abc}) AS struct(Int64(1),Float64(2.3),Utf8("abc")) --EmptyRelation physical_plan -ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as named_struct(Utf8("c0"),Int64(1),Utf8("c1"),Float64(2.3),Utf8("c2"),Utf8("abc"))] +ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as struct(Int64(1),Float64(2.3),Utf8("abc"))] --PlaceholderRowExec diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index 2e0b699f6dd6..8a6256add6ac 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -85,10 +85,10 @@ query TT explain select struct(a, b, c) from values; ---- logical_plan -Projection: named_struct(Utf8("c0"), values.a, Utf8("c1"), values.b, Utf8("c2"), values.c) +Projection: struct(values.a, values.b, values.c) --TableScan: values projection=[a, b, c] physical_plan -ProjectionExec: expr=[named_struct(c0, a@0, c1, b@1, c2, c@2) as named_struct(Utf8("c0"),values.a,Utf8("c1"),values.b,Utf8("c2"),values.c)] +ProjectionExec: expr=[struct(a@0, b@1, c@2) as struct(values.a,values.b,values.c)] --MemoryExec: partitions=1, partition_sizes=[1] # error on 0 arguments @@ -179,4 +179,4 @@ drop table values; query T select arrow_typeof(named_struct('first', 1, 'second', 2, 'third', 3)); ---- -Struct([Field { name: "first", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "second", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "third", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \ No newline at end of file +Struct([Field { name: "first", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "second", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "third", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index e2e129a2e2d1..62b81ea7ea4b 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -3336,6 +3336,16 @@ select * from t; | 3 | 4 | +---+---+ +-- use default names `c0`, `c1` +❯ select struct(a, b) from t; ++-----------------+ +| struct(t.a,t.b) | ++-----------------+ +| {c0: 1, c1: 2} | +| {c0: 3, c1: 4} | ++-----------------+ + +-- name the first field `field_a` select struct(a as field_a, b) from t; +--------------------------------------------------+ | named_struct(Utf8("field_a"),t.a,Utf8("c1"),t.b) | From f0eec349a1abed14bcb2ee8a9fbf98bbb19b8f9a Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Fri, 5 Apr 2024 15:57:48 -0500 Subject: [PATCH 2/3] coercion vec[Dictionary, Utf8] to Dictionary for coalesce function (#9958) * for debug finish remove print add space * fix clippy * finish * fix clippy --- .../expr/src/type_coercion/functions.rs | 57 ++++++++++++------- datafusion/sqllogictest/test_files/scalar.slt | 42 +++++++++++++- 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index d4095a72fe3e..34b607d0884d 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -311,17 +311,25 @@ fn coerced_from<'a>( type_from: &'a DataType, ) -> Option { use self::DataType::*; - - match type_into { + // match Dictionary first + match (type_into, type_from) { + // coerced dictionary first + (cur_type, Dictionary(_, value_type)) | (Dictionary(_, value_type), cur_type) + if coerced_from(cur_type, value_type).is_some() => + { + Some(type_into.clone()) + } // coerced into type_into - Int8 if matches!(type_from, Null | Int8) => Some(type_into.clone()), - Int16 if matches!(type_from, Null | Int8 | Int16 | UInt8) => { + (Int8, _) if matches!(type_from, Null | Int8) => Some(type_into.clone()), + (Int16, _) if matches!(type_from, Null | Int8 | Int16 | UInt8) => { Some(type_into.clone()) } - Int32 if matches!(type_from, Null | Int8 | Int16 | Int32 | UInt8 | UInt16) => { + (Int32, _) + if matches!(type_from, Null | Int8 | Int16 | Int32 | UInt8 | UInt16) => + { Some(type_into.clone()) } - Int64 + (Int64, _) if matches!( type_from, Null | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 @@ -329,15 +337,17 @@ fn coerced_from<'a>( { Some(type_into.clone()) } - UInt8 if matches!(type_from, Null | UInt8) => Some(type_into.clone()), - UInt16 if matches!(type_from, Null | UInt8 | UInt16) => Some(type_into.clone()), - UInt32 if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => { + (UInt8, _) if matches!(type_from, Null | UInt8) => Some(type_into.clone()), + (UInt16, _) if matches!(type_from, Null | UInt8 | UInt16) => { + Some(type_into.clone()) + } + (UInt32, _) if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => { Some(type_into.clone()) } - UInt64 if matches!(type_from, Null | UInt8 | UInt16 | UInt32 | UInt64) => { + (UInt64, _) if matches!(type_from, Null | UInt8 | UInt16 | UInt32 | UInt64) => { Some(type_into.clone()) } - Float32 + (Float32, _) if matches!( type_from, Null | Int8 @@ -353,7 +363,7 @@ fn coerced_from<'a>( { Some(type_into.clone()) } - Float64 + (Float64, _) if matches!( type_from, Null | Int8 @@ -371,7 +381,7 @@ fn coerced_from<'a>( { Some(type_into.clone()) } - Timestamp(TimeUnit::Nanosecond, None) + (Timestamp(TimeUnit::Nanosecond, None), _) if matches!( type_from, Null | Timestamp(_, None) | Date32 | Utf8 | LargeUtf8 @@ -379,23 +389,27 @@ fn coerced_from<'a>( { Some(type_into.clone()) } - Interval(_) if matches!(type_from, Utf8 | LargeUtf8) => Some(type_into.clone()), + (Interval(_), _) if matches!(type_from, Utf8 | LargeUtf8) => { + Some(type_into.clone()) + } // Any type can be coerced into strings - Utf8 | LargeUtf8 => Some(type_into.clone()), - Null if can_cast_types(type_from, type_into) => Some(type_into.clone()), + (Utf8 | LargeUtf8, _) => Some(type_into.clone()), + (Null, _) if can_cast_types(type_from, type_into) => Some(type_into.clone()), - List(_) if matches!(type_from, FixedSizeList(_, _)) => Some(type_into.clone()), + (List(_), _) if matches!(type_from, FixedSizeList(_, _)) => { + Some(type_into.clone()) + } // Only accept list and largelist with the same number of dimensions unless the type is Null. // List or LargeList with different dimensions should be handled in TypeSignature or other places before this - List(_) | LargeList(_) + (List(_) | LargeList(_), _) if datafusion_common::utils::base_type(type_from).eq(&Null) || list_ndims(type_from) == list_ndims(type_into) => { Some(type_into.clone()) } // should be able to coerce wildcard fixed size list to non wildcard fixed size list - FixedSizeList(f_into, FIXED_SIZE_LIST_WILDCARD) => match type_from { + (FixedSizeList(f_into, FIXED_SIZE_LIST_WILDCARD), _) => match type_from { FixedSizeList(f_from, size_from) => { match coerced_from(f_into.data_type(), f_from.data_type()) { Some(data_type) if &data_type != f_into.data_type() => { @@ -410,7 +424,7 @@ fn coerced_from<'a>( _ => None, }, - Timestamp(unit, Some(tz)) if tz.as_ref() == TIMEZONE_WILDCARD => { + (Timestamp(unit, Some(tz)), _) if tz.as_ref() == TIMEZONE_WILDCARD => { match type_from { Timestamp(_, Some(from_tz)) => { Some(Timestamp(unit.clone(), Some(from_tz.clone()))) @@ -422,7 +436,7 @@ fn coerced_from<'a>( _ => None, } } - Timestamp(_, Some(_)) + (Timestamp(_, Some(_)), _) if matches!( type_from, Null | Timestamp(_, _) | Date32 | Utf8 | LargeUtf8 @@ -430,7 +444,6 @@ fn coerced_from<'a>( { Some(type_into.clone()) } - // More coerce rules. // Note that not all rules in `comparison_coercion` can be reused here. // For example, all numeric types can be coerced into Utf8 for comparison, diff --git a/datafusion/sqllogictest/test_files/scalar.slt b/datafusion/sqllogictest/test_files/scalar.slt index 20c8b3d25fdd..e0c600c1484c 100644 --- a/datafusion/sqllogictest/test_files/scalar.slt +++ b/datafusion/sqllogictest/test_files/scalar.slt @@ -1779,6 +1779,46 @@ SELECT COALESCE(NULL, 'test') ---- test + +statement ok +create table test1 as values (arrow_cast('foo', 'Dictionary(Int32, Utf8)')), (null); + +# test coercion string +query ? +select coalesce(column1, 'none_set') from test1; +---- +foo +none_set + +# test coercion Int +query I +select coalesce(34, arrow_cast(123, 'Dictionary(Int32, Int8)')); +---- +34 + +# test with Int +query I +select coalesce(arrow_cast(123, 'Dictionary(Int32, Int8)'),34); +---- +123 + +# test with null +query I +select coalesce(null, 34, arrow_cast(123, 'Dictionary(Int32, Int8)')); +---- +34 + +# test with null +query T +select coalesce(null, column1, 'none_set') from test1; +---- +foo +none_set + +statement ok +drop table test1 + + statement ok CREATE TABLE test( c1 INT, @@ -2162,5 +2202,3 @@ query I select strpos('joséésoj', arrow_cast(null, 'Utf8')); ---- NULL - - From e8de1c612a986ae4b0348ce0a9d92f08d93c258c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 10 Apr 2024 11:14:02 -0400 Subject: [PATCH 3/3] fix NamedStructField should be rewritten in OperatorToFunction in subquery --- .../src/analyzer/function_rewrite.rs | 129 ++++++++++++------ .../sqllogictest/test_files/subquery.slt | 55 ++++++++ 2 files changed, 144 insertions(+), 40 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs b/datafusion/optimizer/src/analyzer/function_rewrite.rs index 15b5265df958..70cb54c24f6c 100644 --- a/datafusion/optimizer/src/analyzer/function_rewrite.rs +++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs @@ -21,9 +21,10 @@ use super::AnalyzerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::{DFSchema, Result}; +use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::expr_rewriter::{rewrite_preserving_name, FunctionRewrite}; use datafusion_expr::utils::merge_schema; -use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_expr::{Expr, LogicalPlan, Subquery}; use std::sync::Arc; /// Analyzer rule that invokes [`FunctionRewrite`]s on expressions @@ -45,52 +46,66 @@ impl AnalyzerRule for ApplyFunctionRewrites { } fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result { - self.analyze_internal(&plan, options) + analyze_internal(&plan, &self.function_rewrites, options) } } -impl ApplyFunctionRewrites { - fn analyze_internal( - &self, - plan: &LogicalPlan, - options: &ConfigOptions, - ) -> Result { - // optimize child plans first - let new_inputs = plan - .inputs() - .iter() - .map(|p| self.analyze_internal(p, options)) - .collect::>>()?; - - // get schema representing all available input fields. This is used for data type - // resolution only, so order does not matter here - let mut schema = merge_schema(new_inputs.iter().collect()); - - if let LogicalPlan::TableScan(ts) = plan { - let source_schema = - DFSchema::try_from_qualified_schema(&ts.table_name, &ts.source.schema())?; - schema.merge(&source_schema); - } +fn analyze_internal( + plan: &LogicalPlan, + function_rewrites: &[Arc], + options: &ConfigOptions, +) -> Result { + // optimize child plans first + let new_inputs = plan + .inputs() + .iter() + .map(|p| analyze_internal(p, function_rewrites, options)) + .collect::>>()?; - let mut expr_rewrite = OperatorToFunctionRewriter { - function_rewrites: &self.function_rewrites, - options, - schema: &schema, - }; + // get schema representing all available input fields. This is used for data type + // resolution only, so order does not matter here + let mut schema = merge_schema(new_inputs.iter().collect()); - let new_expr = plan - .expressions() - .into_iter() - .map(|expr| { - // ensure names don't change: - // https://github.com/apache/arrow-datafusion/issues/3555 - rewrite_preserving_name(expr, &mut expr_rewrite) - }) - .collect::>>()?; - - plan.with_new_exprs(new_expr, new_inputs) + if let LogicalPlan::TableScan(ts) = plan { + let source_schema = DFSchema::try_from_qualified_schema( + ts.table_name.clone(), + &ts.source.schema(), + )?; + schema.merge(&source_schema); } + + let mut expr_rewrite = OperatorToFunctionRewriter { + function_rewrites, + options, + schema: &schema, + }; + + let new_expr = plan + .expressions() + .into_iter() + .map(|expr| { + // ensure names don't change: + // https://github.com/apache/arrow-datafusion/issues/3555 + rewrite_preserving_name(expr, &mut expr_rewrite) + }) + .collect::>>()?; + + plan.with_new_exprs(new_expr, new_inputs) } + +fn rewrite_subquery( + mut subquery: Subquery, + function_rewrites: &[Arc], + options: &ConfigOptions, +) -> Result { + subquery.subquery = Arc::new(analyze_internal( + &subquery.subquery, + function_rewrites, + options, + )?); + Ok(subquery) +} + struct OperatorToFunctionRewriter<'a> { function_rewrites: &'a [Arc], options: &'a ConfigOptions, @@ -111,6 +126,40 @@ impl<'a> TreeNodeRewriter for OperatorToFunctionRewriter<'a> { expr = result.data } + // recurse into subqueries if needed + let expr = match expr { + Expr::ScalarSubquery(subquery) => Expr::ScalarSubquery(rewrite_subquery( + subquery, + self.function_rewrites, + self.options, + )?), + + Expr::Exists(Exists { subquery, negated }) => Expr::Exists(Exists { + subquery: rewrite_subquery( + subquery, + self.function_rewrites, + self.options, + )?, + negated, + }), + + Expr::InSubquery(InSubquery { + expr, + subquery, + negated, + }) => Expr::InSubquery(InSubquery { + expr, + subquery: rewrite_subquery( + subquery, + self.function_rewrites, + self.options, + )?, + negated, + }), + + expr => expr, + }; + Ok(if transformed { Transformed::yes(expr) } else { diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 4fb94cfab523..812f927fcc84 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1060,3 +1060,58 @@ logical_plan Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1) --Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a ----TableScan: t projection=[a] + +### +## Ensure that operators are rewritten in subqueries +### + +statement ok +create table foo(x int) as values (1); + +# Show input data +query ? +select struct(1, 'b') +---- +{c0: 1, c1: b} + + +query T +select (select struct(1, 'b')['c1']); +---- +b + +query T +select 'foo' || (select struct(1, 'b')['c1']); +---- +foob + +query I +SELECT * FROM (VALUES (1), (2)) +WHERE column1 IN (SELECT struct(1, 'b')['c0']); +---- +1 + +# also add an expression so the subquery is the output expr +query I +SELECT * FROM (VALUES (1), (2)) +WHERE 1+2 = 3 AND column1 IN (SELECT struct(1, 'b')['c0']); +---- +1 + + +query I +SELECT * FROM foo +WHERE EXISTS (SELECT * FROM (values (1)) WHERE column1 = foo.x AND struct(1, 'b')['c0'] = 1); +---- +1 + +# also add an expression so the subquery is the output expr +query I +SELECT * FROM foo +WHERE 1+2 = 3 AND EXISTS (SELECT * FROM (values (1)) WHERE column1 = foo.x AND struct(1, 'b')['c0'] = 1); +---- +1 + + +statement ok +drop table foo;