From ca37ce37933f7874d404364cb8c23438baceb46d Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Wed, 28 Feb 2024 19:01:10 -0600 Subject: [PATCH] port range function and change gen_series logic (#9352) * port range function and change gen_series logic * fix failed test * delete useless and add tests * change parameter * fix document * remove useless * change doc * delete space --- datafusion/expr/src/built_in_function.rs | 15 --- datafusion/expr/src/expr_fn.rs | 6 - datafusion/functions-array/src/kernels.rs | 71 +++++++++- datafusion/functions-array/src/lib.rs | 8 +- datafusion/functions-array/src/udf.rs | 123 +++++++++++++++++- .../physical-expr/src/array_expressions.rs | 59 --------- datafusion/physical-expr/src/functions.rs | 3 - datafusion/proto/proto/datafusion.proto | 2 +- datafusion/proto/src/generated/pbjson.rs | 3 - datafusion/proto/src/generated/prost.rs | 4 +- .../proto/src/logical_plan/from_proto.rs | 11 +- datafusion/proto/src/logical_plan/to_proto.rs | 1 - datafusion/sqllogictest/test_files/array.slt | 8 +- .../source/user-guide/sql/scalar_functions.md | 23 +++- 14 files changed, 228 insertions(+), 109 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index c5f0e17c3fec..b7f089846a11 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -180,8 +180,6 @@ pub enum BuiltinScalarFunction { MakeArray, /// Flatten Flatten, - /// Range - Range, // struct functions /// struct @@ -421,7 +419,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable, BuiltinScalarFunction::ArrayUnion => Volatility::Immutable, BuiltinScalarFunction::ArrayResize => Volatility::Immutable, - BuiltinScalarFunction::Range => Volatility::Immutable, BuiltinScalarFunction::Cardinality => Volatility::Immutable, BuiltinScalarFunction::MakeArray => Volatility::Immutable, BuiltinScalarFunction::Ascii => Volatility::Immutable, @@ -632,9 +629,6 @@ impl BuiltinScalarFunction { (dt, _) => Ok(dt), } } - BuiltinScalarFunction::Range => { - Ok(List(Arc::new(Field::new("item", Int64, true)))) - } BuiltinScalarFunction::ArrayExcept => { match (input_expr_types[0].clone(), input_expr_types[1].clone()) { (DataType::Null, _) | (_, DataType::Null) => { @@ -962,14 +956,6 @@ impl BuiltinScalarFunction { Signature::variadic_any(self.volatility()) } - BuiltinScalarFunction::Range => Signature::one_of( - vec![ - Exact(vec![Int64]), - Exact(vec![Int64, Int64]), - Exact(vec![Int64, Int64, Int64]), - ], - self.volatility(), - ), BuiltinScalarFunction::Struct => Signature::variadic_any(self.volatility()), BuiltinScalarFunction::Concat | BuiltinScalarFunction::ConcatWithSeparator => { @@ -1587,7 +1573,6 @@ impl BuiltinScalarFunction { &["array_intersect", "list_intersect"] } BuiltinScalarFunction::OverLay => &["overlay"], - BuiltinScalarFunction::Range => &["range", "generate_series"], // struct functions BuiltinScalarFunction::Struct => &["struct"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 0a472e2ba507..63f3af8868bb 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -764,12 +764,6 @@ scalar_expr!( "Returns an array of the elements in the intersection of array1 and array2." ); -nary_scalar_expr!( - Range, - gen_range, - "Returns a list of values in the range between start and stop with step." -); - // string functions scalar_expr!(Ascii, ascii, chr, "ASCII code value of the character"); scalar_expr!( diff --git a/datafusion/functions-array/src/kernels.rs b/datafusion/functions-array/src/kernels.rs index 1b96e01d8b9a..b9a68b466605 100644 --- a/datafusion/functions-array/src/kernels.rs +++ b/datafusion/functions-array/src/kernels.rs @@ -23,11 +23,12 @@ use arrow::array::{ StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::datatypes::DataType; -use datafusion_common::cast::{as_large_list_array, as_list_array, as_string_array}; +use datafusion_common::cast::{ + as_int64_array, as_large_list_array, as_list_array, as_string_array, +}; use datafusion_common::{exec_err, DataFusionError}; use std::any::type_name; use std::sync::Arc; - macro_rules! downcast_arg { ($ARG:expr, $ARRAY_TYPE:ident) => {{ $ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| { @@ -252,3 +253,69 @@ pub(super) fn array_to_string(args: &[ArrayRef]) -> datafusion_common::Result` representing the resulting ListArray after the operation. +/// +/// # Arguments +/// +/// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values. +/// +/// # Examples +/// +/// gen_range(3) => [0, 1, 2] +/// gen_range(1, 4) => [1, 2, 3] +/// gen_range(1, 7, 2) => [1, 3, 5] +pub fn gen_range( + args: &[ArrayRef], + include_upper: i64, +) -> datafusion_common::Result { + let (start_array, stop_array, step_array) = match args.len() { + 1 => (None, as_int64_array(&args[0])?, None), + 2 => ( + Some(as_int64_array(&args[0])?), + as_int64_array(&args[1])?, + None, + ), + 3 => ( + Some(as_int64_array(&args[0])?), + as_int64_array(&args[1])?, + Some(as_int64_array(&args[2])?), + ), + _ => return exec_err!("gen_range expects 1 to 3 arguments"), + }; + + let mut values = vec![]; + let mut offsets = vec![0]; + for (idx, stop) in stop_array.iter().enumerate() { + let mut stop = stop.unwrap_or(0); + let start = start_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(0); + let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1); + if step == 0 { + return exec_err!("step can't be 0 for function range(start [, stop, step]"); + } + if step < 0 { + // Decreasing range + stop -= include_upper; + values.extend((stop + 1..start + 1).rev().step_by((-step) as usize)); + } else { + // Increasing range + stop += include_upper; + values.extend((start..stop).step_by(step as usize)); + } + + offsets.push(values.len() as i32); + } + let arr = Arc::new(ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(Int64Array::from(values)), + None, + )?); + Ok(arr) +} diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 84997ed10e32..e3515ccf9f72 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -40,11 +40,17 @@ use std::sync::Arc; /// Fluent-style API for creating `Expr`s pub mod expr_fn { pub use super::udf::array_to_string; + pub use super::udf::gen_series; + pub use super::udf::range; } /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { - let functions: Vec> = vec![udf::array_to_string_udf()]; + let functions: Vec> = vec![ + udf::array_to_string_udf(), + udf::range_udf(), + udf::gen_series_udf(), + ]; functions.into_iter().try_for_each(|udf| { let existing_udf = registry.register_udf(udf)?; if let Some(existing_udf) = existing_udf { diff --git a/datafusion/functions-array/src/udf.rs b/datafusion/functions-array/src/udf.rs index 79fb83c059a4..17769419c0b2 100644 --- a/datafusion/functions-array/src/udf.rs +++ b/datafusion/functions-array/src/udf.rs @@ -18,12 +18,14 @@ //! [`ScalarUDFImpl`] definitions for array functions. use arrow::datatypes::DataType; +use arrow::datatypes::Field; use datafusion_common::plan_err; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::Expr; +use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; - +use std::sync::Arc; // Create static instances of ScalarUDFs for each function make_udf_function!(ArrayToString, array_to_string, @@ -31,7 +33,6 @@ make_udf_function!(ArrayToString, "converts each element to its text representation.", // doc array_to_string_udf // internal function name ); - #[derive(Debug)] pub(super) struct ArrayToString { signature: Signature, @@ -83,3 +84,121 @@ impl ScalarUDFImpl for ArrayToString { &self.aliases } } + +make_udf_function!( + Range, + range, + start stop step, + "create a list of values in the range between start and stop", + range_udf +); +#[derive(Debug)] +pub(super) struct Range { + signature: Signature, + aliases: Vec, +} +impl Range { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::one_of( + vec![ + Exact(vec![Int64]), + Exact(vec![Int64, Int64]), + Exact(vec![Int64, Int64, Int64]), + ], + Volatility::Immutable, + ), + aliases: vec![String::from("range")], + } + } +} +impl ScalarUDFImpl for Range { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "range" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + use DataType::*; + Ok(List(Arc::new(Field::new( + "item", + arg_types[0].clone(), + true, + )))) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(args)?; + crate::kernels::gen_range(&args, 0).map(ColumnarValue::Array) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +make_udf_function!( + GenSeries, + gen_series, + start stop step, + "create a list of values in the range between start and stop, include upper bound", + gen_series_udf +); +#[derive(Debug)] +pub(super) struct GenSeries { + signature: Signature, + aliases: Vec, +} +impl GenSeries { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::one_of( + vec![ + Exact(vec![Int64]), + Exact(vec![Int64, Int64]), + Exact(vec![Int64, Int64, Int64]), + ], + Volatility::Immutable, + ), + aliases: vec![String::from("generate_series")], + } + } +} +impl ScalarUDFImpl for GenSeries { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "generate_series" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + use DataType::*; + Ok(List(Arc::new(Field::new( + "item", + arg_types[0].clone(), + true, + )))) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(args)?; + crate::kernels::gen_range(&args, 1).map(ColumnarValue::Array) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 38a4359b4f4b..01b2ae13c8d4 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -39,7 +39,6 @@ use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, ScalarValue, }; - use itertools::Itertools; macro_rules! downcast_arg { @@ -887,64 +886,6 @@ where )?)) } -/// Generates an array of integers from start to stop with a given step. -/// -/// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values. -/// It returns a `Result` representing the resulting ListArray after the operation. -/// -/// # Arguments -/// -/// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values. -/// -/// # Examples -/// -/// gen_range(3) => [0, 1, 2] -/// gen_range(1, 4) => [1, 2, 3] -/// gen_range(1, 7, 2) => [1, 3, 5] -pub fn gen_range(args: &[ArrayRef]) -> Result { - let (start_array, stop_array, step_array) = match args.len() { - 1 => (None, as_int64_array(&args[0])?, None), - 2 => ( - Some(as_int64_array(&args[0])?), - as_int64_array(&args[1])?, - None, - ), - 3 => ( - Some(as_int64_array(&args[0])?), - as_int64_array(&args[1])?, - Some(as_int64_array(&args[2])?), - ), - _ => return exec_err!("gen_range expects 1 to 3 arguments"), - }; - - let mut values = vec![]; - let mut offsets = vec![0]; - for (idx, stop) in stop_array.iter().enumerate() { - let stop = stop.unwrap_or(0); - let start = start_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(0); - let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1); - if step == 0 { - return exec_err!("step can't be 0 for function range(start [, stop, step]"); - } - if step < 0 { - // Decreasing range - values.extend((stop + 1..start + 1).rev().step_by((-step) as usize)); - } else { - // Increasing range - values.extend((start..stop).step_by(step as usize)); - } - - offsets.push(values.len() as i32); - } - let arr = Arc::new(ListArray::try_new( - Arc::new(Field::new("item", DataType::Int64, true)), - OffsetBuffer::new(offsets.into()), - Arc::new(Int64Array::from(values)), - None, - )?); - Ok(arr) -} - /// Array_sort SQL function pub fn array_sort(args: &[ArrayRef]) -> Result { if args.is_empty() || args.len() > 3 { diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index c90349753570..2552381a79b0 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -405,9 +405,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_intersect)(args) }), - BuiltinScalarFunction::Range => Arc::new(|args| { - make_scalar_function_inner(array_expressions::gen_range)(args) - }), BuiltinScalarFunction::Cardinality => Arc::new(|args| { make_scalar_function_inner(array_expressions::cardinality)(args) }), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f75488163008..a4a06bab854c 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -667,7 +667,7 @@ enum ScalarFunction { ArrayIntersect = 119; ArrayUnion = 120; OverLay = 121; - Range = 122; + /// 122 is Range ArrayExcept = 123; ArrayPopFront = 124; Levenshtein = 125; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index a266d1b07886..443597bebc20 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22435,7 +22435,6 @@ impl serde::Serialize for ScalarFunction { Self::ArrayIntersect => "ArrayIntersect", Self::ArrayUnion => "ArrayUnion", Self::OverLay => "OverLay", - Self::Range => "Range", Self::ArrayExcept => "ArrayExcept", Self::ArrayPopFront => "ArrayPopFront", Self::Levenshtein => "Levenshtein", @@ -22575,7 +22574,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayIntersect", "ArrayUnion", "OverLay", - "Range", "ArrayExcept", "ArrayPopFront", "Levenshtein", @@ -22744,7 +22742,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayIntersect" => Ok(ScalarFunction::ArrayIntersect), "ArrayUnion" => Ok(ScalarFunction::ArrayUnion), "OverLay" => Ok(ScalarFunction::OverLay), - "Range" => Ok(ScalarFunction::Range), "ArrayExcept" => Ok(ScalarFunction::ArrayExcept), "ArrayPopFront" => Ok(ScalarFunction::ArrayPopFront), "Levenshtein" => Ok(ScalarFunction::Levenshtein), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b520e84fb12d..c0d234443c94 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2755,7 +2755,7 @@ pub enum ScalarFunction { ArrayIntersect = 119, ArrayUnion = 120, OverLay = 121, - Range = 122, + /// / 122 is Range ArrayExcept = 123, ArrayPopFront = 124, Levenshtein = 125, @@ -2892,7 +2892,6 @@ impl ScalarFunction { ScalarFunction::ArrayIntersect => "ArrayIntersect", ScalarFunction::ArrayUnion => "ArrayUnion", ScalarFunction::OverLay => "OverLay", - ScalarFunction::Range => "Range", ScalarFunction::ArrayExcept => "ArrayExcept", ScalarFunction::ArrayPopFront => "ArrayPopFront", ScalarFunction::Levenshtein => "Levenshtein", @@ -3026,7 +3025,6 @@ impl ScalarFunction { "ArrayIntersect" => Some(Self::ArrayIntersect), "ArrayUnion" => Some(Self::ArrayUnion), "OverLay" => Some(Self::OverLay), - "Range" => Some(Self::Range), "ArrayExcept" => Some(Self::ArrayExcept), "ArrayPopFront" => Some(Self::ArrayPopFront), "Levenshtein" => Some(Self::Levenshtein), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 25aa6adcff45..c89b3d1ed0f2 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -57,8 +57,8 @@ use datafusion_expr::{ concat_expr, concat_ws_expr, cos, cosh, cot, current_date, current_time, date_bin, date_part, date_trunc, degrees, digest, ends_with, exp, expr::{self, InList, Sort, WindowFunction}, - factorial, find_in_set, flatten, floor, from_unixtime, gcd, gen_range, initcap, - instr, iszero, lcm, left, levenshtein, ln, log, log10, log2, + factorial, find_in_set, flatten, floor, from_unixtime, gcd, initcap, instr, iszero, + lcm, left, levenshtein, ln, log, log10, log2, logical_plan::{PlanType, StringifiedPlan}, lower, lpad, ltrim, md5, nanvl, now, octet_length, overlay, pi, power, radians, random, regexp_like, regexp_replace, repeat, replace, reverse, right, round, rpad, @@ -507,7 +507,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayIntersect => Self::ArrayIntersect, ScalarFunction::ArrayUnion => Self::ArrayUnion, ScalarFunction::ArrayResize => Self::ArrayResize, - ScalarFunction::Range => Self::Range, ScalarFunction::Cardinality => Self::Cardinality, ScalarFunction::Array => Self::MakeArray, ScalarFunction::DatePart => Self::DatePart, @@ -1462,12 +1461,6 @@ pub fn parse_expr( parse_expr(&args[2], registry)?, parse_expr(&args[3], registry)?, )), - ScalarFunction::Range => Ok(gen_range( - args.to_owned() - .iter() - .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), ScalarFunction::Cardinality => { Ok(cardinality(parse_expr(&args[0], registry)?)) } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 560adc1307b6..b98be075f314 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1488,7 +1488,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArraySlice => Self::ArraySlice, BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect, BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion, - BuiltinScalarFunction::Range => Self::Range, BuiltinScalarFunction::Cardinality => Self::Cardinality, BuiltinScalarFunction::MakeArray => Self::Array, BuiltinScalarFunction::DatePart => Self::DatePart, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index e64346537150..640bf82b5520 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5544,13 +5544,15 @@ select range(5), ---- [0, 1, 2, 3, 4] [2, 3, 4] [2, 5, 8] [] [] [1, 0, -1, -2, -3, -4] -query ??? +query ????? select generate_series(5), generate_series(2, 5), - generate_series(2, 10, 3) + generate_series(2, 10, 3), + generate_series(1, 5, 1), + generate_series(5, 1, -1) ; ---- -[0, 1, 2, 3, 4] [2, 3, 4] [2, 5, 8] +[0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1] ## array_except diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 4300cc62f3de..cd1fbdabea1c 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2924,7 +2924,28 @@ empty(array) ### `generate_series` -_Alias of [range](#range)._ +Similar to the range function, but it includes the upper bound. + +``` +generate_series(start, stop, step) +``` + +#### Arguments + +- **start**: start of the range +- **end**: end of the range (included) +- **step**: increase by step (can not be 0) + +#### Example + +``` +❯ select generate_series(1,3); ++------------------------------------+ +| generate_series(Int64(1),Int64(3)) | ++------------------------------------+ +| [1, 2, 3] | ++------------------------------------+ +``` ### `list_append`