Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelizeSorts, a subrule of EnforceSorting optimizer, should not remove necessary coalesce. #14691

Closed
wiedld opened this issue Feb 16, 2025 · 5 comments
Assignees
Labels
bug Something isn't working

Comments

@wiedld
Copy link
Contributor

wiedld commented Feb 16, 2025

Describe the bug

During the EnforceSorting optimizer run, a valid plan may be turned invalid due to the removal of a necessary coalesce. The result is a planning time failure in the SanityChecker due to does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2).

We start with a valid input plan:

"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
"    CoalescePartitionsExec",
"      ProjectionExec: expr=[a@0 as a, b@1 as value]",
"        UnionExec",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"

And a coalesce is removed to make it invalid:

"SortPreservingMergeExec: [a@0 ASC]",
"  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"    AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
"      ProjectionExec: expr=[a@0 as a, b@1 as value]",
"        UnionExec",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",

To Reproduce

A test case demonstrates this: 670eff3

We discovered this bug using our own constructed (and valid) LogicalPlans, which were converted to a physical plan using the default planner, and then had a coalesce inserted during the EnforceDistribution optimizer pass (to handle the UnionExec -> AggregateExec distribution requirements). The EnforceSorting would then remove the coalesce, thereby rendering the plan invalid once again. We do not feel this is unique to our plans, rather, that this behavior in the EnforceSorting is a bug which could impact others too.

Expected behavior

EnforceSorting should not take a valid plan, and make it invalid -- thereby causing failure in the planning sanity check.

Additional context

We already have a proposed solution: #14637

While debugging, I did a minor refactor to paralelize_sorts and its helper remove_bottleneck_in_subplan. The reason for the refactor (also summarized here), was that I noticed a pattern of several necessary nodes being removed -- and then added back later. I elected to simplify the code by tightening up how we build the PlanWithCorrespondingCoalescePartitions, in order to correctly identify what nodes should be removed in the first place -- and then only removing those nodes. The refactor is isolated in this commit: 0661ed7

Update:

  • not refactoring the building of the coalesce context.
  • instead, we have added docs to EnforceSorting, it's subrules, and some of the helper methods.
  • still have a refactor of the pushdown_sorts subrule.
@wiedld wiedld added the bug Something isn't working label Feb 16, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 16, 2025

take

@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

I believe in https://github.com/apache/datafusion/pull/14919/files#r1975931108 @berkaysynnada is saying that the input plan is not valid:

"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", < --- since this is SinglePartitioned it should have more than one input partition
"    CoalescePartitionsExec",
"      ProjectionExec: expr=[a@0 as a, b@1 as value]",
"        UnionExec",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"

@wiedld
Copy link
Contributor Author

wiedld commented Feb 28, 2025

Update:

@wiedld wiedld closed this as completed Feb 28, 2025
@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

So does this mean that whatever issue we were hitting has been fixed on main (in Datafusion 46?)

Update:

@wiedld
Copy link
Contributor Author

wiedld commented Feb 28, 2025

So does this mean that whatever issue we were hitting has been fixed on main (in Datafusion 46?)

yes, it should be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants