-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Impl convert_to_state
for GroupsAccumulatorAdapter
(faster median for high cardinality aggregates)
#11827
Impl convert_to_state
for GroupsAccumulatorAdapter
(faster median for high cardinality aggregates)
#11827
Conversation
@@ -342,6 +374,50 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { | |||
fn size(&self) -> usize { | |||
self.allocation_bytes | |||
} | |||
|
|||
fn convert_to_state( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Rachelint
This might be interesting to you: #11825
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Rachelint
This might be interesting to you: #11825
Seems interesting, learning this pr (still not so familiar with arrow ops).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it takes some getting used to thinking in terms of Array
s and masks, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for delay, fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the quick way using filtered_null_mask
+ set_nulls
will just set the filtered row to be null, but not change the row num.
// `filtered_null_mask` + `set_nulls`
left: PrimitiveArray<Int32>
[
null,
null,
6,
null,
null,
null,
]
// `compute::filter`
right: PrimitiveArray<Int32>
[
null,
6,
]
Maybe this will make difference to the correctness for some accumulators? For example, an udf count which thinks a null row as 1?
f29d0bb
to
76f5aba
Compare
76f5aba
to
be5316f
Compare
be5316f
to
0396fc4
Compare
fc2a0b5
to
179f8f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Rachelint -- this looks very cool . I am sorry for the delay in the review
It is my understanding that this will allow aggregates that do not yet implement GroupsAccumulator to benefit from the intermediate aggregate state.
Thus the primary benefit of this code is to make aggregates on such queries faster.
Unfortunately I don't think we have any examples of such aggregates in the benchmarks (e.g. calculating median or approx_median). I will make a PR to add some to see if we can measure improvement of this PR
cc @korowa
Sounds great! And we can continue to improve the performance after having such benchmarks. |
I created a proposal in #12438 I tested a little locally on the PR here and it seems like this PR does not improve the performance much. |
Ok, I will check it in my local soon. |
It may be that the query doesn't show the correct pattern of high cardinality intermediate aggregates, btw. I am not sure |
I found I can' t run it successfully in my local... #12438 |
It is probably because there are 17M groups which each is holding multiple aggregates each with non trivial state 🤔 |
Yes... I found the state is much larger than the simple accumulators(e.g.
|
I create a subset of SELECT "WatchID", "ClientIP", COUNT(*) AS c, approx_median("ResponseStartTiming") tmed FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; |
This is the number from my local test with a subset(15%) of
|
I just ran this with some new queries on #12438 and this branch goes about 2x faster
./datafusion-cli-intermediate-state -f tq1.sql
DataFusion CLI v41.0.0
0 row(s) fetched.
Elapsed 0.019 seconds.
+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tp95 | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+
2 row(s) fetched.
Elapsed 5.361 seconds.
+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tmed | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+
2 row(s) fetched.
Elapsed 3.952 seconds.
andrewlamb@Andrews-MacBook-Pro-2 Downloads % datafusion-cli -f tq1.sql
datafusion-cli -f tq1.sql
DataFusion CLI v41.0.0
0 row(s) fetched.
Elapsed 0.020 seconds.
+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tp95 | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+
2 row(s) fetched.
Elapsed 9.777 seconds.
+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tmed | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+
2 row(s) fetched.
Elapsed 6.919 seconds. |
convert_to_state
for GroupsAccumulatorAdapter
.convert_to_state
for GroupsAccumulatorAdapter
(faster median for high cardinality aggregates)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Rachelint and @korowa
While I think the real way to make MEDIAN and APPROX_PERCENTILE_CONT etc faster is to implement GroupsAccumulator, this PR makes them faster for certain cases.
Nice work. Thanks again and sorry for the delay in reviewing while we sorted out benchmarking
let mut results = vec![]; | ||
for row_idx in 0..num_rows { | ||
// Create the empty accumulator for converting | ||
let mut converted_accumulator = (self.factory)()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a follow on PR I wonder if we could potentially to improve performance by adding a clear()
or reset()
type function to each accumulator to avoid having to create a new accumulator for each group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I want to reuse the converted_accumulator
at the beginning, but it is not ensure that the state will be reset after calling state
.
It is clever to add such function to do the reset work explicitly.
The 42.0.0 release has been cut -- let's start the code flowing! |
Thanks again @Rachelint |
Which issue does this PR close?
Closes #11819
Rationale for this change
See #11819
What changes are included in this PR?
Impl
convert_to_state
forGroupsAccumulatorAdapter
.Are these changes tested?
Test by exists.
Are there any user-facing changes?
No.