diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 887e20fcd7..621983b2ae 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -38,6 +38,7 @@ fn download_taxi_data() -> &'static Path { fn compress(array: ArrayRef) -> usize { CompressCtx::default() .compress(array.as_ref(), None) + .unwrap() .nbytes() } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 6832fdad2f..6ceb56714c 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -91,6 +91,7 @@ mod test { .unwrap(); } + #[ignore] #[test] fn compression_ratio() { setup_logger(); @@ -118,7 +119,9 @@ mod test { HashSet::default(), ); println!("Compression config {cfg:?}"); - let compressed = CompressCtx::new(&cfg).compress(array.as_ref(), None); + let compressed = CompressCtx::new(&cfg) + .compress(array.as_ref(), None) + .unwrap(); println!("Compressed array {compressed}"); println!( "NBytes {}, Ratio {}", diff --git a/pyvortex/src/compress.rs b/pyvortex/src/compress.rs index 46833c9d5a..741d5a6ab6 100644 --- a/pyvortex/src/compress.rs +++ b/pyvortex/src/compress.rs @@ -4,6 +4,7 @@ use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python}; use vortex::compress::{CompressConfig, CompressCtx}; use crate::array::PyArray; +use crate::error::PyVortexError; #[derive(Clone)] #[pyclass(name = "CompressConfig", module = "vortex")] @@ -33,6 +34,8 @@ pub fn compress( ) -> PyResult> { let compress_opts = opts.map(|o| o.inner).unwrap_or_default(); let ctx = CompressCtx::new(&compress_opts); - let compressed = py.allow_threads(|| ctx.compress(arr.unwrap(), None)); + let compressed = py + .allow_threads(|| ctx.compress(arr.unwrap(), None)) + .map_err(PyVortexError::map_err)?; PyArray::wrap(py, compressed) } diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 0a12214e30..3b9ad9a4d8 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -35,7 +35,11 @@ impl EncodingCompression for ALPEncoding { } } -fn alp_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn alp_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let like_alp = like.map(|like_array| like_array.as_alp()); // TODO(ngates): fill forward nulls @@ -49,14 +53,16 @@ fn alp_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) let compressed_encoded = ctx .next_level() - .compress(encoded.as_ref(), like_alp.map(|a| a.encoded())); + .compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?; - let compressed_patches = patches.map(|p| { - ctx.next_level() - .compress(p.as_ref(), like_alp.and_then(|a| a.patches())) - }); + let compressed_patches = patches + .map(|p| { + ctx.next_level() + .compress(p.as_ref(), like_alp.and_then(|a| a.patches())) + }) + .transpose()?; - ALPArray::new(compressed_encoded, exponents, compressed_patches).boxed() + Ok(ALPArray::new(compressed_encoded, exponents, compressed_patches).boxed()) } fn encode_to_array( diff --git a/vortex-array/src/array/bool/compress.rs b/vortex-array/src/array/bool/compress.rs deleted file mode 100644 index 588bed8a38..0000000000 --- a/vortex-array/src/array/bool/compress.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::array::bool::BoolEncoding; -use crate::array::{Array, ArrayRef}; -use crate::compress::{ - sampled_compression, CompressConfig, CompressCtx, Compressor, EncodingCompression, -}; - -impl EncodingCompression for BoolEncoding { - fn compressor( - &self, - array: &dyn Array, - _config: &CompressConfig, - ) -> Option<&'static Compressor> { - if array.encoding().id() == &BoolEncoding::ID { - Some(&(bool_compressor as Compressor)) - } else { - None - } - } -} - -fn bool_compressor(array: &dyn Array, _like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { - sampled_compression(array, ctx) -} diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 8ea3ef06ee..65cd19be96 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -7,7 +7,6 @@ use arrow::buffer::{BooleanBuffer, NullBuffer}; use linkme::distributed_slice; use crate::arrow::CombineChunks; -use crate::compress::EncodingCompression; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, Nullability}; use crate::error::VortexResult; @@ -20,7 +19,6 @@ use super::{ EncodingId, EncodingRef, ENCODINGS, }; -mod compress; mod compute; mod serde; mod stats; @@ -166,10 +164,6 @@ impl Encoding for BoolEncoding { &Self::ID } - fn compression(&self) -> Option<&dyn EncodingCompression> { - Some(self) - } - fn serde(&self) -> Option<&dyn EncodingSerde> { Some(self) } diff --git a/vortex-array/src/array/chunked/compress.rs b/vortex-array/src/array/chunked/compress.rs index a5bf11aa54..235d06ccf0 100644 --- a/vortex-array/src/array/chunked/compress.rs +++ b/vortex-array/src/array/chunked/compress.rs @@ -1,8 +1,11 @@ +use itertools::Itertools; +use std::ops::Deref; + use crate::array::chunked::{ChunkedArray, ChunkedEncoding}; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; -use itertools::Itertools; +use crate::error::VortexResult; impl EncodingCompression for ChunkedEncoding { fn compressor( @@ -10,34 +13,29 @@ impl EncodingCompression for ChunkedEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(chunked_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(chunked_compressor as Compressor)) } } -fn chunked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn chunked_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let chunked_array = array.as_chunked(); let chunked_like = like.map(|like_array| like_array.as_chunked()); - let compressed_chunks = chunked_like - .map(|c_like| { - chunked_array - .chunks() - .iter() - .zip_eq(c_like.chunks()) - .map(|(chunk, chunk_like)| ctx.compress(chunk.as_ref(), Some(chunk_like.as_ref()))) - .collect() + let compressed_chunks = chunked_array + .chunks() + .iter() + .enumerate() + .map(|(i, chunk)| { + let like_chunk = chunked_like + .and_then(|c_like| c_like.chunks().get(i)) + .map(Deref::deref); + ctx.compress(chunk.deref(), like_chunk) }) - .unwrap_or_else(|| { - chunked_array - .chunks() - .iter() - .map(|chunk| ctx.compress(chunk.as_ref(), None)) - .collect() - }); + .try_collect()?; - ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed() + Ok(ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed()) } diff --git a/vortex-array/src/array/constant/compress.rs b/vortex-array/src/array/constant/compress.rs deleted file mode 100644 index 2e3687e71c..0000000000 --- a/vortex-array/src/array/constant/compress.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::array::constant::{ConstantArray, ConstantEncoding}; -use crate::array::{Array, ArrayRef}; -use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; -use crate::compute::scalar_at::scalar_at; -use crate::stats::Stat; - -impl EncodingCompression for ConstantEncoding { - fn compressor( - &self, - array: &dyn Array, - _config: &CompressConfig, - ) -> Option<&'static Compressor> { - if array.stats().get_or_compute_or(false, &Stat::IsConstant) { - Some(&(constant_compressor as Compressor)) - } else { - None - } - } -} - -fn constant_compressor( - array: &dyn Array, - _like: Option<&dyn Array>, - _ctx: CompressCtx, -) -> ArrayRef { - ConstantArray::new(scalar_at(array, 0).unwrap(), array.len()).boxed() -} diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 1cdeede607..9becc699c9 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -9,7 +9,6 @@ use crate::array::{ ENCODINGS, }; use crate::arrow::compute::repeat; -use crate::compress::EncodingCompression; use crate::dtype::DType; use crate::error::VortexResult; use crate::formatter::{ArrayDisplay, ArrayFormatter}; @@ -17,7 +16,6 @@ use crate::scalar::{Scalar, ScalarRef}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; -mod compress; mod compute; mod serde; mod stats; @@ -132,10 +130,6 @@ impl Encoding for ConstantEncoding { &Self::ID } - fn compression(&self) -> Option<&dyn EncodingCompression> { - Some(self) - } - fn serde(&self) -> Option<&dyn EncodingSerde> { Some(self) } diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 072afc4b50..e364fa2765 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -139,10 +139,14 @@ pub trait Encoding: Debug + Send + Sync + 'static { fn id(&self) -> &EncodingId; /// Implementation of the array compression trait - fn compression(&self) -> Option<&dyn EncodingCompression>; + fn compression(&self) -> Option<&dyn EncodingCompression> { + None + } /// Array serialization - fn serde(&self) -> Option<&dyn EncodingSerde>; + fn serde(&self) -> Option<&dyn EncodingSerde> { + None + } } pub type EncodingRef = &'static dyn Encoding; diff --git a/vortex-array/src/array/primitive/compress.rs b/vortex-array/src/array/primitive/compress.rs deleted file mode 100644 index 528d436abf..0000000000 --- a/vortex-array/src/array/primitive/compress.rs +++ /dev/null @@ -1,44 +0,0 @@ -use crate::array::primitive::PrimitiveEncoding; -use crate::array::{Array, ArrayRef}; -use crate::compress::{ - sampled_compression, CompressConfig, CompressCtx, Compressor, EncodingCompression, -}; - -impl EncodingCompression for PrimitiveEncoding { - fn compressor( - &self, - array: &dyn Array, - _config: &CompressConfig, - ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(primitive_compressor as Compressor)) - } else { - None - } - } -} - -fn primitive_compressor( - array: &dyn Array, - _like: Option<&dyn Array>, - ctx: CompressCtx, -) -> ArrayRef { - sampled_compression(array, ctx) -} - -#[cfg(test)] -mod test { - use crate::array::constant::ConstantEncoding; - use crate::array::primitive::PrimitiveArray; - use crate::array::Encoding; - use crate::compress::CompressCtx; - use crate::compute::scalar_at::scalar_at; - - #[test] - pub fn compress_constant() { - let arr = PrimitiveArray::from(vec![1, 1, 1, 1]); - let res = CompressCtx::default().compress(arr.as_ref(), None); - assert_eq!(res.encoding().id(), ConstantEncoding.id()); - assert_eq!(scalar_at(res.as_ref(), 3).unwrap().try_into(), Ok(1)); - } -} diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index d7b92f32b1..ee2ae84606 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -18,7 +18,6 @@ use crate::array::{ EncodingId, EncodingRef, ENCODINGS, }; use crate::arrow::CombineChunks; -use crate::compress::EncodingCompression; use crate::compute::scalar_at::scalar_at; use crate::dtype::DType; use crate::error::VortexResult; @@ -27,7 +26,6 @@ use crate::ptype::{match_each_native_ptype, NativePType, PType}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; -mod compress; mod compute; mod serde; mod stats; @@ -247,10 +245,6 @@ impl Encoding for PrimitiveEncoding { &Self::ID } - fn compression(&self) -> Option<&dyn EncodingCompression> { - Some(self) - } - fn serde(&self) -> Option<&dyn EncodingSerde> { Some(self) } diff --git a/vortex-array/src/array/sparse/compress.rs b/vortex-array/src/array/sparse/compress.rs index 219cdb49f5..e7300358cb 100644 --- a/vortex-array/src/array/sparse/compress.rs +++ b/vortex-array/src/array/sparse/compress.rs @@ -2,6 +2,7 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::sparse::{SparseArray, SparseEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use crate::error::VortexResult; impl EncodingCompression for SparseEncoding { fn compressor( @@ -9,21 +10,21 @@ impl EncodingCompression for SparseEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(sparse_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(sparse_compressor as Compressor)) } } -fn sparse_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn sparse_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let sparse_array = array.as_sparse(); let sparse_like = like.map(|la| la.as_sparse()); - SparseArray::new( - ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices())), - ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values())), + Ok(SparseArray::new( + ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?, + ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?, sparse_array.len(), ) - .boxed() + .boxed()) } diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 5cfdc0b40b..ab757dff4f 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -21,12 +21,11 @@ use crate::error::{VortexError, VortexResult}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::match_arrow_numeric_type; use crate::serde::{ArraySerde, EncodingSerde}; -use crate::stats::{Stats, StatsSet}; +use crate::stats::{Stats, StatsCompute, StatsSet}; mod compress; mod compute; mod serde; -mod stats; #[derive(Debug, Clone)] pub struct SparseArray { @@ -190,6 +189,8 @@ impl Array for SparseArray { } } +impl StatsCompute for SparseArray {} + impl<'arr> AsRef<(dyn Array + 'arr)> for SparseArray { fn as_ref(&self) -> &(dyn Array + 'arr) { self diff --git a/vortex-array/src/array/sparse/stats.rs b/vortex-array/src/array/sparse/stats.rs deleted file mode 100644 index b12669ee59..0000000000 --- a/vortex-array/src/array/sparse/stats.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::array::sparse::SparseArray; -use crate::error::VortexResult; -use crate::stats::{Stat, StatsCompute, StatsSet}; - -impl StatsCompute for SparseArray { - fn compute(&self, _stat: &Stat) -> VortexResult { - todo!() - } -} diff --git a/vortex-array/src/array/struct_/compress.rs b/vortex-array/src/array/struct_/compress.rs index 09755c33d6..16b7b1cf7d 100644 --- a/vortex-array/src/array/struct_/compress.rs +++ b/vortex-array/src/array/struct_/compress.rs @@ -2,9 +2,9 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::struct_::{StructArray, StructEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; -use rayon::iter::IndexedParallelIterator; -use rayon::iter::IntoParallelRefIterator; -use rayon::iter::ParallelIterator; +use crate::error::VortexResult; +use itertools::Itertools; +use std::ops::Deref; impl EncodingCompression for StructEncoding { fn compressor( @@ -12,34 +12,29 @@ impl EncodingCompression for StructEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(struct_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(struct_compressor as Compressor)) } } -fn struct_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn struct_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let struct_array = array.as_struct(); let struct_like = like.map(|like_array| like_array.as_struct()); - let fields = struct_like - .map(|s_like| { - struct_array - .fields() - .par_iter() - .zip_eq(s_like.fields()) - .map(|(field, field_like)| ctx.compress(field.as_ref(), Some(field_like.as_ref()))) - .collect() + let fields = struct_array + .fields() + .iter() + .enumerate() + .map(|(i, chunk)| { + let like_chunk = struct_like + .and_then(|c_like| c_like.fields().get(i)) + .map(Deref::deref); + ctx.compress(chunk.deref(), like_chunk) }) - .unwrap_or_else(|| { - struct_array - .fields() - .par_iter() - .map(|field| ctx.compress(field.as_ref(), None)) - .collect() - }); + .try_collect()?; - StructArray::new(struct_array.names().clone(), fields).boxed() + Ok(StructArray::new(struct_array.names().clone(), fields).boxed()) } diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 8ea1b16fd6..d007925194 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -13,7 +13,7 @@ use crate::dtype::{DType, FieldNames}; use crate::error::VortexResult; use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::serde::{ArraySerde, EncodingSerde}; -use crate::stats::{Stats, StatsSet}; +use crate::stats::{Stats, StatsCompute, StatsSet}; use super::{ check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, @@ -23,7 +23,6 @@ use super::{ mod compress; mod compute; mod serde; -mod stats; #[derive(Debug, Clone)] pub struct StructArray { @@ -167,6 +166,8 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for StructArray { } } +impl StatsCompute for StructArray {} + #[derive(Debug)] pub struct StructEncoding; diff --git a/vortex-array/src/array/struct_/stats.rs b/vortex-array/src/array/struct_/stats.rs deleted file mode 100644 index 6d3e2b7e78..0000000000 --- a/vortex-array/src/array/struct_/stats.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::array::struct_::StructArray; -use crate::error::VortexResult; -use crate::stats::{Stat, StatsCompute, StatsSet}; - -impl StatsCompute for StructArray { - fn compute(&self, _stat: &Stat) -> VortexResult { - todo!() - } -} diff --git a/vortex-array/src/array/typed/compress.rs b/vortex-array/src/array/typed/compress.rs index eea51184d6..96f3e50b72 100644 --- a/vortex-array/src/array/typed/compress.rs +++ b/vortex-array/src/array/typed/compress.rs @@ -2,6 +2,7 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::typed::{TypedArray, TypedEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use crate::error::VortexResult; impl EncodingCompression for TypedEncoding { fn compressor( @@ -9,24 +10,24 @@ impl EncodingCompression for TypedEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(typed_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(typed_compressor as Compressor)) } } -fn typed_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn typed_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let typed_array = array.as_typed(); let typed_like = like.map(|like_array| like_array.as_typed()); - TypedArray::new( + Ok(TypedArray::new( ctx.compress( typed_array.untyped_array(), typed_like.map(|typed_arr| typed_arr.untyped_array()), - ), + )?, array.dtype().clone(), ) - .boxed() + .boxed()) } diff --git a/vortex-array/src/array/varbin/compress.rs b/vortex-array/src/array/varbin/compress.rs index ea74370e2b..0ffb38e363 100644 --- a/vortex-array/src/array/varbin/compress.rs +++ b/vortex-array/src/array/varbin/compress.rs @@ -2,6 +2,7 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::varbin::{VarBinArray, VarBinEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use crate::error::VortexResult; impl EncodingCompression for VarBinEncoding { fn compressor( @@ -9,31 +10,32 @@ impl EncodingCompression for VarBinEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(varbin_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(varbin_compressor as Compressor)) } } -fn varbin_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn varbin_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let varbin_array = array.as_varbin(); let varbin_like = like.map(|like_array| like_array.as_varbin()); - VarBinArray::new( + Ok(VarBinArray::new( ctx.compress( varbin_array.offsets(), varbin_like.map(|typed_arr| typed_arr.offsets()), - ), + )?, ctx.compress( varbin_array.bytes(), varbin_like.map(|typed_arr| typed_arr.bytes()), - ), + )?, array.dtype().clone(), varbin_array .validity() - .map(|v| ctx.compress(v.as_ref(), varbin_like.and_then(|vblike| vblike.validity()))), + .map(|v| ctx.compress(v.as_ref(), varbin_like.and_then(|v| v.validity()))) + .transpose()?, ) - .boxed() + .boxed()) } diff --git a/vortex-array/src/array/varbinview/compress.rs b/vortex-array/src/array/varbinview/compress.rs index 9c384e5c39..2d73bacb9a 100644 --- a/vortex-array/src/array/varbinview/compress.rs +++ b/vortex-array/src/array/varbinview/compress.rs @@ -2,7 +2,9 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::varbinview::{VarBinViewArray, VarBinViewEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; -use rayon::prelude::*; +use crate::error::VortexResult; +use itertools::Itertools; +use std::ops::Deref; impl EncodingCompression for VarBinViewEncoding { fn compressor( @@ -10,11 +12,7 @@ impl EncodingCompression for VarBinViewEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&'static Compressor> { - if array.encoding().id() == &Self::ID { - Some(&(varbinview_compressor as Compressor)) - } else { - None - } + (array.encoding().id() == &Self::ID).then_some(&(varbinview_compressor as Compressor)) } } @@ -22,36 +20,36 @@ fn varbinview_compressor( array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx, -) -> ArrayRef { +) -> VortexResult { let varbinview_array = array.as_varbinview(); let varbinview_like = like.map(|like_array| like_array.as_varbinview()); - VarBinViewArray::new( + Ok(VarBinViewArray::new( // TODO(robert): Can we compress views? Not right now dyn_clone::clone_box(varbinview_array.views()), - varbinview_like - .map(|vbvlike| { - varbinview_array - .data() - .par_iter() - .zip_eq(vbvlike.data()) - .map(|(d, dlike)| ctx.compress(d.as_ref(), Some(dlike.as_ref()))) - .collect() + varbinview_array + .data() + .iter() + .enumerate() + .map(|(i, d)| { + ctx.compress( + d.as_ref(), + varbinview_like + .and_then(|v| v.data().get(i)) + .map(Deref::deref), + ) }) - .unwrap_or_else(|| { - varbinview_array - .data() - .par_iter() - .map(|d| ctx.compress(d.as_ref(), None)) - .collect() - }), + .try_collect()?, array.dtype().clone(), - varbinview_array.validity().map(|v| { - ctx.compress( - v.as_ref(), - varbinview_like.and_then(|vbvlike| vbvlike.validity()), - ) - }), + varbinview_array + .validity() + .map(|v| { + ctx.compress( + v.as_ref(), + varbinview_like.and_then(|vbvlike| vbvlike.validity()), + ) + }) + .transpose()?, ) - .boxed() + .boxed()) } diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 518e31fc32..5d291674a6 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -4,17 +4,21 @@ use std::fmt::Debug; use log::debug; use once_cell::sync::Lazy; -use crate::array::constant::ConstantEncoding; +use crate::array::constant::{ConstantArray, ConstantEncoding}; use crate::array::{Array, ArrayRef, Encoding, EncodingId, ENCODINGS}; use crate::compute; +use crate::compute::scalar_at::scalar_at; +use crate::error::VortexResult; use crate::sampling::stratified_slices; +use crate::stats::Stat; pub trait EncodingCompression { fn compressor(&self, array: &dyn Array, config: &CompressConfig) -> Option<&'static Compressor>; } -pub type Compressor = fn(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef; +pub type Compressor = + fn(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> VortexResult; #[derive(Debug, Clone)] pub struct CompressConfig { @@ -86,25 +90,24 @@ impl<'a> CompressCtx<'a> { Self { options, depth: 0 } } - pub fn compress(&self, arr: &dyn Array, like: Option<&dyn Array>) -> ArrayRef { + pub fn compress(&self, arr: &dyn Array, like: Option<&dyn Array>) -> VortexResult { if arr.is_empty() { - return dyn_clone::clone_box(arr); + return Ok(dyn_clone::clone_box(arr)); } if self.depth >= self.options.max_depth { - return dyn_clone::clone_box(arr); + return Ok(dyn_clone::clone_box(arr)); } - let encoding = like.unwrap_or(arr).encoding(); - if self.options.is_enabled(encoding.id()) { - encoding + // Attempt to compress using the "like" array, otherwise fall back to sampled compression + match like { + Some(l) => l + .encoding() .compression() - .and_then(|compression| compression.compressor(arr, self.options)) + .and_then(|c| c.compressor(arr, self.options)) .map(|compressor| compressor(arr, like, self.clone())) - .unwrap_or_else(|| dyn_clone::clone_box(arr)) - } else { - debug!("Skipping {}: disabled", encoding.id()); - dyn_clone::clone_box(arr) + .unwrap_or_else(|| Ok(dyn_clone::clone_box(arr))), + None => sampled_compression(arr, self.clone()), } } @@ -126,16 +129,19 @@ impl Default for CompressCtx<'_> { } } -pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> ArrayRef { - // First, we try constant compression - if let Some(compressor) = ConstantEncoding.compressor(array, ctx.options()) { - return compressor(array, None, ctx); +pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> VortexResult { + // First, we try constant compression and shortcut any sampling. + if !array.is_empty() + && array + .stats() + .get_or_compute_as::(&Stat::IsConstant) + .unwrap_or(false) + { + return Ok(ConstantArray::new(scalar_at(array, 0)?, array.len()).boxed()); } let candidate_compressors: Vec<&Compressor> = ENCODINGS .iter() - // TODO(robert): Avoid own encoding to avoid infinite recursion - .filter(|encoding| encoding.id().name() != array.encoding().id().name()) .filter(|encoding| ctx.options().is_enabled(encoding.id())) .filter_map(|encoding| encoding.compression()) .filter_map(|compression| compression.compressor(array, ctx.options())) @@ -147,25 +153,27 @@ pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> ArrayRef { array.dtype(), array.encoding().id(), ); - return dyn_clone::clone_box(array); + return Ok(dyn_clone::clone_box(array)); } if array.len() < ctx.options.block_size as usize { // We're either in a sample or we're operating over a sufficiently small array. - let (_, compressed_sample) = candidate_compressors.iter().fold( - (array.nbytes(), None), - |(compressed_bytes, curr_best), compressor| { - let compressed = compressor(array, None, ctx.next_level()); - - if compressed.nbytes() < compressed_bytes { - (compressed.nbytes(), Some(compressed)) - } else { - (compressed_bytes, curr_best) - } - }, - ); + let sampling_result: VortexResult<(usize, Option)> = + candidate_compressors.iter().try_fold( + (array.nbytes(), None), + |(compressed_bytes, curr_best), compressor| { + let compressed = compressor(array, None, ctx.next_level())?; + + if compressed.nbytes() < compressed_bytes { + Ok((compressed.nbytes(), Some(compressed))) + } else { + Ok((compressed_bytes, curr_best)) + } + }, + ); + let (_, compressed_sample) = sampling_result?; - return compressed_sample + return Ok(compressed_sample .map(|s| { debug!( "Compressed small array with dtype: {} and encoding: {}, using: {}", @@ -175,7 +183,7 @@ pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> ArrayRef { ); s }) - .unwrap_or_else(|| dyn_clone::clone_box(array)); + .unwrap_or_else(|| dyn_clone::clone_box(array))); } // Otherwise, take the sample and try each compressor on it. @@ -188,32 +196,29 @@ pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> ArrayRef { .into_iter() .map(|(start, stop)| array.slice(start, stop).unwrap()) .collect(), - ) - // FIXME(ngates): errors - .unwrap(); - - let compression_ratios: Vec<(ArrayRef, f32)> = candidate_compressors - .into_iter() - .map(|compressor| { - let compressed_sample = compressor(sample.as_ref(), None, ctx.clone()); - let compression_ratio = compressed_sample.nbytes() as f32 / sample.nbytes() as f32; - (compressed_sample, compression_ratio) - }) - .collect(); + )?; + + let mut best_sample = None; + let mut best_ratio = 1.0; + for compressor in candidate_compressors { + let compressed_sample = compressor(sample.as_ref(), None, ctx.clone())?; + let compression_ratio = compressed_sample.nbytes() as f32 / sample.nbytes() as f32; + if compression_ratio < best_ratio { + best_sample = Some(compressed_sample); + best_ratio = compression_ratio; + } + } - compression_ratios - .into_iter() - .filter(|(_, ratio)| *ratio < 1.0) - .min_by(|(_, first_ratio), (_, second_ratio)| first_ratio.total_cmp(second_ratio)) - .map(|(sample, _)| { - let c = ctx.next_level().compress(array, Some(sample.as_ref())); + best_sample + .map(|s| { debug!( - "Compressed array with dtype: {} and encoding: {} using: {}", + "Compressing array with dtype: {} and encoding: {}, using: {}", array.dtype(), array.encoding().id(), - c.encoding().id() + s.encoding().id() ); - c + // TODO(ngates): is this next_level? + ctx.next_level().compress(array, Some(s.as_ref())) }) - .unwrap_or_else(|| dyn_clone::clone_box(array)) + .unwrap_or_else(|| Ok(dyn_clone::clone_box(array))) } diff --git a/vortex-array/src/stats.rs b/vortex-array/src/stats.rs index e1fb4cfe4e..0ccf75a21e 100644 --- a/vortex-array/src/stats.rs +++ b/vortex-array/src/stats.rs @@ -209,7 +209,9 @@ impl StatsSet { } pub trait StatsCompute { - fn compute(&self, stat: &Stat) -> VortexResult; + fn compute(&self, _stat: &Stat) -> VortexResult { + Ok(StatsSet::new()) + } } pub struct Stats<'a> { diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index b6264512de..ce6ef03372 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -13,6 +13,7 @@ use vortex::array::{Array, ArrayKind, ArrayRef, CloneOptionalArray}; use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; use vortex::compute::scalar_at::scalar_at; use vortex::dtype::DType; +use vortex::error::VortexResult; use vortex::match_each_native_ptype; use vortex::ptype::NativePType; use vortex::scalar::AsBytes; @@ -56,7 +57,11 @@ impl PartialEq for Value { impl Eq for Value {} -fn dict_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn dict_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let dict_like = like.map(|like_arr| like_arr.as_dict()); let (codes, dict) = match ArrayKind::from(array) { @@ -64,25 +69,25 @@ fn dict_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx let (codes, dict) = dict_encode_primitive(p); ( ctx.next_level() - .compress(codes.as_ref(), dict_like.map(|dict| dict.codes())), + .compress(codes.as_ref(), dict_like.map(|dict| dict.codes()))?, ctx.next_level() - .compress(dict.as_ref(), dict_like.map(|dict| dict.dict())), + .compress(dict.as_ref(), dict_like.map(|dict| dict.dict()))?, ) } ArrayKind::VarBin(vb) => { let (codes, dict) = dict_encode_varbin(vb); ( ctx.next_level() - .compress(codes.as_ref(), dict_like.map(|dict| dict.codes())), + .compress(codes.as_ref(), dict_like.map(|dict| dict.codes()))?, ctx.next_level() - .compress(dict.as_ref(), dict_like.map(|dict| dict.dict())), + .compress(dict.as_ref(), dict_like.map(|dict| dict.dict()))?, ) } _ => unreachable!("This array kind should have been filtered out"), }; - DictArray::new(codes, dict).boxed() + Ok(DictArray::new(codes, dict).boxed()) } // TODO(robert): Use distinct count instead of len for width estimation diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index af7f287e92..187a4bbeef 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -7,6 +7,7 @@ use vortex::array::primitive::PrimitiveArray; use vortex::array::sparse::SparseArray; use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use vortex::error::VortexResult; use vortex::match_each_integer_ptype; use vortex::ptype::{NativePType, PType}; use vortex::scalar::ListScalarVec; @@ -55,7 +56,11 @@ impl EncodingCompression for BitPackedEncoding { } } -fn bitpacked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn bitpacked_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let parray = array.as_primitive(); let bit_width_freq = parray .stats() @@ -79,27 +84,27 @@ fn bitpacked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: Compre let validity = parray .validity() - .map(|v| ctx.compress(v.as_ref(), like_bp.and_then(|bp| bp.validity()))); + .map(|v| ctx.compress(v.as_ref(), like_bp.and_then(|bp| bp.validity()))) + .transpose()?; let patches = if num_exceptions > 0 { Some(ctx.compress( bitpack_patches(parray, bit_width, num_exceptions).as_ref(), like_bp.and_then(|bp| bp.patches()), - )) + )?) } else { None }; - return BitPackedArray::try_new( + Ok(BitPackedArray::try_new( packed, validity, patches, bit_width, parray.dtype().clone(), parray.len(), - ) - .unwrap() - .boxed(); + )? + .boxed()) } fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> ArrayRef { @@ -219,10 +224,12 @@ mod test { ); let ctx = CompressCtx::new(&cfg); - let compressed = ctx.compress( - &PrimitiveArray::from(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))), - None, - ); + let compressed = ctx + .compress( + &PrimitiveArray::from(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))), + None, + ) + .unwrap(); assert_eq!(compressed.encoding().id(), BitPackedEncoding.id()); let bp = compressed .as_any() diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index e8480e7b68..397f464d39 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -5,6 +5,7 @@ use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use vortex::error::VortexResult; use vortex::match_each_integer_ptype; use vortex::stats::Stat; @@ -39,7 +40,11 @@ impl EncodingCompression for FoREncoding { } } -fn for_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn for_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let parray = array.as_primitive(); let child = match_each_integer_ptype!(parray.ptype(), |$T| { @@ -56,11 +61,9 @@ fn for_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) let compressed_child = ctx.compress( child.as_ref(), like.map(|l| l.as_any().downcast_ref::().unwrap().child()), - ); + )?; let reference = parray.stats().get(&Stat::Min).unwrap(); - FoRArray::try_new(compressed_child, reference) - .unwrap() - .boxed() + Ok(FoRArray::try_new(compressed_child, reference)?.boxed()) } #[cfg(test)] @@ -90,7 +93,7 @@ mod test { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = ctx.compress(&array, None); + let compressed = ctx.compress(&array, None).unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); let fa = compressed.as_any().downcast_ref::().unwrap(); assert_eq!(fa.reference().try_into(), Ok(1_000_000u32)); diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 72e28a0f2d..1393115bd4 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -33,27 +33,32 @@ impl EncodingCompression for REEEncoding { } } -fn ree_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn ree_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let ree_like = like.map(|like_arr| like_arr.as_ree()); let primitive_array = array.as_primitive(); let (ends, values) = ree_encode(primitive_array); let compressed_ends = ctx .next_level() - .compress(ends.as_ref(), ree_like.map(|ree| ree.ends())); + .compress(ends.as_ref(), ree_like.map(|ree| ree.ends()))?; let compressed_values = ctx .next_level() - .compress(values.as_ref(), ree_like.map(|ree| ree.values())); + .compress(values.as_ref(), ree_like.map(|ree| ree.values()))?; - REEArray::new( + Ok(REEArray::new( compressed_ends, compressed_values, primitive_array .validity() - .map(|v| ctx.compress(v, ree_like.and_then(|r| r.validity()))), + .map(|v| ctx.compress(v, ree_like.and_then(|r| r.validity()))) + .transpose()?, array.len(), ) - .boxed() + .boxed()) } pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) { diff --git a/vortex-ree/src/lib.rs b/vortex-ree/src/lib.rs index c6e878a3e1..1999271823 100644 --- a/vortex-ree/src/lib.rs +++ b/vortex-ree/src/lib.rs @@ -8,7 +8,6 @@ mod compute; mod downcast; mod ree; mod serde; -mod stats; #[distributed_slice(ENCODINGS)] static ENCODINGS_REE: EncodingRef = &REEEncoding; diff --git a/vortex-ree/src/ree.rs b/vortex-ree/src/ree.rs index fc3e0324d4..e3764f0c95 100644 --- a/vortex-ree/src/ree.rs +++ b/vortex-ree/src/ree.rs @@ -22,7 +22,7 @@ use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::ptype::NativePType; use vortex::serde::{ArraySerde, EncodingSerde}; -use vortex::stats::{Stat, Stats, StatsSet}; +use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; use crate::compress::{ree_decode_primitive, ree_encode}; @@ -216,6 +216,8 @@ impl Array for REEArray { } } +impl StatsCompute for REEArray {} + impl<'arr> AsRef<(dyn Array + 'arr)> for REEArray { fn as_ref(&self) -> &(dyn Array + 'arr) { self diff --git a/vortex-ree/src/stats.rs b/vortex-ree/src/stats.rs deleted file mode 100644 index 7b16fda239..0000000000 --- a/vortex-ree/src/stats.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::REEArray; -use vortex::error::VortexResult; -use vortex::stats::{Stat, StatsCompute, StatsSet}; - -impl StatsCompute for REEArray { - fn compute(&self, _stat: &Stat) -> VortexResult { - todo!() - } -} diff --git a/vortex-roaring/src/boolean/compress.rs b/vortex-roaring/src/boolean/compress.rs index b45abc0b44..0e9c904970 100644 --- a/vortex-roaring/src/boolean/compress.rs +++ b/vortex-roaring/src/boolean/compress.rs @@ -6,6 +6,7 @@ use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; use vortex::dtype::DType; use vortex::dtype::Nullability::NonNullable; +use vortex::error::VortexResult; use crate::boolean::{RoaringBoolArray, RoaringBoolEncoding}; @@ -33,8 +34,12 @@ impl EncodingCompression for RoaringBoolEncoding { } } -fn roaring_compressor(array: &dyn Array, _like: Option<&dyn Array>, _ctx: CompressCtx) -> ArrayRef { - roaring_encode(array.as_bool()).boxed() +fn roaring_compressor( + array: &dyn Array, + _like: Option<&dyn Array>, + _ctx: CompressCtx, +) -> VortexResult { + Ok(roaring_encode(array.as_bool()).boxed()) } pub fn roaring_encode(bool_array: &BoolArray) -> RoaringBoolArray { diff --git a/vortex-roaring/src/integer/compress.rs b/vortex-roaring/src/integer/compress.rs index 715e440938..d6985b219c 100644 --- a/vortex-roaring/src/integer/compress.rs +++ b/vortex-roaring/src/integer/compress.rs @@ -9,6 +9,7 @@ use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompress use vortex::dtype::DType; use vortex::dtype::Nullability::NonNullable; use vortex::dtype::Signedness::Unsigned; +use vortex::error::VortexResult; use vortex::ptype::{NativePType, PType}; use vortex::stats::Stat; @@ -55,8 +56,8 @@ fn roaring_int_compressor( array: &dyn Array, _like: Option<&dyn Array>, _ctx: CompressCtx, -) -> ArrayRef { - roaring_encode(array.as_primitive()).boxed() +) -> VortexResult { + Ok(roaring_encode(array.as_primitive()).boxed()) } pub fn roaring_encode(primitive_array: &PrimitiveArray) -> RoaringIntArray { diff --git a/vortex-zigzag/src/compress.rs b/vortex-zigzag/src/compress.rs index 0b611d1ca6..15f737a56f 100644 --- a/vortex-zigzag/src/compress.rs +++ b/vortex-zigzag/src/compress.rs @@ -36,18 +36,22 @@ impl EncodingCompression for ZigZagEncoding { } } -fn zigzag_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { +fn zigzag_compressor( + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, +) -> VortexResult { let zigzag_like = like.map(|like_arr| like_arr.as_zigzag()); let encoded = match ArrayKind::from(array) { ArrayKind::Primitive(p) => zigzag_encode(p), _ => unreachable!("This array kind should have been filtered out"), }; - ZigZagArray::new( + Ok(ZigZagArray::new( ctx.next_level() - .compress(encoded.unwrap().encoded(), zigzag_like.map(|z| z.encoded())), + .compress(encoded.unwrap().encoded(), zigzag_like.map(|z| z.encoded()))?, ) - .boxed() + .boxed()) } pub fn zigzag_encode(parray: &PrimitiveArray) -> VortexResult {