Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDFs marked as volatile do not appear to evaluate multiple times for each output row. #8866

Closed
dadepo opened this issue Jan 14, 2024 · 19 comments · Fixed by #8878
Closed

UDFs marked as volatile do not appear to evaluate multiple times for each output row. #8866

dadepo opened this issue Jan 14, 2024 · 19 comments · Fixed by #8878
Labels
bug Something isn't working

Comments

@dadepo
Copy link

dadepo commented Jan 14, 2024

Describe the bug

It seems a UDF with no arguments are only called once, even if signature is defined with Volatility::Volatile and also queried in the context of a table with multiple rows.

By this I mean, for example: select random_udf() from many_rows_table

There is a minimal repro here https://github.com/dadepo/df-repro

When ran, the output could be:

+-------+------+-----+-------+
| index | uint | int | float |
+-------+------+-----+-------+
| 1     | 2    | -2  | 1.0   |
| 2     | 3    | 3   | 3.3   |
| 3     |      |     |       |
+-------+------+-----+-------+
+---------------+
| random_normal |
+---------------+
| 25.0          |
| 25.0          |
| 25.0          |
+---------------+

In the run above, the 25.0 is the result of the udf and it is repeated.

To Reproduce

A minimal reproduction can be found here https://github.com/dadepo/df-repro

Expected behavior

The UDF to be evaluated per each row output.

Additional context

No response

@dadepo dadepo added the bug Something isn't working label Jan 14, 2024
@dadepo dadepo changed the title UDF marked as Volatile seem not to be evaluating multiple times for each output row UDFs marked as volatile do not appear to evaluate multiple times for each output row. Jan 15, 2024
@alamb
Copy link
Contributor

alamb commented Jan 15, 2024

I agree this is a bug. Thank you for the report @dadepo

Maybe @guojidan you have time to look into this issue as you have been looking at UDFs recently 🙏

@guojidan
Copy link
Contributor

I will handle this issue 😄

@alamb
Copy link
Contributor

alamb commented Jan 15, 2024

cc @viirya as he fixed #8518

In fact, now that I write this I wonder if the issue @dadepo is hitting is actually the same as #8518 (which is not yet released, but should be in the next few days #8863 ) 🤔

@dadepo
Copy link
Author

dadepo commented Jan 15, 2024

In fact, now that I write this I wonder if the issue @dadepo is hitting is actually the same as #8518 (which is not yet released, but should be in the next few days #8863 ) 🤔

@alamb Could be. My issue is observed in UDFs, while the other ticket, the issue is seen in native functions. It could very well be the fix in the planned release fixes this also. I can actually confirm this in a bit.

@dadepo
Copy link
Author

dadepo commented Jan 15, 2024

@alamb I updated the repo to depend on main branch, see dadepo/df-repro@38acb9f

But when I ran it, I still got an output that indicates the issue still persists.

+-------+------+-----+-------+
| index | uint | int | float |
+-------+------+-----+-------+
| 1     | 2    | -2  | 1.0   |
| 2     | 3    | 3   | 3.3   |
| 3     |      |     |       |
+-------+------+-----+-------+
+---------------+
| random_normal |
+---------------+
| 38.0          |
| 38.0          |
| 38.0          |
+---------------+

@dadepo
Copy link
Author

dadepo commented Jan 15, 2024

@alamb I updated to include native random function dadepo/df-repro@00feaed

and running that I get

+---------------+--------------------+
| random_normal | native_random      |
+---------------+--------------------+
| 82.0          | 0.2617618435183655 |
| 82.0          | 0.3157177219693452 |
| 82.0          | 0.3350644908486391 |
+---------------+--------------------+

Which indicates that, this works fine with native random function.

Could it also be the way UDF's work? Because from my understanding when a udf has no arguments, when called, the arguments it is defined with is set to:

 [
    NullArray(1),
]

ie one Array, regardless of the number of rows in the table. Could this be a factor?

@viirya
Copy link
Member

viirya commented Jan 15, 2024

It is not because if it is volatile or not.

A ScalarUDF which is defined as no argument one or all scalar inputs, DataFusion will assume its output is a scalar (you also return a one element array from random_normal UDF, but even you don't, DataFusion will take 0-index element from it to create a ScalarValue as output).

That's why random_normal returns same value for all input rows in the batch (because it is treated as a scala udf returns a scalar value for all input rows).

@viirya
Copy link
Member

viirya commented Jan 15, 2024

If you want random_normal to not output scalar for all input rows, simply add an input argument e.g., index column to it.

@dadepo
Copy link
Author

dadepo commented Jan 15, 2024

If you want random_normal to not output scalar for all input rows, simply add an input argument e.g., index column to it.

But this means, passing an argument to a UDF, not because the function needs it, but because you want the UDF to be evaluated per output row in the query? This sounds like a limitation.

The native function did not require passing such an argument for it to be evaluated per output row, I was expecting there would be away to define a UDF with this behaviour.

@viirya
Copy link
Member

viirya commented Jan 15, 2024

Well, if you don't use make_scalar_function (The behavior I described above is defined here) to create the scalar function, you can define a ScalarUDF which can do what you want. For example, you can implement the UDF by implementing ScalarUDFImpl.

@viirya
Copy link
Member

viirya commented Jan 15, 2024

Or you want to stick with function definition instead of ScalarUDFImpl, you can also directly define your UDF in ScalarFunctionImplementation instead of using make_scalar_function.

@alamb
Copy link
Contributor

alamb commented Jan 16, 2024

Maybe we need to update the code that invokes a scalar UDF and if it is marked as volatile it needs to have its arguments expanded into an array before being invoked 🤔 (or know enough information so it can do that expansion itself)

@viirya
Copy link
Member

viirya commented Jan 16, 2024

I feel that it is not so reasonable to bind the nature of a volatile scalar function to some special way on expanding its arguments or how DataFusion treats its output. The definition of volatile scalar function doesn't include such spec so by doing this we might create something weird to understand by outside DataFusion.

The special treatment of argument/output is specified to make_scalar_function which is useful for special cases of built-in scalar function to save time on implementation. However this assumption doesn't apply generally to all use cases and that's the problem as users are easily to misuse it.

The good news is we have ScalarUDFImpl now and we are moving to deprecate previous function definition of scalar UDF. I think for this issue we just need to deprecate make_scalar_function and make its doc more clear as I proposed in the #8878.

@alamb
Copy link
Contributor

alamb commented Jan 16, 2024

I think the key thing a UDF writer needs to have is access to the number of output rows to produce. As long as they have this information we can leave it to them to implement the volatile semantics internally

@dadepo
Copy link
Author

dadepo commented Jan 28, 2024

I think the key thing a UDF writer needs to have is access to the number of output rows to produce.

@alamb Any pointers on how to get this info? I have switched to an implementation that does not use make_scalar_function but implements the UDF directly as ScalarFunctionImplementation but now run into a different situation, where

  • the udf only returns one row if used with no arguments (even if the context of the table it is used with has many rows)
  • the udf is used in the same query - with and without a column argument, then it fails with Error: ArrowError(InvalidArgumentError("all columns in a record batch must have the same length"), None)

@dadepo
Copy link
Author

dadepo commented Jan 28, 2024

Switched the implementation to ScalarUDFImpl and the documentation says:

    /// # Zero Argument Functions
    /// If the function has zero parameters (e.g. `now()`) it will be passed a
    /// single element slice which is a a null array to indicate the batch's row
    /// count (so the function can know the resulting array size).

See https://github.com/apache/arrow-datafusion/blob/51b898288830c224b825523f9be1d54974f15d2f/datafusion/expr/src/udf.rs#L257

When I dbg! the args passed to a zero parameter UDF I am implementing, I get

&args = [
    Scalar(
        NULL,
    ),
]

Which suggests to me, that a Scalar value is being passed, and not an Array (I was expecting null array) as suggested in the documentation, and hence the length cannot be deduced from this.

@alamb
Copy link
Contributor

alamb commented Jan 28, 2024

🤔 -- here is how the built in random function does this: https://github.com/apache/arrow-datafusion/blob/51b898288830c224b825523f9be1d54974f15d2f/datafusion/physical-expr/src/math_expressions.rs#L375-L383

And it seems to work as expected:

select x, random() from foo;
+---+---------------------+
| x | random()            |
+---+---------------------+
| 1 | 0.8579996222450448  |
| 2 | 0.11611126693245999 |
+---+---------------------+
2 rows in set. Query took 0.003 seconds.

I wonder if you can follow what random is doing / getting? If the scalar UDFs aren't working the same for some reason, we should fix it so they are.

I checked and I don't see any obvious special cases for Random 🤔

https://github.com/search?q=repo%3Aapache%2Farrow-datafusion+BuiltinScalarFunction%3A%3ARandom&type=code

@dadepo
Copy link
Author

dadepo commented Jan 28, 2024

So I updated the reproduction repo to implement ScalarUDFImpl and I fashioned the implementation after the built in random function.

See the diff here dadepo/df-repro@d97cf9c

When I run this, it fails:

    Running `target/debug/df-repro`
+-------+------+-----+-------+
| index | uint | int | float |
+-------+------+-----+-------+
| 1     | 2    | -2  | 1.0   |
| 2     | 3    | 3   | 3.3   |
| 3     |      |     |       |
+-------+------+-----+-------+
[src/main.rs:70] &args[0] = Scalar(
    NULL,
)
thread 'main' panicked at src/main.rs:74:25:
Opsies
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
...

And it does look like a ColumnarValue::Scalar(..) is being passed in instead of ColumnarValue::Array(..)

@viirya
Copy link
Member

viirya commented Jan 28, 2024

@alamb @dadepo I can reproduce the reported issue. Because it is a separate issue, I created #9032 for it. I also created a PR #9031 to fix it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants