Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak committed Dec 28, 2023
1 parent 0ece593 commit 298fcf0
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 108 deletions.
58 changes: 27 additions & 31 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Accumulator;

/// FIRST_VALUE aggregate expression
Expand Down Expand Up @@ -211,11 +213,11 @@ impl FirstValueAccumulator {

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no entry in the state
// - There is an earlier entry in according to requirements
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in FIRST_VALUE");
};
// Update when there is no entry in the state, or we have an "earlier"
// entry according to sort requirements.
if !self.is_set
|| compare_rows(
&self.orderings,
Expand All @@ -232,9 +234,9 @@ impl FirstValueAccumulator {
}

fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let value = &values[0];
let ordering_values = &values[1..];
assert_eq!(ordering_values.len(), self.ordering_req.len());
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in FIRST_VALUE");
};
if self.ordering_req.is_empty() {
// Get first entry according to receive order (0th index)
return Ok((!value.is_empty()).then_some(0));
Expand All @@ -248,10 +250,7 @@ impl FirstValueAccumulator {
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
if !indices.is_empty() {
return Ok(Some(indices.value(0) as usize));
}
Ok(None)
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand Down Expand Up @@ -495,12 +494,11 @@ impl LastValueAccumulator {

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no value in the state
// - There is no specific requirement, but a new value (most recent entry in terms of execution)
// - There is a more recent entry in terms of requirement
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in LAST_VALUE");
};
// Update when there is no entry in the state, or we have a "later"
// entry (either according to sort requirements or the order of execution).
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
Expand All @@ -518,29 +516,27 @@ impl LastValueAccumulator {
}

fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let value = &values[0];
let ordering_values = &values[1..];
assert_eq!(ordering_values.len(), self.ordering_req.len());
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};
if self.ordering_req.is_empty() {
// Get last entry according to receive order (last index)
// Get last entry according to the order of data:
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take reverse ordering requirement this enables us to use fetch=1 for last value.
// Take the reverse ordering requirement. This enables us to
// use "fetch = 1" to get the last value.
SortColumn {
values: values.clone(),
options: Some(!req.options),
}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
if !indices.is_empty() {
return Ok(Some(indices.value(0) as usize));
}
Ok(None)
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand Down Expand Up @@ -638,16 +634,16 @@ fn convert_to_sort_cols(

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator};

use arrow::compute::concat;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;

use arrow::compute::concat;
use std::sync::Arc;

#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator =
Expand Down
23 changes: 13 additions & 10 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use self::groups_accumulator::GroupsAccumulator;
use crate::expressions::OrderSensitiveArrayAgg;
use crate::{PhysicalExpr, PhysicalSortExpr};

use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use self::groups_accumulator::GroupsAccumulator;
mod hyperloglog;
mod tdigest;

pub(crate) mod approx_distinct;
pub(crate) mod approx_median;
Expand All @@ -46,19 +50,18 @@ pub(crate) mod median;
pub(crate) mod string_agg;
#[macro_use]
pub(crate) mod min_max;
pub mod build_in;
pub(crate) mod groups_accumulator;
mod hyperloglog;
pub mod moving_min_max;
pub(crate) mod regr;
pub(crate) mod stats;
pub(crate) mod stddev;
pub(crate) mod sum;
pub(crate) mod sum_distinct;
mod tdigest;
pub mod utils;
pub(crate) mod variance;

pub mod build_in;
pub mod moving_min_max;
pub mod utils;

/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
Expand Down Expand Up @@ -134,7 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {

/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
/// However, a `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
}
16 changes: 7 additions & 9 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

//! Utilities used in aggregates
use std::any::Any;
use std::sync::Arc;

use crate::{AggregateExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;

use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;

/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
Expand All @@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays(
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Result<Vec<_>>>()
.collect()
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down Expand Up @@ -208,8 +209,5 @@ pub(crate) fn ordering_fields(

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
ordering_req.iter().map(|item| item.options).collect()
}
Loading

0 comments on commit 298fcf0

Please sign in to comment.