Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 10, 2024
1 parent 32346bc commit f78bea7
Showing 1 changed file with 99 additions and 11 deletions.
110 changes: 99 additions & 11 deletions datafusion/physical-plan/src/aggregates/group_values/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use ahash::RandomState;
use crate::aggregates::group_values::GroupValues;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_array::cast::AsArray;
use arrow_buffer::BufferBuilder;
use arrow_schema::DataType;
use datafusion_common::hash_utils::create_hashes;
use datafusion_physical_expr::EmitTo;

/// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values
Expand All @@ -28,16 +32,45 @@ pub struct GroupValuesBinary {
/// The data type of the output array
data_type: DataType,
/// Stores the group index based on the hash of its value
///
/// We don't store the hashes as hashing fixed width primitives
/// is fast enough for this not to benefit performance
//map: RawTable<usize>,
map: RawTable<StringKey>,
/// The group index of the null value if any
null_group: Option<usize>,
// /// The values for each group index
//values: Vec<T::Native>,
// /// The random state used to generate hashes
//random_state: RandomState,
/// The total number of groups so far (used to assign group_index)
num_groups: usize,
/// Total size of the map in bytes
map_size: usize,
/// The raw string value for each group (becomes the variable length data in
/// the output array)
buffer: BufferBuilder<u8>,
/// The random state used to generate hashes
random_state: RandomState,
// buffer to be reused to store hashes
hashes_buffer: Vec<u64>,
}

/// Stores information about string value.
///
/// Is also used to create the offsets when creating for the output array.
struct StringKey {
/// hash of the string value (used when resizing table)
hash: u64,
/// the logical group index for this string value
group_index: usize,
/// length of the string, in bytes
len: u64,
/// Offset into buffer for the first byte of the string
offset: u64,
}

impl StringKey {
/// Returns true if this key represents the same key as other
/// and it is already known that the hash values match
///
/// Buffer is the values for storing this
fn check_eq(&self, buffer: &[u8], value: &[u8]) -> std::cmp::Ordering {
todo!()
}
}

impl GroupValuesBinary {
Expand All @@ -55,19 +88,74 @@ impl GroupValues for GroupValuesBinary {
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> datafusion_common::Result<()> {
todo!()
assert_eq!(cols.len() == 1);

// step 1: compute hashes for the strings
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;

// Step 2: look up / add entries in the table
let arr = cols[0].as_string();

assert_eq!(arr.len(), batch_hashes.len());

for (row, &hash) in batch_hashes.iter().enumerate() {
// safety: we just checked that the lengths is the same
let cur_string = unsafe {
arr.value_unchecked(row)
};
// Check to see if this row exists or not et
let entry = self.map.get_mut(hash, |string_key| {
// check for collisions
string_key.check_eq(self.buffer.as_slice(), cur_string.as_bytes())
});

let group_idx = match entry {
// Existing group_index for this group value
Some(string_key) => string_key.group_index,
// 1.2 Need to create new entry for the group
None => {
let group_idx = self.num_groups;
self.num_groups = self.num_groups + 1;

// add the string to the buffer
let string_key = StringKey {
hash: *hash,
group_index: group_idx,
len: cur_string.len() as u64,
offset: self.buffer.len() as u64,
};

self.buffer.append_slice(cur_string.as_bytes());

// for hasher function, use precomputed hash value
self.map.insert_accounted(
string_key,
|string_key| string_key.hash,
&mut self.map_size,
);
group_idx
}
};
groups.push(group_idx);
}


Ok(groups)
}

fn size(&self) -> usize {
todo!()
}

fn is_empty(&self) -> bool {
todo!()
self.num_groups == 0
}

fn len(&self) -> usize {
todo!()
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
Expand Down

0 comments on commit f78bea7

Please sign in to comment.