Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: enable array allocation reuse (ScalarFunctionArgs gets owned ColumnReference) #13637

Merged
merged 8 commits into from
Dec 8, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Dec 4, 2024

Which issue does this PR close?

Rationale for this change

Previously, because the functions get the input as &[ColumnarValues], they don't own them (the caller retains a reference) which requires creating new arrays even when the input array is not used anywhere else.

Now that we have changed the signature of invoke to get a struct, it is possible to potentially avoid allocating a new array for each output.

I don't have time myself to apply this pattern to the built in DataFusion functions, but I wanted to make sure it was possible in case it comes up in the future or others want to really optimize expression evaluation

What changes are included in this PR?

Are these changes tested?

  1. Change ScalarFunctionArgs to pass by value
  2. Update documentation
  3. Update tests
  4. Update advanced_udf.rs example to show how to reusing values

Are there any user-facing changes?

Yes, both by CI (that it compiles) as well as by the example (which is also run in CI)

I could pull out the example into its own specific test case too if people thought that was valuable

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates functions Changes to functions implementation labels Dec 4, 2024
/// This is the same way that functions built into DataFusion are invoked,
/// which permits important special cases when one or both of the arguments
/// are single values (constants). For example `pow(a, 2)`
///1. When one or both of the arguments are single values (constants).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the advanced_udf example to show how to update the arrays in place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @findepi who asked about this I think

@@ -191,6 +199,48 @@ impl ScalarUDFImpl for PowUdf {
}
}

/// Evaluate `base ^ exp` *without* allocating a new array, if possible
fn pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the example of how to write a function that reuses the input argument's allocation

pub args: &'a [ColumnarValue],
// The number of rows in record batch being evaluated
/// The evaluated arguments to the function
pub args: Vec<ColumnarValue>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an API change, but since this struct has not been released yet, it isn't a user facing change.

