Skip to content

Commit

Permalink
deal with empty partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 23, 2024
1 parent 959c158 commit b4aa46c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/joins/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ impl DynamicFilterInfo {
) -> Result<bool, DataFusionError> {
let mut inner = self.inner.lock();

if inner.final_expr.is_some() {
// that indicates this partition of stream may contains no data, so we return
// or we have already have the final_expr
if inner.final_expr.is_some()
|| (inner.processed_partitions.contains(&partition)
&& records.num_rows() == 0)
{
return Ok(true);
}

if !inner.processed_partitions.insert(partition) {
return Ok(false);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,7 @@ impl HashJoinStream {
build_timer.done();
// Merge the information to dynamic filters (if there is any) and check if it's finalized
let filter_finalized = if let Some(filter_info) = &self.dynamic_filter_info {
filter_info.merge_batch_and_check_finalized(&left_data.batch)?
filter_info.merge_batch_and_check_finalized(left_data.batch())?
} else {
true // If there's no dynamic filter, we consider it as "finalized"
};
Expand Down

0 comments on commit b4aa46c

Please sign in to comment.