Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always add round robin repartitioning to leaves (data sources), benefitting unbalanced / small datasets #13707

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ config_namespace! {
///
/// When false, DataFusion will maximize plan parallelism using
/// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
pub prefer_existing_sort: bool, default = false
pub prefer_existing_sort: bool, default = true

/// When set to true, the logical plan optimizer will produce warning
/// messages if any optimization rules produce errors and then proceed to the next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,10 @@ fn add_roundrobin_on_top(
n_target: usize,
) -> Result<DistributionContext> {
// Adding repartition is helpful:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Adding repartition is helpful:
// Always perform repartitioning on data sources, as they may be imbalanced or consist of too many small partitions.

A 'why' comment here can help understanding

if input.plan.output_partitioning().partition_count() < n_target {
if input.plan.output_partitioning().partition_count() < n_target
|| (input.plan.children().is_empty()
&& input.plan.output_partitioning().partition_count() > 1)
{
// When there is an existing ordering, we preserve ordering
// during repartition. This will be un-done in the future
// If any of the following conditions is true
Expand Down Expand Up @@ -4541,7 +4544,10 @@ pub(crate) mod tests {
// Since at the start of the rule ordering requirement is satisfied
// EnforceDistribution rule satisfy this requirement also.
"SortRequiredExec: [a@0 ASC]",
// prefer_existing_sort is set to false, it will add another sort
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
"ParquetExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];

Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2239,10 +2239,9 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=false"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=false"];
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=false",];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
Expand Down Expand Up @@ -2295,18 +2294,16 @@ mod tests {

// Expected bounded results with and without flag
let expected_optimized_bounded = vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized_bounded_parallelize_sort = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
let (expected_input, expected_optimized, expected_optimized_sort_parallelize) =
if source_unbounded {
Expand Down
33 changes: 15 additions & 18 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true


query TT
Expand All @@ -65,12 +64,11 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

query TT
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
Expand All @@ -84,12 +82,11 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

query II
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
Expand Down
22 changes: 10 additions & 12 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3913,12 +3913,11 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
02)--CoalesceBatchesExec: target_batch_size=2
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true

# drop table multiple_ordered_table_with_pk
statement ok
Expand Down Expand Up @@ -3954,12 +3953,11 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
02)--CoalesceBatchesExec: target_batch_size=2
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true

statement ok
set datafusion.execution.target_partitions = 1;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_sort true
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.repartition_aggregations true
Expand Down Expand Up @@ -338,7 +338,7 @@ datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_sort true When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
Expand Down
Loading
Loading