-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance: enable array allocation reuse (ScalarFunctionArgs
gets owned ColumnReference
)
#13637
Conversation
/// This is the same way that functions built into DataFusion are invoked, | ||
/// which permits important special cases when one or both of the arguments | ||
/// are single values (constants). For example `pow(a, 2)` | ||
///1. When one or both of the arguments are single values (constants). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the advanced_udf example to show how to update the arrays in place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @findepi who asked about this I think
@@ -191,6 +199,48 @@ impl ScalarUDFImpl for PowUdf { | |||
} | |||
} | |||
|
|||
/// Evaluate `base ^ exp` *without* allocating a new array, if possible | |||
fn pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the example of how to write a function that reuses the input argument's allocation
pub args: &'a [ColumnarValue], | ||
// The number of rows in record batch being evaluated | ||
/// The evaluated arguments to the function | ||
pub args: Vec<ColumnarValue>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an API change, but since this struct has not been released yet, it isn't a user facing change.
@@ -562,7 +562,7 @@ mod tests { | |||
fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) { | |||
let res = ToLocalTimeFunc::new() | |||
.invoke_with_args(ScalarFunctionArgs { | |||
args: &[ColumnarValue::Scalar(input)], | |||
args: vec![ColumnarValue::Scalar(input)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the rest of this PR is changes for the new signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats interesting, Clippy was suggesting to use arrays for static sized immutable collections instead of vectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I actually changed ScalarFunctionArgs
to get an owned Vec
Perhaps clippy suggested [ColumnarValue::Scalar(input)],
instead of &vec![ColumnarValue::Scalar(input)]
🤔
@@ -134,20 +134,20 @@ impl PhysicalExpr for ScalarFunctionExpr { | |||
} | |||
|
|||
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could potentially increase the number of places arrays could be reused by changing this particular API to take batch
(owned) and threading that more fully through the physical exprs.
Until there is some compelling usecase to do so, however, I don't think we should go mess with the APIs again
.as_primitive::<Float64Type>() | ||
// non-obviously, clone (which increments ref counts, it | ||
// doesn't clone the data) to get an typed own array | ||
// so we drop the original exp_array (untyped) reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it clone the reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It clones the Float64Array
Underneath this clones several references (like arrow Buffer
s). I will try and make this clearer in the comments.
The code is not closing the Arc<ArrayRef>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @alamb
wondering should we create a small bench to see the performance gain?
I can take care on that if needed
Thank you very much for the review @comphead If wanted to see a perfomance gain, I think we need to
My motivation for changing this API now (without a realistic driving case) was to avoid releasing a version of ScalarFunctionArgs that passed a reference (rather than a Then @findepi recommeneded ensuring that an owned API could actually be used to avoid allocating during execution |
I'm keen to test it to get some numbers, apparently reuse already allocated zone is cheaper, but if we have some numbers for same method implemented with 2 approaches would be nice and we can share this knowledge. I'll try to run it soon |
Thank you 🙏 |
/// Evaluate `base ^ exp` *without* allocating a new array, if possible | ||
fn pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it's "pow maybe in place", not "pow in place"
worth reflecting in the function name (eg with addition of "maybe")? otherwise i'd call this just pow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree -- renamed to maybe_pow_in_place
// These kernels can only be used if there are no other references to | ||
// the arrays (exp_array has to be the last remaining reference). | ||
let owned_array = exp_array | ||
// as before we downcast to Float64Array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "before" refer to here? code above, or previous version of the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code above (aka the previous example) -- I have clarified
// Once we have the owned Float64Array we can drop the original | ||
// exp_array (untyped) reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is called with ArrayRef
.
How did we ensure this was the only reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to clarify this in the comments -- the call to unary_mut
fails if there are other outstanding references.
Err(_orig_array) => { | ||
// unary_mut will return the original array if there are other | ||
// references into the underling buffer (and thus reuse is | ||
// impossible) | ||
// | ||
// In a real implementation, this case should fall back to | ||
// calling `unary` and allocate a new array; In our example | ||
// we will return an error for demonstration purposes | ||
exec_err!("Could not reuse array for pow_in_place") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if my pow udf is called in a query like
select pow(10, f), f from ..
in such case, re-use shouldn't be possible, unless there was some (redundant) data copying before the pow call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that is correct to the best of my understanding
// is the last remaining reference | ||
drop(exp_array); | ||
|
||
// at this point, exp_array is the only reference in this function. However, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at this point, exp_array doesn't exist anymore.
and we still don't know whether we have only reference, or shared.
we know it only based on return result from compute::unary_mut
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry -- I think this meant to say "owned_array" is the only reference. I have tried to clarify the wording. This is a good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % code comments in datafusion-examples/examples/advanced_udf.rs
// You can also invoke pow_in_place by passing a constant base and a | ||
// column `a` as the exponent . If there is only a single | ||
// reference to `a` the code works well | ||
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?; | ||
|
||
// However, if there are multiple references to `a` in the evaluation | ||
// the array storage can not be reused | ||
let err = ctx | ||
.sql("SELECT pow(2, a), pow(3, a) FROM t") | ||
.await? | ||
.show() | ||
.await | ||
.unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Execution error: Could not reuse array for pow_in_place" | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This are good examples
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went to test if this will work currently, found a bug in existing code :)
> SELECT pow(2, a), pow(3, a) FROM t;
Arrow error: Arithmetic overflow: Overflow happened on: 3 ^ 41
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and the example sql does indeed work currently (as expected) so this would be a breaking change if it were to fail now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool -- I will respond to these comments later today. Thank you
Thanks everyone for your comments 🙏 |
… owned `ColumnReference`) (apache#13637) * Improve documentation * Pass owned args to ScalarFunctionArgs * Update advanced_udf with example of reusing arrays * clarify rationale for cloning * clarify comments * fix expected output
… owned `ColumnReference`) (apache#13637) * Improve documentation * Pass owned args to ScalarFunctionArgs * Update advanced_udf with example of reusing arrays * clarify rationale for cloning * clarify comments * fix expected output
Which issue does this PR close?
ScalarUDFImpl::invoke_with_args
to support passing the return type created for the udf instance #13290Rationale for this change
Previously, because the functions get the input as
&[ColumnarValues]
, they don't own them (the caller retains a reference) which requires creating new arrays even when the input array is not used anywhere else.Now that we have changed the signature of
invoke
to get a struct, it is possible to potentially avoid allocating a new array for each output.I don't have time myself to apply this pattern to the built in DataFusion functions, but I wanted to make sure it was possible in case it comes up in the future or others want to really optimize expression evaluation
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Yes, both by CI (that it compiles) as well as by the example (which is also run in CI)
I could pull out the example into its own specific test case too if people thought that was valuable