From 456da7d1e9b880ee9e5b4213691f2fa67c614631 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 6 Mar 2024 09:58:27 +0000 Subject: [PATCH] REE Rust (#74) --- vortex-alp/benches/alp_compress.rs | 2 +- vortex-alp/src/compress.rs | 8 +-- vortex-ree/src/compress.rs | 107 ++++++++++++++++++++++------- 3 files changed, 88 insertions(+), 29 deletions(-) diff --git a/vortex-alp/benches/alp_compress.rs b/vortex-alp/benches/alp_compress.rs index d0cb986d91..54d0998fc1 100644 --- a/vortex-alp/benches/alp_compress.rs +++ b/vortex-alp/benches/alp_compress.rs @@ -15,6 +15,6 @@ fn alp_compress(n: usize) -> (Exponents, Vec, Vec, // TODO(ngates): remove this #[divan::bench(args = [100_000, 10_000_000])] fn alp_compress_array(n: usize) -> ArrayRef { - let array = PrimitiveArray::from_vec(vec![1.234f64; n]); + let array = PrimitiveArray::from(vec![1.234f64; n]); ALPArray::encode(&array).unwrap() } diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 83018a53c6..0a12214e30 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -71,11 +71,11 @@ where let len = values.len(); ( exponents, - PrimitiveArray::from_vec(values).boxed(), + PrimitiveArray::from(values).boxed(), (!exc.is_empty()).then(|| { SparseArray::new( - PrimitiveArray::from_vec(exc_pos).boxed(), - PrimitiveArray::from_vec(exc).boxed(), + PrimitiveArray::from(exc_pos).boxed(), + PrimitiveArray::from(exc).boxed(), len, ) .boxed() @@ -98,7 +98,7 @@ mod tests { #[test] fn test_compress() { - let array = PrimitiveArray::from_vec(vec![1.234f32; 1025]); + let array = PrimitiveArray::from(vec![1.234f32; 1025]); let encoded = alp_encode(&array).unwrap(); println!("Encoded {:?}", encoded); assert!(encoded.patches().is_none()); diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 598bc91ae8..e819906319 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -1,10 +1,11 @@ -use codecz::AlignedAllocator; +use itertools::Itertools; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; -use vortex::array::{Array, ArrayRef, CloneOptionalArray}; +use vortex::array::{Array, ArrayRef, Encoding}; use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; -use vortex::dtype::{DType, IntWidth, Nullability}; -use vortex::ptype::match_each_native_ptype; +use vortex::compute::cast::cast_primitive; +use vortex::error::VortexResult; +use vortex::ptype::{match_each_native_ptype, NativePType}; use vortex::stats::Stat; use crate::downcast::DowncastREE; @@ -16,18 +17,19 @@ impl EncodingCompression for REEEncoding { array: &dyn Array, config: &CompressConfig, ) -> Option<&'static Compressor> { + if array.encoding().id() != PrimitiveEncoding.id() { + return None; + } + let avg_run_length = array.len() as f32 / array .stats() .get_or_compute_or::(array.len(), &Stat::RunCount) as f32; - - if array.encoding().id() == &PrimitiveEncoding::ID - && avg_run_length >= config.ree_average_run_threshold - { - return Some(&(ree_compressor as Compressor)); + if avg_run_length < config.ree_average_run_threshold { + return None; } - None + Some(&(ree_compressor as Compressor)) } } @@ -46,7 +48,9 @@ fn ree_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) REEArray::new( compressed_ends, compressed_values, - primitive_array.validity().clone_optional(), + primitive_array + .validity() + .map(|v| ctx.compress(v, ree_like.and_then(|r| r.validity()))), array.len(), ) .boxed() @@ -54,16 +58,16 @@ fn ree_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) { match_each_native_ptype!(array.ptype(), |$P| { - let (values, ends) = codecz::ree::encode(array.buffer().typed_data::<$P>()).unwrap(); + let (ends, values) = ree_encode_primitive(array.typed_data::<$P>()); - let compressed_values = PrimitiveArray::from_nullable_in::<$P, AlignedAllocator>(values, None); + let compressed_values = PrimitiveArray::from(values); compressed_values.stats().set(Stat::IsConstant, false.into()); compressed_values.stats().set(Stat::RunCount, compressed_values.len().into()); compressed_values.stats().set_many(&array.stats(), vec![ &Stat::Min, &Stat::Max, &Stat::IsSorted, &Stat::IsStrictSorted, ]); - let compressed_ends = PrimitiveArray::from_vec_in::(ends); + let compressed_ends = PrimitiveArray::from(ends); compressed_ends.stats().set(Stat::IsSorted, true.into()); compressed_ends.stats().set(Stat::IsStrictSorted, true.into()); compressed_ends.stats().set(Stat::IsConstant, false.into()); @@ -74,35 +78,89 @@ pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) { }) } +fn ree_encode_primitive(elements: &[T]) -> (Vec, Vec) { + let mut ends = Vec::new(); + let mut values = Vec::new(); + + if elements.is_empty() { + return (ends, values); + } + + // Run-end encode the values + let mut last = elements[0]; + let mut end = 1; + for &e in elements.iter().skip(1) { + if e != last { + ends.push(end); + values.push(last); + } + last = e; + end += 1; + } + ends.push(end); + values.push(last); + + (ends, values) +} + #[allow(dead_code)] pub fn ree_decode( ends: &PrimitiveArray, values: &PrimitiveArray, validity: Option, -) -> PrimitiveArray { - assert!(matches!( - ends.dtype(), - DType::Int(IntWidth::_32, _, Nullability::NonNullable) - )); +) -> VortexResult { + // TODO(ngates): switch over ends without necessarily casting match_each_native_ptype!(values.ptype(), |$P| { - let decoded = codecz::ree::decode::<$P>(values.buffer().typed_data::<$P>(), ends.buffer().typed_data::()).unwrap(); - PrimitiveArray::from_nullable_in::<$P, AlignedAllocator>(decoded, validity) + Ok(PrimitiveArray::from_nullable(ree_decode_primitive( + cast_primitive(ends, &PType::U64)?.typed_data(), + values.typed_data::<$P>(), + ), validity)) }) } +fn ree_decode_primitive(run_ends: &[u64], values: &[T]) -> Vec { + let mut decoded = Vec::with_capacity(run_ends.last().map(|x| *x as usize).unwrap_or(0_usize)); + for (&end, &value) in run_ends.iter().zip_eq(values) { + decoded.extend(std::iter::repeat(value).take(end as usize - decoded.len())); + } + decoded +} + #[cfg(test)] mod test { use arrow::buffer::BooleanBuffer; use vortex::array::bool::BoolArray; use vortex::array::downcast::DowncastArrayBuiltin; + use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, CloneOptionalArray}; - use crate::compress::ree_decode; + use crate::compress::{ree_decode, ree_encode}; use crate::REEArray; #[test] - fn encode_nullable() { + fn encode() { + let arr = PrimitiveArray::from(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]); + let (ends, values) = ree_encode(&arr); + + assert_eq!(ends.typed_data::(), vec![2, 5, 10]); + assert_eq!(values.typed_data::(), vec![1, 2, 3]); + } + + #[test] + fn decode() { + let ends = PrimitiveArray::from(vec![2, 5, 10]); + let values = PrimitiveArray::from(vec![1i32, 2, 3]); + let decoded = ree_decode(&ends, &values, None).unwrap(); + + assert_eq!( + decoded.typed_data::(), + vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3] + ); + } + + #[test] + fn decode_nullable() { let validity = { let mut validity = vec![true; 10]; validity[2] = false; @@ -120,7 +178,8 @@ mod test { arr.ends().as_primitive(), arr.values().as_primitive(), arr.validity().clone_optional(), - ); + ) + .unwrap(); assert_eq!( decoded.buffer().typed_data::(),