-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HashJoin Checkpoint Upstream Diff #49
Conversation
Signed-off-by: Jay Zhan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan-synnada I have marked each diff as remove from this PR or keep it in. You will understand our approach after looking at them. We both try to keep the codes same as much as possible, and to not spoil the upstream code.
After updating according to my suggestions, you can open the datafusion PR with the remaining parts in this PR.
@@ -215,7 +215,7 @@ mod tests { | |||
|
|||
let test2 = BinaryTestCase { | |||
source_types: (SourceType::Bounded, SourceType::Unbounded), | |||
expect_fail: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this change
@@ -290,7 +290,7 @@ mod tests { | |||
}; | |||
let test2 = BinaryTestCase { | |||
source_types: (SourceType::Bounded, SourceType::Unbounded), | |||
expect_fail: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this change
@@ -90,9 +109,11 @@ struct JoinLeftData { | |||
/// Counter of running probe-threads, potentially | |||
/// able to update `visited_indices_bitmap` | |||
probe_threads_counter: AtomicUsize, | |||
/// Memory reservation that tracks memory used by `hash_map` hash table | |||
/// `batch`. Cleared on drop. | |||
_reservation: MemoryReservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing reservation will be sent to the upstream, but upstream does not need batches_hash_values
, it will be kept in Aras only
@@ -102,14 +123,14 @@ impl JoinLeftData { | |||
batch: RecordBatch, | |||
visited_indices_bitmap: SharedBitmapBuilder, | |||
probe_threads_counter: AtomicUsize, | |||
reservation: MemoryReservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
) -> Self { | ||
Self { | ||
hash_map, | ||
batch, | ||
visited_indices_bitmap, | ||
probe_threads_counter, | ||
_reservation: reservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
@@ -902,7 +918,7 @@ async fn collect_left_input( | |||
single_batch, | |||
Mutex::new(visited_indices_bitmap), | |||
AtomicUsize::new(probe_threads_count), | |||
reservation, | |||
batches_hash_values, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
@@ -1043,7 +1059,10 @@ impl HashJoinStreamState { | |||
} | |||
} | |||
|
|||
/// THIS STRUCT IS COMMON, MODIFIED BY ARAS | |||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change
/// Container for HashJoinStreamState::ProcessProbeBatch related data | ||
#[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can send this change to decrease diff
batch, | ||
offset: (0, None), | ||
joined_probe_idx: None, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these changes
@@ -4083,4 +4102,4 @@ mod tests { | |||
fn columns(schema: &Schema) -> Vec<String> { | |||
schema.fields().iter().map(|f| f.name().clone()).collect() | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?