Skip to content

Commit

Permalink
port range function and change gen_series logic (apache#9352)
Browse files Browse the repository at this point in the history
* port range function and change gen_series logic

* fix failed test

* delete useless and add tests

* change parameter

* fix document

* remove useless

* change doc

* delete space
  • Loading branch information
Lordworms authored Feb 29, 2024
1 parent f68864b commit ca37ce3
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 109 deletions.
15 changes: 0 additions & 15 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ pub enum BuiltinScalarFunction {
MakeArray,
/// Flatten
Flatten,
/// Range
Range,

// struct functions
/// struct
Expand Down Expand Up @@ -421,7 +419,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::ArrayResize => Volatility::Immutable,
BuiltinScalarFunction::Range => Volatility::Immutable,
BuiltinScalarFunction::Cardinality => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
Expand Down Expand Up @@ -632,9 +629,6 @@ impl BuiltinScalarFunction {
(dt, _) => Ok(dt),
}
}
BuiltinScalarFunction::Range => {
Ok(List(Arc::new(Field::new("item", Int64, true))))
}
BuiltinScalarFunction::ArrayExcept => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, _) | (_, DataType::Null) => {
Expand Down Expand Up @@ -962,14 +956,6 @@ impl BuiltinScalarFunction {
Signature::variadic_any(self.volatility())
}

BuiltinScalarFunction::Range => Signature::one_of(
vec![
Exact(vec![Int64]),
Exact(vec![Int64, Int64]),
Exact(vec![Int64, Int64, Int64]),
],
self.volatility(),
),
BuiltinScalarFunction::Struct => Signature::variadic_any(self.volatility()),
BuiltinScalarFunction::Concat
| BuiltinScalarFunction::ConcatWithSeparator => {
Expand Down Expand Up @@ -1587,7 +1573,6 @@ impl BuiltinScalarFunction {
&["array_intersect", "list_intersect"]
}
BuiltinScalarFunction::OverLay => &["overlay"],
BuiltinScalarFunction::Range => &["range", "generate_series"],

// struct functions
BuiltinScalarFunction::Struct => &["struct"],
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,12 +764,6 @@ scalar_expr!(
"Returns an array of the elements in the intersection of array1 and array2."
);

nary_scalar_expr!(
Range,
gen_range,
"Returns a list of values in the range between start and stop with step."
);

// string functions
scalar_expr!(Ascii, ascii, chr, "ASCII code value of the character");
scalar_expr!(
Expand Down
71 changes: 69 additions & 2 deletions datafusion/functions-array/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use arrow::array::{
StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::datatypes::DataType;
use datafusion_common::cast::{as_large_list_array, as_list_array, as_string_array};
use datafusion_common::cast::{
as_int64_array, as_large_list_array, as_list_array, as_string_array,
};
use datafusion_common::{exec_err, DataFusionError};
use std::any::type_name;
use std::sync::Arc;

macro_rules! downcast_arg {
($ARG:expr, $ARRAY_TYPE:ident) => {{
$ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| {
Expand Down Expand Up @@ -252,3 +253,69 @@ pub(super) fn array_to_string(args: &[ArrayRef]) -> datafusion_common::Result<Ar

Ok(Arc::new(string_arr))
}

use arrow::array::ListArray;
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::Field;
/// Generates an array of integers from start to stop with a given step.
///
/// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values.
/// It returns a `Result<ArrayRef>` representing the resulting ListArray after the operation.
///
/// # Arguments
///
/// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values.
///
/// # Examples
///
/// gen_range(3) => [0, 1, 2]
/// gen_range(1, 4) => [1, 2, 3]
/// gen_range(1, 7, 2) => [1, 3, 5]
pub fn gen_range(
args: &[ArrayRef],
include_upper: i64,
) -> datafusion_common::Result<ArrayRef> {
let (start_array, stop_array, step_array) = match args.len() {
1 => (None, as_int64_array(&args[0])?, None),
2 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
None,
),
3 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
Some(as_int64_array(&args[2])?),
),
_ => return exec_err!("gen_range expects 1 to 3 arguments"),
};

let mut values = vec![];
let mut offsets = vec![0];
for (idx, stop) in stop_array.iter().enumerate() {
let mut stop = stop.unwrap_or(0);
let start = start_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(0);
let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1);
if step == 0 {
return exec_err!("step can't be 0 for function range(start [, stop, step]");
}
if step < 0 {
// Decreasing range
stop -= include_upper;
values.extend((stop + 1..start + 1).rev().step_by((-step) as usize));
} else {
// Increasing range
stop += include_upper;
values.extend((start..stop).step_by(step as usize));
}

offsets.push(values.len() as i32);
}
let arr = Arc::new(ListArray::try_new(
Arc::new(Field::new("item", DataType::Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Int64Array::from(values)),
None,
)?);
Ok(arr)
}
8 changes: 7 additions & 1 deletion datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ use std::sync::Arc;
/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
pub use super::udf::array_to_string;
pub use super::udf::gen_series;
pub use super::udf::range;
}

/// Registers all enabled packages with a [`FunctionRegistry`]
pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
let functions: Vec<Arc<ScalarUDF>> = vec![udf::array_to_string_udf()];
let functions: Vec<Arc<ScalarUDF>> = vec![
udf::array_to_string_udf(),
udf::range_udf(),
udf::gen_series_udf(),
];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
if let Some(existing_udf) = existing_udf {
Expand Down
123 changes: 121 additions & 2 deletions datafusion/functions-array/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
//! [`ScalarUDFImpl`] definitions for array functions.
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use datafusion_common::plan_err;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;

use std::sync::Arc;
// Create static instances of ScalarUDFs for each function
make_udf_function!(ArrayToString,
array_to_string,
array delimiter, // arg name
"converts each element to its text representation.", // doc
array_to_string_udf // internal function name
);

#[derive(Debug)]
pub(super) struct ArrayToString {
signature: Signature,
Expand Down Expand Up @@ -83,3 +84,121 @@ impl ScalarUDFImpl for ArrayToString {
&self.aliases
}
}

make_udf_function!(
Range,
range,
start stop step,
"create a list of values in the range between start and stop",
range_udf
);
#[derive(Debug)]
pub(super) struct Range {
signature: Signature,
aliases: Vec<String>,
}
impl Range {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
Exact(vec![Int64]),
Exact(vec![Int64, Int64]),
Exact(vec![Int64, Int64, Int64]),
],
Volatility::Immutable,
),
aliases: vec![String::from("range")],
}
}
}
impl ScalarUDFImpl for Range {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"range"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
use DataType::*;
Ok(List(Arc::new(Field::new(
"item",
arg_types[0].clone(),
true,
))))
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::gen_range(&args, 0).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

make_udf_function!(
GenSeries,
gen_series,
start stop step,
"create a list of values in the range between start and stop, include upper bound",
gen_series_udf
);
#[derive(Debug)]
pub(super) struct GenSeries {
signature: Signature,
aliases: Vec<String>,
}
impl GenSeries {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
Exact(vec![Int64]),
Exact(vec![Int64, Int64]),
Exact(vec![Int64, Int64, Int64]),
],
Volatility::Immutable,
),
aliases: vec![String::from("generate_series")],
}
}
}
impl ScalarUDFImpl for GenSeries {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"generate_series"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
use DataType::*;
Ok(List(Arc::new(Field::new(
"item",
arg_types[0].clone(),
true,
))))
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::gen_range(&args, 1).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
59 changes: 0 additions & 59 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
DataFusionError, Result, ScalarValue,
};

