-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Specialized / Pre-compiled / Prepared ScalarUDFs #8051
Comments
@thinkharderdev suggests it would be useful to be able to serialize constant parameters into the user-defined scalar function themselves rather than pass them in as expressions. So for instance if you had to create a UDF to do something with a regex that you have as a static constant. Currently the way to do that is pass it as a literal expression. But then you have to compile the regex again for every batch you process through the UDF. Ideally you could have something like:
|
One way to achieve this might be a PhysicalOptimizerRule that replaces relevant instances of ScalarFunctionExpr. However this is likely somewhat awkward to write and is not clear that these expressions would serialize well (as serialization matches a name to an expr). It is probably possible to do this during derialization with the PhysicalExtensionCodec |
@2010YOUY01 noted that in order to use the same APIs for built in functions and ScalarUDFs we will need to have some way to handle:
This sounds very similar to this method Maybe we could add a |
I think the trick here will be to ensure we can still serialize such "precompiled" functions What I was thinking was maybe we can make a new /// A function that is "precompiled" in some way
/// for example, for a regular expression that has a constant argument
/// constant can be pre-compiled into a Regexpr match instance once per query
/// rather than once per batch
///
/// Somtimes precompiling make it hard/impossible to serialize the function again (e.g. the prepared regular expressions)
/// so this structure contains the original PhysicalExpr that can be used to serialize the function
struct PrecompiledExpr {
precompiled: Arc<dyn PhysicalExpr>,
original: Arc<dyn PhysicalExpr>
} |
That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization. |
@sadboy that is a (really) good idea |
BTW I thought more about this and one challenge is that currently the scalar function implementations that are passed around may be shared -- so using interior mutability would get confusing as each function invocation would be from the same instance of the function |
Hmm, not sure I'm following? I was thinking of the
where |
BTW my hope it to prototype how this would work (as a ScalarUDF) by building on top of #8578 It would be pretty rad |
BTW now that @jayzhan211 and I have implemented Note sure if that would meet your requirements @thinkharderdev For example, to implement "precompiled regexp functions" we could do something like this (would be sweet if someone wanted to prototype this): /// A new UDF that has a precompiled pattern
impl PrecompiledRegexpReplace {
precompiled_match: Arc<Pattern>
}
impl ScalarUDFImpl for PrecompiledRegexpReplace {
// invoke function uses `self.precompiled_match` directly
...
}
// Update the existing RegexpReplace function to implement `simplify`
impl ScalarUDFImpl for RegexpReplace {
/// if the pattern argument is a scalar, rewrite the function to a new scalar UDF that
/// contains a pre-compiled regular-expression
fn simplify(&self) .. {
match (args[1], args[2]) {
(ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
let pattern = // create regexp match
SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
.call(args)
},
_ => Simplified::Original(args)
}
} We could then run some gnarly regular expression case, such as what is found on #8492 and see if it helps or not. If it doesn't help performance, then the extra complexity isn't worth it for regexp_replace |
Yeah, although I was mainly using regexes as an example originally. Our particular issues are with some UDFs we have implemented that require some hilarious hacks to pass state as expressions. But the idea does seem good to me. |
FYI there is another discussion about this here: #11146 |
After #9289, it would be good to recap what's tbd in this issue. |
It looks somebody needs to prototype what @alamb suggested in I believe there is small change, some args will be used to create pattern so they should be removed from passing them down to result expression match (args[1], args[2]) {
(ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
let pattern = // create regexp match, probably from args[1], and arg[2]
let new_args = vec![args[0]]
SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
.call(new_args)
}, The only downside I see is that the "state" is not going to be serialised if it has to be distributed in systems like ballista. It would make sense to have generic "serialise to physical plan" at some point, it would help with distributing something like python udfs but, thats probably different discussion. also note: #12270 |
That's because we don't use expressions. Alternatively, we can avoid all this complexity -- at the cost of different complexity, but conceptually simpler. |
Sorry but I'm not sure I understand why do you need thread local storage. What @alamb proposed will work with datafusion execution, no need for any other state handling, state is in the struct's property, which will be re-used for each and every batch this function participate, Or to put it in a different way, if there is no need to serialise your plan to be distributed, like with ballista, I believe proposed solution totally make sense. Do you have use-case where you need to distribute function expression to different executors, like ballista ? |
I am not using ballista currently. I realized the plan serialization concern is easy to address if we separate simplify into phases: the Expr-constraint simplify (e.g. pruning args known to be null, etc) that would be run during plan optimization phase. And then, local execution simplify which allows a function to "compile itself" into most optimal form, without any needs for serialization anymore. Ballista would need to serialize and distribute the plans in between these phases. BTW we focused so far on compiling regular expressions, but we didn't think about memory needs for their execution. Why am I mentioning this? I thought that maybe if we had "scratch space" / "local buffer" support, we wouldn't have need to "compile functions" during planning. |
Perhaps the
|
Copying a comment from #13290 Yes, I was thinking something like trait ScalarUDFImpl {
/// prepares to run the function, returning any state (such as
/// a precompiled regex). Called once per instance of function in the query
fn prepare(&self, args: &ScalarFunctionArgs) -> Option<Box<dyn Any>> { None }
/// `prepared` field in ScalarFunctonArgs has the result of calling `prepare`
fn invoke_with_args(&self, args: &ScalarFunctionArgs) -> ... pub struct ScalarFunctionArgs<'a> {
...
/// The result from a call to `prepare` for this function instance
prepared: Option<Box<dyn Any>>,
} |
Is your feature request related to a problem or challenge?
Currently, scalar UDF functions can not be "specialized"
What happens is that the regexp string
'[a-z].*'
gets passed as a literal expression to each invocation of the function, then you have to compile the regex again and again for every batch you process through the UDF.Since it is expensive to compile these RegExps, it would be nice if there was something that could compile the RegExp once per plan rather than once per batch
Describe the solution you'd like
No response
Describe alternatives you've considered
See comments below
Additional context
Suggested by @thinkharderdev on #8045 (comment)
The text was updated successfully, but these errors were encountered: