Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not add redundant subquery ordering into plan #30

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d5d6cda
do not add redundant subquery ordering into plan
mertak-synnada Aug 6, 2024
eeabaf1
format code
mertak-synnada Aug 6, 2024
85e23e7
add license
mertak-synnada Aug 6, 2024
bd63098
fix test cases with sort plan removing
mertak-synnada Aug 6, 2024
930c204
fix comment
mertak-synnada Aug 6, 2024
9d5f875
keep sorting on ordering mode test cases
mertak-synnada Aug 6, 2024
2d40a8d
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
b6bc6d4
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
510b16c
Tmp
mustafasrepo Aug 7, 2024
6ef4369
Minor changes
mustafasrepo Aug 7, 2024
c3efafc
Minor changes
mustafasrepo Aug 7, 2024
2d1b48f
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
2bf220d
Minor changes
mustafasrepo Aug 7, 2024
eb83917
Implement top down recursion with delete check
mustafasrepo Aug 7, 2024
0b66b15
Minor changes
mustafasrepo Aug 7, 2024
9d3a972
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
c769f9f
Minor changes
mustafasrepo Aug 7, 2024
07dca3a
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
9192ca9
initialize fetch() api for execution plan
mertak-synnada Aug 7, 2024
0ad7063
Address reviews
mustafasrepo Aug 7, 2024
3661f06
Update comments
mustafasrepo Aug 7, 2024
60967c1
Minor changes
mustafasrepo Aug 7, 2024
6b87c4c
Make test deterministic
mustafasrepo Aug 7, 2024
a029d6f
add supports limit push down to union exec
mertak-synnada Aug 8, 2024
74041e7
support limit push down with multi children cases
mertak-synnada Aug 8, 2024
1d73ddb
fix typos
mertak-synnada Aug 8, 2024
8dd7e0a
Add fetch info to the statistics
mustafasrepo Aug 8, 2024
23a33df
optimize tpch test plans
mertak-synnada Aug 8, 2024
15423ae
Enforce distribution use inexact count estimate also.
mustafasrepo Aug 8, 2024
94fb83d
Minor changes
mustafasrepo Aug 8, 2024
9053b9f
Minor changes
mustafasrepo Aug 8, 2024
54fc4b2
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 8, 2024
501f403
Merge branch 'refs/heads/bug_fix/enforce_sorting' into feature/Sort-F…
mertak-synnada Aug 8, 2024
ebda00a
Merge remote-tracking branch 'refs/remotes/origin/apache_main' into f…
mertak-synnada Aug 14, 2024
27342ff
merge with apache main
mertak-synnada Aug 14, 2024
1d04db8
format code
mertak-synnada Aug 14, 2024
ec67b36
fix doc paths
mertak-synnada Aug 14, 2024
4564f4b
fix doc paths
mertak-synnada Aug 14, 2024
8e7d1df
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 14, 2024
b139b02
remove redundant code block
mertak-synnada Aug 14, 2024
4472e15
if partition count is 1 put GlobalLimitExec
mertak-synnada Aug 14, 2024
eb96912
fix test cases
mertak-synnada Aug 14, 2024
128676e
Apply suggestions from code review
ozankabak Aug 14, 2024
782487c
fix syntax errors
mertak-synnada Aug 14, 2024
ff227a2
Simplify branches
ozankabak Aug 15, 2024
07820cc
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 19, 2024
c6a9abc
remove redundant limit plans from merge
mertak-synnada Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result, TableReference};
use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{FunctionArg, FunctionArgExpr, TableFactor};
Expand Down Expand Up @@ -142,10 +143,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
);
}
};

let optimized_plan = optimize_subquery_sort(plan)?.data;
if let Some(alias) = alias {
self.apply_table_alias(plan, alias)
self.apply_table_alias(optimized_plan, alias)
} else {
Ok(plan)
Ok(optimized_plan)
}
}
}

fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
// When a subquery is initialized, we look for the sort options since they might be redundant.
// It's only important if the subquery result is affected with the order by statement.
// Which are the cases of:
// 1. DISTINCT ON / ARRAY_AGG ... => It's handled as an Aggregate plan and keep the sorting
// 2. RANK / ROW_NUMBER ... => It's handled as a WindowAggr plan sorting is preserved
// 3. LIMIT => It's handled as a Sort plan type so that we need to search for it
let mut has_limit = false;
let new_plan = plan.clone().transform_down(|c| {
if let LogicalPlan::Limit(_) = c {
has_limit = true;
return Ok(Transformed::no(c));
}
if let LogicalPlan::Sort(_) = c {
if !has_limit {
has_limit = false;
return Ok(Transformed::yes(c.inputs()[0].clone()));
}
}
Ok(Transformed::no(c))
});
new_plan
}
48 changes: 30 additions & 18 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2527,20 +2527,24 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
SUM(s.amount) AS sum1
FROM (SELECT *
FROM sales_global
ORDER BY country) AS s
ORDER BY country
LIMIT 10) AS s
GROUP BY s.country
----
logical_plan
01)Projection: s.country, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
03)----SubqueryAlias: s
04)------Sort: sales_global.country ASC NULLS LAST
05)--------TableScan: sales_global projection=[country, amount]
04)------Limit: skip=0, fetch=10
05)--------Sort: sales_global.country ASC NULLS LAST, fetch=10
06)----------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC], preserve_partitioning=[false]
04)------MemoryExec: partitions=1, partition_sizes=[1]
04)------SortExec: TopK(fetch=10), expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false]
05)--------MemoryExec: partitions=1, partition_sizes=[1]


query T?R rowsort
SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
Expand All @@ -2563,20 +2567,23 @@ EXPLAIN SELECT s.country, s.zip_code, ARRAY_AGG(s.amount ORDER BY s.amount DESC)
SUM(s.amount) AS sum1
FROM (SELECT *
FROM sales_global
ORDER BY country) AS s
ORDER BY country
LIMIT 10) AS s
GROUP BY s.country, s.zip_code
----
logical_plan
01)Projection: s.country, s.zip_code, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
02)--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
03)----SubqueryAlias: s
04)------Sort: sales_global.country ASC NULLS LAST
05)--------TableScan: sales_global projection=[zip_code, country, amount]
04)------Limit: skip=0, fetch=10
05)--------Sort: sales_global.country ASC NULLS LAST, fetch=10
06)----------TableScan: sales_global projection=[zip_code, country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, sum(s.amount)@3 as sum1]
02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=PartiallySorted([0])
03)----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC], preserve_partitioning=[false]
04)------MemoryExec: partitions=1, partition_sizes=[1]
04)------SortExec: TopK(fetch=10), expr=[country@1 ASC NULLS LAST], preserve_partitioning=[false]
05)--------MemoryExec: partitions=1, partition_sizes=[1]

query TI?R rowsort
SELECT s.country, s.zip_code, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
Expand All @@ -2599,19 +2606,21 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC) AS amounts
SUM(s.amount) AS sum1
FROM (SELECT *
FROM sales_global
ORDER BY country) AS s
ORDER BY country
LIMIT 10) AS s
GROUP BY s.country
----
logical_plan
01)Projection: s.country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
03)----SubqueryAlias: s
04)------Sort: sales_global.country ASC NULLS LAST
05)--------TableScan: sales_global projection=[country, amount]
04)------Limit: skip=0, fetch=10
05)--------Sort: sales_global.country ASC NULLS LAST, fetch=10
06)----------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false]
03)----SortExec: TopK(fetch=10), expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------MemoryExec: partitions=1, partition_sizes=[1]

query T?R rowsort
Expand All @@ -2634,20 +2643,24 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount D
SUM(s.amount) AS sum1
FROM (SELECT *
FROM sales_global
ORDER BY country) AS s
ORDER BY country
LIMIT 10) AS s
GROUP BY s.country
----
logical_plan
01)Projection: s.country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
03)----SubqueryAlias: s
04)------Sort: sales_global.country ASC NULLS LAST
05)--------TableScan: sales_global projection=[country, amount]
04)------Limit: skip=0, fetch=10
05)--------Sort: sales_global.country ASC NULLS LAST, fetch=10
06)----------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC], preserve_partitioning=[false]
04)------MemoryExec: partitions=1, partition_sizes=[1]
04)------SortExec: TopK(fetch=10), expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false]
05)--------MemoryExec: partitions=1, partition_sizes=[1]


query T?R rowsort
SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS amounts,
Expand Down Expand Up @@ -2801,8 +2814,7 @@ EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
logical_plan
01)Projection: sales_global.country, first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
02)--Aggregate: groupBy=[[sales_global.country]], aggr=[[first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], sum(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
03)----Sort: sales_global.ts ASC NULLS LAST
04)------TableScan: sales_global projection=[country, ts, amount]
03)----TableScan: sales_global projection=[country, ts, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]
Expand Down
26 changes: 16 additions & 10 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3898,6 +3898,7 @@ SELECT * FROM (
) as lhs RIGHT JOIN (
SELECT * from right_table_no_nulls
ORDER BY b
LIMIT 10
) AS rhs ON lhs.b=rhs.b
----
11 1 21 1
Expand All @@ -3911,23 +3912,26 @@ EXPLAIN SELECT * FROM (
) as lhs RIGHT JOIN (
SELECT * from right_table_no_nulls
ORDER BY b
LIMIT 10
) AS rhs ON lhs.b=rhs.b
----
logical_plan
01)Right Join: lhs.b = rhs.b
02)--SubqueryAlias: lhs
03)----TableScan: left_table_no_nulls projection=[a, b]
04)--SubqueryAlias: rhs
05)----Sort: right_table_no_nulls.b ASC NULLS LAST
06)------TableScan: right_table_no_nulls projection=[a, b]
05)----Limit: skip=0, fetch=10
06)------Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10
07)--------TableScan: right_table_no_nulls projection=[a, b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
05)------MemoryExec: partitions=1, partition_sizes=[1]



