Skip to content

Commit

Permalink
Merge branch 'main' into user_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
jonahgao committed Dec 8, 2024
2 parents bd70243 + 9fbf39b commit b1cc263
Show file tree
Hide file tree
Showing 31 changed files with 323 additions and 235 deletions.
126 changes: 100 additions & 26 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_common::{exec_err, internal_err, ScalarValue};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -83,23 +85,27 @@ impl ScalarUDFImpl for PowUdf {
Ok(DataType::Float64)
}

/// This is the function that actually calculates the results.
/// This function actually calculates the results of the scalar function.
///
/// This is the same way that functions provided with DataFusion are invoked,
/// which permits important special cases:
///
/// This is the same way that functions built into DataFusion are invoked,
/// which permits important special cases when one or both of the arguments
/// are single values (constants). For example `pow(a, 2)`
///1. When one or both of the arguments are single values (constants).
/// For example `pow(a, 2)`
/// 2. When the input arrays can be reused (avoid allocating a new output array)
///
/// However, it also means the implementation is more complex than when
/// using `create_udf`.
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
// The other fields of the `args` struct are used for more specialized
// uses, and are not needed in this example
let ScalarFunctionArgs { mut args, .. } = args;
// DataFusion has arranged for the correct inputs to be passed to this
// function, but we check again to make sure
assert_eq!(args.len(), 2);
let (base, exp) = (&args[0], &args[1]);
// take ownership of arguments by popping in reverse order
let exp = args.pop().unwrap();
let base = args.pop().unwrap();
assert_eq!(base.data_type(), DataType::Float64);
assert_eq!(exp.data_type(), DataType::Float64);

Expand All @@ -118,7 +124,7 @@ impl ScalarUDFImpl for PowUdf {
) => {
// compute the output. Note DataFusion treats `None` as NULL.
let res = match (base, exp) {
(Some(base), Some(exp)) => Some(base.powf(*exp)),
(Some(base), Some(exp)) => Some(base.powf(exp)),
// one or both arguments were NULL
_ => None,
};
Expand All @@ -140,31 +146,33 @@ impl ScalarUDFImpl for PowUdf {
// kernel creates very fast "vectorized" code and
// handles things like null values for us.
let res: Float64Array =
compute::unary(base_array, |base| base.powf(*exp));
compute::unary(base_array, |base| base.powf(exp));
Arc::new(res)
}
};
Ok(ColumnarValue::Array(result_array))
}

// special case if the base is a constant (note this code is quite
// similar to the previous case, so we omit comments)
// special case if the base is a constant.
//
// Note this case is very similar to the previous case, so we could
// use the same pattern. However, for this case we demonstrate an
// even more advanced pattern to potentially avoid allocating a new array
(
ColumnarValue::Scalar(ScalarValue::Float64(base)),
ColumnarValue::Array(exp_array),
) => {
let res = match base {
None => new_null_array(exp_array.data_type(), exp_array.len()),
Some(base) => {
let exp_array = exp_array.as_primitive::<Float64Type>();
let res: Float64Array =
compute::unary(exp_array, |exp| base.powf(exp));
Arc::new(res)
}
Some(base) => maybe_pow_in_place(base, exp_array)?,
};
Ok(ColumnarValue::Array(res))
}
// Both arguments are arrays so we have to perform the calculation for every row
// Both arguments are arrays so we have to perform the calculation
// for every row
//
// Note this could also be done in place using `binary_mut` as
// is done in `maybe_pow_in_place` but here we use binary for simplicity
(ColumnarValue::Array(base_array), ColumnarValue::Array(exp_array)) => {
let res: Float64Array = compute::binary(
base_array.as_primitive::<Float64Type>(),
Expand All @@ -191,6 +199,52 @@ impl ScalarUDFImpl for PowUdf {
}
}

/// Evaluate `base ^ exp` *without* allocating a new array, if possible
fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
// Calling `unary` creates a new array for the results. Avoiding
// allocations is a common optimization in performance critical code.
// arrow-rs allows this optimization via the `unary_mut`
// and `binary_mut` kernels in certain cases
//
// These kernels can only be used if there are no other references to
// the arrays (exp_array has to be the last remaining reference).
let owned_array = exp_array
// as in the previous example, we first downcast to &Float64Array
.as_primitive::<Float64Type>()
// non-obviously, we call clone here to get an owned `Float64Array`.
// Calling clone() is relatively inexpensive as it increments
// some ref counts but doesn't clone the data)
//
// Once we have the owned Float64Array we can drop the original
// exp_array (untyped) reference
.clone();

// We *MUST* drop the reference to `exp_array` explicitly so that
// owned_array is the only reference remaining in this function.
//
// Note that depending on the query there may still be other references
// to the underlying buffers, which would prevent reuse. The only way to
// know for sure is the result of `compute::unary_mut`
drop(exp_array);

// If we have the only reference, compute the result directly into the same
// allocation as was used for the input array
match compute::unary_mut(owned_array, |exp| base.powf(exp)) {
Err(_orig_array) => {
// unary_mut will return the original array if there are other
// references into the underling buffer (and thus reuse is
// impossible)
//
// In a real implementation, this case should fall back to
// calling `unary` and allocate a new array; In this example
// we will return an error for demonstration purposes
exec_err!("Could not reuse array for maybe_pow_in_place")
}
// a result of OK means the operation was run successfully
Ok(res) => Ok(Arc::new(res)),
}
}

/// In this example we register `PowUdf` as a user defined function
/// and invoke it via the DataFrame API and SQL
#[tokio::main]
Expand All @@ -215,9 +269,29 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
let sql_df = ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t").await?;
sql_df.show().await?;
// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t")
.await?
.show()
.await?;

// You can also invoke pow_in_place by passing a constant base and a
// column `a` as the exponent . If there is only a single
// reference to `a` the code works well
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?;

// However, if there are multiple references to `a` in the evaluation
// the array storage can not be reused
let err = ctx
.sql("SELECT pow(2, a), pow(3, a) FROM t")
.await?
.show()
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Could not reuse array for maybe_pow_in_place"
);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/regexp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<()> {

// invalid flags will result in an error
let result = ctx
.sql(r"select regexp_like('\b4(?!000)\d\d\d\b', 4010, 'g')")
.sql(r"select regexp_like('\b4(?!000)\d\d\d\b', '4010', 'g')")
.await?
.collect()
.await;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
# Temporary fix for https://github.com/apache/datafusion/issues/13686
# TODO: Remove it once the upstream has a fix
lexical-write-integer = { version = "=1.0.2" }

arrow-buffer = { workspace = true }
async-trait = { workspace = true }
criterion = { version = "0.5", features = ["async_tokio"] }
Expand Down
14 changes: 8 additions & 6 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,15 @@ where
}
}

/// Arguments passed to [`ScalarUDFImpl::invoke_with_args`] when invoking a
/// scalar function.
pub struct ScalarFunctionArgs<'a> {
// The evaluated arguments to the function
pub args: &'a [ColumnarValue],
// The number of rows in record batch being evaluated
/// The evaluated arguments to the function
pub args: Vec<ColumnarValue>,
/// The number of rows in record batch being evaluated
pub number_rows: usize,
// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
// when creating the physical expression from the logical expression
/// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
/// when creating the physical expression from the logical expression
pub return_type: &'a DataType,
}

Expand Down Expand Up @@ -539,7 +541,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
self.invoke_batch(args.args, args.number_rows)
self.invoke_batch(&args.args, args.number_rows)
}

/// Invoke the function without `args`, instead the number of rows are provided,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/datetime/to_local_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ mod tests {
fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
let res = ToLocalTimeFunc::new()
.invoke_with_args(ScalarFunctionArgs {
args: &[ColumnarValue::Scalar(input)],
args: vec![ColumnarValue::Scalar(input)],
number_rows: 1,
return_type: &expected.data_type(),
})
Expand Down
50 changes: 29 additions & 21 deletions datafusion/functions/src/regex/regexplike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,7 @@ impl RegexpLikeFunc {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Utf8View, Utf8]),
TypeSignature::Exact(vec![Utf8View, Utf8View]),
TypeSignature::Exact(vec![Utf8View, LargeUtf8]),
TypeSignature::Exact(vec![Utf8, Utf8]),
TypeSignature::Exact(vec![Utf8, Utf8View]),
TypeSignature::Exact(vec![Utf8, LargeUtf8]),
TypeSignature::Exact(vec![LargeUtf8, Utf8]),
TypeSignature::Exact(vec![LargeUtf8, Utf8View]),
TypeSignature::Exact(vec![LargeUtf8, LargeUtf8]),
TypeSignature::Exact(vec![Utf8View, Utf8, Utf8]),
TypeSignature::Exact(vec![Utf8View, Utf8View, Utf8]),
TypeSignature::Exact(vec![Utf8View, LargeUtf8, Utf8]),
TypeSignature::Exact(vec![Utf8, Utf8, Utf8]),
TypeSignature::Exact(vec![Utf8, Utf8View, Utf8]),
TypeSignature::Exact(vec![Utf8, LargeUtf8, Utf8]),
TypeSignature::Exact(vec![LargeUtf8, Utf8, Utf8]),
TypeSignature::Exact(vec![LargeUtf8, Utf8View, Utf8]),
TypeSignature::Exact(vec![LargeUtf8, LargeUtf8, Utf8]),
],
vec![TypeSignature::String(2), TypeSignature::String(3)],
Volatility::Immutable,
),
}
Expand Down Expand Up @@ -211,7 +192,34 @@ pub fn regexp_like(args: &[ArrayRef]) -> Result<ArrayRef> {
match args.len() {
2 => handle_regexp_like(&args[0], &args[1], None),
3 => {
let flags = args[2].as_string::<i32>();
let flags = match args[2].data_type() {
Utf8 => args[2].as_string::<i32>(),
LargeUtf8 => {
let large_string_array = args[2].as_string::<i64>();
let string_vec: Vec<Option<&str>> = (0..large_string_array.len()).map(|i| {
if large_string_array.is_null(i) {
None
} else {
Some(large_string_array.value(i))
}
})
.collect();

&GenericStringArray::<i32>::from(string_vec)
},
_ => {
let string_view_array = args[2].as_string_view();
let string_vec: Vec<Option<String>> = (0..string_view_array.len()).map(|i| {
if string_view_array.is_null(i) {
None
} else {
Some(string_view_array.value(i).to_string())
}
})
.collect();
&GenericStringArray::<i32>::from(string_vec)
},
};

if flags.iter().any(|s| s == Some("g")) {
return plan_err!("regexp_like() does not support the \"global\" option");
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions/src/string/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
($INPUT:expr, $EXPECTED:expr) => {
test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
$EXPECTED,
i32,
Int32,
Expand All @@ -166,7 +166,7 @@ mod tests {

test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
$EXPECTED,
i32,
Int32,
Expand All @@ -175,7 +175,7 @@ mod tests {

test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
$EXPECTED,
i32,
Int32,
Expand Down
Loading

0 comments on commit b1cc263

Please sign in to comment.