Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not add redundant subquery ordering into plan #30

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d5d6cda
do not add redundant subquery ordering into plan
mertak-synnada Aug 6, 2024
eeabaf1
format code
mertak-synnada Aug 6, 2024
85e23e7
add license
mertak-synnada Aug 6, 2024
bd63098
fix test cases with sort plan removing
mertak-synnada Aug 6, 2024
930c204
fix comment
mertak-synnada Aug 6, 2024
9d5f875
keep sorting on ordering mode test cases
mertak-synnada Aug 6, 2024
2d40a8d
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
b6bc6d4
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
510b16c
Tmp
mustafasrepo Aug 7, 2024
6ef4369
Minor changes
mustafasrepo Aug 7, 2024
c3efafc
Minor changes
mustafasrepo Aug 7, 2024
2d1b48f
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
2bf220d
Minor changes
mustafasrepo Aug 7, 2024
eb83917
Implement top down recursion with delete check
mustafasrepo Aug 7, 2024
0b66b15
Minor changes
mustafasrepo Aug 7, 2024
9d3a972
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
c769f9f
Minor changes
mustafasrepo Aug 7, 2024
07dca3a
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
9192ca9
initialize fetch() api for execution plan
mertak-synnada Aug 7, 2024
0ad7063
Address reviews
mustafasrepo Aug 7, 2024
3661f06
Update comments
mustafasrepo Aug 7, 2024
60967c1
Minor changes
mustafasrepo Aug 7, 2024
6b87c4c
Make test deterministic
mustafasrepo Aug 7, 2024
a029d6f
add supports limit push down to union exec
mertak-synnada Aug 8, 2024
74041e7
support limit push down with multi children cases
mertak-synnada Aug 8, 2024
1d73ddb
fix typos
mertak-synnada Aug 8, 2024
8dd7e0a
Add fetch info to the statistics
mustafasrepo Aug 8, 2024
23a33df
optimize tpch test plans
mertak-synnada Aug 8, 2024
15423ae
Enforce distribution use inexact count estimate also.
mustafasrepo Aug 8, 2024
94fb83d
Minor changes
mustafasrepo Aug 8, 2024
9053b9f
Minor changes
mustafasrepo Aug 8, 2024
54fc4b2
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 8, 2024
501f403
Merge branch 'refs/heads/bug_fix/enforce_sorting' into feature/Sort-F…
mertak-synnada Aug 8, 2024
ebda00a
Merge remote-tracking branch 'refs/remotes/origin/apache_main' into f…
mertak-synnada Aug 14, 2024
27342ff
merge with apache main
mertak-synnada Aug 14, 2024
1d04db8
format code
mertak-synnada Aug 14, 2024
ec67b36
fix doc paths
mertak-synnada Aug 14, 2024
4564f4b
fix doc paths
mertak-synnada Aug 14, 2024
8e7d1df
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 14, 2024
b139b02
remove redundant code block
mertak-synnada Aug 14, 2024
4472e15
if partition count is 1 put GlobalLimitExec
mertak-synnada Aug 14, 2024
eb96912
fix test cases
mertak-synnada Aug 14, 2024
128676e
Apply suggestions from code review
ozankabak Aug 14, 2024
782487c
fix syntax errors
mertak-synnada Aug 14, 2024
ff227a2
Simplify branches
ozankabak Aug 15, 2024
07820cc
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 19, 2024
c6a9abc
remove redundant limit plans from merge
mertak-synnada Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tmp
  • Loading branch information
mustafasrepo committed Aug 7, 2024
commit 510b16c8f4aaaf434ee63e94d7215cfe578afba3
65 changes: 59 additions & 6 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::{displayable, ExecutionPlanProperties};
use datafusion_physical_expr::Partitioning;

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::izip;
Expand Down Expand Up @@ -148,6 +149,13 @@ fn update_coalesce_ctx_children(
};
}

fn print_plan(plan: &Arc<dyn ExecutionPlan>) {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
println!("{:#?}", actual);
}


