Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Jan 16, 2025
1 parent a1bba91 commit 47fbc05
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
54 changes: 39 additions & 15 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,12 @@ fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> Vec<usiz
indices
}

struct JoinData(ProjectionExec, ProjectionExec, Option<JoinFilter>, JoinOn);
struct JoinData {
projected_left_child: ProjectionExec,
projected_right_child: ProjectionExec,
join_filter: Option<JoinFilter>,
join_on: JoinOn,
}

fn try_pushdown_through_join(
projection: &ProjectionExec,
Expand All @@ -640,8 +645,7 @@ fn try_pushdown_through_join(
schema: SchemaRef,
filter: Option<&JoinFilter>,
) -> Result<Option<JoinData>> {
// Convert projected expressions to columns. We can not proceed if this is
// not possible.
// Convert projected expressions to columns. We can not proceed if this is not possible.
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
Expand Down Expand Up @@ -689,18 +693,32 @@ fn try_pushdown_through_join(
join_right,
)?;

Ok(Some(JoinData(new_left, new_right, new_filter, new_on)))
Ok(Some(JoinData {
projected_left_child: new_left,
projected_right_child: new_right,
join_filter: new_filter,
join_on: new_on,
}))
}

/// Tries to push `projection` down through `nested_loop_join`. If possible, performs the
/// pushdown and returns a new [`NestedLoopJoinExec`] as the top plan which has projections
/// as its children. Otherwise, returns `None`.
fn try_pushdown_through_nested_loop_join(
projection: &ProjectionExec,
nl_join: &NestedLoopJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if nl_join.contain_projection() {
// TODO: currently if there is projection in NestedLoopJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
if nl_join.contains_projection() {
return Ok(None);
}

if let Some(JoinData(left, right, filter, _)) = try_pushdown_through_join(
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
..
}) = try_pushdown_through_join(
projection,
nl_join.left(),
nl_join.right(),
Expand All @@ -709,10 +727,11 @@ fn try_pushdown_through_nested_loop_join(
nl_join.filter(),
)? {
Ok(Some(Arc::new(NestedLoopJoinExec::try_new(
Arc::new(left),
Arc::new(right),
filter,
Arc::new(projected_left_child),
Arc::new(projected_right_child),
join_filter,
nl_join.join_type(),
// Returned early if projection is not None
None,
)?)))
} else {
Expand All @@ -728,11 +747,16 @@ fn try_pushdown_through_hash_join(
hash_join: &HashJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later.
if hash_join.contain_projection() {
if hash_join.contains_projection() {
return Ok(None);
}

if let Some(JoinData(left, right, filter, join_on)) = try_pushdown_through_join(
if let Some(JoinData {
projected_left_child,
projected_right_child,
join_filter,
join_on,
}) = try_pushdown_through_join(
projection,
hash_join.left(),
hash_join.right(),
Expand All @@ -741,12 +765,12 @@ fn try_pushdown_through_hash_join(
hash_join.filter(),
)? {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::new(left),
Arc::new(right),
Arc::new(projected_left_child),
Arc::new(projected_right_child),
join_on,
filter,
join_filter,
hash_join.join_type(),
// Early return if projection is not None
// Returned early if projection is not None
None,
*hash_join.partition_mode(),
hash_join.null_equals_null,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl HashJoinExec {
}

/// Return whether the join contains a projection
pub fn contain_projection(&self) -> bool {
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
}

Expand Down Expand Up @@ -626,7 +626,7 @@ impl DisplayAs for HashJoinExec {
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.contain_projection() {
let display_projections = if self.contains_projection() {
format!(
", projection=[{}]",
self.projection
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl NestedLoopJoinExec {
]
}

pub fn contain_projection(&self) -> bool {
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
}

Expand Down Expand Up @@ -398,7 +398,7 @@ impl DisplayAs for NestedLoopJoinExec {
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.contain_projection() {
let display_projections = if self.contains_projection() {
format!(
", projection=[{}]",
self.projection
Expand Down

0 comments on commit 47fbc05

Please sign in to comment.