Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HashJoin Checkpoint Upstream Diff #49

Closed
wants to merge 1 commit into from
Closed

Conversation

jayzhan-synnada
Copy link

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Signed-off-by: Jay Zhan <[email protected]>
Copy link
Collaborator

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayzhan-synnada I have marked each diff as remove from this PR or keep it in. You will understand our approach after looking at them. We both try to keep the codes same as much as possible, and to not spoil the upstream code.

After updating according to my suggestions, you can open the datafusion PR with the remaining parts in this PR.

@@ -215,7 +215,7 @@ mod tests {

let test2 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this change

@@ -290,7 +290,7 @@ mod tests {
};
let test2 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this change

@@ -90,9 +109,11 @@ struct JoinLeftData {
/// Counter of running probe-threads, potentially
/// able to update `visited_indices_bitmap`
probe_threads_counter: AtomicUsize,
/// Memory reservation that tracks memory used by `hash_map` hash table
/// `batch`. Cleared on drop.
_reservation: MemoryReservation,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing reservation will be sent to the upstream, but upstream does not need batches_hash_values, it will be kept in Aras only

@@ -102,14 +123,14 @@ impl JoinLeftData {
batch: RecordBatch,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
reservation: MemoryReservation,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep reservation removal, but remove batches_hash_values

) -> Self {
Self {
hash_map,
batch,
visited_indices_bitmap,
probe_threads_counter,
_reservation: reservation,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep reservation removal, but remove batches_hash_values

@@ -902,7 +918,7 @@ async fn collect_left_input(
single_batch,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
batches_hash_values,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep reservation removal, but remove batches_hash_values

@@ -1043,7 +1059,10 @@ impl HashJoinStreamState {
}
}

/// THIS STRUCT IS COMMON, MODIFIED BY ARAS
///
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this change

/// Container for HashJoinStreamState::ProcessProbeBatch related data
#[derive(Debug, Clone)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can send this change to decrease diff

batch,
offset: (0, None),
joined_probe_idx: None,
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these changes

@@ -4083,4 +4102,4 @@ mod tests {
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this change

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants