Skip to content

Commit

Permalink
define GroupIndexContext.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 21, 2024
1 parent 13c9489 commit 5fd63e8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
48 changes: 40 additions & 8 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,42 @@ use datafusion_physical_expr::binary_map::OutputType;
use datafusion_physical_expr_common::datum::compare_with_eq;
use hashbrown::raw::RawTable;

/// Group index context for performing `vectorized compare` and `vectorized append`
struct GroupIndexContext {
/// It is possible that hash value collision exists,
/// and we will chain the `group indices` with same hash value
///
/// The chained indices is like:
/// `latest group index -> older group index -> even older group index -> ...`
prev_group_index: usize,

/// It is possible that rows with same hash values exist in `input cols`.
/// And if we `vectorized compare` and `vectorized append` them
/// in the same round, some fault cases will occur especially when
/// they are totally the repeated rows...
///
/// For example:
/// - Two repeated rows exist in `input cols`.
///
/// - We found their hash values equal to one exist group
///
/// - We then perform `vectorized compare` for them to the exist group,
/// and found their values not equal to the exist one
///
/// - Finally when perform `vectorized append`, we decide to build two
/// respective new groups for them, even we actually just need one
/// new group...
///
/// So for solving such cases simply, if some rows with same hash value
/// in `input cols`, just allow to process one of them in a round,
/// and this flag is used to represent that one of them is processing
/// in current round.
///
checking: bool,
}

/// A [`GroupValues`] that stores multiple columns of group values.
///
///
pub struct GroupValuesColumn {
/// The output schema
schema: SchemaRef,
Expand All @@ -62,6 +95,11 @@ pub struct GroupValuesColumn {
/// values: (hash, group_index)
map: RawTable<(u64, usize)>,

group_index_ctxs: Vec<GroupIndexContext>,

/// Some
remaining_indices: Vec<usize>,

/// The size of `map` in bytes
map_size: usize,

Expand Down Expand Up @@ -94,6 +132,7 @@ impl GroupValuesColumn {
Ok(Self {
schema,
map,
group_index_ctxs: Vec::new(),
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
Expand Down Expand Up @@ -160,13 +199,6 @@ macro_rules! instantiate_primitive {
};
}

fn append_col_value<C>(mut core: C, array: &ArrayRef, row: usize)
where
C: FnMut(&ArrayRef, usize),
{
core(array, row);
}

impl GroupValues for GroupValuesColumn {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
let n_rows = cols[0].len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
all_non_null: bool,
) {
let arr = array.as_byte_view::<B>();

if all_non_null {
self.nulls.append_n(rows.len(), false);
for &row in rows {
Expand Down

0 comments on commit 5fd63e8

Please sign in to comment.