Skip to content

Commit

Permalink
Do not pushdown projections making calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Apr 30, 2024
1 parent b826d29 commit fb8bf5a
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,12 @@ impl ProjectionOptimizer {
return Ok(Transformed::no(self));
};
// If the projection does not narrow the schema, we should not try to push it down:
if projection.expr().len() >= projection.input().schema().fields().len() {
if projection.expr().len() >= projection.input().schema().fields().len()
|| !projection.expr().iter().all(|(expr, _)| {
expr.as_any().downcast_ref::<Column>().is_some()
|| expr.as_any().downcast_ref::<Literal>().is_some()
})
{
return Ok(Transformed::no(self));
}

Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1451,8 +1451,8 @@ logical_plan
03)----TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
04)----TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
02)--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@4, t2_name@5, t2_int@6]
04)------CoalescePartitionsExec
05)--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
Expand All @@ -1477,8 +1477,8 @@ logical_plan
03)----TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
04)----TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
02)--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@4, t2_name@5, t2_int@6]
04)------CoalesceBatchesExec: target_batch_size=2
05)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3], 2), input_partitions=2
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/q7.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([supp_nation@0, cust_nation@1, l_year@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------ProjectionExec: expr=[n_name@3 as supp_nation, n_name@4 as cust_nation, date_part(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume]
08)--------------ProjectionExec: expr=[n_name@3 as supp_nation, n_name@4 as cust_nation, date_part(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_nationkey@3, n_nationkey@0)], filter=n_name@0 = FRANCE AND n_name@1 = GERMANY OR n_name@0 = GERMANY AND n_name@1 = FRANCE, projection=[l_extendedprice@0, l_discount@1, l_shipdate@2, n_name@4, n_name@6]
11)--------------------CoalesceBatchesExec: target_batch_size=8192
12)----------------------RepartitionExec: partitioning=Hash([c_nationkey@3], 4), input_partitions=4
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/q8.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([o_year@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3 as nation]
08)--------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3 as nation]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_regionkey@3, r_regionkey@0)], projection=[l_extendedprice@0, l_discount@1, o_orderdate@2, n_name@4]
11)--------------------CoalesceBatchesExec: target_batch_size=8192
12)----------------------RepartitionExec: partitioning=Hash([n_regionkey@3], 4), input_partitions=4
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/q9.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ physical_plan
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([nation@0, o_year@1], 4), input_partitions=4
08)--------------AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------ProjectionExec: expr=[n_name@5 as nation, date_part(YEAR, o_orderdate@4) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@3 * l_quantity@0 as amount]
09)----------------ProjectionExec: expr=[n_name@5 as nation, date_part(YEAR, o_orderdate@4) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@3 * l_quantity@0 as amount]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@3, n_nationkey@0)], projection=[l_quantity@0, l_extendedprice@1, l_discount@2, ps_supplycost@4, o_orderdate@5, n_name@7]
12)----------------------CoalesceBatchesExec: target_batch_size=8192
13)------------------------RepartitionExec: partitioning=Hash([s_nationkey@3], 4), input_partitions=4
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ physical_plan
12)----------ProjectionExec: expr=[name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
13)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
14)--------------MemoryExec: partitions=1, partition_sizes=[1]
15)--CoalesceBatchesExec: target_batch_size=2
16)----ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
15)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
16)----CoalesceBatchesExec: target_batch_size=2
17)------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
18)--------CoalesceBatchesExec: target_batch_size=2
19)----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, name@1], 4), input_partitions=4
Expand Down

0 comments on commit fb8bf5a

Please sign in to comment.