Skip to content

Commit

Permalink
REE Rust (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 6, 2024
1 parent 5a0c91f commit 456da7d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 29 deletions.
2 changes: 1 addition & 1 deletion vortex-alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ fn alp_compress<T: ALPFloat>(n: usize) -> (Exponents, Vec<T::ALPInt>, Vec<u64>,
// TODO(ngates): remove this
#[divan::bench(args = [100_000, 10_000_000])]
fn alp_compress_array(n: usize) -> ArrayRef {
let array = PrimitiveArray::from_vec(vec![1.234f64; n]);
let array = PrimitiveArray::from(vec![1.234f64; n]);
ALPArray::encode(&array).unwrap()
}
8 changes: 4 additions & 4 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ where
let len = values.len();
(
exponents,
PrimitiveArray::from_vec(values).boxed(),
PrimitiveArray::from(values).boxed(),
(!exc.is_empty()).then(|| {
SparseArray::new(
PrimitiveArray::from_vec(exc_pos).boxed(),
PrimitiveArray::from_vec(exc).boxed(),
PrimitiveArray::from(exc_pos).boxed(),
PrimitiveArray::from(exc).boxed(),
len,
)
.boxed()
Expand All @@ -98,7 +98,7 @@ mod tests {

#[test]
fn test_compress() {
let array = PrimitiveArray::from_vec(vec![1.234f32; 1025]);
let array = PrimitiveArray::from(vec![1.234f32; 1025]);
let encoded = alp_encode(&array).unwrap();
println!("Encoded {:?}", encoded);
assert!(encoded.patches().is_none());
Expand Down
107 changes: 83 additions & 24 deletions vortex-ree/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use codecz::AlignedAllocator;
use itertools::Itertools;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding};
use vortex::array::{Array, ArrayRef, CloneOptionalArray};
use vortex::array::{Array, ArrayRef, Encoding};
use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use vortex::dtype::{DType, IntWidth, Nullability};
use vortex::ptype::match_each_native_ptype;
use vortex::compute::cast::cast_primitive;
use vortex::error::VortexResult;
use vortex::ptype::{match_each_native_ptype, NativePType};
use vortex::stats::Stat;

use crate::downcast::DowncastREE;
Expand All @@ -16,18 +17,19 @@ impl EncodingCompression for REEEncoding {
array: &dyn Array,
config: &CompressConfig,
) -> Option<&'static Compressor> {
if array.encoding().id() != PrimitiveEncoding.id() {
return None;
}

let avg_run_length = array.len() as f32
/ array
.stats()
.get_or_compute_or::<usize>(array.len(), &Stat::RunCount) as f32;

if array.encoding().id() == &PrimitiveEncoding::ID
&& avg_run_length >= config.ree_average_run_threshold
{
return Some(&(ree_compressor as Compressor));
if avg_run_length < config.ree_average_run_threshold {
return None;
}

None
Some(&(ree_compressor as Compressor))
}
}

Expand All @@ -46,24 +48,26 @@ fn ree_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx)
REEArray::new(
compressed_ends,
compressed_values,
primitive_array.validity().clone_optional(),
primitive_array
.validity()
.map(|v| ctx.compress(v, ree_like.and_then(|r| r.validity()))),
array.len(),
)
.boxed()
}

pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) {
match_each_native_ptype!(array.ptype(), |$P| {
let (values, ends) = codecz::ree::encode(array.buffer().typed_data::<$P>()).unwrap();
let (ends, values) = ree_encode_primitive(array.typed_data::<$P>());

let compressed_values = PrimitiveArray::from_nullable_in::<$P, AlignedAllocator>(values, None);
let compressed_values = PrimitiveArray::from(values);
compressed_values.stats().set(Stat::IsConstant, false.into());
compressed_values.stats().set(Stat::RunCount, compressed_values.len().into());
compressed_values.stats().set_many(&array.stats(), vec![
&Stat::Min, &Stat::Max, &Stat::IsSorted, &Stat::IsStrictSorted,
]);

let compressed_ends = PrimitiveArray::from_vec_in::<u32, AlignedAllocator>(ends);
let compressed_ends = PrimitiveArray::from(ends);
compressed_ends.stats().set(Stat::IsSorted, true.into());
compressed_ends.stats().set(Stat::IsStrictSorted, true.into());
compressed_ends.stats().set(Stat::IsConstant, false.into());
Expand All @@ -74,35 +78,89 @@ pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) {
})
}

fn ree_encode_primitive<T: NativePType>(elements: &[T]) -> (Vec<u64>, Vec<T>) {
let mut ends = Vec::new();
let mut values = Vec::new();

if elements.is_empty() {
return (ends, values);
}

// Run-end encode the values
let mut last = elements[0];
let mut end = 1;
for &e in elements.iter().skip(1) {
if e != last {
ends.push(end);
values.push(last);
}
last = e;
end += 1;
}
ends.push(end);
values.push(last);

(ends, values)
}

#[allow(dead_code)]
pub fn ree_decode(
ends: &PrimitiveArray,
values: &PrimitiveArray,
validity: Option<ArrayRef>,
) -> PrimitiveArray {
assert!(matches!(
ends.dtype(),
DType::Int(IntWidth::_32, _, Nullability::NonNullable)
));
) -> VortexResult<PrimitiveArray> {
// TODO(ngates): switch over ends without necessarily casting
match_each_native_ptype!(values.ptype(), |$P| {
let decoded = codecz::ree::decode::<$P>(values.buffer().typed_data::<$P>(), ends.buffer().typed_data::<u32>()).unwrap();
PrimitiveArray::from_nullable_in::<$P, AlignedAllocator>(decoded, validity)
Ok(PrimitiveArray::from_nullable(ree_decode_primitive(
cast_primitive(ends, &PType::U64)?.typed_data(),
values.typed_data::<$P>(),
), validity))
})
}

fn ree_decode_primitive<T: NativePType>(run_ends: &[u64], values: &[T]) -> Vec<T> {
let mut decoded = Vec::with_capacity(run_ends.last().map(|x| *x as usize).unwrap_or(0_usize));
for (&end, &value) in run_ends.iter().zip_eq(values) {
decoded.extend(std::iter::repeat(value).take(end as usize - decoded.len()));
}
decoded
}

#[cfg(test)]
mod test {
use arrow::buffer::BooleanBuffer;

use vortex::array::bool::BoolArray;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, CloneOptionalArray};

use crate::compress::ree_decode;
use crate::compress::{ree_decode, ree_encode};
use crate::REEArray;

#[test]
fn encode_nullable() {
fn encode() {
let arr = PrimitiveArray::from(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
let (ends, values) = ree_encode(&arr);

assert_eq!(ends.typed_data::<u64>(), vec![2, 5, 10]);
assert_eq!(values.typed_data::<i32>(), vec![1, 2, 3]);
}

#[test]
fn decode() {
let ends = PrimitiveArray::from(vec![2, 5, 10]);
let values = PrimitiveArray::from(vec![1i32, 2, 3]);
let decoded = ree_decode(&ends, &values, None).unwrap();

assert_eq!(
decoded.typed_data::<i32>(),
vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
);
}

#[test]
fn decode_nullable() {
let validity = {
let mut validity = vec![true; 10];
validity[2] = false;
Expand All @@ -120,7 +178,8 @@ mod test {
arr.ends().as_primitive(),
arr.values().as_primitive(),
arr.validity().clone_optional(),
);
)
.unwrap();

assert_eq!(
decoded.buffer().typed_data::<i32>(),
Expand Down

0 comments on commit 456da7d

Please sign in to comment.