use itertools::Itertools;

macro_rules! downcast_arg {
Expand Down Expand Up @@ -887,64 +886,6 @@ where
)?))
}

/// Generates an array of integers from start to stop with a given step.
///
/// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values.
/// It returns a `Result<ArrayRef>` representing the resulting ListArray after the operation.
///
/// # Arguments
///
/// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values.
///
/// # Examples
///
/// gen_range(3) => [0, 1, 2]
/// gen_range(1, 4) => [1, 2, 3]
/// gen_range(1, 7, 2) => [1, 3, 5]
pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
let (start_array, stop_array, step_array) = match args.len() {
1 => (None, as_int64_array(&args[0])?, None),
2 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
None,
),
3 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
Some(as_int64_array(&args[2])?),
),
_ => return exec_err!("gen_range expects 1 to 3 arguments"),
};

let mut values = vec![];
let mut offsets = vec![0];
for (idx, stop) in stop_array.iter().enumerate() {
let stop = stop.unwrap_or(0);
let start = start_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(0);
let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1);
if step == 0 {
return exec_err!("step can't be 0 for function range(start [, stop, step]");
}
if step < 0 {
// Decreasing range
values.extend((stop + 1..start + 1).rev().step_by((-step) as usize));
} else {
// Increasing range
values.extend((start..stop).step_by(step as usize));
}

offsets.push(values.len() as i32);
}
let arr = Arc::new(ListArray::try_new(
Arc::new(Field::new("item", DataType::Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Int64Array::from(values)),
None,
)?);
Ok(arr)
}

/// Array_sort SQL function
pub fn array_sort(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() || args.len() > 3 {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_intersect)(args)
}),
BuiltinScalarFunction::Range => Arc::new(|args| {
make_scalar_function_inner(array_expressions::gen_range)(args)
}),
BuiltinScalarFunction::Cardinality => Arc::new(|args| {
make_scalar_function_inner(array_expressions::cardinality)(args)
}),
Expand Down
Loading

0 comments on commit ca37ce3

Please sign in to comment.