/// The boolean flag `repartition_sorts` defined in the config indicates
/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to
Expand All @@ -162,6 +170,10 @@ impl PhysicalOptimizerRule for EnforceSorting {
// Execute a bottom-up traversal to enforce sorting requirements,
// remove unnecessary sorts, and optimize sort-sensitive operators:
let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
if false {
println!("After ensure sorting");
print_plan(&adjusted.plan);
}
let new_plan = if config.optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
Expand All @@ -172,7 +184,10 @@ impl PhysicalOptimizerRule for EnforceSorting {
} else {
adjusted.plan
};

if false {
println!("After parallelize_sorts");
print_plan(&new_plan);
}
let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
.transform_up(|plan_with_pipeline_fixer| {
Expand Down Expand Up @@ -513,8 +528,20 @@ fn remove_corresponding_coalesce_in_sub_plan(
})
.collect::<Result<_>>()?;
}

requirements.update_plan_from_children()
let mut new_req = requirements.update_plan_from_children()?;
if let Some(repartition) = new_req.plan.as_any().downcast_ref::<RepartitionExec>() {
let mut can_remove = false;
if repartition.input().output_partitioning().eq(repartition.partitioning()) {
// Their partitioning same
can_remove = true;
} else if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning(){
can_remove = *n_out == repartition.input().output_partitioning().partition_count();
}
if can_remove {
new_req = new_req.children.swap_remove(0)
}
}
Ok(new_req)
}

/// Updates child to remove the unnecessary sort below it.
Expand All @@ -540,8 +567,12 @@ fn remove_corresponding_sort_from_sub_plan(
requires_single_partition: bool,
) -> Result<PlanWithCorrespondingSort> {
// A `SortExec` is always at the bottom of the tree.
if is_sort(&node.plan) {
node = node.children.swap_remove(0);
if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>(){
if sort_exec.fetch().is_none() {
node = node.children.swap_remove(0);
} else {
// Do not remove the sort with fetch
}
} else {
let mut any_connection = false;
let required_dist = node.plan.required_input_distribution();
Expand Down Expand Up @@ -1049,6 +1080,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_sort6() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
// let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source);
let input = Arc::new(SortExec::new(vec![sort_expr("non_nullable_col", &schema)], source).with_fetch(Some(2)));
let physical_plan = sort_exec(vec![sort_expr("non_nullable_col", &schema), sort_expr("nullable_col", &schema)], input);

let expected_input = [
"SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_spm1() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
29 changes: 27 additions & 2 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::displayable;

/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
Expand All @@ -54,6 +55,12 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
}
}

fn print_plan(plan: &Arc<dyn ExecutionPlan>) {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
println!("{:#?}", actual);
}

pub(crate) fn pushdown_sorts(
mut requirements: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
Expand All @@ -64,11 +71,13 @@ pub(crate) fn pushdown_sorts(
.ordering_satisfy_requirement(parent_reqs);

if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
println!("sort operator");
print_plan(&plan);
let required_ordering = plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)
.unwrap_or_default();

println!("required_ordering: {:?}", required_ordering);
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
let fetch = sort_exec.fetch();
Expand All @@ -85,9 +94,13 @@ pub(crate) fn pushdown_sorts(
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
grand_child.data = order;
}
println!("pushed down requirements plan");
print_plan(&requirements.plan);
println!("pushed down child plan");
print_plan(&child.plan);
// Can push down requirements
child.data = None;
return Ok(Transformed::yes(child));
requirements = child;
} else {
// Can not push down requirements
requirements.children = vec![child];
Expand Down Expand Up @@ -132,6 +145,18 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
println!("trying to pushdown sort exec");
println!("parent_required: {:?}", parent_required);
println!("sort_exec.properties().eq_properties: {:?}", sort_exec.properties().eq_properties);
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_exec.properties().output_ordering().unwrap_or(&[]));
if sort_exec.properties().eq_properties.requirements_compatible(parent_required, &sort_req){
println!("compatible, parent_required: {:?}", parent_required);
debug_assert!(!parent_required.is_empty());
Ok(Some(vec![Some(parent_required.to_vec())]))
} else {
Ok(None)
}
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
Expand Down
22 changes: 11 additions & 11 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1777,17 +1777,17 @@ physical_plan
02)--AggregateExec: mode=Final, gby=[], aggr=[count(*)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2
06)----------ProjectionExec: expr=[]
07)------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
08)--------------CoalesceBatchesExec: target_batch_size=4096
09)----------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
10)------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
11)--------------------ProjectionExec: expr=[c1@0 as c1]
12)----------------------CoalesceBatchesExec: target_batch_size=4096
13)------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
15)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
05)--------ProjectionExec: expr=[]
06)----------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
07)------------CoalesceBatchesExec: target_batch_size=4096
08)--------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
09)----------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
10)------------------ProjectionExec: expr=[c1@0 as c1]
11)--------------------CoalesceBatchesExec: target_batch_size=4096
12)----------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
14)--------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true


query I
SELECT count(*) as global_count FROM
Expand Down
Loading