-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug Fix]: Deem hash repartition unnecessary when input and output has 1 partition #10095
Conversation
cc @echai58, this fix should solve the problem in the issue. Feel free to review, if you have time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix!
Thanks @mustafasrepo and @ozankabak |
…s 1 partition (apache#10095) * Add input partition number check * Minor changes
@mustafasrepo good catch on UNION -- I previously suggested that this kind of plan (with partitions > config.target_partitions) is "illegal" in DF. |
I agree, this behaviour is a bit counter intuitive. However, with current implementation of the |
11)------------ProjectionExec: expr=[1 as c, 3 as d] | ||
12)--------------PlaceholderRowExec | ||
|
||
query IIII |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this query is non determisitic and fails sometimes on CI as it doesn't have an ORDER BY
and isn't annotated with rowsort
. Here is a PR to fix that: #10120
Probably better solution would be planning union inputs execution according to total available partitions -- e.g select l_linenumber as f
from lineitem
union all
select l_orderkey as f
from lineitem with target_partitions = 4, could plan 2 threads for each ParquetExec (ideally we could also use byte/row statistics and plan according to them -- not only 2-2, but probably 1-3 if there is significant data skew across inputs/files). Currently, with target_partitions = 4, it's planned as 4 threads per ParquetExec, and 8 output partitions for UNION. And on top of it, when target_partitions is less then number of UNION inputs (e.g. UNION has 10 inputs, target_partitions = 4, and we need at least 1 thread for each input) there could be RepartitionExec. |
That might work. However, this approach cannot solve all cases I guess. For the following query select * from table
union all
select * from table
union all
select * from table when |
Hi guys, not familiar with datafusion's release process - is there a estimate of when this will be released in a new datafusion version? |
Which issue does this PR close?
Closes #9928.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?