Skip to content

Commit

Permalink
NestedLoopJoin Projection Pushdown (apache#14120)
Browse files Browse the repository at this point in the history
* nlj proj pushdown

Signed-off-by: Jay Zhan <[email protected]>

* fmt

Signed-off-by: Jay Zhan <[email protected]>

* move swap proj to util

Signed-off-by: Jay Zhan <[email protected]>

* fmt

Signed-off-by: Jay Zhan <[email protected]>

* fix proto

Signed-off-by: Jay Zhan <[email protected]>

* fmt

Signed-off-by: Jay Zhan <[email protected]>

* use none

Signed-off-by: Jay Zhan <[email protected]>

* proto fix

Signed-off-by: Jay Zhan <[email protected]>

* fix slt

Signed-off-by: Jay Zhan <[email protected]>

* Update projection_pushdown.rs

* refactor: streamline projection pushdown logic for join operations

* minor

* fmt

Signed-off-by: Jay Zhan <[email protected]>

---------

Signed-off-by: Jay Zhan <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
  • Loading branch information
jayzhan-synnada and berkaysynnada authored Jan 16, 2025
1 parent 3cd31af commit 05f4e5a
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 177 deletions.
296 changes: 193 additions & 103 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ impl DefaultPhysicalPlanner {
physical_right,
join_filter,
join_type,
None,
)?)
}
} else if session_state.config().target_partitions() > 1
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl JoinFuzzTestCase {
let filter = JoinFilter::new(expression, column_indices, intermediate_schema);

Arc::new(
NestedLoopJoinExec::try_new(left, right, Some(filter), &self.join_type)
NestedLoopJoinExec::try_new(left, right, Some(filter), &self.join_type, None)
.unwrap(),
)
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ mod tests_statistical {
Arc::clone(&small),
nl_join_filter(),
&join_type,
None,
)
.unwrap(),
);
Expand Down Expand Up @@ -1123,6 +1124,7 @@ mod tests_statistical {
Arc::clone(&small),
nl_join_filter(),
&join_type,
None,
)
.unwrap(),
);
Expand Down
38 changes: 3 additions & 35 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{any::Any, vec};

use super::utils::{
asymmetric_join_output_partitioning, get_final_indices_from_shared_bitmap,
reorder_output_after_swap,
reorder_output_after_swap, swap_join_projection,
};
use super::{
utils::{OnceAsync, OnceFut},
Expand Down Expand Up @@ -477,7 +477,7 @@ impl HashJoinExec {
}

/// Return whether the join contains a projection
pub fn contain_projection(&self) -> bool {
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
}

Expand Down Expand Up @@ -626,38 +626,6 @@ impl HashJoinExec {
}
}

/// This function swaps the given join's projection.
fn swap_join_projection(
left_schema_len: usize,
right_schema_len: usize,
projection: Option<&Vec<usize>>,
join_type: &JoinType,
) -> Option<Vec<usize>> {
match join_type {
// For Anti/Semi join types, projection should remain unmodified,
// since these joins output schema remains the same after swap
JoinType::LeftAnti
| JoinType::LeftSemi
| JoinType::RightAnti
| JoinType::RightSemi => projection.cloned(),

_ => projection.map(|p| {
p.iter()
.map(|i| {
// If the index is less than the left schema length, it is from
// the left schema, so we add the right schema length to it.
// Otherwise, it is from the right schema, so we subtract the left
// schema length from it.
if *i < left_schema_len {
*i + right_schema_len
} else {
*i - left_schema_len
}
})
.collect()
}),
}
}
impl DisplayAs for HashJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
Expand All @@ -666,7 +634,7 @@ impl DisplayAs for HashJoinExec {
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.contain_projection() {
let display_projections = if self.contains_projection() {
format!(
", projection=[{}]",
self.projection
Expand Down
Loading

0 comments on commit 05f4e5a

Please sign in to comment.