Skip to content

Commit

Permalink
fix renaming clear and groups fill.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 27, 2024
1 parent 150248f commit 37d68e6
Showing 1 changed file with 110 additions and 3 deletions.
113 changes: 110 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,9 @@ impl GroupValuesColumn {
self.vectorized_equal_to_group_indices.clear();
self.empty_buckets.clear();
self.occupied_buckets.clear();
self.group_values_len = self.group_values[0].len();
self.remaining_indices.clear();

self.group_values_len = self.group_values[0].len();
for &row in self.current_indices.iter() {
let target_hash = batch_hashes[row];
let entry = self.map.find(target_hash, |(exist_hash, _)| {
Expand Down Expand Up @@ -456,13 +457,21 @@ impl GroupValuesColumn {
return;
}

let group_len_before_appending = self.group_values[0].len();
let iter = self.group_values.iter_mut().zip(cols.iter());
for (group_column, col) in iter {
group_column.vectorized_append(col, &self.vectorized_append_row_indices);
}

assert_eq!(self.group_values[0].len(), self.group_values_len);

let iter = self
.vectorized_append_row_indices
.iter()
.zip(group_len_before_appending..self.group_values_len);
for (&row, group_idx) in iter {
groups[row] = group_idx;
}

// Set back `index_lists_updates`.
self.index_lists_updates = index_lists_updates;
}
Expand Down Expand Up @@ -647,7 +656,8 @@ impl GroupValues for GroupValuesColumn {

self.group_index_lists[new_idx] = new_next_idx;
}
self.group_index_lists.resize(self.group_values[0].len() + 1, 0);
self.group_index_lists
.resize(self.group_values[0].len() + 1, 0);

output
}
Expand Down Expand Up @@ -690,3 +700,100 @@ impl GroupValues for GroupValuesColumn {
self.vectorized_equal_to_results.clear();
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::{ArrayRef, Int64Array, StringArray};
use arrow_schema::{DataType, Field, Schema};

use crate::aggregates::group_values::{column::GroupValuesColumn, GroupValues};

#[test]
fn test() {
// ***************************************************************
// The test group cols, the schema is `a(Int64) + b(String)`.
// It should cover following input rows situations:
// - a: null + b: null
// - a: not null + b: null
// - a: null + b: not null
// - a: not null + b: not null
//
// And it should cover following repeating situations:
// - Rows unique
// - Rows repeating in two `cols`
// - Rows repeating in single `cols`
// ***************************************************************

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
]));
// // Case 1
// Cols 1
let a: ArrayRef = Arc::new(Int64Array::from(vec![
None,
Some(42),
None,
Some(24),
Some(4224),
]));
let b: ArrayRef = Arc::new(StringArray::from(vec![
None,
None,
Some("42"),
Some("24"),
Some("4224"),
]));
let cols1 = vec![a, b];

// Cols 2
let a: ArrayRef = Arc::new(Int64Array::from(vec![
None,
Some(42),
None,
Some(24),
Some(2442),
]));
let b: ArrayRef = Arc::new(StringArray::from(vec![
None,
None,
Some("42"),
Some("24"),
Some("2442"),
]));
let cols2 = vec![a, b];

// Cols 3
let a: ArrayRef = Arc::new(Int64Array::from(vec![
None,
Some(42),
None,
Some(24),
None,
Some(42),
None,
Some(24),
Some(4224),
]));
let b: ArrayRef = Arc::new(StringArray::from(vec![
None,
None,
Some("42"),
Some("24"),
None,
None,
Some("42"),
Some("24"),
Some("4224"),
]));
let cols3 = vec![a, b];

let mut group_values = GroupValuesColumn::try_new(schema).unwrap();
let mut groups = Vec::new();
group_values.intern(&cols1, &mut groups).unwrap();
group_values.intern(&cols2, &mut groups).unwrap();
group_values.intern(&cols3, &mut groups).unwrap();
}
}

0 comments on commit 37d68e6

Please sign in to comment.