Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Distribution error failing in the SanityCheck, for a specific influxql plan. #58

Draft
wants to merge 4 commits into
base: patched-df-dec25-ver44.0.0
Choose a base branch
from

Conversation

wiedld
Copy link
Collaborator

@wiedld wiedld commented Feb 10, 2025

Temporary fix for https://github.com/influxdata/influxdb_iox/issues/13310.

This PR branches off our current patched DF branch. It adds a few commits to handle the above issue.

Confirmed that it does fix this bug reproducer in iox.

Changes made

  • Commit 1 = recreate the insertion of the coalesce in the EnforceDistribution optimization pass.

    • Demonstrates that we get a coalesce (not a repartition) inserted into our plan.
    • In order to show this, I had to update the test suite to handle more "real world" parameters; which enables distribution decisions based on statistics.
  • Commit 2 = reproducer of SanityCheck failure after EnforceSorting removes the added coalesce

  • Commit 3 = a special cased fix that uses the checking of the AggregateExec. (a more general fix will occur upstream)

    • This fixes the test case seen in commit 2.
  • Commit 4 = unrelated. It's fixing a wasm build CI error by cherry-picking over the upstream fix.

@github-actions github-actions bot added the core label Feb 10, 2025
@wiedld wiedld force-pushed the dlw/debug-distribution-error branch from af2856a to 27f2a3a Compare February 12, 2025 03:01
* demonstrate the insertion of coalesce after the use of column estimates, and the removal of the test scenario's forcing of rr repartitioning
…the coalesce added in the EnforceDistribution
@wiedld wiedld force-pushed the dlw/debug-distribution-error branch from 27f2a3a to 202860b Compare February 12, 2025 15:51
@wiedld wiedld changed the title test: debugging distribution enforcement bug. fix: Distribution error failing in the SanityCheck, for a specific influxql plan. Feb 12, 2025
@@ -516,7 +516,7 @@ fn remove_bottleneck_in_subplan(
) -> Result<PlanWithCorrespondingCoalescePartitions> {
let plan = &requirements.plan;
let children = &mut requirements.children;
if is_coalesce_partitions(&children[0].plan) {
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
Copy link
Collaborator Author

@wiedld wiedld Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix, for now.

A proper fix (if we decide to fix at this conditional) should be comparing the partitioning needs of the parent (of coalesce) vs children of coalesce. My initial attempt to do so caused other DF tests to fail.

I didn't proceed further since I'm unsure of the correct solution at a higher level.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be checking that it is an aggregation and that that the aggregation mode is SinglePartitioned 🤔

@wiedld
Copy link
Collaborator Author

wiedld commented Feb 12, 2025

We have a bug where the SanityCheck fails due to mismatched distribution needs between a union and an aggregate. After chasing it down (and recreating in this PR), it appears that a change done in an earlier EnforceDistribution optimizer run -- is undone in the EnforceSorting optimizer run.

Here is the plan before the EnforceDistribution:

OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[t as iox::measurement, time@0 as time, sum(Value)@1 as Value]
      GapFillExec: group_expr=[time@0], aggr_expr=[sum(Value)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time_range=Included("0")..Included("59999999999")
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[sum(Value)]
          AggregateExec: mode=Partial, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
            SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[false]
              ProjectionExec: expr=[time@1 as time, f@0 as Value]
                UnionExec
                  ParquetExec: 
                    file_groups={1 group: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/b5e32e0e-ffa8-4831-aef2-4e16e32a1264.parquet]]}, projection=[f, time], predicate=time@2 >= 0 AND time@2 <= 59999999999 AND f@1 IS NOT NULL, pruning_predicate=time_null_count@1 != time_row_count@2 AND time_max@0 >= 0 AND time_null_count@1 != time_row_count@2 AND time_min@3 <= 59999999999 AND f_null_count@5 != f_row_count@4, required_guarantees=[]
                  FilterExec: f@0 IS NOT NULL
                    ProjectionExec: expr=[f@0 as f, time@1 as time]
                      DeduplicateExec: [x@2 ASC,time@1 ASC]
                        FilterExec: time@1 >= 0 AND time@1 <= 59999999999
                          RecordBatchesExec: chunks=1, projection=[f, time, x, __chunk_order]

Here is the plan after the EnforceDistribution. Note the insertion of the coalesce between the union and aggregate.

OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[t as iox::measurement, time@0 as time, sum(Value)@1 as Value]
      GapFillExec: group_expr=[time@0], aggr_expr=[sum(Value)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time_range=Included("0")..Included("59999999999")
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[sum(Value)]
          AggregateExec: mode=Partial, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
            SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[false]
              CoalescePartitionsExec
                ProjectionExec: expr=[time@1 as time, f@0 as Value]
                  UnionExec
                    ParquetExec: 
                        file_groups={1 group: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/b5e32e0e-ffa8-4831-aef2-4e16e32a1264.parquet]]}, projection=[f, time], predicate=time@2 >= 0 AND time@2 <= 59999999999 AND f@1 IS NOT NULL, pruning_predicate=time_null_count@1 != time_row_count@2 AND time_max@0 >= 0 AND time_null_count@1 != time_row_count@2 AND time_min@3 <= 59999999999 AND f_null_count@5 != f_row_count@4, required_guarantees=[]
                    FilterExec: f@0 IS NOT NULL
                      ProjectionExec: expr=[f@0 as f, time@1 as time]
                        DeduplicateExec: [x@2 ASC,time@1 ASC]
                          FilterExec: time@1 >= 0 AND time@1 <= 59999999999
                            RecordBatchesExec: chunks=1, projection=[f, time, x, __chunk_order]

