Skip to content

Commit

Permalink
tpch fails
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Feb 20, 2024
1 parent 56b3136 commit 26542ba
Show file tree
Hide file tree
Showing 30 changed files with 1,009 additions and 662 deletions.
668 changes: 508 additions & 160 deletions datafusion/core/src/physical_optimizer/optimize_projections.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
/// The order of the given expressions is taken into account while replacing.
fn with_new_expressions(
self: Arc<Self>,
expressions: Vec<Arc<dyn PhysicalExpr>>,
_expressions: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn AggregateExpr>> {
None
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use arrow::compute::kernels::sort::SortColumn;
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::{
internal_err, not_impl_err, DataFusionError, Result, ScalarValue,
};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::window_state::{
PartitionBatchState, WindowAggState, WindowFrameContext,
};
Expand Down Expand Up @@ -134,7 +132,7 @@ pub trait WindowExpr: Send + Sync + Debug {
/// The order of the given expressions is taken into account while replacing.
fn with_new_expressions(
self: Arc<Self>,
expressions: Vec<Arc<dyn PhysicalExpr>>,
_expressions: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn WindowExpr>> {
None
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ProjectionExec {
expr = expr
.iter()
.enumerate()
.map(|(expr_idx, (expression, name))| {
.map(|(_expr_idx, (expression, name))| {
expression
.clone()
.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ Sort: balances.time ASC NULLS LAST, balances.name ASC NULLS LAST, balances.accou
physical_plan
SortExec: expr=[time@0 ASC NULLS LAST,name@1 ASC NULLS LAST,account_balance@2 ASC NULLS LAST]
--RecursiveQueryExec: name=balances, is_distinct=false
----ProjectionExec: expr=[time@0 as time, name@1 as name, account_balance@2 as account_balance]
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/balance.csv]]}, projection=[time, name, account_balance], has_header=true
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/balance.csv]]}, projection=[time, name, account_balance], has_header=true
----CoalescePartitionsExec
------ProjectionExec: expr=[time@0 + 1 as time, name@1 as name, account_balance@2 + 10 as account_balance]
--------CoalesceBatchesExec: target_batch_size=2
Expand Down
100 changes: 51 additions & 49 deletions datafusion/sqllogictest/test_files/group_by.slt

Large diffs are not rendered by default.

178 changes: 93 additions & 85 deletions datafusion/sqllogictest/test_files/joins.slt

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/q1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS
------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------FilterExec: l_shipdate@6 <= 10471
--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false
------------------ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
--------------------FilterExec: l_shipdate@6 <= 10471
----------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false

query TTRRRRRRRI
select
Expand Down
20 changes: 10 additions & 10 deletions datafusion/sqllogictest/test_files/tpch/q10.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,33 @@ GlobalLimitExec: skip=0, fetch=10
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([c_custkey@0, c_name@1, c_acctbal@2, c_phone@3, n_name@4, c_address@5, c_comment@6], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
----------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name]
------------------CoalesceBatchesExec: target_batch_size=8192
----------------CoalesceBatchesExec: target_batch_size=8192
------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name]
--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_nationkey@3, n_nationkey@0)]
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([c_nationkey@3], 4), input_partitions=4
--------------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount]
----------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount]
------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@7, l_orderkey@0)]
--------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------------RepartitionExec: partitioning=Hash([o_orderkey@7], 4), input_partitions=4
------------------------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey]
--------------------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey]
----------------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)]
------------------------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
----------------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment], has_header=false
------------------------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4
----------------------------------------------ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]
------------------------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------------------------ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]
--------------------------------------------------FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766
----------------------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_orderdate], has_header=false
--------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4
------------------------------------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
--------------------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------------------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
----------------------------------------FilterExec: l_returnflag@3 = R
------------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], has_header=false
----------------------CoalesceBatchesExec: target_batch_size=8192
Expand Down
24 changes: 12 additions & 12 deletions datafusion/sqllogictest/test_files/tpch/q11.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ GlobalLimitExec: skip=0, fetch=10
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4
----------------AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
------------------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, ps_availqty@1 as ps_availqty, ps_supplycost@2 as ps_supplycost]
--------------------CoalesceBatchesExec: target_batch_size=8192
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, ps_availqty@1 as ps_availqty, ps_supplycost@2 as ps_supplycost]
----------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@3, n_nationkey@0)]
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------RepartitionExec: partitioning=Hash([s_nationkey@3], 4), input_partitions=4
----------------------------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, ps_availqty@2 as ps_availqty, ps_supplycost@3 as ps_supplycost, s_nationkey@5 as s_nationkey]
------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, ps_availqty@2 as ps_availqty, ps_supplycost@3 as ps_supplycost, s_nationkey@5 as s_nationkey]
--------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@1, s_suppkey@0)]
----------------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@1], 4), input_partitions=4
Expand All @@ -100,22 +100,22 @@ GlobalLimitExec: skip=0, fetch=10
----------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4
----------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
--------------------------------FilterExec: n_name@1 = GERMANY
----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false
----------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)]
------------AggregateExec: mode=Final, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
--------------CoalescePartitionsExec
----------------AggregateExec: mode=Partial, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
------------------ProjectionExec: expr=[ps_availqty@0 as ps_availqty, ps_supplycost@1 as ps_supplycost]
--------------------CoalesceBatchesExec: target_batch_size=8192
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------ProjectionExec: expr=[ps_availqty@0 as ps_availqty, ps_supplycost@1 as ps_supplycost]
----------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)]
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 4), input_partitions=4
----------------------------ProjectionExec: expr=[ps_availqty@1 as ps_availqty, ps_supplycost@2 as ps_supplycost, s_nationkey@4 as s_nationkey]
------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------ProjectionExec: expr=[ps_availqty@1 as ps_availqty, ps_supplycost@2 as ps_supplycost, s_nationkey@4 as s_nationkey]
--------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)]
----------------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4
Expand All @@ -126,8 +126,8 @@ GlobalLimitExec: skip=0, fetch=10
----------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4
----------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
------------------------------CoalesceBatchesExec: target_batch_size=8192
----------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
--------------------------------FilterExec: n_name@1 = GERMANY
----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/tpch/q12.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([l_shipmode@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
--------------ProjectionExec: expr=[l_shipmode@1 as l_shipmode, o_orderpriority@3 as o_orderpriority]
----------------CoalesceBatchesExec: target_batch_size=8192
--------------CoalesceBatchesExec: target_batch_size=8192
----------------ProjectionExec: expr=[l_shipmode@1 as l_shipmode, o_orderpriority@3 as o_orderpriority]
------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_orderkey@0, o_orderkey@0)]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4
------------------------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode]
--------------------------CoalesceBatchesExec: target_batch_size=8192
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode]
----------------------------FilterExec: (l_shipmode@4 = MAIL OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], has_header=false
--------------------CoalesceBatchesExec: target_batch_size=8192
Expand Down
Loading

0 comments on commit 26542ba

Please sign in to comment.