From a1645c4c1d6578d425e6a86648e32f13bca5cd43 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 5 Aug 2024 20:29:56 -0400 Subject: [PATCH] Minor: refactor probe check into function `should_skip_aggregation` (#11821) --- .../physical-plan/src/aggregates/row_hash.rs | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 62ed79dad4aa..1b84befb0269 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -635,11 +635,7 @@ impl Stream for GroupedHashAggregateStream { ( if self.input_done { ExecutionState::Done - } else if self - .skip_aggregation_probe - .as_ref() - .is_some_and(|probe| probe.should_skip()) - { + } else if self.should_skip_aggregation() { ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput @@ -955,12 +951,13 @@ impl GroupedHashAggregateStream { Ok(()) } - // Updates skip aggregation probe state. - // In case stream has any spills, the probe is forcefully set to - // forbid aggregation skipping, and locked, since spilling resets - // total number of unique groups. - // - // Note: currently spilling is not supported for Partial aggregation + /// Updates skip aggregation probe state. + /// + /// In case stream has any spills, the probe is forcefully set to + /// forbid aggregation skipping, and locked, since spilling resets + /// total number of unique groups. + /// + /// Note: currently spilling is not supported for Partial aggregation fn update_skip_aggregation_probe(&mut self, input_rows: usize) { if let Some(probe) = self.skip_aggregation_probe.as_mut() { if !self.spill_state.spills.is_empty() { @@ -971,8 +968,8 @@ impl GroupedHashAggregateStream { }; } - // In case the probe indicates that aggregation may be - // skipped, forces stream to produce currently accumulated output. + /// In case the probe indicates that aggregation may be + /// skipped, forces stream to produce currently accumulated output. fn switch_to_skip_aggregation(&mut self) -> Result<()> { if let Some(probe) = self.skip_aggregation_probe.as_mut() { if probe.should_skip() { @@ -984,7 +981,15 @@ impl GroupedHashAggregateStream { Ok(()) } - // Transforms input batch to intermediate aggregate state, without grouping it + /// Returns true if the aggregation probe indicates that aggregation + /// should be skipped. + fn should_skip_aggregation(&self) -> bool { + self.skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + } + + /// Transforms input batch to intermediate aggregate state, without grouping it fn transform_to_states(&self, batch: RecordBatch) -> Result { let group_values = evaluate_group_by(&self.group_by, &batch)?; let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;