The partial and final aggregates are later combined in the CombinePartialFinalAggregate. The plan then passed to EnforceSorting is:

OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[t as iox::measurement, time@0 as time, sum(Value)@1 as Value]
      GapFillExec: group_expr=[time@0], aggr_expr=[sum(Value)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time_range=Included("0")..Included("59999999999")
        AggregateExec: mode=SinglePartitioned, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
          SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[false]
            CoalescePartitionsExec
              ProjectionExec: expr=[time@1 as time, f@0 as Value]
                UnionExec
                  ParquetExec: file_groups={1 group: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/0d80ea75-da4d-4d01-ba58-5169be3df839.parquet]]}, projection=[f, time], predicate=time@2 >= 0 AND time@2 <= 59999999999 AND f@1 IS NOT NULL, pruning_predicate=time_null_count@1 != time_row_count@2 AND time_max@0 >= 0 AND time_null_count@1 != time_row_count@2 AND time_min@3 <= 59999999999 AND f_null_count@5 != f_row_count@4, required_guarantees=[]
                  FilterExec: f@0 IS NOT NULL
                    ProjectionExec: expr=[f@0 as f, time@1 as time]
                      DeduplicateExec: [x@2 ASC,time@1 ASC]
                        FilterExec: time@1 >= 0 AND time@1 <= 59999999999
                          RecordBatchesExec: chunks=1, projection=[f, time, x, __chunk_order]

EnforceSorting removes the needed coalesce, and replaces with an SPM. But the SPM is inserted further up the plan (above the aggregate node):

OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[t as iox::measurement, time@0 as time, sum(Value)@1 as Value]
      GapFillExec: group_expr=[time@0], aggr_expr=[sum(Value)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time_range=Included("0")..Included("59999999999")
        SortPreservingMergeExec: [time@0 ASC]
          SortExec: expr=[time@0 ASC], preserve_partitioning=[true]
            AggregateExec: mode=SinglePartitioned, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
              ProjectionExec: expr=[time@1 as time, f@0 as Value]
                UnionExec
                  ParquetExec: file_groups={1 group: [[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/0d80ea75-da4d-4d01-ba58-5169be3df839.parquet]]}, projection=[f, time], predicate=time@2 >= 0 AND time@2 <= 59999999999 AND f@1 IS NOT NULL, pruning_predicate=time_null_count@1 != time_row_count@2 AND time_max@0 >= 0 AND time_null_count@1 != time_row_count@2 AND time_min@3 <= 59999999999 AND f_null_count@5 != f_row_count@4, required_guarantees=[]
                  FilterExec: f@0 IS NOT NULL
                    ProjectionExec: expr=[f@0 as f, time@1 as time]
                      DeduplicateExec: [x@2 ASC,time@1 ASC]
                        SortExec: expr=[x@2 ASC, time@1 ASC, __chunk_order@3 ASC], preserve_partitioning=[false]
                          FilterExec: time@1 >= 0 AND time@1 <= 59999999999
                            RecordBatchesExec: chunks=1, projection=[f, time, x, __chunk_order]

This then fails the SanityCheck plan.

Possible solutions

This PR is about demonstrating how the error occurred, (based upon changes is the 2 optimizer runs), and adds a temporary fix to unblock iox. As for the proper fix, I'm unclear on what to do. Ideas include, but are not limited to:

  • have the EnforceSorting selectively not remove the coalesce
  • let the EnforceSorting remove it, but update the code to add the SPM in the correct place
    • right now the decision to remove, vs to add the SPM, are in different transformations of the plan. We could either change the rules of the SPM insertion, or alternatively, combine the SPM insertion with the coalesce removal.

There could also be other solutions. I'll make an upstream PR and ping for advice.

Comment on lines -1853 to -1854
// Use a small batch size, to trigger RoundRobin in tests
config.execution.batch_size = 1;
Copy link
Collaborator Author

@wiedld wiedld Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the removal of the forced insertion of round robin repartitioning. This was preventing the coalesce insertion (as seen in our real world reproducer).

Comment on lines +1587 to +1599
pub(crate) fn parquet_exec_with_stats() -> Arc<ParquetExec> {
let mut statistics = Statistics::new_unknown(&schema());
statistics.num_rows = Precision::Inexact(10);
statistics.column_statistics = column_stats();

let config =
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 10000))
.with_statistics(statistics);
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));

ParquetExec::builder(config).build_arc()
}
Copy link
Collaborator Author

@wiedld wiedld Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we are now providing statistics in the parquet exec.

This was preventing the coalesce insertion (as seen in our real world reproducer). In the absence of parquet stats, we always has a rr repartition because the roundrobin_beneficial_stats was always evaluating to true.

.optimize(optimized, &Default::default())
.unwrap_err();
assert!(err.message().contains(" does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"));
let checker = checker.optimize(optimized, &Default::default());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of running the SanityChecker as part of the tests

Copy link
Collaborator

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one suggestion about making the check even more specific, but otherwise I think this is great

Thanks @wiedld

Comment on lines +3740 to +3745
/// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection
/// as well between the union and aggregate. This change the outcome:
///
/// * we no longer get repartitioning, and instead get coalescing.
#[test]
fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> {
Copy link
Collaborator Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how we are getting coalesce inserted in the EnforceDistribution run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants