diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 0b1c4ac81e..10b884955c 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -46,7 +46,7 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the >>> cvtx = vortex.compress(vtx) >>> cvtx.nbytes - 13970 + 13963 >>> cvtx.nbytes / vtx.nbytes 0.099... diff --git a/encodings/dict/benches/dict_compress.rs b/encodings/dict/benches/dict_compress.rs index 2f5aeddb06..445bd65b7b 100644 --- a/encodings/dict/benches/dict_compress.rs +++ b/encodings/dict/benches/dict_compress.rs @@ -4,9 +4,9 @@ use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion use rand::distributions::{Alphanumeric, Uniform}; use rand::prelude::SliceRandom; use rand::{thread_rng, Rng}; -use vortex_array::array::{PrimitiveArray, VarBinArray}; +use vortex_array::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; use vortex_array::{ArrayTrait, IntoArray as _, IntoCanonical as _}; -use vortex_dict::{dict_encode_primitive, dict_encode_varbin, DictArray}; +use vortex_dict::{dict_encode_primitive, dict_encode_varbin, dict_encode_varbinview, DictArray}; fn gen_primitive_dict(len: usize, uniqueness: f64) -> PrimitiveArray { let mut rng = thread_rng(); @@ -17,7 +17,7 @@ fn gen_primitive_dict(len: usize, uniqueness: f64) -> PrimitiveArray { PrimitiveArray::from(data) } -fn gen_varbin_dict(len: usize, uniqueness: f64) -> VarBinArray { +fn gen_varbin_words(len: usize, uniqueness: f64) -> Vec { let mut rng = thread_rng(); let uniq_cnt = (len as f64 * uniqueness) as usize; let dict: Vec = (0..uniq_cnt) @@ -29,10 +29,9 @@ fn gen_varbin_dict(len: usize, uniqueness: f64) -> VarBinArray { .collect() }) .collect(); - let words: Vec<&str> = (0..len) - .map(|_| dict.choose(&mut rng).unwrap().as_str()) - .collect(); - VarBinArray::from(words) + (0..len) + .map(|_| dict.choose(&mut rng).unwrap().clone()) + .collect() } fn dict_encode(c: &mut Criterion) { @@ -44,11 +43,17 @@ fn dict_encode(c: &mut Criterion) { b.iter(|| black_box(dict_encode_primitive(&primitive_arr))); }); - let varbin_arr = gen_varbin_dict(1_000_000, 0.00005); + let varbin_arr = VarBinArray::from(gen_varbin_words(1_000_000, 0.00005)); group.throughput(Throughput::Bytes(varbin_arr.nbytes() as u64)); group.bench_function("dict_encode_varbin", |b| { b.iter(|| black_box(dict_encode_varbin(&varbin_arr))); }); + + let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005)); + group.throughput(Throughput::Bytes(varbinview_arr.nbytes() as u64)); + group.bench_function("dict_encode_varbinview", |b| { + b.iter(|| black_box(dict_encode_varbinview(&varbinview_arr))); + }); } fn dict_decode(c: &mut Criterion) { @@ -65,7 +70,7 @@ fn dict_decode(c: &mut Criterion) { ); }); - let varbin_arr = gen_varbin_dict(1_000_000, 0.00005); + let varbin_arr = VarBinArray::from(gen_varbin_words(1_000_000, 0.00005)); let (codes, values) = dict_encode_varbin(&varbin_arr); group.throughput(Throughput::Bytes(varbin_arr.nbytes() as u64)); group.bench_function("dict_decode_varbin", |b| { @@ -75,6 +80,17 @@ fn dict_decode(c: &mut Criterion) { BatchSize::SmallInput, ); }); + + let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005)); + let (codes, values) = dict_encode_varbinview(&varbinview_arr); + group.throughput(Throughput::Bytes(varbin_arr.nbytes() as u64)); + group.bench_function("dict_decode_varbinview", |b| { + b.iter_batched( + || DictArray::try_new(codes.clone().into_array(), values.clone().into_array()).unwrap(), + |dict_arr| black_box(dict_arr.into_canonical().unwrap()), + BatchSize::SmallInput, + ); + }); } criterion_group!(benches, dict_encode, dict_decode); diff --git a/encodings/dict/src/compress.rs b/encodings/dict/src/compress.rs index 405ac65df1..3a1485decd 100644 --- a/encodings/dict/src/compress.rs +++ b/encodings/dict/src/compress.rs @@ -1,14 +1,18 @@ use std::hash::{BuildHasher, Hash, Hasher}; -use hashbrown::hash_map::{Entry, RawEntryMut}; -use hashbrown::{DefaultHashBuilder, HashMap}; +use hashbrown::hash_map::Entry; +use hashbrown::HashTable; use num_traits::AsPrimitive; use vortex_array::accessor::ArrayAccessor; -use vortex_array::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; +use vortex_array::aliases::hash_map::{DefaultHashBuilder, HashMap}; +use vortex_array::array::{ + ConstantArray, PrimitiveArray, SparseArray, VarBinArray, VarBinViewArray, +}; use vortex_array::validity::Validity; use vortex_array::{ArrayDType, IntoArray, IntoCanonical}; use vortex_dtype::{match_each_native_ptype, DType, NativePType, ToBytes}; use vortex_error::{VortexExpect as _, VortexUnwrap}; +use vortex_scalar::ScalarValue; /// Statically assigned code for a null value. pub const NULL_CODE: u64 = 0; @@ -41,7 +45,7 @@ pub fn dict_encode_primitive(array: &PrimitiveArray) -> (PrimitiveArray, Primiti pub fn dict_encode_typed_primitive( array: &PrimitiveArray, ) -> (PrimitiveArray, PrimitiveArray) { - let mut lookup_dict: HashMap, u64> = HashMap::new(); + let mut lookup: HashMap, u64> = HashMap::new(); let mut codes: Vec = Vec::new(); let mut values: Vec = Vec::new(); @@ -49,35 +53,28 @@ pub fn dict_encode_typed_primitive( values.push(T::zero()); } - ArrayAccessor::::with_iterator(array, |iter| { - for ov in iter { - match ov { - None => codes.push(NULL_CODE), - Some(&v) => { - let code = match lookup_dict.entry(Value(v)) { - Entry::Occupied(o) => *o.get(), - Entry::Vacant(vac) => { - let next_code = values.len() as u64; - vac.insert(next_code.as_()); - values.push(v); - next_code - } - }; - codes.push(code); + array + .with_iterator(|iter| { + for ov in iter { + match ov { + None => codes.push(NULL_CODE), + Some(&v) => { + codes.push(match lookup.entry(Value(v)) { + Entry::Occupied(o) => *o.get(), + Entry::Vacant(vac) => { + let next_code = values.len() as u64; + vac.insert(next_code.as_()); + values.push(v); + next_code + } + }); + } } } - } - }) - .vortex_expect("Failed to dictionary encode primitive array"); + }) + .vortex_expect("Failed to dictionary encode primitive array"); - let values_validity = if array.dtype().is_nullable() { - let mut validity = vec![true; values.len()]; - validity[0] = false; - - validity.into() - } else { - Validity::NonNullable - }; + let values_validity = dict_values_validity(array.dtype().is_nullable(), values.len()); ( PrimitiveArray::from(codes), @@ -88,14 +85,14 @@ pub fn dict_encode_typed_primitive( /// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray) { array - .with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter)) - .vortex_expect("Failed to dictionary encode varbin array") + .with_iterator(|iter| dict_encode_varbin_bytes(array.dtype().clone(), iter)) + .vortex_unwrap() } /// Dictionary encode a VarbinViewArray. pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBinViewArray) { let (codes, values) = array - .with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter)) + .with_iterator(|iter| dict_encode_varbin_bytes(array.dtype().clone(), iter)) .vortex_unwrap(); ( codes, @@ -107,28 +104,16 @@ pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBi ) } -fn lookup_bytes<'a, T: NativePType + AsPrimitive>( - offsets: &'a [T], - bytes: &'a [u8], - idx: usize, -) -> &'a [u8] { - let begin: usize = offsets[idx].as_(); - let end: usize = offsets[idx + 1].as_(); - &bytes[begin..end] -} - -fn dict_encode_typed_varbin(dtype: DType, values: I) -> (PrimitiveArray, VarBinArray) -where - I: Iterator>, - U: AsRef<[u8]>, -{ +fn dict_encode_varbin_bytes<'a, I: Iterator>>( + dtype: DType, + values: I, +) -> (PrimitiveArray, VarBinArray) { let (lower, _) = values.size_hint(); let hasher = DefaultHashBuilder::default(); - let mut lookup_dict: HashMap = HashMap::with_hasher(()); + let mut lookup_dict: HashTable = HashTable::new(); let mut codes: Vec = Vec::with_capacity(lower); let mut bytes: Vec = Vec::new(); - let mut offsets: Vec = Vec::new(); - offsets.push(0); + let mut offsets: Vec = vec![0]; if dtype.is_nullable() { offsets.push(0); @@ -136,45 +121,34 @@ where for o_val in values { match o_val { - None => codes.push(0), + None => codes.push(NULL_CODE), Some(val) => { - let byte_ref = val.as_ref(); - let value_hash = hasher.hash_one(byte_ref); - let raw_entry = lookup_dict.raw_entry_mut().from_hash(value_hash, |idx| { - byte_ref == lookup_bytes(offsets.as_slice(), bytes.as_slice(), idx.as_()) - }); - - let code = match raw_entry { - RawEntryMut::Occupied(o) => *o.into_key(), - RawEntryMut::Vacant(vac) => { - let next_code = offsets.len() as u64 - 1; - bytes.extend_from_slice(byte_ref); - offsets.push(bytes.len() as u32); - vac.insert_with_hasher(value_hash, next_code, (), |idx| { + let code = *lookup_dict + .entry( + hasher.hash_one(val), + |idx| val == lookup_bytes(offsets.as_slice(), bytes.as_slice(), idx.as_()), + |idx| { hasher.hash_one(lookup_bytes( offsets.as_slice(), bytes.as_slice(), idx.as_(), )) - }); + }, + ) + .or_insert_with(|| { + let next_code = offsets.len() as u64 - 1; + bytes.extend_from_slice(val); + offsets.push(bytes.len() as u32); next_code - } - }; + }) + .get(); + codes.push(code) } } } - let values_validity = if dtype.is_nullable() { - let mut validity = Vec::with_capacity(offsets.len() - 1); - validity.push(false); - validity.extend(vec![true; offsets.len() - 2]); - - validity.into() - } else { - Validity::NonNullable - }; - + let values_validity = dict_values_validity(dtype.is_nullable(), offsets.len() - 1); ( PrimitiveArray::from(codes), VarBinArray::try_new( @@ -187,6 +161,33 @@ where ) } +fn dict_values_validity(nullable: bool, len: usize) -> Validity { + if nullable { + Validity::Array( + SparseArray::try_new( + ConstantArray::new(0u64, 1).into_array(), + ConstantArray::new(false, 1).into_array(), + len, + ScalarValue::Bool(true), + ) + .vortex_unwrap() + .into_array(), + ) + } else { + Validity::NonNullable + } +} + +fn lookup_bytes<'a, T: AsPrimitive>( + offsets: &'a [T], + bytes: &'a [u8], + idx: usize, +) -> &'a [u8] { + let begin: usize = offsets[idx].as_(); + let end: usize = offsets[idx + 1].as_(); + &bytes[begin..end] +} + #[cfg(test)] mod test { use std::str; diff --git a/vortex-array/src/aliases/hash_map.rs b/vortex-array/src/aliases/hash_map.rs index 68344d0976..4360d3b47d 100644 --- a/vortex-array/src/aliases/hash_map.rs +++ b/vortex-array/src/aliases/hash_map.rs @@ -1,3 +1,5 @@ +pub type DefaultHashBuilder = hashbrown::DefaultHashBuilder; pub type HashMap = hashbrown::HashMap; pub type Entry<'a, K, V, S> = hashbrown::hash_map::Entry<'a, K, V, S>; pub type IntoIter = hashbrown::hash_map::IntoIter; +pub type HashTable = hashbrown::HashTable; diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs index 0474bb211f..c7ce4a4903 100644 --- a/vortex-array/src/array/varbinview/accessor.rs +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -16,6 +16,7 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { let bytes: Vec = (0..self.metadata().buffer_lens.len()) .map(|i| self.buffer(i).into_canonical()?.into_primitive()) .try_collect()?; + let bytes_slices: Vec<&[u8]> = bytes.iter().map(|b| b.maybe_null_slice::()).collect(); let views: Vec = self.binary_views()?.collect(); let validity = self.logical_validity().to_null_buffer()?; @@ -23,13 +24,11 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { None => { let mut iter = views.iter().map(|view| { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.len() as usize] }) + Some(view.as_inlined().value()) } else { - let offset = unsafe { view._ref.offset as usize }; - let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( - &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.len() as usize], + &bytes_slices[view.as_view().buffer_index() as usize] + [view.as_view().to_range()], ) } }); @@ -39,13 +38,11 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { let mut iter = views.iter().zip(validity.iter()).map(|(view, valid)| { if valid { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.len() as usize] }) + Some(view.as_inlined().value()) } else { - let offset = unsafe { view._ref.offset as usize }; - let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( - &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.len() as usize], + &bytes_slices[view.as_view().buffer_index() as usize] + [view.as_view().to_range()], ) } } else { diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index faa1d2d991..a0d1bf9c01 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,4 +1,5 @@ use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; use std::sync::Arc; use ::serde::{Deserialize, Serialize}; @@ -88,6 +89,11 @@ impl Ref { pub fn prefix(&self) -> &[u8; 4] { &self.prefix } + + #[inline] + pub fn to_range(&self) -> Range { + self.offset as usize..(self.offset + self.size) as usize + } } #[derive(Clone, Copy)] @@ -160,6 +166,14 @@ impl BinaryView { } } +impl From for BinaryView { + fn from(value: u128) -> Self { + BinaryView { + le_bytes: value.to_le_bytes(), + } + } +} + impl Debug for BinaryView { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("BinaryView");