Skip to content

Commit

Permalink
feat: add ChunkedCompressor which compresses chunk n+1 like chunk n (#…
Browse files Browse the repository at this point in the history
…996)

The primary idea is that chunk n+1 more often than not has a
distribution of values similar to chunk n. We ought to reuse chunk n's
compression scheme if the ratio is "good" before attempting a full
sampling pass. This has the potential to both increase throughput and
also permit us to invest in a more extensive search on the first chunk.

This PR introduces `ChunkedCompressor` and `StructCompressor`. Their
existence means that compression trees now fully represent an array. For
example, if I have a `Chunked(Struct(foo=Chunked(U64), ...))`, the
`ChunkedCompressor` will attempt to compress all the U64 chunks
similarly and then it will pass up the ratio and encoding tree of the
last chunk to the `StructCompressor`. Eventually the outer
`ChunkedCompressor` can attempt to reuse on the second outer chunk all
the encodings from all the fields of the first outer chunk.

This PR looks best with whitespace ignored.

The `CompressionTree` (particularly the metadata) is not so ergonomic,
but I focused on throughput improvement rather than a refactor.

### benchmarks

Any ratio outside (0.8, 1.2) is bolded.

|Benchmark suite|Current: 68faec3|Previous: 4aa30c0|Unit|Ratio|
|-|-|-|-|-|
|taxi: compress time|1.4951|2.9452|s|**0.51**|
|taxi: compress throughput|300.31|152.45|MiB/s|**1.97**|
|taxi: vortex:parquetzstd size|0.94933|0.95669||0.99|
|taxi: compress ratio|0.10633|0.10605||1.00|
|taxi: compressed size|47.744|47.615|MiB|1.00|
|AirlineSentiment: compress time|0.00038491|0.00036394|s|1.06|
|AirlineSenthroughputnt: compress throughput|5.0049|5.2933|MiB/s|0.95|
|AirlineSentiment: vortex:parquetzstd size|6.3744|6.3744||1.00|
|AirlineSentiment: compress ratio|0.62079|0.62079||1.00|
|AirlineSentiment: compressed size|0.0011959|0.0011959|MiB|1.00|
|Arade: compress time|2.294|3.9502|s|**0.58**|
|Arade: compress throughput|327.19|190.01|MiB/s|**1.72**|
|Arade: vortex:parquetzstd size|0.47662|0.47901||1.00|
|Arade: compress ratio|0.17756|0.17816||1.00|
|Arade: compressed size|133.27|133.72|MiB|1.00|
|Bimbo: compress time|12.753|25.983|s|**0.49**|
|Bimbo: compress throughput|532.55|261.38|MiB/s|**2.04**|
|Bimbo: vortex:parquetzstd size|1.2573|1.1858||1.06|
|Bimbo: compress ratio|0.061503|0.057562||1.07|
|Bimbo: compressed size|417.69|390.93|MiB|1.07|
|CMSprovider: compress time|11.892|16.619|s|**0.72**|
|CMSprovider: compress throughput|412.91|295.48|MiB/s|**1.40**|
|CMSprovider: vortex:parquetzstd size|1.0742|1.0992||0.98|
|CMSprovider: compress ratio|0.15301|0.1575||0.97|
|CMSprovider: compressed size|751.38|773.42|MiB|0.97|
|Euro2016: compress time|1.7194|2.0275|s|0.85|
|Euro2016: compress throughput|218.12|184.97|MiB/s|1.18|
|Euro2016: vortex:parquetzstd size|1.3998|1.3737||1.02|
|Euro2016: compress ratio|0.4182|0.41015||1.02|
|Euro2016: compressed size|156.84|153.82|MiB|1.02|
|Food: compress time|1.0851|1.3049|s|0.83|
|Food: compress throughput|292.41|243.16|MiB/s|1.20|
|Food: vortex:parquetzstd size|1.2213|1.2548||0.97|
|Food: compress ratio|0.12602|0.13044||0.97|
|Food: compressed size|39.986|41.39|MiB|0.97|
|HashTags: compress time|2.3817|3.1473|s|**0.76**|
|HashTags: compress throughput|322.14|243.77|MiB/s|**1.32**|
|HashTags: vortex:parquetzstd size|1.5056|1.5142||0.99|
|HashTags: compress ratio|0.24665|0.2483||0.99|
|HashTags: compressed size|189.23|190.51|MiB|0.99|
|TPC-H l_comment: compress time|0.8073|1.2042|s|**0.67**|
|TPC-H l_comment: compress throughput|216.19|144.93|MiB/s|**1.49**|
|TPC-H l_comment: vortex:parquetzstd size|1.1701|1.1648||1.00|
|TPC-H l_comment: compress ratio|0.36161|0.35995||1.00|
|TPC-H l_comment: compressed size|63.113|62.822|MiB|1.00|
  • Loading branch information
danking authored Oct 10, 2024
1 parent 5ed38e3 commit 3d6dd50
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 135 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ readme = { workspace = true }
[dependencies]
arbitrary = { workspace = true, optional = true }
fsst-rs = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
Expand Down
156 changes: 156 additions & 0 deletions vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use log::warn;
use vortex::array::{Chunked, ChunkedArray};
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray};
use vortex_error::{vortex_bail, VortexResult};

use super::EncoderMetadata;
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct ChunkedCompressor {
relatively_good_ratio: f32,
}

pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor {
relatively_good_ratio: 1.2,
};

pub struct ChunkedCompressorMetadata(Option<f32>);

impl EncoderMetadata for ChunkedCompressorMetadata {
fn as_any(&self) -> &dyn Any {
self
}
}

