diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 447b69e414cf..a38fd7f1fe28 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1273,6 +1273,7 @@ dependencies = [ "itertools", "log", "md-5", + "rand", "regex", "sha2", "unicode-segmentation", @@ -1355,7 +1356,6 @@ dependencies = [ "md-5", "paste", "petgraph", - "rand", "regex", "sha2", ] diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 42514537e28d..bb268e048d9a 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -292,6 +292,23 @@ pub trait TreeNode: Sized { ) } + /// Returns true if `f` returns true for node in the tree. + /// + /// Stops recursion as soon as a matching node is found + fn exists bool>(&self, mut f: F) -> bool { + let mut found = false; + self.apply(&mut |n| { + Ok(if f(n) { + found = true; + TreeNodeRecursion::Stop + } else { + TreeNodeRecursion::Continue + }) + }) + .unwrap(); + found + } + /// Apply the closure `F` to the node's children. fn apply_children Result>( &self, diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 19e71a92a706..dc7e0529decb 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -105,19 +105,23 @@ pub trait PruningStatistics { fn num_containers(&self) -> usize; /// Return the number of null values for the named column as an - /// `Option`. + /// [`UInt64Array`] /// /// See [`Self::min_values`] for when to return `None` and null values. /// /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array fn null_counts(&self, column: &Column) -> Option; /// Return the number of rows for the named column in each container - /// as an `Option`. + /// as an [`UInt64Array`]. /// /// See [`Self::min_values`] for when to return `None` and null values. /// /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array fn row_counts(&self, column: &Column) -> Option; /// Returns [`BooleanArray`] where each row represents information known @@ -1519,6 +1523,7 @@ mod tests { array::{BinaryArray, Int32Array, Int64Array, StringArray}, datatypes::{DataType, TimeUnit}, }; + use arrow_array::UInt64Array; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::InList; @@ -1684,10 +1689,10 @@ mod tests { /// there are containers fn with_null_counts( mut self, - counts: impl IntoIterator>, + counts: impl IntoIterator>, ) -> Self { let null_counts: ArrayRef = - Arc::new(counts.into_iter().collect::()); + Arc::new(counts.into_iter().collect::()); self.assert_invariants(); self.null_counts = Some(null_counts); @@ -1698,10 +1703,10 @@ mod tests { /// there are containers fn with_row_counts( mut self, - counts: impl IntoIterator>, + counts: impl IntoIterator>, ) -> Self { let row_counts: ArrayRef = - Arc::new(counts.into_iter().collect::()); + Arc::new(counts.into_iter().collect::()); self.assert_invariants(); self.row_counts = Some(row_counts); @@ -1753,13 +1758,13 @@ mod tests { self } - /// Add null counts for the specified columm. + /// Add null counts for the specified column. /// There must be the same number of null counts as /// there are containers fn with_null_counts( mut self, name: impl Into, - counts: impl IntoIterator>, + counts: impl IntoIterator>, ) -> Self { let col = Column::from_name(name.into()); @@ -1775,13 +1780,13 @@ mod tests { self } - /// Add row counts for the specified columm. + /// Add row counts for the specified column. /// There must be the same number of row counts as /// there are containers fn with_row_counts( mut self, name: impl Into, - counts: impl IntoIterator>, + counts: impl IntoIterator>, ) -> Self { let col = Column::from_name(name.into()); @@ -1797,7 +1802,7 @@ mod tests { self } - /// Add contained information for the specified columm. + /// Add contained information for the specified column. fn with_contained( mut self, name: impl Into, diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index 5a2f040c09d8..a0bcdda84d64 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -28,9 +28,10 @@ use datafusion_expr::expr::ScalarFunction; use datafusion_expr::logical_plan::builder::table_scan_with_filters; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ - expr, table_scan, BuiltinScalarFunction, Cast, ColumnarValue, Expr, ExprSchemable, - LogicalPlan, LogicalPlanBuilder, ScalarUDF, Volatility, + expr, table_scan, Cast, ColumnarValue, Expr, ExprSchemable, LogicalPlan, + LogicalPlanBuilder, ScalarUDF, Volatility, }; +use datafusion_functions::math; use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions}; use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use std::sync::Arc; @@ -383,17 +384,17 @@ fn test_const_evaluator_scalar_functions() { // volatile / stable functions should not be evaluated // rand() + (1 + 2) --> rand() + 3 - let fun = BuiltinScalarFunction::Random; - assert_eq!(fun.volatility(), Volatility::Volatile); - let rand = Expr::ScalarFunction(ScalarFunction::new(fun, vec![])); + let fun = math::random(); + assert_eq!(fun.signature().volatility, Volatility::Volatile); + let rand = Expr::ScalarFunction(ScalarFunction::new_udf(fun, vec![])); let expr = rand.clone() + (lit(1) + lit(2)); let expected = rand + lit(3); test_evaluate(expr, expected); // parenthesization matters: can't rewrite // (rand() + 1) + 2 --> (rand() + 1) + 2) - let fun = BuiltinScalarFunction::Random; - let rand = Expr::ScalarFunction(ScalarFunction::new(fun, vec![])); + let fun = math::random(); + let rand = Expr::ScalarFunction(ScalarFunction::new_udf(fun, vec![])); let expr = (rand + lit(1)) + lit(2); test_evaluate(expr.clone(), expr); } diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index a6795e99d751..43cb0c3e0a50 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -45,8 +45,6 @@ pub enum BuiltinScalarFunction { Exp, /// factorial Factorial, - /// nanvl - Nanvl, // string functions /// concat Concat, @@ -56,8 +54,6 @@ pub enum BuiltinScalarFunction { EndsWith, /// initcap InitCap, - /// random - Random, } /// Maps the sql function name to `BuiltinScalarFunction` @@ -114,14 +110,10 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Coalesce => Volatility::Immutable, BuiltinScalarFunction::Exp => Volatility::Immutable, BuiltinScalarFunction::Factorial => Volatility::Immutable, - BuiltinScalarFunction::Nanvl => Volatility::Immutable, BuiltinScalarFunction::Concat => Volatility::Immutable, BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable, BuiltinScalarFunction::EndsWith => Volatility::Immutable, BuiltinScalarFunction::InitCap => Volatility::Immutable, - - // Volatile builtin functions - BuiltinScalarFunction::Random => Volatility::Volatile, } } @@ -152,16 +144,10 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::InitCap => { utf8_to_str_type(&input_expr_types[0], "initcap") } - BuiltinScalarFunction::Random => Ok(Float64), BuiltinScalarFunction::EndsWith => Ok(Boolean), BuiltinScalarFunction::Factorial => Ok(Int64), - BuiltinScalarFunction::Nanvl => match &input_expr_types[0] { - Float32 => Ok(Float32), - _ => Ok(Float64), - }, - BuiltinScalarFunction::Ceil | BuiltinScalarFunction::Exp => { match input_expr_types[0] { Float32 => Ok(Float32), @@ -199,11 +185,6 @@ impl BuiltinScalarFunction { ], self.volatility(), ), - BuiltinScalarFunction::Random => Signature::exact(vec![], self.volatility()), - BuiltinScalarFunction::Nanvl => Signature::one_of( - vec![Exact(vec![Float32, Float32]), Exact(vec![Float64, Float64])], - self.volatility(), - ), BuiltinScalarFunction::Factorial => { Signature::uniform(1, vec![Int64], self.volatility()) } @@ -240,8 +221,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Ceil => &["ceil"], BuiltinScalarFunction::Exp => &["exp"], BuiltinScalarFunction::Factorial => &["factorial"], - BuiltinScalarFunction::Nanvl => &["nanvl"], - BuiltinScalarFunction::Random => &["random"], // conditional functions BuiltinScalarFunction::Coalesce => &["coalesce"], diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index b11636d831b1..c7c50d871902 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use crate::expr_fn::binary_expr; use crate::logical_plan::Subquery; -use crate::utils::{expr_to_columns, find_out_reference_exprs}; +use crate::utils::expr_to_columns; use crate::window_frame; use crate::{ aggregate_function, built_in_function, built_in_window_function, udaf, @@ -1232,7 +1232,7 @@ impl Expr { /// Return true when the expression contains out reference(correlated) expressions. pub fn contains_outer(&self) -> bool { - !find_out_reference_exprs(self).is_empty() + self.exists(|expr| matches!(expr, Expr::OuterReferenceColumn { .. })) } /// Recursively find all [`Expr::Placeholder`] expressions, and @@ -1903,8 +1903,8 @@ mod test { use crate::expr::Cast; use crate::expr_fn::col; use crate::{ - case, lit, BuiltinScalarFunction, ColumnarValue, Expr, ScalarFunctionDefinition, - ScalarUDF, ScalarUDFImpl, Signature, Volatility, + case, lit, ColumnarValue, Expr, ScalarFunctionDefinition, ScalarUDF, + ScalarUDFImpl, Signature, Volatility, }; use arrow::datatypes::DataType; use datafusion_common::Column; @@ -2018,13 +2018,6 @@ mod test { #[test] fn test_is_volatile_scalar_func_definition() { - // BuiltIn - assert!( - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Random) - .is_volatile() - .unwrap() - ); - // UDF #[derive(Debug)] struct TestScalarUDF { diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 1e28e27af1e0..6a28275ebfcf 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -297,11 +297,6 @@ pub fn concat_ws(sep: Expr, values: Vec) -> Expr { )) } -/// Returns a random value in the range 0.0 <= x < 1.0 -pub fn random() -> Expr { - Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Random, vec![])) -} - /// Returns the approximate number of distinct input values. /// This function provides an approximation of count(DISTINCT x). /// Zero is returned if all input values are null. @@ -550,7 +545,6 @@ nary_scalar_expr!( "concatenates several strings, placing a seperator between each one" ); nary_scalar_expr!(Concat, concat_expr, "concatenates several strings"); -scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y"); /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. pub fn case(expr: Expr) -> CaseBuilder { @@ -922,7 +916,6 @@ mod test { test_unary_scalar_expr!(Factorial, factorial); test_unary_scalar_expr!(Ceil, ceil); test_unary_scalar_expr!(Exp, exp); - test_scalar_expr!(Nanvl, nanvl, x, y); test_scalar_expr!(InitCap, initcap, string); test_scalar_expr!(EndsWith, ends_with, string, characters); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ca8d718ec090..02d65973a50b 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1321,6 +1321,21 @@ impl LogicalPlan { | LogicalPlan::Extension(_) => None, } } + + /// If this node's expressions contains any references to an outer subquery + pub fn contains_outer_reference(&self) -> bool { + let mut contains = false; + self.apply_expressions(|expr| { + Ok(if expr.contains_outer() { + contains = true; + TreeNodeRecursion::Stop + } else { + TreeNodeRecursion::Continue + }) + }) + .unwrap(); + contains + } } /// This macro is used to determine continuation during combined transforming @@ -1446,7 +1461,7 @@ impl LogicalPlan { /// Calls `f` on all subqueries referenced in expressions of the current /// `LogicalPlan` node. - fn apply_subqueries Result>( + pub fn apply_subqueries Result>( &self, mut f: F, ) -> Result { @@ -1469,7 +1484,7 @@ impl LogicalPlan { /// using `f`. /// /// Returns the current node. - fn map_subqueries Result>>( + pub fn map_subqueries Result>>( self, mut f: F, ) -> Result> { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 482fc96b519b..ce26cac7970b 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -17,12 +17,19 @@ //! Tree node implementation for logical plan -use crate::LogicalPlan; +use crate::{ + Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, DdlStatement, Distinct, + DistinctOn, DmlStatement, Explain, Extension, Filter, Join, Limit, LogicalPlan, + Prepare, Projection, RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, + Union, Unnest, Window, +}; +use std::sync::Arc; +use crate::dml::CopyTo; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, }; -use datafusion_common::Result; +use datafusion_common::{map_until_stop_and_collect, Result}; impl TreeNode for LogicalPlan { fn apply_children Result>( @@ -32,23 +39,359 @@ impl TreeNode for LogicalPlan { self.inputs().into_iter().apply_until_stop(f) } - fn map_children Result>>( - self, - f: F, - ) -> Result> { - let new_children = self - .inputs() - .into_iter() - .cloned() - .map_until_stop_and_collect(f)?; - // Propagate up `new_children.transformed` and `new_children.tnr` - // along with the node containing transformed children. - if new_children.transformed { - new_children.map_data(|new_children| { - self.with_new_exprs(self.expressions(), new_children) - }) - } else { - Ok(new_children.update_data(|_| self)) - } + /// Applies `f` to each child (input) of this plan node, rewriting them *in place.* + /// + /// # Notes + /// + /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for + /// subqueries, for example such as are in [`Expr::Exists`]. + /// + /// [`Expr::Exists`]: crate::Expr::Exists + fn map_children(self, mut f: F) -> Result> + where + F: FnMut(Self) -> Result>, + { + Ok(match self { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + }) + }), + LogicalPlan::Filter(Filter { predicate, input }) => rewrite_arc(input, f)? + .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })), + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) + }), + LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) + }), + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + }) + }), + LogicalPlan::Sort(Sort { expr, input, fetch }) => rewrite_arc(input, f)? + .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + join_constraint, + schema, + null_equals_null, + }) => map_until_stop_and_collect!( + rewrite_arc(left, &mut f), + right, + rewrite_arc(right, &mut f) + )? + .update_data(|(left, right)| { + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + join_constraint, + schema, + null_equals_null, + }) + }), + LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema, + }) => map_until_stop_and_collect!( + rewrite_arc(left, &mut f), + right, + rewrite_arc(right, &mut f) + )? + .update_data(|(left, right)| { + LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema, + }) + }), + LogicalPlan::Limit(Limit { skip, fetch, input }) => rewrite_arc(input, f)? + .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })), + LogicalPlan::Subquery(Subquery { + subquery, + outer_ref_columns, + }) => rewrite_arc(subquery, f)?.update_data(|subquery| { + LogicalPlan::Subquery(Subquery { + subquery, + outer_ref_columns, + }) + }), + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + }) + }), + LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)? + .update_data(LogicalPlan::Extension), + LogicalPlan::Union(Union { inputs, schema }) => rewrite_arcs(inputs, f)? + .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })), + LogicalPlan::Distinct(distinct) => match distinct { + Distinct::All(input) => rewrite_arc(input, f)?.update_data(Distinct::All), + Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + }) + }), + } + .update_data(LogicalPlan::Distinct), + LogicalPlan::Explain(Explain { + verbose, + plan, + stringified_plans, + schema, + logical_optimization_succeeded, + }) => rewrite_arc(plan, f)?.update_data(|plan| { + LogicalPlan::Explain(Explain { + verbose, + plan, + stringified_plans, + schema, + logical_optimization_succeeded, + }) + }), + LogicalPlan::Analyze(Analyze { + verbose, + input, + schema, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Analyze(Analyze { + verbose, + input, + schema, + }) + }), + LogicalPlan::Dml(DmlStatement { + table_name, + table_schema, + op, + input, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Dml(DmlStatement { + table_name, + table_schema, + op, + input, + }) + }), + LogicalPlan::Copy(CopyTo { + input, + output_url, + partition_by, + format_options, + options, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Copy(CopyTo { + input, + output_url, + partition_by, + format_options, + options, + }) + }), + LogicalPlan::Ddl(ddl) => { + match ddl { + DdlStatement::CreateMemoryTable(CreateMemoryTable { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + }) => rewrite_arc(input, f)?.update_data(|input| { + DdlStatement::CreateMemoryTable(CreateMemoryTable { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + }) + }), + DdlStatement::CreateView(CreateView { + name, + input, + or_replace, + definition, + }) => rewrite_arc(input, f)?.update_data(|input| { + DdlStatement::CreateView(CreateView { + name, + input, + or_replace, + definition, + }) + }), + // no inputs in these statements + DdlStatement::CreateExternalTable(_) + | DdlStatement::CreateCatalogSchema(_) + | DdlStatement::CreateCatalog(_) + | DdlStatement::DropTable(_) + | DdlStatement::DropView(_) + | DdlStatement::DropCatalogSchema(_) + | DdlStatement::CreateFunction(_) + | DdlStatement::DropFunction(_) => Transformed::no(ddl), + } + .update_data(LogicalPlan::Ddl) + } + LogicalPlan::Unnest(Unnest { + input, + column, + schema, + options, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Unnest(Unnest { + input, + column, + schema, + options, + }) + }), + LogicalPlan::Prepare(Prepare { + name, + data_types, + input, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Prepare(Prepare { + name, + data_types, + input, + }) + }), + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, + static_term, + recursive_term, + is_distinct, + }) => map_until_stop_and_collect!( + rewrite_arc(static_term, &mut f), + recursive_term, + rewrite_arc(recursive_term, &mut f) + )? + .update_data(|(static_term, recursive_term)| { + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, + static_term, + recursive_term, + is_distinct, + }) + }), + // plans without inputs + LogicalPlan::TableScan { .. } + | LogicalPlan::Statement { .. } + | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } + | LogicalPlan::DescribeTable(_) => Transformed::no(self), + }) } } + +/// Converts a `Arc` without copying, if possible. Copies the plan +/// if there is a shared reference +fn unwrap_arc(plan: Arc) -> LogicalPlan { + Arc::try_unwrap(plan) + // if None is returned, there is another reference to this + // LogicalPlan, so we can not own it, and must clone instead + .unwrap_or_else(|node| node.as_ref().clone()) +} + +/// Applies `f` to rewrite a `Arc` without copying, if possible +fn rewrite_arc( + plan: Arc, + mut f: F, +) -> Result>> +where + F: FnMut(LogicalPlan) -> Result>, +{ + f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan))) +} + +/// rewrite a `Vec` of `Arc` without copying, if possible +fn rewrite_arcs( + input_plans: Vec>, + mut f: F, +) -> Result>>> +where + F: FnMut(LogicalPlan) -> Result>, +{ + input_plans + .into_iter() + .map_until_stop_and_collect(|plan| rewrite_arc(plan, &mut f)) +} + +/// Rewrites all inputs for an Extension node "in place" +/// (it currently has to copy values because there are no APIs for in place modification) +/// +/// Should be removed when we have an API for in place modifications of the +/// extension to avoid these copies +fn rewrite_extension_inputs( + extension: Extension, + f: F, +) -> Result> +where + F: FnMut(LogicalPlan) -> Result>, +{ + let Extension { node } = extension; + + node.inputs() + .into_iter() + .cloned() + .map_until_stop_and_collect(f)? + .map_data(|new_inputs| { + let exprs = node.expressions(); + Ok(Extension { + node: node.from_template(&exprs, &new_inputs), + }) + }) +} diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs index 89f456f337f9..e2505d6fd65f 100644 --- a/datafusion/expr/src/signature.rs +++ b/datafusion/expr/src/signature.rs @@ -51,7 +51,7 @@ pub enum Volatility { Stable, /// A volatile function may change the return value from evaluation to evaluation. /// Multiple invocations of a volatile function may return different results when used in the - /// same query. An example of this is [super::BuiltinScalarFunction::Random]. DataFusion + /// same query. An example of this is the random() function. DataFusion /// can not evaluate such functions during planning. /// In the query `select col1, random() from t1`, `random()` function will be evaluated /// for each output row, resulting in a unique random value for each row. diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index ef7d2c9b1892..a6847f3327c0 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -77,6 +77,7 @@ hex = { version = "0.4", optional = true } itertools = { workspace = true } log = { workspace = true } md-5 = { version = "^0.10.0", optional = true } +rand = { workspace = true } regex = { version = "1.8", optional = true } sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } diff --git a/datafusion/functions/src/math/mod.rs b/datafusion/functions/src/math/mod.rs index 544de04e4a98..c83a98cb1913 100644 --- a/datafusion/functions/src/math/mod.rs +++ b/datafusion/functions/src/math/mod.rs @@ -27,8 +27,10 @@ pub mod iszero; pub mod lcm; pub mod log; pub mod nans; +pub mod nanvl; pub mod pi; pub mod power; +pub mod random; pub mod round; pub mod trunc; @@ -55,9 +57,11 @@ make_udf_function!(lcm::LcmFunc, LCM, lcm); make_math_unary_udf!(LnFunc, LN, ln, ln, Some(vec![Some(true)])); make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)])); make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)])); +make_udf_function!(nanvl::NanvlFunc, NANVL, nanvl); make_udf_function!(pi::PiFunc, PI, pi); make_udf_function!(power::PowerFunc, POWER, power); make_math_unary_udf!(RadiansFunc, RADIANS, radians, to_radians, None); +make_udf_function!(random::RandomFunc, RANDOM, random); make_udf_function!(round::RoundFunc, ROUND, round); make_math_unary_udf!(SignumFunc, SIGNUM, signum, signum, None); make_math_unary_udf!(SinFunc, SIN, sin, sin, None); @@ -180,6 +184,11 @@ pub mod expr_fn { super::log10().call(vec![num]) } + #[doc = "returns x if x is not NaN otherwise returns y"] + pub fn nanvl(x: Expr, y: Expr) -> Expr { + super::nanvl().call(vec![x, y]) + } + #[doc = "Returns an approximate value of π"] pub fn pi() -> Expr { super::pi().call(vec![]) @@ -195,6 +204,11 @@ pub mod expr_fn { super::radians().call(vec![num]) } + #[doc = "Returns a random value in the range 0.0 <= x < 1.0"] + pub fn random() -> Expr { + super::random().call(vec![]) + } + #[doc = "round to nearest integer"] pub fn round(args: Vec) -> Expr { super::round().call(args) @@ -261,9 +275,11 @@ pub fn functions() -> Vec> { log(), log2(), log10(), + nanvl(), pi(), power(), radians(), + random(), round(), signum(), sin(), diff --git a/datafusion/functions/src/math/nanvl.rs b/datafusion/functions/src/math/nanvl.rs new file mode 100644 index 000000000000..d81a690843b6 --- /dev/null +++ b/datafusion/functions/src/math/nanvl.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Float32Array, Float64Array}; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Float32, Float64}; + +use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_expr::ColumnarValue; +use datafusion_expr::TypeSignature::Exact; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; + +use crate::utils::make_scalar_function; + +#[derive(Debug)] +pub struct NanvlFunc { + signature: Signature, +} + +impl Default for NanvlFunc { + fn default() -> Self { + NanvlFunc::new() + } +} + +impl NanvlFunc { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::one_of( + vec![Exact(vec![Float32, Float32]), Exact(vec![Float64, Float64])], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for NanvlFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "nanvl" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match &arg_types[0] { + Float32 => Ok(Float32), + _ => Ok(Float64), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + make_scalar_function(nanvl, vec![])(args) + } +} + +/// Nanvl SQL function +fn nanvl(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + Float64 => { + let compute_nanvl = |x: f64, y: f64| { + if x.is_nan() { + y + } else { + x + } + }; + + Ok(Arc::new(make_function_inputs2!( + &args[0], + &args[1], + "x", + "y", + Float64Array, + { compute_nanvl } + )) as ArrayRef) + } + Float32 => { + let compute_nanvl = |x: f32, y: f32| { + if x.is_nan() { + y + } else { + x + } + }; + + Ok(Arc::new(make_function_inputs2!( + &args[0], + &args[1], + "x", + "y", + Float32Array, + { compute_nanvl } + )) as ArrayRef) + } + other => exec_err!("Unsupported data type {other:?} for function nanvl"), + } +} + +#[cfg(test)] +mod test { + use crate::math::nanvl::nanvl; + use arrow::array::{ArrayRef, Float32Array, Float64Array}; + use datafusion_common::cast::{as_float32_array, as_float64_array}; + use std::sync::Arc; + + #[test] + fn test_nanvl_f64() { + let args: Vec = vec![ + Arc::new(Float64Array::from(vec![1.0, f64::NAN, 3.0, f64::NAN])), // y + Arc::new(Float64Array::from(vec![5.0, 6.0, f64::NAN, f64::NAN])), // x + ]; + + let result = nanvl(&args).expect("failed to initialize function nanvl"); + let floats = + as_float64_array(&result).expect("failed to initialize function nanvl"); + + assert_eq!(floats.len(), 4); + assert_eq!(floats.value(0), 1.0); + assert_eq!(floats.value(1), 6.0); + assert_eq!(floats.value(2), 3.0); + assert!(floats.value(3).is_nan()); + } + + #[test] + fn test_nanvl_f32() { + let args: Vec = vec![ + Arc::new(Float32Array::from(vec![1.0, f32::NAN, 3.0, f32::NAN])), // y + Arc::new(Float32Array::from(vec![5.0, 6.0, f32::NAN, f32::NAN])), // x + ]; + + let result = nanvl(&args).expect("failed to initialize function nanvl"); + let floats = + as_float32_array(&result).expect("failed to initialize function nanvl"); + + assert_eq!(floats.len(), 4); + assert_eq!(floats.value(0), 1.0); + assert_eq!(floats.value(1), 6.0); + assert_eq!(floats.value(2), 3.0); + assert!(floats.value(3).is_nan()); + } +} diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs new file mode 100644 index 000000000000..2c1ad4136702 --- /dev/null +++ b/datafusion/functions/src/math/random.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::iter; +use std::sync::Arc; + +use arrow::array::Float64Array; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::Float64; +use rand::{thread_rng, Rng}; + +use datafusion_common::{exec_err, Result}; +use datafusion_expr::ColumnarValue; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug)] +pub struct RandomFunc { + signature: Signature, +} + +impl Default for RandomFunc { + fn default() -> Self { + RandomFunc::new() + } +} + +impl RandomFunc { + pub fn new() -> Self { + Self { + signature: Signature::exact(vec![], Volatility::Volatile), + } + } +} + +impl ScalarUDFImpl for RandomFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "random" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Float64) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + random(args) + } +} + +/// Random SQL function +fn random(args: &[ColumnarValue]) -> Result { + let len: usize = match &args[0] { + ColumnarValue::Array(array) => array.len(), + _ => return exec_err!("Expect random function to take no param"), + }; + let mut rng = thread_rng(); + let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); + let array = Float64Array::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::array::NullArray; + + use datafusion_common::cast::as_float64_array; + use datafusion_expr::ColumnarValue; + + use crate::math::random::random; + + #[test] + fn test_random_expression() { + let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; + let array = random(&args) + .expect("failed to initialize function random") + .into_array(1) + .expect("Failed to convert to array"); + let floats = + as_float64_array(&array).expect("failed to initialize function random"); + + assert_eq!(floats.len(), 1); + assert!(0.0 <= floats.value(0) && floats.value(0) < 1.0); + } +} diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 79375e52da1f..002885266e2f 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -140,7 +140,7 @@ fn check_inner_plan( is_aggregate: bool, can_contain_outer_ref: bool, ) -> Result<()> { - if !can_contain_outer_ref && contains_outer_reference(inner_plan) { + if !can_contain_outer_ref && inner_plan.contains_outer_reference() { return plan_err!("Accessing outer reference columns is not allowed in the plan"); } // We want to support as many operators as possible inside the correlated subquery @@ -233,13 +233,6 @@ fn check_inner_plan( } } -fn contains_outer_reference(inner_plan: &LogicalPlan) -> bool { - inner_plan - .expressions() - .iter() - .any(|expr| expr.contains_outer()) -} - fn check_aggregation_in_scalar_subquery( inner_plan: &LogicalPlan, agg: &Aggregate, diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index dbcf02b26ba6..7eda45fb563c 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -91,7 +91,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { _ => Ok(Transformed::no(plan)), } } - _ if plan.expressions().iter().any(|expr| expr.contains_outer()) => { + _ if plan.contains_outer_reference() => { // the unsupported cases, the plan expressions contain out reference columns(like window expressions) self.can_pull_up = false; Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump)) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index ff24df259adf..f3ce8bbcde72 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1028,6 +1028,7 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { #[cfg(test)] mod tests { + use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -1038,15 +1039,17 @@ mod tests { use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::{DFSchema, DFSchemaRef}; + use datafusion_common::{DFSchema, DFSchemaRef, ScalarValue}; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ - and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, random, sum, - BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource, - TableType, UserDefinedLogicalNodeCore, + and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, sum, BinaryExpr, + ColumnarValue, Expr, Extension, LogicalPlanBuilder, Operator, ScalarUDF, + ScalarUDFImpl, Signature, TableSource, TableType, UserDefinedLogicalNodeCore, + Volatility, }; use async_trait::async_trait; + use datafusion_expr::expr::ScalarFunction; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { crate::test::assert_optimized_plan_eq( @@ -2859,17 +2862,44 @@ Projection: a, b assert_optimized_plan_eq(&plan, expected) } + #[derive(Debug)] + struct TestScalarUDF { + signature: Signature, + } + + impl ScalarUDFImpl for TestScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "TestScalarUDF" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::from(1))) + } + } + #[test] fn test_push_down_volatile_function_in_aggregate() -> Result<()> { - // SELECT t.a, t.r FROM (SELECT a, SUM(b), random()+1 AS r FROM test1 GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5; + // SELECT t.a, t.r FROM (SELECT a, SUM(b), TestScalarUDF()+1 AS r FROM test1 GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5; let table_scan = test_table_scan_with_name("test1")?; + let fun = ScalarUDF::new_from_impl(TestScalarUDF { + signature: Signature::exact(vec![], Volatility::Volatile), + }); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), vec![])); + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))])? - .project(vec![ - col("a"), - sum(col("b")), - add(random(), lit(1)).alias("r"), - ])? + .project(vec![col("a"), sum(col("b")), add(expr, lit(1)).alias("r")])? .alias("t")? .filter(col("t.a").gt(lit(5)).and(col("t.r").gt(lit(0.5))))? .project(vec![col("t.a"), col("t.r")])? @@ -2878,7 +2908,7 @@ Projection: a, b let expected_before = "Projection: t.a, t.r\ \n Filter: t.a > Int32(5) AND t.r > Float64(0.5)\ \n SubqueryAlias: t\ - \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ + \n Projection: test1.a, SUM(test1.b), TestScalarUDF() + Int32(1) AS r\ \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ \n TableScan: test1"; assert_eq!(format!("{plan:?}"), expected_before); @@ -2886,7 +2916,7 @@ Projection: a, b let expected_after = "Projection: t.a, t.r\ \n SubqueryAlias: t\ \n Filter: r > Float64(0.5)\ - \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ + \n Projection: test1.a, SUM(test1.b), TestScalarUDF() + Int32(1) AS r\ \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ \n TableScan: test1, full_filters=[test1.a > Int32(5)]"; assert_optimized_plan_eq(&plan, expected_after) @@ -2894,8 +2924,12 @@ Projection: a, b #[test] fn test_push_down_volatile_function_in_join() -> Result<()> { - // SELECT t.a, t.r FROM (SELECT test1.a AS a, random() AS r FROM test1 join test2 ON test1.a = test2.a) AS t WHERE t.r > 0.5; + // SELECT t.a, t.r FROM (SELECT test1.a AS a, TestScalarUDF() AS r FROM test1 join test2 ON test1.a = test2.a) AS t WHERE t.r > 0.5; let table_scan = test_table_scan_with_name("test1")?; + let fun = ScalarUDF::new_from_impl(TestScalarUDF { + signature: Signature::exact(vec![], Volatility::Volatile), + }); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), vec![])); let left = LogicalPlanBuilder::from(table_scan).build()?; let right_table_scan = test_table_scan_with_name("test2")?; let right = LogicalPlanBuilder::from(right_table_scan).build()?; @@ -2909,7 +2943,7 @@ Projection: a, b ), None, )? - .project(vec![col("test1.a").alias("a"), random().alias("r")])? + .project(vec![col("test1.a").alias("a"), expr.alias("r")])? .alias("t")? .filter(col("t.r").gt(lit(0.8)))? .project(vec![col("t.a"), col("t.r")])? @@ -2918,7 +2952,7 @@ Projection: a, b let expected_before = "Projection: t.a, t.r\ \n Filter: t.r > Float64(0.8)\ \n SubqueryAlias: t\ - \n Projection: test1.a AS a, random() AS r\ + \n Projection: test1.a AS a, TestScalarUDF() AS r\ \n Inner Join: test1.a = test2.a\ \n TableScan: test1\ \n TableScan: test2"; @@ -2927,7 +2961,7 @@ Projection: a, b let expected = "Projection: t.a, t.r\ \n SubqueryAlias: t\ \n Filter: r > Float64(0.8)\ - \n Projection: test1.a AS a, random() AS r\ + \n Projection: test1.a AS a, TestScalarUDF() AS r\ \n Inner Join: test1.a = test2.a\ \n TableScan: test1\ \n TableScan: test2"; diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 72fac5370ae0..423087d2182b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -70,7 +70,6 @@ log = { workspace = true } md-5 = { version = "^0.10.0", optional = true } paste = "^1.0" petgraph = "0.6.2" -rand = { workspace = true } regex = { version = "1.8", optional = true } sha2 = { version = "^0.10.1", optional = true } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 2be85a69d7da..c237e2070675 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -184,10 +184,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::Factorial => { Arc::new(|args| make_scalar_function_inner(math_expressions::factorial)(args)) } - BuiltinScalarFunction::Nanvl => { - Arc::new(|args| make_scalar_function_inner(math_expressions::nanvl)(args)) - } - BuiltinScalarFunction::Random => Arc::new(math_expressions::random), // string functions BuiltinScalarFunction::Coalesce => Arc::new(conditional_expressions::coalesce), BuiltinScalarFunction::Concat => Arc::new(string_expressions::concat), @@ -542,19 +538,6 @@ mod tests { Ok(()) } - #[test] - fn test_empty_arguments() -> Result<()> { - let execution_props = ExecutionProps::new(); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let funs = [BuiltinScalarFunction::Random]; - - for fun in funs.iter() { - create_physical_expr_with_type_coercion(fun, &[], &schema, &execution_props)?; - } - Ok(()) - } - // Helper function just for testing. // Returns `expressions` coerced to types compatible with // `signature`, if possible. diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 55fb54563787..004a9abe7f0b 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -18,14 +18,12 @@ //! Math expressions use std::any::type_name; -use std::iter; use std::sync::Arc; use arrow::array::ArrayRef; use arrow::array::{BooleanArray, Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use arrow_array::Array; -use rand::{thread_rng, Rng}; use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{DataFusionError, Result}; @@ -113,33 +111,6 @@ macro_rules! make_function_scalar_inputs { }}; } -macro_rules! make_function_inputs2 { - ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ - let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); - let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); - - arg1.iter() - .zip(arg2.iter()) - .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)), - _ => None, - }) - .collect::<$ARRAY_TYPE>() - }}; - ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE1:ident, $ARRAY_TYPE2:ident, $FUNC: block) => {{ - let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE1); - let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE2); - - arg1.iter() - .zip(arg2.iter()) - .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)), - _ => None, - }) - .collect::<$ARRAY_TYPE1>() - }}; -} - macro_rules! make_function_scalar_inputs_return_type { ($ARG: expr, $NAME:expr, $ARGS_TYPE:ident, $RETURN_TYPE:ident, $FUNC: block) => {{ let arg = downcast_arg!($ARG, $NAME, $ARGS_TYPE); @@ -169,51 +140,6 @@ pub fn factorial(args: &[ArrayRef]) -> Result { } } -/// Nanvl SQL function -pub fn nanvl(args: &[ArrayRef]) -> Result { - match args[0].data_type() { - DataType::Float64 => { - let compute_nanvl = |x: f64, y: f64| { - if x.is_nan() { - y - } else { - x - } - }; - - Ok(Arc::new(make_function_inputs2!( - &args[0], - &args[1], - "x", - "y", - Float64Array, - { compute_nanvl } - )) as ArrayRef) - } - - DataType::Float32 => { - let compute_nanvl = |x: f32, y: f32| { - if x.is_nan() { - y - } else { - x - } - }; - - Ok(Arc::new(make_function_inputs2!( - &args[0], - &args[1], - "x", - "y", - Float32Array, - { compute_nanvl } - )) as ArrayRef) - } - - other => exec_err!("Unsupported data type {other:?} for function nanvl"), - } -} - /// Isnan SQL function pub fn isnan(args: &[ArrayRef]) -> Result { match args[0].data_type() { @@ -237,42 +163,14 @@ pub fn isnan(args: &[ArrayRef]) -> Result { } } -/// Random SQL function -pub fn random(args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect random function to take no param"), - }; - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) -} - #[cfg(test)] mod tests { - use arrow::array::{Float64Array, NullArray}; + use arrow::array::Float64Array; - use datafusion_common::cast::{ - as_boolean_array, as_float32_array, as_float64_array, as_int64_array, - }; + use datafusion_common::cast::{as_boolean_array, as_int64_array}; use super::*; - #[test] - fn test_random_expression() { - let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; - let array = random(&args) - .expect("failed to initialize function random") - .into_array(1) - .expect("Failed to convert to array"); - let floats = - as_float64_array(&array).expect("failed to initialize function random"); - - assert_eq!(floats.len(), 1); - assert!(0.0 <= floats.value(0) && floats.value(0) < 1.0); - } - #[test] fn test_factorial_i64() { let args: Vec = vec![ @@ -288,42 +186,6 @@ mod tests { assert_eq!(ints, &expected); } - #[test] - fn test_nanvl_f64() { - let args: Vec = vec![ - Arc::new(Float64Array::from(vec![1.0, f64::NAN, 3.0, f64::NAN])), // y - Arc::new(Float64Array::from(vec![5.0, 6.0, f64::NAN, f64::NAN])), // x - ]; - - let result = nanvl(&args).expect("failed to initialize function nanvl"); - let floats = - as_float64_array(&result).expect("failed to initialize function nanvl"); - - assert_eq!(floats.len(), 4); - assert_eq!(floats.value(0), 1.0); - assert_eq!(floats.value(1), 6.0); - assert_eq!(floats.value(2), 3.0); - assert!(floats.value(3).is_nan()); - } - - #[test] - fn test_nanvl_f32() { - let args: Vec = vec![ - Arc::new(Float32Array::from(vec![1.0, f32::NAN, 3.0, f32::NAN])), // y - Arc::new(Float32Array::from(vec![5.0, 6.0, f32::NAN, f32::NAN])), // x - ]; - - let result = nanvl(&args).expect("failed to initialize function nanvl"); - let floats = - as_float32_array(&result).expect("failed to initialize function nanvl"); - - assert_eq!(floats.len(), 4); - assert_eq!(floats.value(0), 1.0); - assert_eq!(floats.value(1), 6.0); - assert_eq!(floats.value(2), 3.0); - assert!(floats.value(3).is_nan()); - } - #[test] fn test_isnan_f64() { let args: Vec = vec![Arc::new(Float64Array::from(vec![ diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c7c0d9b5a656..e1bcf33b8254 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -579,7 +579,7 @@ enum ScalarFunction { // 35 was MD5 // 36 was NullIf // 37 was OctetLength - Random = 38; + // 38 was Random // 39 was RegexpReplace // 40 was Repeat // 41 was Replace @@ -650,7 +650,7 @@ enum ScalarFunction { // 108 was ArrayReplaceN // 109 was ArrayRemoveAll // 110 was ArrayReplaceAll - Nanvl = 111; + // 111 was Nanvl // 112 was Flatten // 113 was IsNan // 114 was Iszero diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index c8a1fba40765..7beaeef0e58b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22797,10 +22797,8 @@ impl serde::Serialize for ScalarFunction { Self::Concat => "Concat", Self::ConcatWithSeparator => "ConcatWithSeparator", Self::InitCap => "InitCap", - Self::Random => "Random", Self::Coalesce => "Coalesce", Self::Factorial => "Factorial", - Self::Nanvl => "Nanvl", Self::EndsWith => "EndsWith", }; serializer.serialize_str(variant) @@ -22819,10 +22817,8 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Concat", "ConcatWithSeparator", "InitCap", - "Random", "Coalesce", "Factorial", - "Nanvl", "EndsWith", ]; @@ -22870,10 +22866,8 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Concat" => Ok(ScalarFunction::Concat), "ConcatWithSeparator" => Ok(ScalarFunction::ConcatWithSeparator), "InitCap" => Ok(ScalarFunction::InitCap), - "Random" => Ok(ScalarFunction::Random), "Coalesce" => Ok(ScalarFunction::Coalesce), "Factorial" => Ok(ScalarFunction::Factorial), - "Nanvl" => Ok(ScalarFunction::Nanvl), "EndsWith" => Ok(ScalarFunction::EndsWith), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index facf24219810..042c794e19de 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2878,7 +2878,7 @@ pub enum ScalarFunction { /// 35 was MD5 /// 36 was NullIf /// 37 was OctetLength - Random = 38, + /// 38 was Random /// 39 was RegexpReplace /// 40 was Repeat /// 41 was Replace @@ -2949,7 +2949,7 @@ pub enum ScalarFunction { /// 108 was ArrayReplaceN /// 109 was ArrayRemoveAll /// 110 was ArrayReplaceAll - Nanvl = 111, + /// 111 was Nanvl /// 112 was Flatten /// 113 was IsNan /// 114 was Iszero @@ -2992,10 +2992,8 @@ impl ScalarFunction { ScalarFunction::Concat => "Concat", ScalarFunction::ConcatWithSeparator => "ConcatWithSeparator", ScalarFunction::InitCap => "InitCap", - ScalarFunction::Random => "Random", ScalarFunction::Coalesce => "Coalesce", ScalarFunction::Factorial => "Factorial", - ScalarFunction::Nanvl => "Nanvl", ScalarFunction::EndsWith => "EndsWith", } } @@ -3008,10 +3006,8 @@ impl ScalarFunction { "Concat" => Some(Self::Concat), "ConcatWithSeparator" => Some(Self::ConcatWithSeparator), "InitCap" => Some(Self::InitCap), - "Random" => Some(Self::Random), "Coalesce" => Some(Self::Coalesce), "Factorial" => Some(Self::Factorial), - "Nanvl" => Some(Self::Nanvl), "EndsWith" => Some(Self::EndsWith), _ => None, } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index e9eb53e45199..057690aacee6 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -41,9 +41,8 @@ use datafusion_expr::{ expr::{self, InList, Sort, WindowFunction}, factorial, initcap, logical_plan::{PlanType, StringifiedPlan}, - nanvl, random, AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, - BuiltinScalarFunction, Case, Cast, Expr, GetFieldAccess, GetIndexedField, - GroupingSet, + AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, + Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -426,9 +425,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator, ScalarFunction::EndsWith => Self::EndsWith, ScalarFunction::InitCap => Self::InitCap, - ScalarFunction::Random => Self::Random, ScalarFunction::Coalesce => Self::Coalesce, - ScalarFunction::Nanvl => Self::Nanvl, } } } @@ -1298,7 +1295,6 @@ pub fn parse_expr( ScalarFunction::InitCap => { Ok(initcap(parse_expr(&args[0], registry, codec)?)) } - ScalarFunction::Random => Ok(random()), ScalarFunction::Concat => { Ok(concat_expr(parse_exprs(args, registry, codec)?)) } @@ -1312,10 +1308,6 @@ pub fn parse_expr( ScalarFunction::Coalesce => { Ok(coalesce(parse_exprs(args, registry, codec)?)) } - ScalarFunction::Nanvl => Ok(nanvl( - parse_expr(&args[0], registry, codec)?, - parse_expr(&args[1], registry, codec)?, - )), } } ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index ed5e7a302b20..358eea785713 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1414,9 +1414,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator, BuiltinScalarFunction::EndsWith => Self::EndsWith, BuiltinScalarFunction::InitCap => Self::InitCap, - BuiltinScalarFunction::Random => Self::Random, BuiltinScalarFunction::Coalesce => Self::Coalesce, - BuiltinScalarFunction::Nanvl => Self::Nanvl, }; Ok(scalar_function)