Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jul 12, 2024
1 parent 32781db commit 94933ce
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,6 @@ impl SortMergeJoinExec {

impl DisplayAs for SortMergeJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);

match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let on = self
Expand All @@ -255,7 +250,12 @@ impl DisplayAs for SortMergeJoinExec {
write!(
f,
"SortMergeJoin: join_type={:?}, on=[{}]{}",
self.join_type, on, display_filter
self.join_type,
on,
self.filter
.as_ref()
.map(|f| format!(", filter={}", f.expression()))
.unwrap_or("".to_string())
)
}
}
Expand Down Expand Up @@ -959,8 +959,7 @@ impl SMJStream {
// Shrink mem usage for non spilled batches only
if buffered_batch.spill_file.is_none() {
self.reservation
.try_shrink(buffered_batch.size_estimation)
.unwrap_or(());
.shrink(buffered_batch.size_estimation);
}
}
} else {
Expand Down Expand Up @@ -2911,6 +2910,10 @@ mod tests {
"Resources exhausted: Failed to allocate additional"
);
assert_contains!(err.to_string(), "SMJStream[0]");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_rows(), Some(0));
}

Ok(())
Expand Down Expand Up @@ -2993,6 +2996,10 @@ mod tests {
"Resources exhausted: Failed to allocate additional"
);
assert_contains!(err.to_string(), "SMJStream[0]");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_rows(), Some(0));
}

Ok(())
Expand Down Expand Up @@ -3049,6 +3056,11 @@ mod tests {

let stream = join.execute(0, task_ctx)?;
let _ = common::collect(stream).await.unwrap();

assert!(join.metrics().is_some());
assert!(join.metrics().unwrap().spill_count().unwrap() > 0);
assert!(join.metrics().unwrap().spilled_bytes().unwrap() > 0);
assert!(join.metrics().unwrap().spilled_rows().unwrap() > 0);
}

Ok(())
Expand Down Expand Up @@ -3125,6 +3137,10 @@ mod tests {

let stream = join.execute(0, task_ctx)?;
let _ = common::collect(stream).await.unwrap();
assert!(join.metrics().is_some());
assert!(join.metrics().unwrap().spill_count().unwrap() > 0);
assert!(join.metrics().unwrap().spilled_bytes().unwrap() > 0);
assert!(join.metrics().unwrap().spilled_rows().unwrap() > 0);
}

Ok(())
Expand Down

0 comments on commit 94933ce

Please sign in to comment.