From 14c7ec5844625d3ef28c8e887986468b7a3b8a6e Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 13 Dec 2024 11:43:36 +0800 Subject: [PATCH 1/2] Refactor JoinLeftData structure by removing unused memory reservation field in hash join implementation --- datafusion/physical-plan/src/joins/hash_join.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index f7bf039ee7b5..dde0a8b9a759 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -90,9 +90,6 @@ struct JoinLeftData { /// Counter of running probe-threads, potentially /// able to update `visited_indices_bitmap` probe_threads_counter: AtomicUsize, - /// Memory reservation that tracks memory used by `hash_map` hash table - /// `batch`. Cleared on drop. - _reservation: MemoryReservation, } impl JoinLeftData { @@ -102,14 +99,12 @@ impl JoinLeftData { batch: RecordBatch, visited_indices_bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, - reservation: MemoryReservation, ) -> Self { Self { hash_map, batch, visited_indices_bitmap, probe_threads_counter, - _reservation: reservation, } } @@ -902,7 +897,6 @@ async fn collect_left_input( single_batch, Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), - reservation, ); Ok(data) From bde5edb8519f286d7e1898a66fe71843c93906d0 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 13 Dec 2024 11:53:56 +0800 Subject: [PATCH 2/2] Add Debug and Clone derives for HashJoinStreamState and ProcessProbeBatchState enums This commit enhances the HashJoinStreamState and ProcessProbeBatchState structures by implementing the Debug and Clone traits, allowing for easier debugging and cloning of these state representations in the hash join implementation. --- datafusion/physical-plan/src/joins/hash_join.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index dde0a8b9a759..9fcb39e65d02 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1013,6 +1013,7 @@ impl BuildSide { /// └─ ProcessProbeBatch /// /// ``` +#[derive(Debug, Clone)] enum HashJoinStreamState { /// Initial state for HashJoinStream indicating that build-side data not collected yet WaitBuildSide, @@ -1038,6 +1039,7 @@ impl HashJoinStreamState { } /// Container for HashJoinStreamState::ProcessProbeBatch related data +#[derive(Debug, Clone)] struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch,