-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Optimize SortPreservingMergeExec
to avoid merging non-overlapping partitions
#13296
base: main
Are you sure you want to change the base?
Conversation
// First Fit: | ||
// * Choose the first file group that a file can be placed into. | ||
// * If it fits into no existing file groups, create a new one. | ||
// | ||
// By sorting files by min values and then applying first-fit bin packing, | ||
// we can produce the smallest number of file groups such that | ||
// files within a group are in order and non-overlapping. | ||
// | ||
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8 | ||
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this and the relevant code into a new method, MinMaxStatistics::first_fit
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> { | ||
Ok(vec![ | ||
self.statistics()?; | ||
self.properties().partitioning.partition_count() | ||
]) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stated in the PR description, this is what a proposed API would look like for statistics by partition, though it is certainly not final.
// Helper function to get min/max statistics | ||
let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> { | ||
Ok(projected_statistics | ||
.iter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this code later so it uses the projected statistics, i.e. it only relies on stats for sorting columns. I was seeing the code error because some columns had unknown statistics. Hopefully this will reduce such cases.
SortPreservingMergeExec
to avoid merging non-overlapping partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @suremarc -- this looks like a great start to me
/// into chains, such that elements in a chain are non-overlapping and ordered | ||
/// amongst one another. | ||
/// This bin-packing is optimal in the sense that it has the fewest number of chains. | ||
pub fn first_fit(&self) -> Vec<Vec<usize>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know there are no overlapping ranges here? It seems like we would also have to check if the ranges overlapped and if any did we can't do this packing
This may be checked elsewhere but I didn't see it in a cursory glance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no ranges overlapped, they could all be ordered into a single chain. If some ranges do overlap, any ranges that overlap get placed into separate chains. The check for non-overlapping-ness happens in this logic:
datafusion/datafusion/physical-plan/src/statistics.rs
Lines 286 to 294 in 31d3716
let chain_to_insert = chains.iter_mut().find(|chain| { | |
// If our element is non-overlapping and comes _after_ the last element of the chain, | |
// it can be added to this chain. | |
min > self.max( | |
*chain | |
.last() | |
.expect("groups should be nonempty at construction"), | |
) | |
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see 👍
query TT | ||
EXPLAIN | ||
select a from t WHERE partition = 1 | ||
UNION all | ||
select a from t WHERE partition = 2 | ||
ORDER BY a; | ||
---- | ||
logical_plan | ||
01)Sort: t.a ASC NULLS LAST | ||
02)--Union | ||
03)----TableScan: t projection=[a], full_filters=[t.partition = Int32(1)] | ||
04)----TableScan: t projection=[a], full_filters=[t.partition = Int32(2)] | ||
physical_plan | ||
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], partition_groups=[[2,0],[1]] | ||
02)--UnionExec | ||
03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] | ||
04)----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=2/2_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @alamb I was able to implement the statistics_by_partition
for ParquetExec
and UnionExec
and I wrote a little test. It seems to work 🎉
In this case, files 0 (partition=1/1_1.parquet
) and 2 (partition=2/2_2.parquet
) are non-overlapping, but file 1 (partition=1/1_2.parquet
) overlaps file 0, so it gets placed into another chain (group).
I noticed int32 columns didn't seem to have working parquet statistics, so I used a string column. Seems like we will need to plug a lot of holes to make this feature complete.
The implementation is really nice. let result = StreamingMergeBuilder::new()
.with_streams(inputs)
.with_statistics_by_stream(stats)
.build(); // Concat non-overlapping input streams here Now |
FYI an update here is that I don't think I am going to be able to work on Statistics for the next month or two. Though I think @mhilton from InfluxData was thinking of potentially helping (🎣 ). |
Ok. My team is pretty eager to get this optimization in before February-ish, so I think we may be able to spare a helper or two for the statistics-related changes. But obviously that would require someone available to review, also I think we would need a resolution in #13293 before proceeding. |
This didn't occur to me but I think it would be a great change. On the other hand I'm considering if it would make sense in a follow-on PR. But in any case there's a lot of statistics-related work that will need to be done before this PR is mergeable, unfortunately. |
I will find time to review / help it along. Also, given you are willing to help I will help drive a resolution on #13293 |
Which issue does this PR close?
Closes #10316.
Rationale for this change
What changes are included in this PR?
This PR uses the existing
MinMaxStatistics
to reorder the input streams into chains of non-overlapping streams based on the statistics knowledge of its input. This requires some changes:ExecutionPlan::statistics_by_partition
MinMaxStatistics
type todatafusion-physical-plan
MinMaxStatistics
code assumes that the statistics give precise bounds and not (potentially overzealous) estimates.Are these changes tested?
Yes, there is a new sqllogictest,
optimize_sort_preserving_merge.slt
Are there any user-facing changes?
The
MinMaxStatistics
API is made public, as otherwise we can't use it in thecore
crate where it previously was being used.There is a new
ExecutionPlan::statistics_by_partition
method with a default implementation, but it is not breaking.There are also the new
Statistics::merge
andColumnStatistics::merge
functions.