Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialized / Pre-compiled / Prepared ScalarUDFs #8051

Open
alamb opened this issue Nov 4, 2023 · 19 comments
Open

Specialized / Pre-compiled / Prepared ScalarUDFs #8051

alamb opened this issue Nov 4, 2023 · 19 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Nov 4, 2023

Is your feature request related to a problem or challenge?

Currently, scalar UDF functions can not be "specialized"

SELECT * FROM t where my_matcher(t.column, '[a-z].*');

What happens is that the regexp string '[a-z].*' gets passed as a literal expression to each invocation of the function, then you have to compile the regex again and again for every batch you process through the UDF.

Since it is expensive to compile these RegExps, it would be nice if there was something that could compile the RegExp once per plan rather than once per batch

Describe the solution you'd like

No response

Describe alternatives you've considered

See comments below

Additional context

Suggested by @thinkharderdev on #8045 (comment)

@alamb alamb added the enhancement New feature or request label Nov 4, 2023
@alamb
Copy link
Contributor Author

alamb commented Nov 4, 2023

@thinkharderdev suggests

it would be useful to be able to serialize constant parameters into the user-defined scalar function themselves rather than pass them in as expressions. So for instance if you had to create a UDF to do something with a regex that you have as a static constant. Currently the way to do that is pass it as a literal expression. But then you have to compile the regex again for every batch you process through the UDF. Ideally you could have something like:

struct MyRegexUdf {
  regex: Regex
}

impl ScalarFunction for MyRegexUdf {
  // use regex on each value somehow
}

The regex would only need to be compiled once during deserialization (or construction) instead of once for each batch.

@alamb alamb changed the title Pre-compiled / Prepared ScalarUDFs Specialized / Pre-compiled / Prepared ScalarUDFs Nov 4, 2023
@alamb
Copy link
Contributor Author

alamb commented Nov 4, 2023

One way to achieve this might be a PhysicalOptimizerRule that replaces relevant instances of ScalarFunctionExpr.

However this is likely somewhat awkward to write and is not clear that these expressions would serialize well (as serialization matches a name to an expr). It is probably possible to do this during derialization with the PhysicalExtensionCodec

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2023

@2010YOUY01 noted that in order to use the same APIs for built in functions and ScalarUDFs we will need to have some way to handle:

Figure out the interface to let core pass information to function definitions (e.g. now() requires to be passed query start time from core)

This sounds very similar to this method

Maybe we could add a prepare(ctx: &TaskContext) type method on ScalarUDF 🤔 that can potentially return a new version of the function to invoke

@alamb
Copy link
Contributor Author

alamb commented Dec 14, 2023

I think the trick here will be to ensure we can still serialize such "precompiled" functions

What I was thinking was maybe we can make a new PhysicalExpr that is like

/// A function that is "precompiled" in some way 
/// for example, for a regular expression that has a constant argument
/// constant can be pre-compiled into a Regexpr match instance once per query
/// rather than once per batch
///
/// Somtimes precompiling make it hard/impossible to serialize the function again (e.g. the prepared regular expressions)
/// so this structure contains the original PhysicalExpr that can be used to serialize the function
struct PrecompiledExpr {
  precompiled: Arc<dyn PhysicalExpr>,
  original: Arc<dyn PhysicalExpr>
}

@sadboy
Copy link
Contributor

sadboy commented Dec 15, 2023

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

@alamb
Copy link
Contributor Author

alamb commented Dec 15, 2023

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

@sadboy that is a (really) good idea

@alamb
Copy link
Contributor Author

alamb commented Dec 18, 2023

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

@sadboy that is a (really) good idea

BTW I thought more about this and one challenge is that currently the scalar function implementations that are passed around may be shared -- so using interior mutability would get confusing as each function invocation would be from the same instance of the function

@sadboy
Copy link
Contributor

sadboy commented Dec 20, 2023

the scalar function implementations that are passed around may be shared -- so using interior mutability would get confusing as each function invocation would be from the same instance of the function

Hmm, not sure I'm following? I was thinking of the ScalarFunctionExpr physical node (https://docs.rs/datafusion-physical-expr/34.0.0/src/datafusion_physical_expr/scalar_function.rs.html#51), e.g. adding something like

pub struct ScalarFunctionExpr {
[...]
    args: Vec<Arc<dyn PhysicalExpr>>,
    prepared_args: OnceCell<Vec<Box<dyn PreparedArgs>>>,
[...]
}

where prepared_args is populated by the return value of calling ScalarFunction::prepare on the set of physical args. The function implementation objects themselves do not need to mutate -- they only need to provide an optional implementation of ScalarFunction::prepare method if they wish to pre-process the arguments. prepared_args is purely an optimization construct, it should have no effect on the semantics of ScalarFunctionExpr -- the node should compute the exact same result whether or not prepared_args is populated. This way, any part of the system (e.g. Serialize, Clone, etc.) is always free to simply drop it without affecting correctness.

@alamb
Copy link
Contributor Author

alamb commented Dec 20, 2023

BTW my hope it to prototype how this would work (as a ScalarUDF) by building on top of #8578

It would be pretty rad

@alamb
Copy link
Contributor Author

alamb commented Mar 5, 2024

BTW now that @jayzhan211 and I have implemented ScalarUDF::simplify in #9298 and we have ported the regular_expression functions to use ScalarUDF, I think we could actually use that API to implement precompiled functions

Note sure if that would meet your requirements @thinkharderdev

For example, to implement "precompiled regexp functions" we could do something like this (would be sweet if someone wanted to prototype this):

/// A new UDF that has a precompiled pattern
impl PrecompiledRegexpReplace {
  precompiled_match: Arc<Pattern>
}

impl ScalarUDFImpl for PrecompiledRegexpReplace  {
   // invoke function uses `self.precompiled_match` directly
...
}


// Update the existing RegexpReplace function to implement `simplify`
impl ScalarUDFImpl for RegexpReplace  {

  /// if the pattern argument is a scalar, rewrite the function to a new scalar UDF that
  /// contains a pre-compiled regular-expression
  fn simplify(&self) .. { 
    match (args[1], args[2]) {
       (ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
         let pattern = // create regexp match
         SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
          .call(args)
       }, 
      _ => Simplified::Original(args)
  }
}

We could then run some gnarly regular expression case, such as what is found on #8492 and see if it helps or not.

If it doesn't help performance, then the extra complexity isn't worth it for regexp_replace

@thinkharderdev
Copy link
Contributor

Note sure if that would meet your requirements @thinkharderdev

Yeah, although I was mainly using regexes as an example originally. Our particular issues are with some UDFs we have implemented that require some hilarious hacks to pass state as expressions.

But the idea does seem good to me.

@alamb
Copy link
Contributor Author

alamb commented Jun 27, 2024

FYI there is another discussion about this here: #11146

@findepi
Copy link
Member

findepi commented Sep 12, 2024

After #9289, it would be good to recap what's tbd in this issue.

@milenkovicm
Copy link
Contributor

It looks somebody needs to prototype what @alamb suggested in

#8051 (comment)

I believe there is small change, some args will be used to create pattern so they should be removed from passing them down to result expression

match (args[1], args[2]) {
     (ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
       let pattern = // create regexp match, probably from  args[1], and arg[2]
       let new_args = vec![args[0]]
       SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
        .call(new_args)
},

The only downside I see is that the "state" is not going to be serialised if it has to be distributed in systems like ballista. It would make sense to have generic "serialise to physical plan" at some point, it would help with distributing something like python udfs but, thats probably different discussion.

also note: #12270

@findepi
Copy link
Member

findepi commented Sep 13, 2024

The only downside I see is that the "state" is not going to be serialised if it has to be distributed in systems like ballista.

That's because we don't use expressions.
What about using simplify all the way down? The PrecompiledRegexpMatch could get dumped as a bytes buffer (varbinary) into an expression and then cast back to PrecompiledRegexpMatch. This will work as long as it's a flat structure. It won't work when it's something that has pointers internally and requires actual serialization.

Alternatively, we can avoid all this complexity -- at the cost of different complexity, but conceptually simpler.
Let's imagine ScalarUDF invoke gets an option to create a thread local scratch space that it can reuse on all invocations. That would make reusing compiled pattern easier without having to serialize it in the plan.
The downside would be that the implementation would need to explicitly check whether the pattern is the same on every invocation (equality check once per batch).

@milenkovicm
Copy link
Contributor

Sorry but I'm not sure I understand why do you need thread local storage.

What @alamb proposed will work with datafusion execution, no need for any other state handling, state is in the struct's property, which will be re-used for each and every batch this function participate, PrecompiledRegexpMatch will be "specialised" for a given logical plan.

Or to put it in a different way, if there is no need to serialise your plan to be distributed, like with ballista, I believe proposed solution totally make sense.

Do you have use-case where you need to distribute function expression to different executors, like ballista ?

@findepi
Copy link
Member

findepi commented Sep 13, 2024

I am not using ballista currently. I realized the plan serialization concern is easy to address if we separate simplify into phases: the Expr-constraint simplify (e.g. pruning args known to be null, etc) that would be run during plan optimization phase. And then, local execution simplify which allows a function to "compile itself" into most optimal form, without any needs for serialization anymore. Ballista would need to serialize and distribute the plans in between these phases.

BTW we focused so far on compiling regular expressions, but we didn't think about memory needs for their execution.
Internally regex::Regex::is_match uses a synchronized pool of "caches" (regex execution scratch space) underneath. I don't know if this is a perf problem (probably not!), but let me use this as an example. It would probably be good if at runtime a scalar could have its own thread local "scratch space" / "local buffer". And without having to use thread locals which aren't great if DF is embedded and doesn't control thread creation.

Why am I mentioning this? I thought that maybe if we had "scratch space" / "local buffer" support, we wouldn't have need to "compile functions" during planning.

@alamb
Copy link
Contributor Author

alamb commented Nov 21, 2024

Perhaps the ScalarFunctionArgs added in

@alamb
Copy link
Contributor Author

alamb commented Nov 21, 2024

Copying a comment from #13290

Yes, I was thinking something like

trait ScalarUDFImpl { 
  /// prepares to run the function, returning any state (such as 
  /// a precompiled regex). Called once per instance of function in the query
  fn prepare(&self, args: &ScalarFunctionArgs) -> Option<Box<dyn Any>> { None }

  /// `prepared` field in ScalarFunctonArgs has the result of calling `prepare`
  fn invoke_with_args(&self, args: &ScalarFunctionArgs) -> ...
pub struct ScalarFunctionArgs<'a> {
  ...
  /// The result from a call to `prepare` for this function instance
  prepared: Option<Box<dyn Any>>,
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants