Skip to content

Commit

Permalink
greedily combine chunks before compressing (#783)
Browse files Browse the repository at this point in the history
feat: before compressing, collapse chunks of a chunked array targeting
chunks of 16 MiB or 64Ki rows.
  • Loading branch information
danking authored Sep 11, 2024
1 parent d54d83c commit 2e81835
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 9 deletions.
11 changes: 8 additions & 3 deletions encodings/roaring/src/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl RoaringBoolArray {
if array.encoding().id() == Bool::ID {
roaring_bool_encode(BoolArray::try_from(array)?).map(|a| a.into_array())
} else {
vortex_bail!("RoaringInt can only encode boolean arrays")
vortex_bail!("RoaringBool can only encode boolean arrays")
}
}
}
Expand All @@ -84,12 +84,17 @@ impl BoolArrayTrait for RoaringBoolArray {
}

impl AcceptArrayVisitor for RoaringBoolArray {
fn accept(&self, _visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
// TODO(ngates): should we store a buffer in memory? Or delay serialization?
// Or serialize into metadata? The only reason we support buffers is so we can write to
// the wire without copying into FlatBuffers. But if we need to allocate to serialize
// the bitmap anyway, then may as well shove it into metadata.
todo!()
visitor.visit_buffer(
self.array()
.buffer()
.ok_or(vortex_err!("roaring bool should have a buffer"))?,
)?;
Ok(())
}
}

Expand Down
19 changes: 19 additions & 0 deletions vortex-array/src/array/assertions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#[macro_export]
macro_rules! assert_arrays_eq {
($expected:expr, $actual:expr) => {
let expected: Array = $expected.into();
let actual: Array = $actual.into();
assert_eq!(expected.dtype(), actual.dtype());

let expected_contents = (0..expected.len())
.map(|idx| scalar_at(&expected, idx).map(|x| x.into_value()))
.collect::<VortexResult<Vec<_>>>()
.unwrap();
let actual_contents = (0..actual.len())
.map(|idx| scalar_at(&expected, idx).map(|x| x.into_value()))
.collect::<VortexResult<Vec<_>>>()
.unwrap();

assert_eq!(expected_contents, actual_contents);
};
}
114 changes: 111 additions & 3 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::validity::Validity::NonNullable;
use crate::validity::{ArrayValidity, LogicalValidity, Validity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray};
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, IntoCanonical};

mod canonical;
mod compute;
Expand Down Expand Up @@ -123,6 +123,50 @@ impl ChunkedArray {
pub fn array_stream(&self) -> impl ArrayStream + '_ {
ArrayStreamAdapter::new(self.dtype().clone(), stream::iter(self.chunks().map(Ok)))
}

pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
let mut new_chunks = Vec::new();
let mut chunks_to_combine = Vec::new();
let mut new_chunk_n_bytes = 0;
let mut new_chunk_n_elements = 0;
for chunk in self.chunks() {
let n_bytes = chunk.nbytes();
let n_elements = chunk.len();

if (new_chunk_n_bytes + n_bytes > target_bytesize
|| new_chunk_n_elements + n_elements > target_rowsize)
&& !chunks_to_combine.is_empty()
{
new_chunks.push(
ChunkedArray::try_new(chunks_to_combine, self.dtype().clone())?
.into_canonical()?
.into(),
);

new_chunk_n_bytes = 0;
new_chunk_n_elements = 0;
chunks_to_combine = Vec::new();
}

if n_bytes > target_bytesize || n_elements > target_rowsize {
new_chunks.push(chunk);
} else {
new_chunk_n_bytes += n_bytes;
new_chunk_n_elements += n_elements;
chunks_to_combine.push(chunk);
}
}

if !chunks_to_combine.is_empty() {
new_chunks.push(
ChunkedArray::try_new(chunks_to_combine, self.dtype().clone())?
.into_canonical()?
.into(),
);
}

Self::try_new(new_chunks, self.dtype().clone())
}
}

impl ArrayTrait for ChunkedArray {}
Expand Down Expand Up @@ -178,11 +222,12 @@ impl SubtractScalarFn for ChunkedArray {
#[cfg(test)]
mod test {
use vortex_dtype::{DType, NativePType, Nullability, PType};
use vortex_error::VortexResult;

use crate::array::chunked::ChunkedArray;
use crate::compute::slice;
use crate::compute::unary::subtract_scalar;
use crate::{Array, IntoArray, IntoArrayVariant, ToArray};
use crate::compute::unary::{scalar_at, subtract_scalar};
use crate::{assert_arrays_eq, Array, ArrayDType, IntoArray, IntoArrayVariant, ToArray};

fn chunked_array() -> ChunkedArray {
ChunkedArray::try_new(
Expand Down Expand Up @@ -273,4 +318,67 @@ mod test {
.to_vec();
assert_eq!(results, &[6u64, 7, 8]);
}

#[test]
fn test_rechunk_one_chunk() {
let chunked = ChunkedArray::try_new(
vec![vec![0u64].into_array()],
DType::Primitive(PType::U64, Nullability::NonNullable),
)
.unwrap();

let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();

assert_arrays_eq!(chunked, rechunked);
}

#[test]
fn test_rechunk_two_chunks() {
let chunked = ChunkedArray::try_new(
vec![vec![0u64].into_array(), vec![5u64].into_array()],
DType::Primitive(PType::U64, Nullability::NonNullable),
)
.unwrap();

let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();

assert_eq!(rechunked.nchunks(), 1);
assert_arrays_eq!(chunked, rechunked);
}

#[test]
fn test_rechunk_tiny_target_chunks() {
let chunked = ChunkedArray::try_new(
vec![vec![0u64, 1, 2, 3].into_array(), vec![4u64, 5].into_array()],
DType::Primitive(PType::U64, Nullability::NonNullable),
)
.unwrap();

let rechunked = chunked.rechunk(1 << 16, 5).unwrap();

assert_eq!(rechunked.nchunks(), 2);
assert!(rechunked.chunks().all(|c| c.len() < 5));
assert_arrays_eq!(chunked, rechunked);
}

#[test]
fn test_rechunk_with_too_big_chunk() {
let chunked = ChunkedArray::try_new(
vec![
vec![0u64, 1, 2].into_array(),
vec![42_u64; 6].into_array(),
vec![4u64, 5].into_array(),
vec![6u64, 7].into_array(),
vec![8u64, 9].into_array(),
],
DType::Primitive(PType::U64, Nullability::NonNullable),
)
.unwrap();

let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
// greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]

assert_eq!(rechunked.nchunks(), 4);
assert_arrays_eq!(chunked, rechunked);
}
}
1 change: 1 addition & 0 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod assertions;
mod bool;
mod chunked;
mod constant;
Expand Down
3 changes: 2 additions & 1 deletion vortex-sampling-compressor/src/compressors/roaring_bool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashSet;

use vortex::array::Bool;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_dtype::DType;
Expand All @@ -20,7 +21,7 @@ impl EncodingCompressor for RoaringBoolCompressor {

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support bool enc arrays
if array.encoding().id() != RoaringBool::ID {
if array.encoding().id() != Bool::ID {
return None;
}

Expand Down
13 changes: 11 additions & 2 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@ pub struct CompressConfig {
sample_size: u16,
sample_count: u16,
max_depth: u8,
target_block_bytesize: usize,
target_block_size: usize,
}

impl Default for CompressConfig {
fn default() -> Self {
let kib = 1 << 10;
let mib = 1 << 20;
Self {
// Sample length should always be multiple of 1024
sample_size: 128,
sample_count: 8,
max_depth: 3,
target_block_bytesize: 16 * mib,
target_block_size: 64 * kib,
}
}
}
Expand Down Expand Up @@ -199,9 +205,12 @@ impl<'a> SamplingCompressor<'a> {
fn compress_array(&self, arr: &Array) -> VortexResult<CompressedArray<'a>> {
match arr.encoding().id() {
Chunked::ID => {
// For chunked arrays, we compress each chunk individually
let chunked = ChunkedArray::try_from(arr)?;
let compressed_chunks = chunked
let less_chunked = chunked.rechunk(
self.options().target_block_bytesize,
self.options().target_block_size,
)?;
let compressed_chunks = less_chunked
.chunks()
.map(|chunk| {
self.compress_array(&chunk)
Expand Down
104 changes: 104 additions & 0 deletions vortex-sampling-compressor/tests/smoketest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor};

#[cfg(test)]
mod tests {
use vortex::array::{Bool, ChunkedArray, VarBin};
use vortex::variants::{ArrayVariants, StructArrayTrait};
use vortex::ArrayDef;
use vortex_datetime_dtype::TimeUnit;
use vortex_datetime_parts::DateTimeParts;
use vortex_dict::Dict;
use vortex_fastlanes::FoR;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;

use super::*;
Expand Down Expand Up @@ -79,6 +85,104 @@ mod tests {
assert_eq!(compressed.dtype(), to_compress.dtype());
}

#[test]
#[cfg_attr(miri, ignore)] // roaring bit maps uses an unsupported FFI
pub fn smoketest_compressor_on_chunked_array() {
let compressor = SamplingCompressor::default();

let chunk_size = 1 << 14;

let ints: Vec<Array> = (0..4).map(|_| make_primitive_column(chunk_size)).collect();
let bools: Vec<Array> = (0..4).map(|_| make_bool_column(chunk_size)).collect();
let varbins: Vec<Array> = (0..4).map(|_| make_string_column(chunk_size)).collect();
let binaries: Vec<Array> = (0..4).map(|_| make_binary_column(chunk_size)).collect();
let timestamps: Vec<Array> = (0..4).map(|_| make_timestamp_column(chunk_size)).collect();

fn chunked(arrays: Vec<Array>) -> Array {
let dtype = arrays[0].dtype().clone();
ChunkedArray::try_new(arrays, dtype).unwrap().into()
}

let to_compress = StructArray::try_new(
vec![
"prim_col".into(),
"bool_col".into(),
"varbin_col".into(),
"binary_col".into(),
"timestamp_col".into(),
]
.into(),
vec![
chunked(ints),
chunked(bools),
chunked(varbins),
chunked(binaries),
chunked(timestamps),
],
chunk_size * 4,
Validity::NonNullable,
)
.unwrap()
.into_array();

println!("uncompressed: {}", to_compress.tree_display());
let compressed = compressor
.compress(&to_compress, None)
.unwrap()
.into_array();

println!("compressed: {}", compressed.tree_display());
assert_eq!(compressed.dtype(), to_compress.dtype());

let struct_array: StructArray = compressed.try_into().unwrap();
let struct_array: &dyn StructArrayTrait = struct_array.as_struct_array().unwrap();

let prim_col: ChunkedArray = struct_array
.field_by_name("prim_col")
.unwrap()
.try_into()
.unwrap();
for chunk in prim_col.chunks() {
assert_eq!(chunk.encoding().id(), FoR::ID);
}

let bool_col: ChunkedArray = struct_array
.field_by_name("bool_col")
.unwrap()
.try_into()
.unwrap();
for chunk in bool_col.chunks() {
assert_eq!(chunk.encoding().id(), Bool::ID);
}

let varbin_col: ChunkedArray = struct_array
.field_by_name("varbin_col")
.unwrap()
.try_into()
.unwrap();
for chunk in varbin_col.chunks() {
assert_eq!(chunk.encoding().id(), Dict::ID);
}

let binary_col: ChunkedArray = struct_array
.field_by_name("binary_col")
.unwrap()
.try_into()
.unwrap();
for chunk in binary_col.chunks() {
assert_eq!(chunk.encoding().id(), VarBin::ID);
}

let timestamp_col: ChunkedArray = struct_array
.field_by_name("timestamp_col")
.unwrap()
.try_into()
.unwrap();
for chunk in timestamp_col.chunks() {
assert_eq!(chunk.encoding().id(), DateTimeParts::ID);
}
}

fn make_primitive_column(count: usize) -> Array {
PrimitiveArray::from_vec(
(0..count).map(|i| i as i64).collect::<Vec<i64>>(),
Expand Down

0 comments on commit 2e81835

Please sign in to comment.