Skip to content

Commit

Permalink
[Document] Adding UDF by impl ScalarUDFImpl (#9172)
Browse files Browse the repository at this point in the history
* doc: update scalar udf

* Update docs/source/library-user-guide/adding-udfs.md

Co-authored-by: Andrew Lamb <[email protected]>

* Update docs/source/library-user-guide/adding-udfs.md

Co-authored-by: Andrew Lamb <[email protected]>

* apply suggestions and format

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
yyy1000 and alamb authored Feb 10, 2024
1 parent b2ff63f commit afb169c
Showing 1 changed file with 82 additions and 5 deletions.
87 changes: 82 additions & 5 deletions docs/source/library-user-guide/adding-udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,87 @@ First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about th

## Adding a Scalar UDF

A Scalar UDF is a function that takes a row of data and returns a single value. For example, this function takes a single i64 and returns a single i64 with 1 added to it:
A Scalar UDF is a function that takes a row of data and returns a single value. In order for good performance
such functions are "vectorized" in DataFusion, meaning they get one or more Arrow Arrays as input and produce
an Arrow Array with the same number of rows as output.

To create a Scalar UDF, you

1. Implement the `ScalarUDFImpl` trait to tell DataFusion about your function such as what types of arguments it takes and how to calculate the results.
2. Create a `ScalarUDF` and register it with `SessionContext::register_udf` so it can be invoked by name.

In the following example, we will add a function takes a single i64 and returns a single i64 with 1 added to it:

For brevity, we'll skipped some error handling, but e.g. you may want to check that `args.len()` is the expected number of arguments.

### Adding by `impl ScalarUDFImpl`

This a lower level API with more functionality but is more complex, also documented in [`advanced_udf.rs`].

```rust
use std::any::Any;
use arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, plan_err, Result};
use datafusion_expr::{col, ColumnarValue, Signature, Volatility};
use datafusion_expr::{ScalarUDFImpl, ScalarUDF};

#[derive(Debug)]
struct AddOne {
signature: Signature
};

impl AddOne {
fn new() -> Self {
Self {
signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable)
}
}
}

/// Implement the ScalarUDFImpl trait for AddOne
impl ScalarUDFImpl for AddOne {
fn as_any(&self) -> &dyn Any { self }
fn name(&self) -> &str { "add_one" }
fn signature(&self) -> &Signature { &self.signature }
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
if !matches!(args.get(0), Some(&DataType::Int32)) {
return plan_err!("add_one only accepts Int32 arguments");
}
Ok(DataType::Int32)
}
// The actual implementation would add one to the argument
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = columnar_values_to_array(args)?;
let i64s = as_int64_array(&args[0])?;

let new_array = i64s
.iter()
.map(|array_elem| array_elem.map(|value| value + 1))
.collect::<Int64Array>();
Ok(Arc::new(new_array))
}
}
```

We now need to register the function with DataFusion so that it can be used in the context of a query.

```rust
// Create a new ScalarUDF from the implementation
let add_one = ScalarUDF::from(AddOne::new());

// register the UDF with the context so it can be invoked by name and from SQL
let mut ctx = SessionContext::new();
ctx.register_udf(add_one.clone());

// Call the function `add_one(col)`
let expr = add_one.call(vec![col("a")]);
```

### Adding a Scalar UDF by [`create_udf`]

There is a an older, more concise, but also more limited API [`create_udf`] available as well

#### Adding a Scalar UDF

```rust
use std::sync::Arc;
Expand All @@ -58,8 +138,6 @@ pub fn add_one(args: &[ColumnarValue]) -> Result<ArrayRef> {
}
```

For brevity, we'll skipped some error handling, but e.g. you may want to check that `args.len()` is the expected number of arguments.

This "works" in isolation, i.e. if you have a slice of `ArrayRef`s, you can call `add_one` and it will return a new `ArrayRef` with 1 added to each value.

```rust
Expand All @@ -74,11 +152,10 @@ assert_eq!(result, &Int64Array::from(vec![Some(2), None, Some(4)]));

The challenge however is that DataFusion doesn't know about this function. We need to register it with DataFusion so that it can be used in the context of a query.

### Registering a Scalar UDF
#### Registering a Scalar UDF

To register a Scalar UDF, you need to wrap the function implementation in a [`ScalarUDF`] struct and then register it with the `SessionContext`.
DataFusion provides the [`create_udf`] and helper functions to make this easier.
There is a lower level API with more functionality but is more complex, that is documented in [`advanced_udf.rs`].

```rust
use datafusion::logical_expr::{Volatility, create_udf};
Expand Down

0 comments on commit afb169c

Please sign in to comment.