@@ -562,7 +562,7 @@ mod tests {
fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
let res = ToLocalTimeFunc::new()
.invoke_with_args(ScalarFunctionArgs {
args: &[ColumnarValue::Scalar(input)],
args: vec![ColumnarValue::Scalar(input)],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the rest of this PR is changes for the new signature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats interesting, Clippy was suggesting to use arrays for static sized immutable collections instead of vectors

Copy link
Contributor Author

@alamb alamb Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I actually changed ScalarFunctionArgs to get an owned Vec

Perhaps clippy suggested [ColumnarValue::Scalar(input)], instead of &vec![ColumnarValue::Scalar(input)] 🤔

@@ -134,20 +134,20 @@ impl PhysicalExpr for ScalarFunctionExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
Copy link
Contributor Author

@alamb alamb Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could potentially increase the number of places arrays could be reused by changing this particular API to take batch (owned) and threading that more fully through the physical exprs.

Until there is some compelling usecase to do so, however, I don't think we should go mess with the APIs again

.as_primitive::<Float64Type>()
// non-obviously, clone (which increments ref counts, it
// doesn't clone the data) to get an typed own array
// so we drop the original exp_array (untyped) reference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it clone the reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It clones the Float64Array

Underneath this clones several references (like arrow Buffers). I will try and make this clearer in the comments.

The code is not closing the Arc<ArrayRef>

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @alamb
wondering should we create a small bench to see the performance gain?

I can take care on that if needed

@alamb
Copy link
Contributor Author

alamb commented Dec 5, 2024

lgtm thanks @alamb wondering should we create a small bench to see the performance gain?

I can take care on that if needed

Thank you very much for the review @comphead

If wanted to see a perfomance gain, I think we need to

  1. have a query where expression evaluation with a function is is a significant portion of the query time
  2. rewrite the funtion in question to use this array reuse strategy

My motivation for changing this API now (without a realistic driving case) was to avoid releasing a version of ScalarFunctionArgs that passed a reference (rather than a Vec)

Then @findepi recommeneded ensuring that an owned API could actually be used to avoid allocating during execution

@comphead
Copy link
Contributor

comphead commented Dec 5, 2024

I'm keen to test it to get some numbers, apparently reuse already allocated zone is cheaper, but if we have some numbers for same method implemented with 2 approaches would be nice and we can share this knowledge. I'll try to run it soon

@alamb
Copy link
Contributor Author

alamb commented Dec 5, 2024

I'm keen to test it to get some numbers, apparently reuse already allocated zone is cheaper, but if we have some numbers for same method implemented with 2 approaches would be nice and we can share this knowledge. I'll try to run it soon

Thank you 🙏

Comment on lines 202 to 203
/// Evaluate `base ^ exp` *without* allocating a new array, if possible
fn pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's "pow maybe in place", not "pow in place"
worth reflecting in the function name (eg with addition of "maybe")? otherwise i'd call this just pow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree -- renamed to maybe_pow_in_place

// These kernels can only be used if there are no other references to
// the arrays (exp_array has to be the last remaining reference).
let owned_array = exp_array
// as before we downcast to Float64Array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "before" refer to here? code above, or previous version of the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above (aka the previous example) -- I have clarified

Comment on lines +218 to +219
// Once we have the owned Float64Array we can drop the original
// exp_array (untyped) reference
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called with ArrayRef.
How did we ensure this was the only reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify this in the comments -- the call to unary_mut fails if there are other outstanding references.

Comment on lines 232 to 240
Err(_orig_array) => {
// unary_mut will return the original array if there are other
// references into the underling buffer (and thus reuse is
// impossible)
//
// In a real implementation, this case should fall back to
// calling `unary` and allocate a new array; In our example
// we will return an error for demonstration purposes
exec_err!("Could not reuse array for pow_in_place")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if my pow udf is called in a query like

select pow(10, f), f from ..

in such case, re-use shouldn't be possible, unless there was some (redundant) data copying before the pow call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is correct to the best of my understanding

// is the last remaining reference
drop(exp_array);

// at this point, exp_array is the only reference in this function. However,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point, exp_array doesn't exist anymore.
and we still don't know whether we have only reference, or shared.
we know it only based on return result from compute::unary_mut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry -- I think this meant to say "owned_array" is the only reference. I have tried to clarify the wording. This is a good catch

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % code comments in datafusion-examples/examples/advanced_udf.rs

Comment on lines 274 to 290
// You can also invoke pow_in_place by passing a constant base and a
// column `a` as the exponent . If there is only a single
// reference to `a` the code works well
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?;

// However, if there are multiple references to `a` in the evaluation
// the array storage can not be reused
let err = ctx
.sql("SELECT pow(2, a), pow(3, a) FROM t")
.await?
.show()
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Could not reuse array for pow_in_place"
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This are good examples

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went to test if this will work currently, found a bug in existing code :)

> SELECT pow(2, a), pow(3, a) FROM t;
Arrow error: Arithmetic overflow: Overflow happened on: 3 ^ 41

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and the example sql does indeed work currently (as expected) so this would be a breaking change if it were to fail now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool -- I will respond to these comments later today. Thank you

@alamb alamb merged commit 3ee9b3d into apache:main Dec 8, 2024
25 checks passed
@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2024

Thanks everyone for your comments 🙏

@alamb alamb deleted the alamb/improve_scalar_udf_invoke branch December 8, 2024 13:28
zhuliquan pushed a commit to zhuliquan/datafusion that referenced this pull request Dec 11, 2024
… owned `ColumnReference`) (apache#13637)

* Improve documentation

* Pass owned args to ScalarFunctionArgs

* Update advanced_udf with example of reusing arrays

* clarify rationale for cloning

* clarify comments

* fix expected output
zhuliquan pushed a commit to zhuliquan/datafusion that referenced this pull request Dec 15, 2024
… owned `ColumnReference`) (apache#13637)

* Improve documentation

* Pass owned args to ScalarFunctionArgs

* Update advanced_udf with example of reusing arrays

* clarify rationale for cloning

* clarify comments

* fix expected output
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions Changes to functions implementation logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Perf: Allow User defined functions to potentially reuse their argument arrays (to avoid new allocations)
4 participants