Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EnforceSorting should not remove a needed coalesces #14637

Closed

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Feb 12, 2025

Which issue does this PR close?

I'll make a new issue, once we confirm this reproducer is a bug.

Closes: #14691

Rationale for this change

We have physical plans which are failing in the sanity checker. Taking the plan as it is when provided to the EnforceSorting, it is a valid plan (defined "valid" as passing the sanity check). After the EnforceSorting, it fails due to the removal of the needed coalesce.

I've added a temporary patch to our forked datafusion which adds to this conditional, in order to prevent the removal of the coalesce. However, I'm unclear whether a proper fix should be the replacement of the coalesce with an SPM at the correct location (instead of placing the SPM after the aggregation, which no longer fulfills the distribution requirements of the aggregate).

What changes are included in this PR?

Demonstrate the bug.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Feb 12, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 12, 2025

Asking for advice from @alamb, @mustafasrepo , or anyone else on the expected behavior. 🙏🏼

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld -- I agree this is a bug

cc @berkaysynnada

vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a bug -- the AggregateExec (with mode=SinglePartitioned) requires the input to have a single partition

In the input plan, CoalescePartitionsExec makes a single partition

Int he new plan, that CoalescePartitionsExec is removed incorrectly

I would expect that the AggregateExec reports that its input distribution requirement was required a single partition and that the enforce sorting pass would respect this

However, the actual AggregateExec::required_input_distribution seems somewhat more subtle.:

https://docs.rs/datafusion-physical-plan/45.0.0/src/datafusion_physical_plan/aggregates/mod.rs.html#812-824

I think it is saying the input needs to be hash partitioned by the group keys (which this plan has clearly violated)

Comment on lines +129 to 134
///
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
Copy link
Contributor Author

@wiedld wiedld Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a refactor. See this commit.

  • The parallelize_sorts and its helper remove_bottleneck_in_subplan removed more nodes than it should, including needed nodes. Those nodes were then added back in a few conditionals.
  • Instead, if I tightened up the context PlanWithCorrespondingCoalescePartitions and how it was built in update_coalesce_ctx_children -- then (a) it avoids excess node removal, and also (b) we no longer need to add back nodes later.

Comment on lines -301 to -377
} else if is_coalesce_partitions(&requirements.plan) {
// There is an unnecessary `CoalescePartitionsExec` in the plan.
// This will handle the recursive `CoalescePartitionsExec` plans.
} else if unneeded_coalesce {
requirements = remove_bottleneck_in_subplan(requirements)?;
// For the removal of self node which is also a `CoalescePartitionsExec`.
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;

Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
false,
vec![requirements],
),
))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the context refactor, the remove_bottleneck_in_subplan is no longer removing too many nodes.
As a result, here we no longer have to add back the coalesce again (old line 310).

@wiedld wiedld force-pushed the wiedld/enforce-sorting-removes-coalesce branch from 05ff553 to 411d882 Compare February 15, 2025 04:47
@wiedld wiedld force-pushed the wiedld/enforce-sorting-removes-coalesce branch from 411d882 to 89556c2 Compare February 15, 2025 04:49
);
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
// handle aggregates with input=hashPartitioning with a single output partition
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This single line is the fix for our found issue.
It's also isolated in it's own commit, with the fixed test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb -- lmk if you want the refactor to go in first, in a separate PR, before the the reproducer + fix PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I don't understand what this is asking -- if it is still relevant can you perhaps clarify what refactor you are referring to?

@wiedld wiedld marked this pull request as ready for review February 15, 2025 05:31
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The title of this PR is still

Demonstrate EnforceSorting can remove a needed coalesce #14637

But it seems like it is actually a bug fix

I'll make a new issue, once we confirm this reproducer is a bug.

Thank you -- that would be super helpful -- specifically some sort of end-user description would be most helpful. Like "what symptom in what circumstance would a user see this" (does it only affect systems like influxdb_iox that use low level plans?)

@xudong963
Copy link
Member

I plan to review it tomorrow or next Monday, thank you @wiedld

@wiedld wiedld changed the title Demonstrate EnforceSorting can remove a needed coalesce fix: EnforceSorting should not remove a needed coalesces Feb 15, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 16, 2025

I'll make a new issue, once we confirm this reproducer is a bug.

Thank you -- that would be super helpful -- specifically some sort of end-user description would be most helpful. Like "what symptom in what circumstance would a user see this" (does it only affect systems like influxdb_iox that use low level plans?)

See new issue #14691

/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
///
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
/// This requires that there are nodes between the [`SortExec`] and [`CoalescePartitionsExec`]

?

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I need better words. 😆

The context is made to find linked Sort->Coalesce cascades.

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

This linkage is then used to say "if we find a sort, remove the linked coalesces from the subplan". Specifically, this code. If the link is broken, a.k.a. if ctx.data=false, then stop going down the subplan looking for coalesce to remove.

