Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ordering of first last aggregation inside aggregator #8662

Merged
merged 10 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 96 additions & 29 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
Expand All @@ -29,7 +29,6 @@ use crate::{
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -211,10 +210,47 @@ impl FirstValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no entry in the state
// - There is an earlier entry in according to requirements
if !self.is_set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, we may be able to use a Option<ScalarValue> instead of ScalarValue and is_set flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍

|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_gt()
{
self.first = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let value = &values[0];
let ordering_values = &values[1..];
assert_eq!(ordering_values.len(), self.ordering_req.len());
if self.ordering_req.is_empty() {
return Ok((!value.is_empty()).then_some(0));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| SortColumn {
values: values.clone(),
options: Some(req.options),
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
if !indices.is_empty() {
return Ok(Some(indices.value(0) as usize));
}
Ok(None)
}
}

Expand All @@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
if !values[0].is_empty() && !self.is_set {
let row = get_row_at_idx(values, 0)?;
// Update with first value in the array.
self.update_with_new_row(&row);
if let Some(first_idx) = self.get_first_idx(values)? {
let row = get_row_at_idx(values, first_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator {
// Update with first value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&first_row[0..is_set_idx]);
self.update_with_new_row(&first_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -459,10 +493,52 @@ impl LastValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.last = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no value in the state
// - There is no specific requirement, but a new value (most recent entry in terms of execution)
// - There is a more recent entry in terms of requirement
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here.

Copy link
Contributor

@alamb alamb Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that since it is a single column max comparison this is probably fine (and no worse than the current implementation). If we need to optimize performance we could probably implement specialized implementations (like FirstValue<ArrowPrimitiveType> and skip the copying entirely.

That is likely a premature optimization at this point

Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.

&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
{
self.last = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let value = &values[0];
let ordering_values = &values[1..];
assert_eq!(ordering_values.len(), self.ordering_req.len());
if self.ordering_req.is_empty() {
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take reverse ordering requirement this would enable us to use fetch=1 for last value.
SortColumn {
values: values.clone(),
options: Some(!req.options),
}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of a min/max kernel that returns the ordinal position of the min/max

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special selector_first, selector_last, etc functions in InfluxDB 3.0 (I also had to code them specially)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here

https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs

And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127

I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time

if !indices.is_empty() {
return Ok(Some(indices.value(0) as usize));
}
Ok(None)
}
}

Expand All @@ -475,10 +551,9 @@ impl Accumulator for LastValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !values[0].is_empty() {
let row = get_row_at_idx(values, values[0].len() - 1)?;
// Update with last value in the array.
self.update_with_new_row(&row);
if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -515,7 +590,7 @@ impl Accumulator for LastValueAccumulator {
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&last_row[0..is_set_idx]);
self.update_with_new_row(&last_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -559,14 +634,6 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
}

#[cfg(test)]
mod tests {
use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator};
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
use crate::expressions::OrderSensitiveArrayAgg;
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
Expand Down Expand Up @@ -134,10 +134,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {

/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
/// However, a `FirstValue` depends on the input ordering (if the order changes,
/// the first value in the list would change).
/// However, a `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually this would be a nice thing to move into the AggregateExpr trait directly so we could override it and avoid special casing built in functions. Not for this PR though :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea 👍

}
10 changes: 9 additions & 1 deletion datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow_array::types::{
};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
Expand Down Expand Up @@ -205,3 +205,11 @@ pub(crate) fn ordering_fields(
})
.collect()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
}
Loading
Loading