Skip to content

Commit

Permalink
[MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
Browse files Browse the repository at this point in the history
* minor changes

* Fix imports

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
metesynnada and ozankabak authored Dec 15, 2023
1 parent 82235ae commit bf0073c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 64 deletions.
83 changes: 80 additions & 3 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;

use crate::handle_async_state;
use crate::joins::utils::{JoinFilter, JoinHashMapType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use crate::{handle_async_state, metrics};

use arrow::compute::concat_batches;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch};
Expand Down Expand Up @@ -824,6 +825,10 @@ pub trait EagerJoinStream {
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.right_stream().next().await {
Some(Ok(batch)) => {
if batch.num_rows() == 0 {
return Ok(StreamJoinStateResult::Continue);
}

self.set_state(EagerJoinStreamState::PullLeft);
self.process_batch_from_right(batch)
}
Expand All @@ -849,6 +854,9 @@ pub trait EagerJoinStream {
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.left_stream().next().await {
Some(Ok(batch)) => {
if batch.num_rows() == 0 {
return Ok(StreamJoinStateResult::Continue);
}
self.set_state(EagerJoinStreamState::PullRight);
self.process_batch_from_left(batch)
}
Expand All @@ -874,7 +882,12 @@ pub trait EagerJoinStream {
&mut self,
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.left_stream().next().await {
Some(Ok(batch)) => self.process_batch_after_right_end(batch),
Some(Ok(batch)) => {
if batch.num_rows() == 0 {
return Ok(StreamJoinStateResult::Continue);
}
self.process_batch_after_right_end(batch)
}
Some(Err(e)) => Err(e),
None => {
self.set_state(EagerJoinStreamState::BothExhausted {
Expand All @@ -899,7 +912,12 @@ pub trait EagerJoinStream {
&mut self,
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.right_stream().next().await {
Some(Ok(batch)) => self.process_batch_after_left_end(batch),
Some(Ok(batch)) => {
if batch.num_rows() == 0 {
return Ok(StreamJoinStateResult::Continue);
}
self.process_batch_after_left_end(batch)
}
Some(Err(e)) => Err(e),
None => {
self.set_state(EagerJoinStreamState::BothExhausted {
Expand Down Expand Up @@ -1020,6 +1038,65 @@ pub trait EagerJoinStream {
fn state(&mut self) -> EagerJoinStreamState;
}

#[derive(Debug)]
pub struct StreamJoinSideMetrics {
/// Number of batches consumed by this operator
pub(crate) input_batches: metrics::Count,
/// Number of rows consumed by this operator
pub(crate) input_rows: metrics::Count,
}

/// Metrics for HashJoinExec
#[derive(Debug)]
pub struct StreamJoinMetrics {
/// Number of left batches/rows consumed by this operator
pub(crate) left: StreamJoinSideMetrics,
/// Number of right batches/rows consumed by this operator
pub(crate) right: StreamJoinSideMetrics,
/// Memory used by sides in bytes
pub(crate) stream_memory_usage: metrics::Gauge,
/// Number of batches produced by this operator
pub(crate) output_batches: metrics::Count,
/// Number of rows produced by this operator
pub(crate) output_rows: metrics::Count,
}

impl StreamJoinMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let left = StreamJoinSideMetrics {
input_batches,
input_rows,
};

let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let right = StreamJoinSideMetrics {
input_batches,
input_rows,
};

let stream_memory_usage =
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
left,
right,
output_batches,
stream_memory_usage,
output_rows,
}
}
}

#[cfg(test)]
pub mod tests {
use std::sync::Arc;
Expand Down
64 changes: 3 additions & 61 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use crate::joins::stream_join_utils::{
calculate_filter_expr_intervals, combine_two_batches,
convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
get_pruning_semi_indices, record_visited_indices, EagerJoinStream,
EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinStateResult,
EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
StreamJoinStateResult,
};
use crate::joins::utils::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
Expand All @@ -47,7 +48,7 @@ use crate::joins::utils::{
use crate::{
expressions::{Column, PhysicalSortExpr},
joins::StreamJoinPartitionMode,
metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -184,65 +185,6 @@ pub struct SymmetricHashJoinExec {
mode: StreamJoinPartitionMode,
}

#[derive(Debug)]
pub struct StreamJoinSideMetrics {
/// Number of batches consumed by this operator
pub(crate) input_batches: metrics::Count,
/// Number of rows consumed by this operator
pub(crate) input_rows: metrics::Count,
}

/// Metrics for HashJoinExec
#[derive(Debug)]
pub struct StreamJoinMetrics {
/// Number of left batches/rows consumed by this operator
pub(crate) left: StreamJoinSideMetrics,
/// Number of right batches/rows consumed by this operator
pub(crate) right: StreamJoinSideMetrics,
/// Memory used by sides in bytes
pub(crate) stream_memory_usage: metrics::Gauge,
/// Number of batches produced by this operator
pub(crate) output_batches: metrics::Count,
/// Number of rows produced by this operator
pub(crate) output_rows: metrics::Count,
}

impl StreamJoinMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let left = StreamJoinSideMetrics {
input_batches,
input_rows,
};

let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let right = StreamJoinSideMetrics {
input_batches,
input_rows,
};

let stream_memory_usage =
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
left,
right,
output_batches,
stream_memory_usage,
output_rows,
}
}
}

impl SymmetricHashJoinExec {
/// Tries to create a new [SymmetricHashJoinExec].
/// # Error
Expand Down

0 comments on commit bf0073c

Please sign in to comment.