From 5bb1805eb202fdb8e96e0f36f31b49cab3000ee3 Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 30 Aug 2024 15:17:35 +0800 Subject: [PATCH] test new group index. --- .../src/aggregate/groups_accumulator.rs | 40 ++-- .../groups_accumulator/accumulate.rs | 53 ++---- datafusion/functions-aggregate/src/count.rs | 72 +++----- .../src/aggregates/group_values/row.rs | 171 ++++++++---------- 4 files changed, 136 insertions(+), 200 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index f96fef92d39c3..6efca93cce06d 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -44,6 +44,7 @@ use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub const MAX_PREALLOC_BLOCK_SIZE: usize = 8192; +const GROUP_INDEX_DATA_MASK: u64 = 0x7fffffffffffffff; /// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] /// @@ -452,37 +453,36 @@ impl EmitToExt for EmitTo { pub struct BlockedGroupIndex { pub block_id: u32, pub block_offset: u64, - pub is_blocked: bool, + pub flag: u8, } impl BlockedGroupIndex { #[inline] - pub fn new_from_parts(block_id: u32, block_offset: u64, is_blocked: bool) -> Self { + pub fn new_from_parts(flag: u8, block_id: u32, block_offset: u64) -> Self { Self { block_id, block_offset, - is_blocked, + flag, } } #[inline] - pub fn new_flat(raw_index: usize) -> Self { - Self { - block_id: 0, - block_offset: raw_index as u64, - is_blocked: false, - } - } - - #[inline] - pub fn new_blocked(raw_index: usize) -> Self { - let block_id = ((raw_index as u64 >> 32) & 0x00000000ffffffff) as u32; - let block_offset = (raw_index as u64) & 0x00000000ffffffff; + pub fn new(raw_index: usize) -> Self { + let raw_index = raw_index as u64; + let flag = raw_index >> 63; + let data = raw_index & GROUP_INDEX_DATA_MASK; + let (highs, lows) = ((data >> 32) as u32, data as u32); + + let block_id = highs * flag as u32; + let block_offset = { + let offset_high = highs as u64 * (1 - flag); + (offset_high << 32) | (lows as u64) + }; Self { block_id, block_offset, - is_blocked: true, + flag: flag as u8, } } @@ -496,12 +496,10 @@ impl BlockedGroupIndex { self.block_offset as usize } + #[inline] pub fn as_packed_index(&self) -> usize { - if self.is_blocked { - (((self.block_id as u64) << 32) | self.block_offset) as usize - } else { - self.block_offset as usize - } + (((self.flag as u64) << 63) | ((self.block_id as u64) << 32) | self.block_offset) + as usize } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 835b4df93e0ed..b9e6bf64adf85 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -304,31 +304,16 @@ impl BlockedNullState { ); let seen_values_blocks = &mut self.seen_values_blocks; - if self.block_size.is_some() { - do_blocked_accumulate( - group_indices, - values, - opt_filter, - BlockedGroupIndex::new_blocked, - value_fn, - |index: &BlockedGroupIndex| { - seen_values_blocks[index.block_id()] - .set_bit(index.block_offset(), true); - }, - ) - } else { - do_blocked_accumulate( - group_indices, - values, - opt_filter, - BlockedGroupIndex::new_flat, - value_fn, - |index: &BlockedGroupIndex| { - seen_values_blocks[index.block_id()] - .set_bit(index.block_offset(), true); - }, - ); - } + do_blocked_accumulate( + group_indices, + values, + opt_filter, + value_fn, + |group_index| { + seen_values_blocks[group_index.block_id()] + .set_bit(group_index.block_offset(), true); + }, + ) } /// Similar as [NullState::build] but support the blocked version accumulator @@ -598,16 +583,14 @@ pub fn accumulate_indices( } } -fn do_blocked_accumulate( +fn do_blocked_accumulate( group_indices: &[usize], values: &PrimitiveArray, opt_filter: Option<&BooleanArray>, - group_index_parse_fn: G, mut value_fn: F1, mut set_valid_fn: F2, ) where T: ArrowPrimitiveType + Send, - G: Fn(usize) -> BlockedGroupIndex, F1: FnMut(&BlockedGroupIndex, T::Native) + Send, F2: FnMut(&BlockedGroupIndex) + Send, { @@ -617,7 +600,7 @@ fn do_blocked_accumulate( (false, None) => { let iter = group_indices.iter().zip(data.iter()); for (&group_index, &new_value) in iter { - let blocked_index = group_index_parse_fn(group_index); + let blocked_index = BlockedGroupIndex::new(group_index); set_valid_fn(&blocked_index); value_fn(&blocked_index, new_value); } @@ -645,7 +628,7 @@ fn do_blocked_accumulate( // valid bit was set, real value let is_valid = (mask & index_mask) != 0; if is_valid { - let blocked_index = group_index_parse_fn(group_index); + let blocked_index = BlockedGroupIndex::new(group_index); set_valid_fn(&blocked_index); value_fn(&blocked_index, new_value); } @@ -663,7 +646,7 @@ fn do_blocked_accumulate( .for_each(|(i, (&group_index, &new_value))| { let is_valid = remainder_bits & (1 << i) != 0; if is_valid { - let blocked_index = group_index_parse_fn(group_index); + let blocked_index = BlockedGroupIndex::new(group_index); set_valid_fn(&blocked_index); value_fn(&blocked_index, new_value); } @@ -681,7 +664,7 @@ fn do_blocked_accumulate( .zip(filter.iter()) .for_each(|((&group_index, &new_value), filter_value)| { if let Some(true) = filter_value { - let blocked_index = group_index_parse_fn(group_index); + let blocked_index = BlockedGroupIndex::new(group_index); set_valid_fn(&blocked_index); value_fn(&blocked_index, new_value); } @@ -700,9 +683,9 @@ fn do_blocked_accumulate( .for_each(|((filter_value, &group_index), new_value)| { if let Some(true) = filter_value { if let Some(new_value) = new_value { - let blocked_index = group_index_parse_fn(group_index); + let blocked_index = BlockedGroupIndex::new(group_index); set_valid_fn(&blocked_index); - value_fn(&blocked_index, new_value) + value_fn(&blocked_index, new_value); } } }) @@ -931,9 +914,9 @@ mod test { let block_id = *idx / self.block_size; let block_offset = *idx % self.block_size; BlockedGroupIndex::new_from_parts( + 1, block_id as u32, block_offset as u64, - true, ) .as_packed_index() }) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index d9cd3988129ce..38c050c41609c 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -399,31 +399,17 @@ impl GroupsAccumulator for CountGroupsAccumulator { 0, ); - if self.block_size.is_some() { - accumulate_indices( - group_indices, - values.logical_nulls().as_ref(), - opt_filter, - |group_index| { - let blocked_index = BlockedGroupIndex::new_blocked(group_index); - let count = &mut self.counts[blocked_index.block_id()] - [blocked_index.block_offset()]; - *count += 1; - }, - ); - } else { - accumulate_indices( - group_indices, - values.logical_nulls().as_ref(), - opt_filter, - |group_index| { - let blocked_index = BlockedGroupIndex::new_flat(group_index); - let count = &mut self.counts[blocked_index.block_id()] - [blocked_index.block_offset()]; - *count += 1; - }, - ); - } + accumulate_indices( + group_indices, + values.logical_nulls().as_ref(), + opt_filter, + |group_index| { + let blocked_index = BlockedGroupIndex::new(group_index); + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += 1; + }, + ); Ok(()) } @@ -450,31 +436,17 @@ impl GroupsAccumulator for CountGroupsAccumulator { 0, ); - if self.block_size.is_some() { - do_count_merge_batch( - values, - group_indices, - opt_filter, - |group_index, partial_count| { - let blocked_index = BlockedGroupIndex::new_blocked(group_index); - let count = &mut self.counts[blocked_index.block_id()] - [blocked_index.block_offset()]; - *count += partial_count; - }, - ); - } else { - do_count_merge_batch( - values, - group_indices, - opt_filter, - |group_index, partial_count| { - let blocked_index = BlockedGroupIndex::new_flat(group_index); - let count = &mut self.counts[blocked_index.block_id()] - [blocked_index.block_offset()]; - *count += partial_count; - }, - ); - } + do_count_merge_batch( + values, + group_indices, + opt_filter, + |group_index, partial_count| { + let blocked_index = BlockedGroupIndex::new(group_index); + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += partial_count; + }, + ); Ok(()) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index ea55ac5d450d6..313ba315686e1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -135,79 +135,68 @@ impl GroupValues for GroupValuesRows { batch_hashes.resize(n_rows, 0); create_hashes(cols, &self.random_state, batch_hashes)?; - let mut get_or_create_groups = - |group_index_parse_fn: fn(usize) -> BlockedGroupIndex| { - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = - self.map.get_mut(target_hash, |(exist_hash, group_idx)| { - // Somewhat surprisingly, this closure can be called even if the - // hash doesn't match, so check the hash first with an integer - // comparison first avoid the more expensive comparison with - // group value. https://github.com/apache/datafusion/pull/11718 - if target_hash != *exist_hash { - return false; - } - - // verify that the group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - let blocked_index = group_index_parse_fn(*group_idx); - group_rows.row(row) - == group_values[blocked_index.block_id()] - .row(blocked_index.block_offset()) - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - if let Some(blk_size) = self.block_size { - if group_values.current().unwrap().num_rows() == blk_size - { - // Use blk_size as offset cap, - // and old block's buffer size as buffer cap - let new_buf_cap = - rows_buffer_size(group_values.current().unwrap()); - let new_blk = self - .row_converter - .empty_rows(blk_size, new_buf_cap); - group_values.push_block(new_blk); - } - } + let flag = self.block_size.is_some() as u8; + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { + // Somewhat surprisingly, this closure can be called even if the + // hash doesn't match, so check the hash first with an integer + // comparison first avoid the more expensive comparison with + // group value. https://github.com/apache/datafusion/pull/11718 + if target_hash != *exist_hash { + return false; + } - let blk_id = group_values.num_blocks() - 1; - let cur_blk = group_values.current_mut().unwrap(); - let blk_offset = cur_blk.num_rows(); - cur_blk.push(group_rows.row(row)); - - let blocked_index = BlockedGroupIndex::new_from_parts( - blk_id as u32, - blk_offset as u64, - self.block_size.is_some(), - ); - let group_idx = blocked_index.as_packed_index(); - - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_idx), - |(hash, _group_index)| *hash, - &mut self.map_size, - ); - - group_idx + // verify that the group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let blocked_index = BlockedGroupIndex::new(*group_idx); + group_rows.row(row) + == group_values[blocked_index.block_id()] + .row(blocked_index.block_offset()) + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx)) => *group_idx, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + if let Some(blk_size) = self.block_size { + if group_values.current().unwrap().num_rows() == blk_size { + // Use blk_size as offset cap, + // and old block's buffer size as buffer cap + let new_buf_cap = + rows_buffer_size(group_values.current().unwrap()); + let new_blk = + self.row_converter.empty_rows(blk_size, new_buf_cap); + group_values.push_block(new_blk); } - }; - groups.push(group_idx); + } + + let blk_id = group_values.num_blocks() - 1; + let cur_blk = group_values.current_mut().unwrap(); + let blk_offset = cur_blk.num_rows(); + cur_blk.push(group_rows.row(row)); + + let blocked_index = BlockedGroupIndex::new_from_parts( + flag, + blk_id as u32, + blk_offset as u64, + ); + let group_idx = blocked_index.as_packed_index(); + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_idx), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + + group_idx } }; - - if self.block_size.is_some() { - get_or_create_groups(BlockedGroupIndex::new_blocked); - } else { - get_or_create_groups(BlockedGroupIndex::new_flat); - }; + groups.push(group_idx); + } self.group_values = group_values; @@ -294,33 +283,27 @@ impl GroupValues for GroupValuesRows { let cur_blk = group_values.pop_first_block().unwrap(); let output = self.row_converter.convert_rows(cur_blk.iter())?; - let mut shift_down_values = - |group_index_parse_fn: fn(usize) -> BlockedGroupIndex| unsafe { - for bucket in self.map.iter() { - // Decrement group index by n - let group_idx = bucket.as_ref().1; - let old_blk_idx = group_index_parse_fn(group_idx); - match old_blk_idx.block_id().checked_sub(1) { - // Group index was >= n, shift value down - Some(new_blk_id) => { - let new_group_idx = BlockedGroupIndex::new_from_parts( - new_blk_id as u32, - old_blk_idx.block_offset, - true, - ); - bucket.as_mut().1 = new_group_idx.as_packed_index(); - } - // Group index was < n, so remove from table - None => self.map.erase(bucket), + unsafe { + let flag = self.block_size.is_some() as u8; + for bucket in self.map.iter() { + // Decrement group index by n + let group_idx = bucket.as_ref().1; + let old_blk_idx = BlockedGroupIndex::new(group_idx); + match old_blk_idx.block_id().checked_sub(1) { + // Group index was >= n, shift value down + Some(new_blk_id) => { + let new_group_idx = BlockedGroupIndex::new_from_parts( + flag, + new_blk_id as u32, + old_blk_idx.block_offset, + ); + bucket.as_mut().1 = new_group_idx.as_packed_index(); } + // Group index was < n, so remove from table + None => self.map.erase(bucket), } - }; - - if self.block_size.is_some() { - shift_down_values(BlockedGroupIndex::new_blocked); - } else { - shift_down_values(BlockedGroupIndex::new_flat); - }; + } + } output }