# Missing probe index in the middle of the batch:
statement ok
CREATE TABLE left_table_missing_probe(a INT UNSIGNED, b INT UNSIGNED)
Expand All @@ -3953,6 +3957,7 @@ SELECT * FROM (
) as lhs RIGHT JOIN (
SELECT * from right_table_missing_probe
ORDER BY b
LIMIT 10
) AS rhs ON lhs.b=rhs.b
----
11 1 21 1
Expand All @@ -3974,14 +3979,12 @@ logical_plan
02)--SubqueryAlias: lhs
03)----TableScan: left_table_no_nulls projection=[a, b]
04)--SubqueryAlias: rhs
05)----Sort: right_table_no_nulls.b ASC NULLS LAST
06)------TableScan: right_table_no_nulls projection=[a, b]
05)----TableScan: right_table_no_nulls projection=[a, b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
05)------MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]


# Null build indices:
Expand Down Expand Up @@ -4009,6 +4012,7 @@ SELECT * FROM (
) as lhs RIGHT JOIN (
SELECT * from right_table_append_null_build
ORDER BY b
LIMIT 10
) AS rhs ON lhs.b=rhs.b
----
NULL NULL 21 4
Expand All @@ -4025,18 +4029,20 @@ EXPLAIN SELECT * FROM (
) as lhs RIGHT JOIN (
SELECT * from right_table_no_nulls
ORDER BY b
LIMIT 10
) AS rhs ON lhs.b=rhs.b
----
logical_plan
01)Right Join: lhs.b = rhs.b
02)--SubqueryAlias: lhs
03)----TableScan: left_table_no_nulls projection=[a, b]
04)--SubqueryAlias: rhs
05)----Sort: right_table_no_nulls.b ASC NULLS LAST
06)------TableScan: right_table_no_nulls projection=[a, b]
05)----Limit: skip=0, fetch=10
06)------Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10
07)--------TableScan: right_table_no_nulls projection=[a, b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)]
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
05)------MemoryExec: partitions=1, partition_sizes=[1]
30 changes: 17 additions & 13 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -990,13 +990,13 @@ FROM (
) AS a
) AS b
----
a 5 -101
a 5 -54
a 5 -38
a 5 -54
a 6 36
a 6 -31
a 5 65
a 5 -101
a 6 -101
a 6 -31
a 6 36

# nested select without aliases
query TII
Expand All @@ -1011,13 +1011,13 @@ FROM (
)
)
----
a 5 -101
a 5 -54
a 5 -38
a 5 -54
a 6 36
a 6 -31
a 5 65
a 5 -101
a 6 -101
a 6 -31
a 6 36

# select with join unaliased subqueries
query TIITII
Expand Down Expand Up @@ -1461,22 +1461,26 @@ query TT
EXPLAIN SELECT c2, COUNT(*)
FROM (SELECT c2
FROM aggregate_test_100
ORDER BY c1, c2)
ORDER BY c1, c2
LIMIT 4)
GROUP BY c2;
----
logical_plan
01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(Int64(1)) AS count(*)]]
02)--Projection: aggregate_test_100.c2
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST
04)------Projection: aggregate_test_100.c2, aggregate_test_100.c1
05)--------TableScan: aggregate_test_100 projection=[c1, c2]
03)----Limit: skip=0, fetch=4
04)------Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST, fetch=4
05)--------Projection: aggregate_test_100.c2, aggregate_test_100.c1
06)----------TableScan: aggregate_test_100 projection=[c1, c2]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[count(*)]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2
04)------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[count(*)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true
06)----------ProjectionExec: expr=[c2@0 as c2]
07)------------SortExec: TopK(fetch=4), expr=[c1@1 ASC NULLS LAST,c2@0 ASC NULLS LAST], preserve_partitioning=[false]
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c1], has_header=true

# FilterExec can track equality of non-column expressions.
# plan below shouldn't have a SortExec because given column 'a' is ordered.
Expand Down
Loading
Loading