From bf0073c03ace1e4212f5895c529592d9925bf28d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Metehan=20Y=C4=B1ld=C4=B1r=C4=B1m?= <100111937+metesynnada@users.noreply.github.com> Date: Fri, 15 Dec 2023 16:10:12 +0200 Subject: [PATCH] [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557) * minor changes * Fix imports --------- Co-authored-by: Mehmet Ozan Kabak --- .../src/joins/stream_join_utils.rs | 83 ++++++++++++++++++- .../src/joins/symmetric_hash_join.rs | 64 +------------- 2 files changed, 83 insertions(+), 64 deletions(-) diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 5083f96b01fb..2f74bd1c4bb2 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -23,8 +23,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::usize; -use crate::handle_async_state; use crate::joins::utils::{JoinFilter, JoinHashMapType}; +use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; +use crate::{handle_async_state, metrics}; use arrow::compute::concat_batches; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch}; @@ -824,6 +825,10 @@ pub trait EagerJoinStream { ) -> Result>> { match self.right_stream().next().await { Some(Ok(batch)) => { + if batch.num_rows() == 0 { + return Ok(StreamJoinStateResult::Continue); + } + self.set_state(EagerJoinStreamState::PullLeft); self.process_batch_from_right(batch) } @@ -849,6 +854,9 @@ pub trait EagerJoinStream { ) -> Result>> { match self.left_stream().next().await { Some(Ok(batch)) => { + if batch.num_rows() == 0 { + return Ok(StreamJoinStateResult::Continue); + } self.set_state(EagerJoinStreamState::PullRight); self.process_batch_from_left(batch) } @@ -874,7 +882,12 @@ pub trait EagerJoinStream { &mut self, ) -> Result>> { match self.left_stream().next().await { - Some(Ok(batch)) => self.process_batch_after_right_end(batch), + Some(Ok(batch)) => { + if batch.num_rows() == 0 { + return Ok(StreamJoinStateResult::Continue); + } + self.process_batch_after_right_end(batch) + } Some(Err(e)) => Err(e), None => { self.set_state(EagerJoinStreamState::BothExhausted { @@ -899,7 +912,12 @@ pub trait EagerJoinStream { &mut self, ) -> Result>> { match self.right_stream().next().await { - Some(Ok(batch)) => self.process_batch_after_left_end(batch), + Some(Ok(batch)) => { + if batch.num_rows() == 0 { + return Ok(StreamJoinStateResult::Continue); + } + self.process_batch_after_left_end(batch) + } Some(Err(e)) => Err(e), None => { self.set_state(EagerJoinStreamState::BothExhausted { @@ -1020,6 +1038,65 @@ pub trait EagerJoinStream { fn state(&mut self) -> EagerJoinStreamState; } +#[derive(Debug)] +pub struct StreamJoinSideMetrics { + /// Number of batches consumed by this operator + pub(crate) input_batches: metrics::Count, + /// Number of rows consumed by this operator + pub(crate) input_rows: metrics::Count, +} + +/// Metrics for HashJoinExec +#[derive(Debug)] +pub struct StreamJoinMetrics { + /// Number of left batches/rows consumed by this operator + pub(crate) left: StreamJoinSideMetrics, + /// Number of right batches/rows consumed by this operator + pub(crate) right: StreamJoinSideMetrics, + /// Memory used by sides in bytes + pub(crate) stream_memory_usage: metrics::Gauge, + /// Number of batches produced by this operator + pub(crate) output_batches: metrics::Count, + /// Number of rows produced by this operator + pub(crate) output_rows: metrics::Count, +} + +impl StreamJoinMetrics { + pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let input_batches = + MetricBuilder::new(metrics).counter("input_batches", partition); + let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + let left = StreamJoinSideMetrics { + input_batches, + input_rows, + }; + + let input_batches = + MetricBuilder::new(metrics).counter("input_batches", partition); + let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + let right = StreamJoinSideMetrics { + input_batches, + input_rows, + }; + + let stream_memory_usage = + MetricBuilder::new(metrics).gauge("stream_memory_usage", partition); + + let output_batches = + MetricBuilder::new(metrics).counter("output_batches", partition); + + let output_rows = MetricBuilder::new(metrics).output_rows(partition); + + Self { + left, + right, + output_batches, + stream_memory_usage, + output_rows, + } + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc; diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 95f15877b960..00a7f23ebae7 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -37,7 +37,8 @@ use crate::joins::stream_join_utils::{ calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, get_pruning_semi_indices, record_visited_indices, EagerJoinStream, - EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinStateResult, + EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, + StreamJoinStateResult, }; use crate::joins::utils::{ build_batch_from_indices, build_join_schema, check_join_is_valid, @@ -47,7 +48,7 @@ use crate::joins::utils::{ use crate::{ expressions::{Column, PhysicalSortExpr}, joins::StreamJoinPartitionMode, - metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; @@ -184,65 +185,6 @@ pub struct SymmetricHashJoinExec { mode: StreamJoinPartitionMode, } -#[derive(Debug)] -pub struct StreamJoinSideMetrics { - /// Number of batches consumed by this operator - pub(crate) input_batches: metrics::Count, - /// Number of rows consumed by this operator - pub(crate) input_rows: metrics::Count, -} - -/// Metrics for HashJoinExec -#[derive(Debug)] -pub struct StreamJoinMetrics { - /// Number of left batches/rows consumed by this operator - pub(crate) left: StreamJoinSideMetrics, - /// Number of right batches/rows consumed by this operator - pub(crate) right: StreamJoinSideMetrics, - /// Memory used by sides in bytes - pub(crate) stream_memory_usage: metrics::Gauge, - /// Number of batches produced by this operator - pub(crate) output_batches: metrics::Count, - /// Number of rows produced by this operator - pub(crate) output_rows: metrics::Count, -} - -impl StreamJoinMetrics { - pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let left = StreamJoinSideMetrics { - input_batches, - input_rows, - }; - - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let right = StreamJoinSideMetrics { - input_batches, - input_rows, - }; - - let stream_memory_usage = - MetricBuilder::new(metrics).gauge("stream_memory_usage", partition); - - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); - - let output_rows = MetricBuilder::new(metrics).output_rows(partition); - - Self { - left, - right, - output_batches, - stream_memory_usage, - output_rows, - } - } -} - impl SymmetricHashJoinExec { /// Tries to create a new [SymmetricHashJoinExec]. /// # Error