So the link only exists as long as "no nodes" break the link.

Example of an unlinked Coalesce->Sort, since the aggregate requires the coalesce for single partitioned input:

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   AggregateExec  ctx.data=false, to stop the link
      ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a better way to say/explain this? 🙏🏼

Maybe I should add docs to the update_coalesce_ctx_children which constructs this context? Or the wording on the current docs for PlanWithCorrespondingCoalescePartitions? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got it.

Now I think current doc is very good. 😆

@xudong963
Copy link
Member

Will continue to review after working out.

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

Comment on lines -549 to -629
let mut new_reqs = requirements.update_plan_from_children()?;
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
let input_partitioning = repartition.input().output_partitioning();
// We can remove this repartitioning operator if it is now a no-op:
let mut can_remove = input_partitioning.eq(repartition.partitioning());
// We can also remove it if we ended up with an ineffective RR:
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
}
Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about identifying (and removing) Repartition->Coalesce->Repartition, to make it only a singular repartition.

Since the removal decisions were already being made when the context is built, I consolidated this removal decision to the same place (update_coalesce_ctx_children).

@alamb
Copy link
Contributor

alamb commented Feb 18, 2025

Thanks for working on this one. I am almost out of time today for reviews but I can take a look tomorrow (or feel free to merge it in and I will review afterwards)

Comment on lines 301 to 306
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " ...nodes requiring single partitioning..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think it's good to go if the PR blocks something from your side.

@berkaysynnada
Copy link
Contributor

I'd like to take a look at this tomorrow, please hold if it's not urgent.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Feb 22, 2025
@wiedld wiedld force-pushed the wiedld/enforce-sorting-removes-coalesce branch from b3b871f to 29d799c Compare February 22, 2025 00:20
Comment on lines 247 to 252
# TODO: Test regression on a new feature, with the same SanityCheckPlan failures as currently existing for this feature.
query error
SELECT x, y FROM t1 UNION ALL BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1;
----
1 1 NULL
3 3 NULL
3 3 NULL
NULL 2 2
DataFusion error: SanityCheckPlan
caused by
Copy link
Contributor Author

@wiedld wiedld Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file, there are 3 other SanityCheckPlan failures in the same (newly added) union-by-name feature.

I'm working on a fix for the union-by-name. So IMO this is not a blocker. Feedback?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wiedld and @xudong963

Something about the current fix doesn't seem right if it breaks a test (I realize the test was just recently added). My "gut feeling" (without actual data) is that the fix in this PR is overly specific somehow -- maybe we need to make it more general so that it works in more cases?

);
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
// handle aggregates with input=hashPartitioning with a single output partition
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I don't understand what this is asking -- if it is still relevant can you perhaps clarify what refactor you are referring to?

3 3 NULL
3 3 NULL
NULL 2 2
# TODO: Test regression on a new feature, with the same SanityCheckPlan failures as currently existing for this feature.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right to me -- this shows a regression (a query that was working is now no longer working)?

Doesn't that mean that the fix in this PR is likely not correct?

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem, and your solution seems very specific to this case, as you mentioned. If it doesn’t introduce a regression, I could approve it. However, I believe there must be a more elegant solution, and we should try to find it.

First, instead of blocking the removal of CoalescePartitions, I believe converting it to SortPreservingMerge is the correct behavior. The repartition_sorts flag is true by default, and I didn’t see any changes to it in the test setup --right? The core issue here seems to be that AggregateExec receives an ungrouped input. Therefore; the expected plan should be:

"SortPreservingMergeExec: [a@0 ASC]",
"  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"    AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
"      RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
"        ProjectionExec: expr=[a@0 as a, b@1 as value]",
"          UnionExec",
"            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",

Unfortunately, I can’t further debug the reproducer at the moment and can’t pinpoint the exact cause of the issue. However, if you need help, I’ll try to make time.

@wiedld
Copy link
Contributor Author

wiedld commented Feb 24, 2025

Thank you @berkaysynnada for the excellent feedback.

First, instead of blocking the removal of CoalescePartitions, I believe converting it to SortPreservingMerge is the correct behavior.

I like this idea. +1

I'm going to break out the refactor into a separate PR, from the bug fix, to hopefully help speed up the process.

@wiedld wiedld marked this pull request as draft February 24, 2025 15:18
@wiedld
Copy link
Contributor Author

wiedld commented Feb 27, 2025

Separate refactoring PR: #14907

Breaking this up further into even smaller chunks:
adding docs = #14907
adding test case demonstrating bug = #14919

@wiedld
Copy link
Contributor Author

wiedld commented Feb 27, 2025

Closing this in favor of smaller PRs, doing in piecemeal.

@wiedld wiedld closed this Feb 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParallelizeSorts, a subrule of EnforceSorting optimizer, should not remove necessary coalesce.
4 participants