diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 0b9d9176f421..f96429890d46 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -276,8 +276,9 @@ impl GroupValuesColumn { self.vectorized_equal_to_group_indices.clear(); self.empty_buckets.clear(); self.occupied_buckets.clear(); - self.group_values_len = self.group_values[0].len(); + self.remaining_indices.clear(); + self.group_values_len = self.group_values[0].len(); for &row in self.current_indices.iter() { let target_hash = batch_hashes[row]; let entry = self.map.find(target_hash, |(exist_hash, _)| { @@ -456,13 +457,21 @@ impl GroupValuesColumn { return; } + let group_len_before_appending = self.group_values[0].len(); let iter = self.group_values.iter_mut().zip(cols.iter()); for (group_column, col) in iter { group_column.vectorized_append(col, &self.vectorized_append_row_indices); } - assert_eq!(self.group_values[0].len(), self.group_values_len); + let iter = self + .vectorized_append_row_indices + .iter() + .zip(group_len_before_appending..self.group_values_len); + for (&row, group_idx) in iter { + groups[row] = group_idx; + } + // Set back `index_lists_updates`. self.index_lists_updates = index_lists_updates; } @@ -647,7 +656,8 @@ impl GroupValues for GroupValuesColumn { self.group_index_lists[new_idx] = new_next_idx; } - self.group_index_lists.resize(self.group_values[0].len() + 1, 0); + self.group_index_lists + .resize(self.group_values[0].len() + 1, 0); output } @@ -690,3 +700,100 @@ impl GroupValues for GroupValuesColumn { self.vectorized_equal_to_results.clear(); } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int64Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + use crate::aggregates::group_values::{column::GroupValuesColumn, GroupValues}; + + #[test] + fn test() { + // *************************************************************** + // The test group cols, the schema is `a(Int64) + b(String)`. + // It should cover following input rows situations: + // - a: null + b: null + // - a: not null + b: null + // - a: null + b: not null + // - a: not null + b: not null + // + // And it should cover following repeating situations: + // - Rows unique + // - Rows repeating in two `cols` + // - Rows repeating in single `cols` + // *************************************************************** + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + ])); + // // Case 1 + // Cols 1 + let a: ArrayRef = Arc::new(Int64Array::from(vec![ + None, + Some(42), + None, + Some(24), + Some(4224), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec![ + None, + None, + Some("42"), + Some("24"), + Some("4224"), + ])); + let cols1 = vec![a, b]; + + // Cols 2 + let a: ArrayRef = Arc::new(Int64Array::from(vec![ + None, + Some(42), + None, + Some(24), + Some(2442), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec![ + None, + None, + Some("42"), + Some("24"), + Some("2442"), + ])); + let cols2 = vec![a, b]; + + // Cols 3 + let a: ArrayRef = Arc::new(Int64Array::from(vec![ + None, + Some(42), + None, + Some(24), + None, + Some(42), + None, + Some(24), + Some(4224), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec![ + None, + None, + Some("42"), + Some("24"), + None, + None, + Some("42"), + Some("24"), + Some("4224"), + ])); + let cols3 = vec![a, b]; + + let mut group_values = GroupValuesColumn::try_new(schema).unwrap(); + let mut groups = Vec::new(); + group_values.intern(&cols1, &mut groups).unwrap(); + group_values.intern(&cols2, &mut groups).unwrap(); + group_values.intern(&cols3, &mut groups).unwrap(); + } +}