Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Remove memory reservation in JoinLeftData used in HashJoin #13751

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ struct JoinLeftData {
/// Counter of running probe-threads, potentially
/// able to update `visited_indices_bitmap`
probe_threads_counter: AtomicUsize,
/// Memory reservation that tracks memory used by `hash_map` hash table
/// `batch`. Cleared on drop.
_reservation: MemoryReservation,
Copy link
Contributor

@korowa korowa Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case reservation is not stored in JoinLeftData, it'll be dropped right after collect_left_input due to being unneeded anymore. And due to MemoryReservation calls free on its drop, it looks like that with this patch, during the whole join execution build side data will be untracked by memory pool.

So the initial idea of this attribute was to make reservation to live as long as JoinLeftData exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPD: some "logs" (additional prints 😞 ) just to check join execution before/after this patch:

// Before
---- joins::hash_join::tests::join_inner_one_no_shared_column_names stdout ----
Waiting build side
Reserving 324 bytes
Reserving 124 bytes
Fetching probe batch
Processing probe batch
Fetching probe batch
Processing unmatched build side
Freeing 448 bytes

// After
---- joins::hash_join::tests::join_inner_one_no_shared_column_names stdout ----
Waiting build side
Reserving 324 bytes
Reserving 124 bytes
Freeing 448 bytes
Fetching probe batch
Processing probe batch
Fetching probe batch
Processing unmatched build side

so yes, the memory is "freed" before join completes its execution, which doesn't seem to be an expected behavior.

Copy link
Contributor

@jayzhan211 jayzhan211 Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I think this is not an issue is because it only matters when we run 'collect_left_join', we track the memory and free after the build side is done. Although, the memory is clean while the hash map is not freed. BUT, since we don't have expected memory change in the probe stage, the memory is not helpful for the probe stage. That's why I think this will not cause any issues if we drop the reservation early.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, the memory is clean while the hash map is not freed

That actually is the issue, because there is single memory pool for a RuntimeEnv, and it's intended to be used by multiple sessions or at least by the multiple operators within the same query. Even in case of single query like select * from a join b order by a.field untracked build side may cause OOM due to SortExec will consider MemoryPool as being empty, while there is a build side data in memory.

}

impl JoinLeftData {
Expand All @@ -102,14 +99,12 @@ impl JoinLeftData {
batch: RecordBatch,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
reservation: MemoryReservation,
) -> Self {
Self {
hash_map,
batch,
visited_indices_bitmap,
probe_threads_counter,
_reservation: reservation,
}
}

Expand Down Expand Up @@ -902,7 +897,6 @@ async fn collect_left_input(
single_batch,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
);

Ok(data)
Expand Down Expand Up @@ -1019,6 +1013,7 @@ impl BuildSide {
/// └─ ProcessProbeBatch
///
/// ```
#[derive(Debug, Clone)]
enum HashJoinStreamState {
/// Initial state for HashJoinStream indicating that build-side data not collected yet
WaitBuildSide,
Expand All @@ -1044,6 +1039,7 @@ impl HashJoinStreamState {
}

/// Container for HashJoinStreamState::ProcessProbeBatch related data
#[derive(Debug, Clone)]
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
Expand Down
Loading