-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: EnforceSorting should not remove a needed coalesces #14637
fix: EnforceSorting should not remove a needed coalesces #14637
Conversation
Asking for advice from @alamb, @mustafasrepo , or anyone else on the expected behavior. 🙏🏼 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @wiedld -- I agree this is a bug
vec![ | ||
"SortPreservingMergeExec: [a@0 ASC]", | ||
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", | ||
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is a bug -- the AggregateExec (with mode=SinglePartitioned)
requires the input to have a single partition
In the input plan, CoalescePartitionsExec
makes a single partition
Int he new plan, that CoalescePartitionsExec
is removed incorrectly
I would expect that the AggregateExec reports that its input distribution requirement was required a single partition and that the enforce sorting pass would respect this
However, the actual AggregateExec::required_input_distribution
seems somewhat more subtle.:
I think it is saying the input needs to be hash partitioned by the group keys (which this plan has clearly violated)
/// | ||
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). | ||
/// | ||
/// This requires a bottom-up traversal was previously performed, updating the | ||
/// children previously. | ||
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a refactor. See this commit.
- The
parallelize_sorts
and its helperremove_bottleneck_in_subplan
removed more nodes than it should, including needed nodes. Those nodes were then added back in a few conditionals. - Instead, if I tightened up the context
PlanWithCorrespondingCoalescePartitions
and how it was built inupdate_coalesce_ctx_children
-- then (a) it avoids excess node removal, and also (b) we no longer need to add back nodes later.
} else if is_coalesce_partitions(&requirements.plan) { | ||
// There is an unnecessary `CoalescePartitionsExec` in the plan. | ||
// This will handle the recursive `CoalescePartitionsExec` plans. | ||
} else if unneeded_coalesce { | ||
requirements = remove_bottleneck_in_subplan(requirements)?; | ||
// For the removal of self node which is also a `CoalescePartitionsExec`. | ||
requirements = requirements.children.swap_remove(0); | ||
requirements = requirements.update_plan_from_children()?; | ||
|
||
Ok(Transformed::yes( | ||
PlanWithCorrespondingCoalescePartitions::new( | ||
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))), | ||
false, | ||
vec![requirements], | ||
), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the context refactor, the remove_bottleneck_in_subplan
is no longer removing too many nodes.
As a result, here we no longer have to add back the coalesce again (old line 310).
05ff553
to
411d882
Compare
…text, selectively indicating when coalesce should be removed
411d882
to
89556c2
Compare
); | ||
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition) | ||
// handle aggregates with input=hashPartitioning with a single output partition | ||
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This single line is the fix for our found issue.
It's also isolated in it's own commit, with the fixed test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb -- lmk if you want the refactor to go in first, in a separate PR, before the the reproducer + fix PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I don't understand what this is asking -- if it is still relevant can you perhaps clarify what refactor you are referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The title of this PR is still
Demonstrate EnforceSorting can remove a needed coalesce #14637
But it seems like it is actually a bug fix
I'll make a new issue, once we confirm this reproducer is a bug.
Thank you -- that would be super helpful -- specifically some sort of end-user description would be most helpful. Like "what symptom in what circumstance would a user see this" (does it only affect systems like influxdb_iox that use low level plans?)
I plan to review it tomorrow or next Monday, thank you @wiedld |
See new issue #14691 |
/// ``` | ||
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. | ||
/// By performing sorting in parallel, we can increase performance in some scenarios. | ||
/// | ||
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] | |
/// This requires that there are nodes between the [`SortExec`] and [`CoalescePartitionsExec`] |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I need better words. 😆
The context is made to find linked Sort->Coalesce cascades.
SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
...nodes... ctx.data=true (e.g. are linked in cascade)
Coalesce ctx.data=true (e.g. is a coalesce)
This linkage is then used to say "if we find a sort, remove the linked coalesces from the subplan". Specifically, this code. If the link is broken, a.k.a. if ctx.data=false
, then stop going down the subplan looking for coalesce to remove.
So the link only exists as long as "no nodes" break the link.
Example of an unlinked Coalesce->Sort, since the aggregate requires the coalesce for single partitioned input:
SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
AggregateExec ctx.data=false, to stop the link
...nodes... ctx.data=true (e.g. are linked in cascade)
Coalesce ctx.data=true (e.g. is a coalesce)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be a better way to say/explain this? 🙏🏼
Maybe I should add docs to the update_coalesce_ctx_children
which constructs this context? Or the wording on the current docs for PlanWithCorrespondingCoalescePartitions
? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I got it.
Now I think current doc is very good. 😆
Will continue to review after working out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
let mut new_reqs = requirements.update_plan_from_children()?; | ||
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() { | ||
let input_partitioning = repartition.input().output_partitioning(); | ||
// We can remove this repartitioning operator if it is now a no-op: | ||
let mut can_remove = input_partitioning.eq(repartition.partitioning()); | ||
// We can also remove it if we ended up with an ineffective RR: | ||
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { | ||
can_remove |= *n_out == input_partitioning.partition_count(); | ||
} | ||
if can_remove { | ||
new_reqs = new_reqs.children.swap_remove(0) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is about identifying (and removing) Repartition->Coalesce->Repartition, to make it only a singular repartition.
Since the removal decisions were already being made when the context is built, I consolidated this removal decision to the same place (update_coalesce_ctx_children
).
Thanks for working on this one. I am almost out of time today for reviews but I can take a look tomorrow (or feel free to merge it in and I will review afterwards) |
/// ```text | ||
/// "SortExec: expr=\[a@0 ASC\]", | ||
/// " ...nodes requiring single partitioning..." | ||
/// " CoalescePartitionsExec", | ||
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", | ||
/// ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think it's good to go if the PR blocks something from your side.
I'd like to take a look at this tomorrow, please hold if it's not urgent. |
b3b871f
to
29d799c
Compare
# TODO: Test regression on a new feature, with the same SanityCheckPlan failures as currently existing for this feature. | ||
query error | ||
SELECT x, y FROM t1 UNION ALL BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; | ||
---- | ||
1 1 NULL | ||
3 3 NULL | ||
3 3 NULL | ||
NULL 2 2 | ||
DataFusion error: SanityCheckPlan | ||
caused by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, there are 3 other SanityCheckPlan failures in the same (newly added) union-by-name feature.
I'm working on a fix for the union-by-name. So IMO this is not a blocker. Feedback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wiedld and @xudong963
Something about the current fix doesn't seem right if it breaks a test (I realize the test was just recently added). My "gut feeling" (without actual data) is that the fix in this PR is overly specific somehow -- maybe we need to make it more general so that it works in more cases?
); | ||
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition) | ||
// handle aggregates with input=hashPartitioning with a single output partition | ||
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I don't understand what this is asking -- if it is still relevant can you perhaps clarify what refactor you are referring to?
3 3 NULL | ||
3 3 NULL | ||
NULL 2 2 | ||
# TODO: Test regression on a new feature, with the same SanityCheckPlan failures as currently existing for this feature. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right to me -- this shows a regression (a query that was working is now no longer working)?
Doesn't that mean that the fix in this PR is likely not correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem, and your solution seems very specific to this case, as you mentioned. If it doesn’t introduce a regression, I could approve it. However, I believe there must be a more elegant solution, and we should try to find it.
First, instead of blocking the removal of CoalescePartitions, I believe converting it to SortPreservingMerge is the correct behavior. The repartition_sorts
flag is true by default, and I didn’t see any changes to it in the test setup --right? The core issue here seems to be that AggregateExec receives an ungrouped input. Therefore; the expected plan should be:
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
Unfortunately, I can’t further debug the reproducer at the moment and can’t pinpoint the exact cause of the issue. However, if you need help, I’ll try to make time.
Thank you @berkaysynnada for the excellent feedback.
I like this idea. +1 I'm going to break out the refactor into a separate PR, from the bug fix, to hopefully help speed up the process. |
Closing this in favor of smaller PRs, doing in piecemeal. |
Which issue does this PR close?
I'll make a new issue, once we confirm this reproducer is a bug.Closes: #14691
Rationale for this change
We have physical plans which are failing in the sanity checker. Taking the plan as it is when provided to the
EnforceSorting
, it is a valid plan (defined "valid" as passing the sanity check). After theEnforceSorting
, it fails due to the removal of the needed coalesce.I've added a temporary patch to our forked datafusion which adds to this conditional, in order to prevent the removal of the coalesce. However, I'm unclear whether a proper fix should be the replacement of the coalesce with an SPM at the correct location (instead of placing the SPM after the aggregation, which no longer fulfills the distribution requirements of the aggregate).
What changes are included in this PR?
Demonstrate the bug.
Are these changes tested?
Are there any user-facing changes?