impl EncodingCompressor for ChunkedCompressor {
fn id(&self) -> &str {
Chunked::ID.as_ref()
}

fn cost(&self) -> u8 {
0
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Chunked::ID).then_some(self)
}

fn compress<'a>(
&'a self,
array: &Array,
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let chunked_array = ChunkedArray::try_from(array)?;
let like_and_ratio = like_into_parts(like)?;
self.compress_chunked(&chunked_array, like_and_ratio, ctx)
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([])
}
}

fn like_into_parts(
tree: Option<CompressionTree<'_>>,
) -> VortexResult<Option<(CompressionTree<'_>, f32)>> {
let (_, mut children, metadata) = match tree {
None => return Ok(None),
Some(tree) => tree.into_parts(),
};

let Some(target_ratio) = metadata else {
vortex_bail!("chunked array compression tree must have metadata")
};

let Some(ChunkedCompressorMetadata(target_ratio)) =
target_ratio.as_ref().as_any().downcast_ref()
else {
vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata")
};

if children.len() != 1 {
vortex_bail!("chunked array compression tree must have one child")
}

let child = children.remove(0);

match (child, target_ratio) {
(None, None) => Ok(None),
(Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
(..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"),
}
}

impl ChunkedCompressor {
/// How far the compression ratio is allowed to grow from one chunk to another chunk.
///
/// As long as a compressor compresses subsequent chunks "reasonably well" we should continue to
/// use it, which saves us the cost of searching for a good compressor. This constant quantifies
/// "reasonably well" as
///
/// ```text
/// new_ratio <= old_ratio * self.relatively_good_ratio
/// ```
fn relatively_good_ratio(&self) -> f32 {
self.relatively_good_ratio
}

fn compress_chunked<'a>(
&'a self,
array: &ChunkedArray,
mut previous: Option<(CompressionTree<'a>, f32)>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let less_chunked = array.rechunk(
ctx.options().target_block_bytesize,
ctx.options().target_block_size,
)?;
let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks());
for (index, chunk) in less_chunked.chunks().enumerate() {
let like = previous.as_ref().map(|(like, _)| like);
let (compressed_chunk, tree) = ctx
.named(&format!("chunk-{}", index))
.compress(&chunk, like)?
.into_parts();

let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
let exceeded_target_ratio = previous
.as_ref()
.map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio())
.unwrap_or(false);

if ratio > 1.0 || exceeded_target_ratio {
warn!("unsatisfactory ratio {} {:?}", ratio, previous);
let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts();
let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
previous = tree.map(|tree| (tree, new_ratio));
compressed_chunks.push(compressed_chunk);
} else {
previous = previous.or_else(|| tree.map(|tree| (tree, ratio)));
compressed_chunks.push(compressed_chunk);
}
}

let (child, ratio) = match previous {
Some((child, ratio)) => (Some(child), Some(ratio)),
None => (None, None),
};

Ok(CompressedArray::new(
ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![child],
Arc::new(ChunkedCompressorMetadata(ratio)),
)),
))
}
}
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EncodingCompressor for FSSTCompressor {
// between 2-3x depending on the text quality.
//
// It's not worth running a full compression step unless the array is large enough.
if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE {
if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE {
return Ok(CompressedArray::uncompressed(array.clone()));
}

Expand Down
14 changes: 14 additions & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::SamplingCompressor;
pub mod alp;
pub mod alp_rd;
pub mod bitpacked;
pub mod chunked;
pub mod constant;
pub mod date_time_parts;
pub mod delta;
Expand All @@ -23,6 +24,7 @@ pub mod roaring_bool;
pub mod roaring_int;
pub mod runend;
pub mod sparse;
pub mod struct_;
pub mod zigzag;

pub trait EncodingCompressor: Sync + Send + Debug {
Expand Down Expand Up @@ -162,6 +164,17 @@ impl<'a> CompressionTree<'a> {
.filter_map(|child| child.as_ref().map(|c| c.num_descendants() + 1))
.sum::<usize>()
}

#[allow(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
&'a dyn EncodingCompressor,
Vec<Option<CompressionTree<'a>>>,
Option<Arc<dyn EncoderMetadata>>,
) {
(self.compressor, self.children, self.metadata)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -199,6 +212,7 @@ impl<'a> CompressedArray<'a> {
self.path
}

#[inline]
pub fn into_parts(self) -> (Array, Option<CompressionTree<'a>>) {
(self.array, self.path)
}
Expand Down
66 changes: 66 additions & 0 deletions vortex-sampling-compressor/src/compressors/struct_.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::collections::HashSet;

use itertools::Itertools;
use vortex::array::{Struct, StructArray};
use vortex::encoding::EncodingRef;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct StructCompressor;

impl EncodingCompressor for StructCompressor {
fn id(&self) -> &str {
Struct::ID.as_ref()
}

fn cost(&self) -> u8 {
0
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
StructArray::try_from(array)
.ok()
.map(|_| self as &dyn EncodingCompressor)
}

fn compress<'a>(
&'a self,
array: &Array,
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let array = StructArray::try_from(array)?;
let compressed_validity = ctx.compress_validity(array.validity())?;

let children_trees = match like {
Some(tree) => tree.children,
None => vec![None; array.nfields()],
};

let (arrays, trees) = array
.children()
.zip_eq(children_trees)
.map(|(array, like)| ctx.compress(&array, like.as_ref()))
.process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?;

Ok(CompressedArray::new(
StructArray::try_new(
array.names().clone(),
arrays,
array.len(),
compressed_validity,
)?
.into_array(),
Some(CompressionTree::new(self, trees)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([])
}
}
Loading

0 comments on commit 3d6dd50

Please sign in to comment.