-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batching support for row-based bounded window functions #9973
Batching support for row-based bounded window functions #9973
Conversation
GpuBatchedBoundedWindowExec is currently identical to GpuWindowExec, in that it does no batching yet. After rerouting, the tests all seem to still pass.
Compiling. Yet to test. Signed-off-by: MithunR <[email protected]>
Also built safety guards to disable optimization for very large window extents.
Plus, some minor reformatting.
Still WIP. Need to sort out the |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala
Outdated
Show resolved
Hide resolved
override def next(): ColumnarBatch = { | ||
var outputBatch: ColumnarBatch = null | ||
while (outputBatch == null && hasNext) { | ||
withResource(getNextInputBatch) { inputCbSpillable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have withRetry put in here somewhere. The hard part is making sure that we can roll back any of the caching.
We can calculate/get the inputRowCount
, noMoreInput
and numUnprocessedInCache
without needing to get the input batch from inputCbSpillable
so that might make it simpler to add in the retry logic.
I am fine if this is a follow on issue, but we need it fixed at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have to address this in a follow-up. I'm still trying to sort out the missing rows problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#10046 will address the withRetry
part of the problem.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala
Outdated
Show resolved
Hide resolved
This now allows for `LEAD()`, `LAG()`, and regular window functions with negative values for `preceding`,`following` window bounds.
This commit fixes the batching. The new exec should not have to receive batched input.
Build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
1. Renamed config. '.extent' to '.max'. 2. Fixed documentation for said config. 3. Removed TODOs that were already handled.
I'll rebase and retest this shortly, after regenerating docs. |
Build |
There seems to be a bug in the handling for |
The failing test is a weird one. I've boiled it down to the following: @ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1g'], ids=idfn)
@pytest.mark.parametrize('a_b_gen', [long_gen], ids=meta_idfn('partAndOrderBy:'))
@pytest.mark.parametrize('c_gen', [StructGen(children=[['child_int', IntegerGen()]])], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_myth_repro(a_b_gen, c_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
base_window_spec = Window.partitionBy('a').orderBy('b', 'c')
def do_it(spark):
# Repro FAIL!
df = spark.read.parquet("/tmp/repro_input_cpu_parquet")
df = df.withColumn('row_num', f.row_number().over(base_window_spec))
df = df.withColumn('lead_def_c', f.lead('c', 2, None).over(base_window_spec))
return df
# Repro: WORKING!
# return df.selectExpr(
# "ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) row_num",
# "LEAD(C, 2, NULL) OVER (PARTITION BY a ORDER BY b, c) lead_def_c"
# )
assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) Calling
This is very odd. Instrumentation in the This does not happen from SQL, for the same query: SELECT ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) row_num,
LEAD(c, 2, NULL) OVER(PARTITION BY a ORDER BY b, c) lead_def_c
FROM my_repro_table Nor does this repro from the command line. Nor with |
This is really odd because ROW_NUMBER is supposed to be a running window agg, so they should not be in the same window operation at all. Unless it is something to do with lead by itself being a problem. |
You're right about that. And the plan does indicate that these operations are addressed in separate execs: == Physical Plan ==
GpuColumnarToRow false
+- GpuBatchedBoundedWindow [a#212L, b#213L, c#214, gpulead(c#214, 2, null) gpuwindowspecdefinition(a#212L, b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST, gpuspecifiedwindowframe(RowFrame, 2, 2)) AS lead_def_c#225, row_num#219], [a#212L], [b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST]
+- GpuRunningWindow [a#212L, b#213L, c#214, gpurownumber$() gpuwindowspecdefinition(a#212L, b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(currentrow$()))) AS row_num#219], [a#212L], [b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST]
... The progress has been slow, but I was wrong about the following:
I have a repro from the shell, not just from The operations work fine individually. 😕 |
@revans2 has cracked it: Looks like reordering the execs causes the output columns to be reordered as well. I'm testing out the fix. I should have an update to this PR shortly. |
Build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only request is that we have a follow on issue to add retry to this.
I'll raise one and start on it shortly. Edit: I have filed #10046 for the follow on. |
Thank you for the review and guidance, @revans2. This has been merged. |
Fixes #1860.
This commit adds support for batched processing of window aggregations where the window-extents are row-based and (finitely) bounded.
Example query:
The algorithm is described at length in #1860. In brief:
GpuBatchedBoundedWindowExec
is used to batch the input into chunks that fit into GPU memory.Note that window bounds might be specified with negative offsets. These are also supported. As a consequence,
LEAD()
andLAG()
are supported as well.This implementation falls back to unbatched processing (via
GpuWindowExec
) if a window's preceding/following bounds exceeds a configurable maximum (defaulting to 100 rows in either direction). This may be reconfigured via: