From b4aa46cc2fc97ae12f2599e5485ed4b0570ca3a9 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Wed, 23 Oct 2024 11:10:32 -0700 Subject: [PATCH] deal with empty partition --- datafusion/physical-plan/src/joins/dynamic_filters.rs | 8 ++++++-- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/dynamic_filters.rs b/datafusion/physical-plan/src/joins/dynamic_filters.rs index 667c7fc7f60d1..8517375216b30 100644 --- a/datafusion/physical-plan/src/joins/dynamic_filters.rs +++ b/datafusion/physical-plan/src/joins/dynamic_filters.rs @@ -96,10 +96,14 @@ impl DynamicFilterInfo { ) -> Result { let mut inner = self.inner.lock(); - if inner.final_expr.is_some() { + // that indicates this partition of stream may contains no data, so we return + // or we have already have the final_expr + if inner.final_expr.is_some() + || (inner.processed_partitions.contains(&partition) + && records.num_rows() == 0) + { return Ok(true); } - if !inner.processed_partitions.insert(partition) { return Ok(false); } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 9488d2aabca91..e9bd21e385846 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1335,7 +1335,7 @@ impl HashJoinStream { build_timer.done(); // Merge the information to dynamic filters (if there is any) and check if it's finalized let filter_finalized = if let Some(filter_info) = &self.dynamic_filter_info { - filter_info.merge_batch_and_check_finalized(&left_data.batch)? + filter_info.merge_batch_and_check_finalized(left_data.batch())? } else { true // If there's no dynamic filter, we consider it as "finalized" };