diff --git a/Cargo.lock b/Cargo.lock index f22e5c2493..5a16dcc172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4597,6 +4597,7 @@ dependencies = [ "arbitrary", "chrono", "fsst-rs", + "itertools 0.13.0", "lazy_static", "log", "rand", diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index 6e097f1cd6..e616de8b58 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -16,6 +16,7 @@ readme = { workspace = true } [dependencies] arbitrary = { workspace = true, optional = true } fsst-rs = { workspace = true } +itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs new file mode 100644 index 0000000000..23ff730d17 --- /dev/null +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -0,0 +1,156 @@ +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +use log::warn; +use vortex::array::{Chunked, ChunkedArray}; +use vortex::encoding::EncodingRef; +use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; +use vortex_error::{vortex_bail, VortexResult}; + +use super::EncoderMetadata; +use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; +use crate::SamplingCompressor; + +#[derive(Debug)] +pub struct ChunkedCompressor { + relatively_good_ratio: f32, +} + +pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor { + relatively_good_ratio: 1.2, +}; + +pub struct ChunkedCompressorMetadata(Option); + +impl EncoderMetadata for ChunkedCompressorMetadata { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl EncodingCompressor for ChunkedCompressor { + fn id(&self) -> &str { + Chunked::ID.as_ref() + } + + fn cost(&self) -> u8 { + 0 + } + + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { + array.is_encoding(Chunked::ID).then_some(self) + } + + fn compress<'a>( + &'a self, + array: &Array, + like: Option>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let chunked_array = ChunkedArray::try_from(array)?; + let like_and_ratio = like_into_parts(like)?; + self.compress_chunked(&chunked_array, like_and_ratio, ctx) + } + + fn used_encodings(&self) -> HashSet { + HashSet::from([]) + } +} + +fn like_into_parts( + tree: Option>, +) -> VortexResult, f32)>> { + let (_, mut children, metadata) = match tree { + None => return Ok(None), + Some(tree) => tree.into_parts(), + }; + + let Some(target_ratio) = metadata else { + vortex_bail!("chunked array compression tree must have metadata") + }; + + let Some(ChunkedCompressorMetadata(target_ratio)) = + target_ratio.as_ref().as_any().downcast_ref() + else { + vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata") + }; + + if children.len() != 1 { + vortex_bail!("chunked array compression tree must have one child") + } + + let child = children.remove(0); + + match (child, target_ratio) { + (None, None) => Ok(None), + (Some(child), Some(ratio)) => Ok(Some((child, *ratio))), + (..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"), + } +} + +impl ChunkedCompressor { + /// How far the compression ratio is allowed to grow from one chunk to another chunk. + /// + /// As long as a compressor compresses subsequent chunks "reasonably well" we should continue to + /// use it, which saves us the cost of searching for a good compressor. This constant quantifies + /// "reasonably well" as + /// + /// ```text + /// new_ratio <= old_ratio * self.relatively_good_ratio + /// ``` + fn relatively_good_ratio(&self) -> f32 { + self.relatively_good_ratio + } + + fn compress_chunked<'a>( + &'a self, + array: &ChunkedArray, + mut previous: Option<(CompressionTree<'a>, f32)>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let less_chunked = array.rechunk( + ctx.options().target_block_bytesize, + ctx.options().target_block_size, + )?; + let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); + for (index, chunk) in less_chunked.chunks().enumerate() { + let like = previous.as_ref().map(|(like, _)| like); + let (compressed_chunk, tree) = ctx + .named(&format!("chunk-{}", index)) + .compress(&chunk, like)? + .into_parts(); + + let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); + let exceeded_target_ratio = previous + .as_ref() + .map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio()) + .unwrap_or(false); + + if ratio > 1.0 || exceeded_target_ratio { + warn!("unsatisfactory ratio {} {:?}", ratio, previous); + let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); + let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); + previous = tree.map(|tree| (tree, new_ratio)); + compressed_chunks.push(compressed_chunk); + } else { + previous = previous.or_else(|| tree.map(|tree| (tree, ratio))); + compressed_chunks.push(compressed_chunk); + } + } + + let (child, ratio) = match previous { + Some((child, ratio)) => (Some(child), Some(ratio)), + None => (None, None), + }; + + Ok(CompressedArray::new( + ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), + Some(CompressionTree::new_with_metadata( + self, + vec![child], + Arc::new(ChunkedCompressorMetadata(ratio)), + )), + )) + } +} diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 285332672c..69e27ac3e8 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -63,7 +63,7 @@ impl EncodingCompressor for FSSTCompressor { // between 2-3x depending on the text quality. // // It's not worth running a full compression step unless the array is large enough. - if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE { + if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE { return Ok(CompressedArray::uncompressed(array.clone())); } diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 3b087df2dd..132f0cbbac 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -13,6 +13,7 @@ use crate::SamplingCompressor; pub mod alp; pub mod alp_rd; pub mod bitpacked; +pub mod chunked; pub mod constant; pub mod date_time_parts; pub mod delta; @@ -23,6 +24,7 @@ pub mod roaring_bool; pub mod roaring_int; pub mod runend; pub mod sparse; +pub mod struct_; pub mod zigzag; pub trait EncodingCompressor: Sync + Send + Debug { @@ -162,6 +164,17 @@ impl<'a> CompressionTree<'a> { .filter_map(|child| child.as_ref().map(|c| c.num_descendants() + 1)) .sum::() } + + #[allow(clippy::type_complexity)] + pub fn into_parts( + self, + ) -> ( + &'a dyn EncodingCompressor, + Vec>>, + Option>, + ) { + (self.compressor, self.children, self.metadata) + } } #[derive(Debug, Clone)] @@ -199,6 +212,7 @@ impl<'a> CompressedArray<'a> { self.path } + #[inline] pub fn into_parts(self) -> (Array, Option>) { (self.array, self.path) } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs new file mode 100644 index 0000000000..9225f68540 --- /dev/null +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -0,0 +1,66 @@ +use std::collections::HashSet; + +use itertools::Itertools; +use vortex::array::{Struct, StructArray}; +use vortex::encoding::EncodingRef; +use vortex::variants::StructArrayTrait; +use vortex::{Array, ArrayDef, IntoArray}; +use vortex_error::VortexResult; + +use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; +use crate::SamplingCompressor; + +#[derive(Debug)] +pub struct StructCompressor; + +impl EncodingCompressor for StructCompressor { + fn id(&self) -> &str { + Struct::ID.as_ref() + } + + fn cost(&self) -> u8 { + 0 + } + + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { + StructArray::try_from(array) + .ok() + .map(|_| self as &dyn EncodingCompressor) + } + + fn compress<'a>( + &'a self, + array: &Array, + like: Option>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let array = StructArray::try_from(array)?; + let compressed_validity = ctx.compress_validity(array.validity())?; + + let children_trees = match like { + Some(tree) => tree.children, + None => vec![None; array.nfields()], + }; + + let (arrays, trees) = array + .children() + .zip_eq(children_trees) + .map(|(array, like)| ctx.compress(&array, like.as_ref())) + .process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; + + Ok(CompressedArray::new( + StructArray::try_new( + array.names().clone(), + arrays, + array.len(), + compressed_validity, + )? + .into_array(), + Some(CompressionTree::new(self, trees)), + )) + } + + fn used_encodings(&self) -> HashSet { + HashSet::from([]) + } +} diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 1ad5083ef5..9714689e49 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -2,18 +2,19 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use compressors::bitpacked::BITPACK_WITH_PATCHES; +use compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; use compressors::fsst::FSSTCompressor; +use compressors::struct_::StructCompressor; use lazy_static::lazy_static; use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; -use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray}; +use vortex::array::{ChunkedArray, Constant}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; use vortex::encoding::EncodingRef; use vortex::validity::Validity; -use vortex::variants::StructArrayTrait; -use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayDef, IntoCanonical}; use vortex_error::{VortexExpect as _, VortexResult}; use crate::compressors::alp::ALPCompressor; @@ -241,144 +242,104 @@ impl<'a> SamplingCompressor<'a> { } } - fn compress_array(&self, arr: &Array) -> VortexResult> { - match arr.encoding().id() { - Chunked::ID => { - let chunked = ChunkedArray::try_from(arr)?; - let less_chunked = chunked.rechunk( - self.options().target_block_bytesize, - self.options().target_block_size, - )?; - let compressed_chunks = less_chunked - .chunks() - .map(|chunk| { - self.compress_array(&chunk) - .map(compressors::CompressedArray::into_array) - }) - .collect::>>()?; - Ok(CompressedArray::uncompressed( - ChunkedArray::try_new(compressed_chunks, chunked.dtype().clone())?.into_array(), - )) - } - Constant::ID => { - // Not much better we can do than constant! - Ok(CompressedArray::uncompressed(arr.clone())) - } - Struct::ID => { - // For struct arrays, we compress each field individually - let strct = StructArray::try_from(arr)?; - let compressed_fields = strct - .children() - .map(|field| { - self.compress_array(&field) - .map(compressors::CompressedArray::into_array) - }) - .collect::>>()?; - let validity = self.compress_validity(strct.validity())?; - Ok(CompressedArray::uncompressed( - StructArray::try_new( - strct.names().clone(), - compressed_fields, - strct.len(), - validity, - )? - .into_array(), - )) - } - _ => { - // Otherwise, we run sampled compression over pluggable encodings - let mut rng = StdRng::seed_from_u64(self.options.rng_seed); - let sampled = sampled_compression(arr, self, &mut rng)?; - Ok(sampled.unwrap_or_else(|| CompressedArray::uncompressed(arr.clone()))) - } + fn compress_array(&self, array: &Array) -> VortexResult> { + let mut rng = StdRng::seed_from_u64(self.options.rng_seed); + + if array.encoding().id() == Constant::ID { + // Not much better we can do than constant! + return Ok(CompressedArray::uncompressed(array.clone())); } - } -} -fn sampled_compression<'a>( - array: &Array, - compressor: &SamplingCompressor<'a>, - rng: &mut StdRng, -) -> VortexResult>> { - // First, we try constant compression and shortcut any sampling. - if let Some(cc) = ConstantCompressor.can_compress(array) { - return cc.compress(array, None, compressor.clone()).map(Some); - } + if let Some(cc) = DEFAULT_CHUNKED_COMPRESSOR.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - let mut candidates: Vec<&dyn EncodingCompressor> = compressor - .compressors - .iter() - .filter(|&encoding| !compressor.disabled_compressors.contains(encoding)) - .filter(|compression| { - if compression.can_compress(array).is_some() { - if compressor.depth + compression.cost() > compressor.options.max_cost { - debug!( - "{} skipping encoding {} due to depth", - compressor, - compression.id() - ); - return false; - } - true - } else { - false - } - }) - .copied() - .collect(); - debug!("{} candidates for {}: {:?}", compressor, array, candidates); + if let Some(cc) = StructCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - if candidates.is_empty() { - debug!( - "{} no compressors for array with dtype: {} and encoding: {}", - compressor, - array.dtype(), - array.encoding().id(), - ); - return Ok(None); - } + if let Some(cc) = ConstantCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - // We prefer all other candidates to the array's own encoding. - // This is because we assume that the array's own encoding is the least efficient, but useful - // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded - // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the - // dictionary, but we still have a large offsets array that should be compressed. - // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin - // encodings, e.g. FSST. - if candidates.len() > 1 { - candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); - } + let (mut candidates, too_deep) = self + .compressors + .iter() + .filter(|&encoding| !self.disabled_compressors.contains(encoding)) + .filter(|&encoding| encoding.can_compress(array).is_some()) + .partition::, _>(|&encoding| { + self.depth + encoding.cost() <= self.options.max_cost + }); + + if !too_deep.is_empty() { + debug!( + "{} skipping encodings due to depth/cost: {}", + self, + too_deep + .iter() + .map(|x| x.id()) + .collect::>() + .join(", ") + ); + } - if array.len() - <= (compressor.options.sample_size as usize * compressor.options.sample_count as usize) - { - // We're either already within a sample, or we're operating over a sufficiently small array. - return find_best_compression(candidates, array, compressor).map(Some); - } + info!("{} candidates for {}: {:?}", self, array, candidates); + + if candidates.is_empty() { + debug!( + "{} no compressors for array with dtype: {} and encoding: {}", + self, + array.dtype(), + array.encoding().id(), + ); + return Ok(CompressedArray::uncompressed(array.clone())); + } - // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = ChunkedArray::try_new( - stratified_slices( - array.len(), - compressor.options.sample_size, - compressor.options.sample_count, - rng, - ) - .into_iter() - .map(|(start, stop)| slice(array, start, stop)) - .collect::>>()?, - array.dtype().clone(), - )? - .into_canonical()? - .into(); - - find_best_compression(candidates, &sample, compressor)? - .into_path() - .map(|best_compressor| { - info!("Compressing array {} with {}", array, best_compressor); - best_compressor.compress_unchecked(array, compressor) - }) - .transpose() + // We prefer all other candidates to the array's own encoding. + // This is because we assume that the array's own encoding is the least efficient, but useful + // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded + // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the + // dictionary, but we still have a large offsets array that should be compressed. + // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin + // encodings, e.g. FSST. + if candidates.len() > 1 { + candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); + } + + if array.len() <= (self.options.sample_size as usize * self.options.sample_count as usize) { + // We're either already within a sample, or we're operating over a sufficiently small array. + return find_best_compression(candidates, array, self); + } + + // Take a sample of the array, then ask codecs for their best compression estimate. + let sample = ChunkedArray::try_new( + stratified_slices( + array.len(), + self.options.sample_size, + self.options.sample_count, + &mut rng, + ) + .into_iter() + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .into_canonical()? + .into(); + + let best = find_best_compression(candidates, &sample, self)? + .into_path() + .map(|best_compressor| { + info!( + "{} Compressing array {} with {}", + self, array, best_compressor + ); + best_compressor.compress_unchecked(array, self) + }) + .transpose()?; + + Ok(best.unwrap_or_else(|| CompressedArray::uncompressed(array.clone()))) + } } fn find_best_compression<'a>(