-
Notifications
You must be signed in to change notification settings - Fork 914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic multi-partition GroupBy
support to cuDF-Polars
#17503
base: branch-25.02
Are you sure you want to change the base?
Add basic multi-partition GroupBy
support to cuDF-Polars
#17503
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
/ok to test |
# Check that we are grouping on element-wise | ||
# keys (is this already guaranteed?) | ||
for ne in ir.keys: | ||
if not isinstance(ne.value, Col): # pragma: no cover | ||
return _single_fallback(ir, children, partition_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by elementwise keys? It's certainly not the case that we always group on columns. But I think it is the case that the group keys (if expressions) are trivially elementwise (e.g. a + b
as a key is fine, but a.unique()
or a.sort()
is not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I'm being extra cautious by requiring the keys to be Col
. This comment is essentially asking: "can we drop this check altogether? ie. Will the keys always be element-wise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so, yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened pola-rs/polars#20152 as well
agg_requests_pwise = [] # Partition-wise requests | ||
agg_requests_tree = [] # Tree-node requests | ||
|
||
for ne in ir.agg_requests: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to think about this (and possibly reorganise what we're doing in the single-partition case) to make this easier to handle.
For example, I think it is going to do the wrong thing for .agg(a.max() + b.min())
I think what you're trying to do here is turn a GroupBy(df, keys, aggs)
into Reduce(LocalGroupBy(df, keys, agg_exprs), keys, transformed_aggs)
And what does this look like, I think once we've determined the "leaf" aggregations we're performing (e.g. col.max()
) then we must concat and combine to get the full leaf aggregations, followed by evaluation of the column expressions that produce the final result.
So suppose we have determined what the leaf aggs are, and then what the post-aggregation expressions are, for a single-partition this is effectively Select(GroupBy(df, keys, leaf_aggs), keys, post_agg_exprs)
where post_agg_exprs
are all guaranteed elementwise (for now).
thought: Would it be easier for you here if the GroupBy
IR nodes really only held aggregation expressions that are "leaf" aggregations (with the post-processing done in a Select
)?
I think it would, because then the transform becomes something like:
Select(
GroupByCombine(GroupBy(df, keys, leaf_aggs), keys, post_aggs),
keys, post_agg_exprs
)
Where groupbycombine
emits the tree-reduction tasks with the post aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: Would it be easier for you here if the GroupBy IR nodes really only held aggregation expressions that are "leaf" aggregations (with the post-processing done in a Select)?
I'm pretty sure the answer is "yes" :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick follow-up: I totally agree that we probably want to revise the upstream GroupBy
design to make the decomposition here a bit simpler. With that said, I don't think we are doing anything "wrong" here. Rather, the code would just need to become unnecessarily messy if we wanted to do much more than "simple" mean/count/min/max aggregations.
For example, I think it is going to do the wrong thing for .agg(a.max() + b.min())
We won't do the "wrong" thing here - We will just raise an error. E.g.:
polars.exceptions.ComputeError: NotImplementedError: GroupBy does not support multiple partitions for this expression:
BinOp(<pylibcudf.types.DataType object at 0x7f06ebcc63b0>, <binary_operator.ADD: 0>, Cast(<pylibcudf.types.DataType object at 0x7f06ebcc63b0>, Agg(<pylibcudf.types.DataType object at 0x7f06ebcc6370>, 'max', False, Col(<pylibcudf.types.DataType object at 0x7f06ebcc6370>, 'x'))), Agg(<pylibcudf.types.DataType object at 0x7f06ebcc63b0>, 'max', False, Col(<pylibcudf.types.DataType object at 0x7f06ebcc63b0>, 'z')))
This PR is pretty-much "ready" - I don't think it makes sense to build more groupby logic directly on top of this. It would be much better to revise the underlying |
Description
Adds multi-partition support for simple
GroupBy
aggregations (following the same design as #17441)Checklist