From 2e8183502a8ef1c5a3a382c37d69cf133d50edc7 Mon Sep 17 00:00:00 2001 From: Dan King Date: Wed, 11 Sep 2024 12:27:15 -0400 Subject: [PATCH] greedily combine chunks before compressing (#783) feat: before compressing, collapse chunks of a chunked array targeting chunks of 16 MiB or 64Ki rows. --- encodings/roaring/src/boolean/mod.rs | 11 +- vortex-array/src/array/assertions.rs | 19 +++ vortex-array/src/array/chunked/mod.rs | 114 +++++++++++++++++- vortex-array/src/array/mod.rs | 1 + .../src/compressors/roaring_bool.rs | 3 +- vortex-sampling-compressor/src/lib.rs | 13 +- vortex-sampling-compressor/tests/smoketest.rs | 104 ++++++++++++++++ 7 files changed, 256 insertions(+), 9 deletions(-) create mode 100644 vortex-array/src/array/assertions.rs diff --git a/encodings/roaring/src/boolean/mod.rs b/encodings/roaring/src/boolean/mod.rs index 3b7a7883b3..0424877ca0 100644 --- a/encodings/roaring/src/boolean/mod.rs +++ b/encodings/roaring/src/boolean/mod.rs @@ -60,7 +60,7 @@ impl RoaringBoolArray { if array.encoding().id() == Bool::ID { roaring_bool_encode(BoolArray::try_from(array)?).map(|a| a.into_array()) } else { - vortex_bail!("RoaringInt can only encode boolean arrays") + vortex_bail!("RoaringBool can only encode boolean arrays") } } } @@ -84,12 +84,17 @@ impl BoolArrayTrait for RoaringBoolArray { } impl AcceptArrayVisitor for RoaringBoolArray { - fn accept(&self, _visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { + fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { // TODO(ngates): should we store a buffer in memory? Or delay serialization? // Or serialize into metadata? The only reason we support buffers is so we can write to // the wire without copying into FlatBuffers. But if we need to allocate to serialize // the bitmap anyway, then may as well shove it into metadata. - todo!() + visitor.visit_buffer( + self.array() + .buffer() + .ok_or(vortex_err!("roaring bool should have a buffer"))?, + )?; + Ok(()) } } diff --git a/vortex-array/src/array/assertions.rs b/vortex-array/src/array/assertions.rs new file mode 100644 index 0000000000..083b59d63e --- /dev/null +++ b/vortex-array/src/array/assertions.rs @@ -0,0 +1,19 @@ +#[macro_export] +macro_rules! assert_arrays_eq { + ($expected:expr, $actual:expr) => { + let expected: Array = $expected.into(); + let actual: Array = $actual.into(); + assert_eq!(expected.dtype(), actual.dtype()); + + let expected_contents = (0..expected.len()) + .map(|idx| scalar_at(&expected, idx).map(|x| x.into_value())) + .collect::>>() + .unwrap(); + let actual_contents = (0..actual.len()) + .map(|idx| scalar_at(&expected, idx).map(|x| x.into_value())) + .collect::>>() + .unwrap(); + + assert_eq!(expected_contents, actual_contents); + }; +} diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index a687d8c784..1dc993b23a 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -18,7 +18,7 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::Validity::NonNullable; use crate::validity::{ArrayValidity, LogicalValidity, Validity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray}; +use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, IntoCanonical}; mod canonical; mod compute; @@ -123,6 +123,50 @@ impl ChunkedArray { pub fn array_stream(&self) -> impl ArrayStream + '_ { ArrayStreamAdapter::new(self.dtype().clone(), stream::iter(self.chunks().map(Ok))) } + + pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult { + let mut new_chunks = Vec::new(); + let mut chunks_to_combine = Vec::new(); + let mut new_chunk_n_bytes = 0; + let mut new_chunk_n_elements = 0; + for chunk in self.chunks() { + let n_bytes = chunk.nbytes(); + let n_elements = chunk.len(); + + if (new_chunk_n_bytes + n_bytes > target_bytesize + || new_chunk_n_elements + n_elements > target_rowsize) + && !chunks_to_combine.is_empty() + { + new_chunks.push( + ChunkedArray::try_new(chunks_to_combine, self.dtype().clone())? + .into_canonical()? + .into(), + ); + + new_chunk_n_bytes = 0; + new_chunk_n_elements = 0; + chunks_to_combine = Vec::new(); + } + + if n_bytes > target_bytesize || n_elements > target_rowsize { + new_chunks.push(chunk); + } else { + new_chunk_n_bytes += n_bytes; + new_chunk_n_elements += n_elements; + chunks_to_combine.push(chunk); + } + } + + if !chunks_to_combine.is_empty() { + new_chunks.push( + ChunkedArray::try_new(chunks_to_combine, self.dtype().clone())? + .into_canonical()? + .into(), + ); + } + + Self::try_new(new_chunks, self.dtype().clone()) + } } impl ArrayTrait for ChunkedArray {} @@ -178,11 +222,12 @@ impl SubtractScalarFn for ChunkedArray { #[cfg(test)] mod test { use vortex_dtype::{DType, NativePType, Nullability, PType}; + use vortex_error::VortexResult; use crate::array::chunked::ChunkedArray; use crate::compute::slice; - use crate::compute::unary::subtract_scalar; - use crate::{Array, IntoArray, IntoArrayVariant, ToArray}; + use crate::compute::unary::{scalar_at, subtract_scalar}; + use crate::{assert_arrays_eq, Array, ArrayDType, IntoArray, IntoArrayVariant, ToArray}; fn chunked_array() -> ChunkedArray { ChunkedArray::try_new( @@ -273,4 +318,67 @@ mod test { .to_vec(); assert_eq!(results, &[6u64, 7, 8]); } + + #[test] + fn test_rechunk_one_chunk() { + let chunked = ChunkedArray::try_new( + vec![vec![0u64].into_array()], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap(); + + let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap(); + + assert_arrays_eq!(chunked, rechunked); + } + + #[test] + fn test_rechunk_two_chunks() { + let chunked = ChunkedArray::try_new( + vec![vec![0u64].into_array(), vec![5u64].into_array()], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap(); + + let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap(); + + assert_eq!(rechunked.nchunks(), 1); + assert_arrays_eq!(chunked, rechunked); + } + + #[test] + fn test_rechunk_tiny_target_chunks() { + let chunked = ChunkedArray::try_new( + vec![vec![0u64, 1, 2, 3].into_array(), vec![4u64, 5].into_array()], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap(); + + let rechunked = chunked.rechunk(1 << 16, 5).unwrap(); + + assert_eq!(rechunked.nchunks(), 2); + assert!(rechunked.chunks().all(|c| c.len() < 5)); + assert_arrays_eq!(chunked, rechunked); + } + + #[test] + fn test_rechunk_with_too_big_chunk() { + let chunked = ChunkedArray::try_new( + vec![ + vec![0u64, 1, 2].into_array(), + vec![42_u64; 6].into_array(), + vec![4u64, 5].into_array(), + vec![6u64, 7].into_array(), + vec![8u64, 9].into_array(), + ], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap(); + + let rechunked = chunked.rechunk(1 << 16, 5).unwrap(); + // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9] + + assert_eq!(rechunked.nchunks(), 4); + assert_arrays_eq!(chunked, rechunked); + } } diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index dfb0335286..12b50b72f4 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -1,3 +1,4 @@ +mod assertions; mod bool; mod chunked; mod constant; diff --git a/vortex-sampling-compressor/src/compressors/roaring_bool.rs b/vortex-sampling-compressor/src/compressors/roaring_bool.rs index 2cf7055f72..24de3868bc 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_bool.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_bool.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use vortex::array::Bool; use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::DType; @@ -20,7 +21,7 @@ impl EncodingCompressor for RoaringBoolCompressor { fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { // Only support bool enc arrays - if array.encoding().id() != RoaringBool::ID { + if array.encoding().id() != Bool::ID { return None; } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 4738875418..92b9fb61aa 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -55,15 +55,21 @@ pub struct CompressConfig { sample_size: u16, sample_count: u16, max_depth: u8, + target_block_bytesize: usize, + target_block_size: usize, } impl Default for CompressConfig { fn default() -> Self { + let kib = 1 << 10; + let mib = 1 << 20; Self { // Sample length should always be multiple of 1024 sample_size: 128, sample_count: 8, max_depth: 3, + target_block_bytesize: 16 * mib, + target_block_size: 64 * kib, } } } @@ -199,9 +205,12 @@ impl<'a> SamplingCompressor<'a> { fn compress_array(&self, arr: &Array) -> VortexResult> { match arr.encoding().id() { Chunked::ID => { - // For chunked arrays, we compress each chunk individually let chunked = ChunkedArray::try_from(arr)?; - let compressed_chunks = chunked + let less_chunked = chunked.rechunk( + self.options().target_block_bytesize, + self.options().target_block_size, + )?; + let compressed_chunks = less_chunked .chunks() .map(|chunk| { self.compress_array(&chunk) diff --git a/vortex-sampling-compressor/tests/smoketest.rs b/vortex-sampling-compressor/tests/smoketest.rs index 747dc959cb..6477810446 100644 --- a/vortex-sampling-compressor/tests/smoketest.rs +++ b/vortex-sampling-compressor/tests/smoketest.rs @@ -22,7 +22,13 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor}; #[cfg(test)] mod tests { + use vortex::array::{Bool, ChunkedArray, VarBin}; + use vortex::variants::{ArrayVariants, StructArrayTrait}; + use vortex::ArrayDef; use vortex_datetime_dtype::TimeUnit; + use vortex_datetime_parts::DateTimeParts; + use vortex_dict::Dict; + use vortex_fastlanes::FoR; use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; use super::*; @@ -79,6 +85,104 @@ mod tests { assert_eq!(compressed.dtype(), to_compress.dtype()); } + #[test] + #[cfg_attr(miri, ignore)] // roaring bit maps uses an unsupported FFI + pub fn smoketest_compressor_on_chunked_array() { + let compressor = SamplingCompressor::default(); + + let chunk_size = 1 << 14; + + let ints: Vec = (0..4).map(|_| make_primitive_column(chunk_size)).collect(); + let bools: Vec = (0..4).map(|_| make_bool_column(chunk_size)).collect(); + let varbins: Vec = (0..4).map(|_| make_string_column(chunk_size)).collect(); + let binaries: Vec = (0..4).map(|_| make_binary_column(chunk_size)).collect(); + let timestamps: Vec = (0..4).map(|_| make_timestamp_column(chunk_size)).collect(); + + fn chunked(arrays: Vec) -> Array { + let dtype = arrays[0].dtype().clone(); + ChunkedArray::try_new(arrays, dtype).unwrap().into() + } + + let to_compress = StructArray::try_new( + vec![ + "prim_col".into(), + "bool_col".into(), + "varbin_col".into(), + "binary_col".into(), + "timestamp_col".into(), + ] + .into(), + vec![ + chunked(ints), + chunked(bools), + chunked(varbins), + chunked(binaries), + chunked(timestamps), + ], + chunk_size * 4, + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + println!("uncompressed: {}", to_compress.tree_display()); + let compressed = compressor + .compress(&to_compress, None) + .unwrap() + .into_array(); + + println!("compressed: {}", compressed.tree_display()); + assert_eq!(compressed.dtype(), to_compress.dtype()); + + let struct_array: StructArray = compressed.try_into().unwrap(); + let struct_array: &dyn StructArrayTrait = struct_array.as_struct_array().unwrap(); + + let prim_col: ChunkedArray = struct_array + .field_by_name("prim_col") + .unwrap() + .try_into() + .unwrap(); + for chunk in prim_col.chunks() { + assert_eq!(chunk.encoding().id(), FoR::ID); + } + + let bool_col: ChunkedArray = struct_array + .field_by_name("bool_col") + .unwrap() + .try_into() + .unwrap(); + for chunk in bool_col.chunks() { + assert_eq!(chunk.encoding().id(), Bool::ID); + } + + let varbin_col: ChunkedArray = struct_array + .field_by_name("varbin_col") + .unwrap() + .try_into() + .unwrap(); + for chunk in varbin_col.chunks() { + assert_eq!(chunk.encoding().id(), Dict::ID); + } + + let binary_col: ChunkedArray = struct_array + .field_by_name("binary_col") + .unwrap() + .try_into() + .unwrap(); + for chunk in binary_col.chunks() { + assert_eq!(chunk.encoding().id(), VarBin::ID); + } + + let timestamp_col: ChunkedArray = struct_array + .field_by_name("timestamp_col") + .unwrap() + .try_into() + .unwrap(); + for chunk in timestamp_col.chunks() { + assert_eq!(chunk.encoding().id(), DateTimeParts::ID); + } + } + fn make_primitive_column(count: usize) -> Array { PrimitiveArray::from_vec( (0..count).map(|i| i as i64).collect::>(),