-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle ordering of first last aggregation inside aggregator #8662
Changes from 9 commits
edcef77
da1cf71
202936d
05bdc81
416ac3a
06adf25
e208ebf
0ece593
298fcf0
64666df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
use std::any::Any; | ||
use std::sync::Arc; | ||
|
||
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; | ||
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; | ||
use crate::expressions::format_state_name; | ||
use crate::{ | ||
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, | ||
|
@@ -29,9 +29,10 @@ use crate::{ | |
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; | ||
use arrow::compute::{self, lexsort_to_indices, SortColumn}; | ||
use arrow::datatypes::{DataType, Field}; | ||
use arrow_schema::SortOptions; | ||
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; | ||
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; | ||
use datafusion_common::{ | ||
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, | ||
}; | ||
use datafusion_expr::Accumulator; | ||
|
||
/// FIRST_VALUE aggregate expression | ||
|
@@ -211,10 +212,45 @@ impl FirstValueAccumulator { | |
} | ||
|
||
// Updates state with the values in the given row. | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) { | ||
self.first = row[0].clone(); | ||
self.orderings = row[1..].to_vec(); | ||
self.is_set = true; | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { | ||
let [value, orderings @ ..] = row else { | ||
return internal_err!("Empty row in FIRST_VALUE"); | ||
}; | ||
// Update when there is no entry in the state, or we have an "earlier" | ||
// entry according to sort requirements. | ||
if !self.is_set | ||
|| compare_rows( | ||
&self.orderings, | ||
orderings, | ||
&get_sort_options(&self.ordering_req), | ||
)? | ||
.is_gt() | ||
{ | ||
self.first = value.clone(); | ||
self.orderings = orderings.to_vec(); | ||
self.is_set = true; | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> { | ||
let [value, ordering_values @ ..] = values else { | ||
return internal_err!("Empty row in FIRST_VALUE"); | ||
}; | ||
if self.ordering_req.is_empty() { | ||
// Get first entry according to receive order (0th index) | ||
return Ok((!value.is_empty()).then_some(0)); | ||
} | ||
let sort_columns = ordering_values | ||
.iter() | ||
.zip(self.ordering_req.iter()) | ||
.map(|(values, req)| SortColumn { | ||
values: values.clone(), | ||
options: Some(req.options), | ||
}) | ||
.collect::<Vec<_>>(); | ||
let indices = lexsort_to_indices(&sort_columns, Some(1))?; | ||
Ok((!indices.is_empty()).then_some(indices.value(0) as _)) | ||
} | ||
} | ||
|
||
|
@@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator { | |
} | ||
|
||
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
// If we have seen first value, we shouldn't update it | ||
if !values[0].is_empty() && !self.is_set { | ||
let row = get_row_at_idx(values, 0)?; | ||
// Update with first value in the array. | ||
self.update_with_new_row(&row); | ||
if let Some(first_idx) = self.get_first_idx(values)? { | ||
let row = get_row_at_idx(values, first_idx)?; | ||
self.update_with_new_row(&row)?; | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator { | |
// Update with first value in the state. Note that we should exclude the | ||
// is_set flag from the state. Otherwise, we will end up with a state | ||
// containing two is_set flags. | ||
self.update_with_new_row(&first_row[0..is_set_idx]); | ||
self.update_with_new_row(&first_row[0..is_set_idx])?; | ||
} | ||
} | ||
Ok(()) | ||
|
@@ -459,10 +493,50 @@ impl LastValueAccumulator { | |
} | ||
|
||
// Updates state with the values in the given row. | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) { | ||
self.last = row[0].clone(); | ||
self.orderings = row[1..].to_vec(); | ||
self.is_set = true; | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { | ||
let [value, orderings @ ..] = row else { | ||
return internal_err!("Empty row in LAST_VALUE"); | ||
}; | ||
// Update when there is no entry in the state, or we have a "later" | ||
// entry (either according to sort requirements or the order of execution). | ||
if !self.is_set | ||
|| self.orderings.is_empty() | ||
|| compare_rows( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I re-reviewed and I agree that the |
||
&self.orderings, | ||
orderings, | ||
&get_sort_options(&self.ordering_req), | ||
)? | ||
.is_lt() | ||
{ | ||
self.last = value.clone(); | ||
self.orderings = orderings.to_vec(); | ||
self.is_set = true; | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> { | ||
let [value, ordering_values @ ..] = values else { | ||
return internal_err!("Empty row in LAST_VALUE"); | ||
}; | ||
if self.ordering_req.is_empty() { | ||
// Get last entry according to the order of data: | ||
return Ok((!value.is_empty()).then_some(value.len() - 1)); | ||
} | ||
let sort_columns = ordering_values | ||
.iter() | ||
.zip(self.ordering_req.iter()) | ||
.map(|(values, req)| { | ||
// Take the reverse ordering requirement. This enables us to | ||
// use "fetch = 1" to get the last value. | ||
SortColumn { | ||
values: values.clone(), | ||
options: Some(!req.options), | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
let indices = lexsort_to_indices(&sort_columns, Some(1))?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not aware of a min/max kernel that returns the ordinal position of the min/max There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127 I think we should stay with the |
||
Ok((!indices.is_empty()).then_some(indices.value(0) as _)) | ||
} | ||
} | ||
|
||
|
@@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator { | |
} | ||
|
||
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
if !values[0].is_empty() { | ||
let row = get_row_at_idx(values, values[0].len() - 1)?; | ||
// Update with last value in the array. | ||
self.update_with_new_row(&row); | ||
if let Some(last_idx) = self.get_last_idx(values)? { | ||
let row = get_row_at_idx(values, last_idx)?; | ||
self.update_with_new_row(&row)?; | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator { | |
// Update with last value in the state. Note that we should exclude the | ||
// is_set flag from the state. Otherwise, we will end up with a state | ||
// containing two is_set flags. | ||
self.update_with_new_row(&last_row[0..is_set_idx]); | ||
self.update_with_new_row(&last_row[0..is_set_idx])?; | ||
} | ||
} | ||
Ok(()) | ||
|
@@ -559,26 +632,18 @@ fn convert_to_sort_cols( | |
.collect::<Vec<_>>() | ||
} | ||
|
||
/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. | ||
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> { | ||
ordering_req | ||
.iter() | ||
.map(|item| item.options) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::sync::Arc; | ||
|
||
use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator}; | ||
|
||
use arrow::compute::concat; | ||
use arrow_array::{ArrayRef, Int64Array}; | ||
use arrow_schema::DataType; | ||
use datafusion_common::{Result, ScalarValue}; | ||
use datafusion_expr::Accumulator; | ||
|
||
use arrow::compute::concat; | ||
use std::sync::Arc; | ||
|
||
#[test] | ||
fn test_first_last_value_value() -> Result<()> { | ||
let mut first_accumulator = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,16 +15,20 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; | ||
use crate::{PhysicalExpr, PhysicalSortExpr}; | ||
use arrow::datatypes::Field; | ||
use datafusion_common::{not_impl_err, DataFusionError, Result}; | ||
use datafusion_expr::Accumulator; | ||
use std::any::Any; | ||
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
|
||
use self::groups_accumulator::GroupsAccumulator; | ||
use crate::expressions::OrderSensitiveArrayAgg; | ||
use crate::{PhysicalExpr, PhysicalSortExpr}; | ||
|
||
use arrow::datatypes::Field; | ||
use datafusion_common::{not_impl_err, DataFusionError, Result}; | ||
use datafusion_expr::Accumulator; | ||
|
||
mod hyperloglog; | ||
mod tdigest; | ||
|
||
pub(crate) mod approx_distinct; | ||
pub(crate) mod approx_median; | ||
|
@@ -46,19 +50,18 @@ pub(crate) mod median; | |
pub(crate) mod string_agg; | ||
#[macro_use] | ||
pub(crate) mod min_max; | ||
pub mod build_in; | ||
pub(crate) mod groups_accumulator; | ||
mod hyperloglog; | ||
pub mod moving_min_max; | ||
pub(crate) mod regr; | ||
pub(crate) mod stats; | ||
pub(crate) mod stddev; | ||
pub(crate) mod sum; | ||
pub(crate) mod sum_distinct; | ||
mod tdigest; | ||
pub mod utils; | ||
pub(crate) mod variance; | ||
|
||
pub mod build_in; | ||
pub mod moving_min_max; | ||
pub mod utils; | ||
|
||
/// An aggregate expression that: | ||
/// * knows its resulting field | ||
/// * knows how to create its accumulator | ||
|
@@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> { | |
|
||
/// Checks whether the given aggregate expression is order-sensitive. | ||
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. | ||
/// However, a `FirstValue` depends on the input ordering (if the order changes, | ||
/// the first value in the list would change). | ||
/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering. | ||
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool { | ||
aggr_expr.as_any().is::<FirstValue>() | ||
|| aggr_expr.as_any().is::<LastValue>() | ||
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() | ||
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually this would be a nice thing to move into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this idea 👍 |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, we may be able to use a
Option<ScalarValue>
instead ofScalarValue
andis_set
flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