Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ChunkedCompressor which compresses chunk n+1 like chunk n #996

Merged
merged 36 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9ce1789
feat: compress chunk n+1 like chunk n
danking Oct 3, 2024
336888f
handle chunked too
danking Oct 3, 2024
f73df06
wip
danking Oct 3, 2024
33ff0aa
revert compression benchmark
danking Oct 3, 2024
fe2db95
wip
danking Oct 3, 2024
3cb83f1
wip
danking Oct 4, 2024
a0af4cd
main function for cargo instruments
danking Oct 4, 2024
75c03dc
add fixme to slow is_strict_sorted check
danking Oct 7, 2024
7734c17
remove is_strict_sorted check
danking Oct 7, 2024
8bcc45f
feat: reduce compress time with smarter use and calculation of stats
danking Oct 7, 2024
4b8266c
remove comment
danking Oct 8, 2024
8c4312b
remove comments
danking Oct 8, 2024
c211061
remove comments
danking Oct 8, 2024
e9b6da1
struct is cost 0
danking Oct 8, 2024
629ea6c
reduce fsst multiple to five
danking Oct 8, 2024
6a40f1c
restore
danking Oct 8, 2024
1a5489b
slightly better From<Vec<T>> for Scalar
danking Oct 9, 2024
8abdca9
fixes
danking Oct 9, 2024
2065033
cleanups
danking Oct 9, 2024
799dd6a
pass along metadata
danking Oct 10, 2024
bcb82d5
14 => 12
danking Oct 10, 2024
25a387c
kill commented code
danking Oct 10, 2024
fb9b804
revert Scalar changes
danking Oct 10, 2024
4326be9
revert varbin IsConstant change and Dict compressor change
danking Oct 10, 2024
dc51115
placate clippy
danking Oct 10, 2024
5a1be70
fix direction on partition
danking Oct 10, 2024
fe3ce9d
no changes to vortex-scalar
danking Oct 10, 2024
8bde547
remove simplelog
danking Oct 10, 2024
9b8859e
deindent like_into_parts
danking Oct 10, 2024
7274215
fix nit and pull 1.2 into documented constant
danking Oct 10, 2024
deb4287
field rather than const
danking Oct 10, 2024
da8d39c
zip => zip_eq
danking Oct 10, 2024
c11620e
compile error: Self -> self
danking Oct 10, 2024
2be867e
do not run fenced code block
danking Oct 10, 2024
240693b
this is text not code
danking Oct 10, 2024
d325968
save an allocation
danking Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ readme = { workspace = true }
[dependencies]
arbitrary = { workspace = true, optional = true }
fsst-rs = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
Expand Down
156 changes: 156 additions & 0 deletions vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use log::warn;
use vortex::array::{Chunked, ChunkedArray};
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray};
use vortex_error::{vortex_bail, VortexResult};

use super::EncoderMetadata;
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct ChunkedCompressor {
relatively_good_ratio: f32,
}

pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor {
relatively_good_ratio: 1.2,
};

pub struct ChunkedCompressorMetadata(Option<f32>);

impl EncoderMetadata for ChunkedCompressorMetadata {
fn as_any(&self) -> &dyn Any {
self
}
}

impl EncodingCompressor for ChunkedCompressor {
fn id(&self) -> &str {
Chunked::ID.as_ref()
}

fn cost(&self) -> u8 {
0
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Chunked::ID).then_some(self)
}

fn compress<'a>(
&'a self,
array: &Array,
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let chunked_array = ChunkedArray::try_from(array)?;
let like_and_ratio = like_into_parts(like)?;
self.compress_chunked(&chunked_array, like_and_ratio, ctx)
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([])
}
}

fn like_into_parts(
tree: Option<CompressionTree<'_>>,
) -> VortexResult<Option<(CompressionTree<'_>, f32)>> {
lwwmanning marked this conversation as resolved.
Show resolved Hide resolved
let (_, mut children, metadata) = match tree {
None => return Ok(None),
Some(tree) => tree.into_parts(),
};

let Some(target_ratio) = metadata else {
vortex_bail!("chunked array compression tree must have metadata")
};

let Some(ChunkedCompressorMetadata(target_ratio)) =
target_ratio.as_ref().as_any().downcast_ref()
else {
vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata")
};

if children.len() != 1 {
vortex_bail!("chunked array compression tree must have one child")
}

let child = children.remove(0);

match (child, target_ratio) {
(None, None) => Ok(None),
(Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
(..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"),
}
}

impl ChunkedCompressor {
/// How far the compression ratio is allowed to grow from one chunk to another chunk.
///
/// As long as a compressor compresses subsequent chunks "reasonably well" we should continue to
/// use it, which saves us the cost of searching for a good compressor. This constant quantifies
/// "reasonably well" as
///
/// ```text
/// new_ratio <= old_ratio * self.relatively_good_ratio
/// ```
fn relatively_good_ratio(&self) -> f32 {
self.relatively_good_ratio
}

fn compress_chunked<'a>(
&'a self,
array: &ChunkedArray,
mut previous: Option<(CompressionTree<'a>, f32)>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let less_chunked = array.rechunk(
ctx.options().target_block_bytesize,
ctx.options().target_block_size,
)?;
let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks());
for (index, chunk) in less_chunked.chunks().enumerate() {
let like = previous.as_ref().map(|(like, _)| like);
let (compressed_chunk, tree) = ctx
.named(&format!("chunk-{}", index))
.compress(&chunk, like)?
.into_parts();

let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
let exceeded_target_ratio = previous
.as_ref()
.map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio())
.unwrap_or(false);

if ratio > 1.0 || exceeded_target_ratio {
warn!("unsatisfactory ratio {} {:?}", ratio, previous);
let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts();
let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
previous = tree.map(|tree| (tree, new_ratio));
compressed_chunks.push(compressed_chunk);
} else {
previous = previous.or_else(|| tree.map(|tree| (tree, ratio)));
compressed_chunks.push(compressed_chunk);
}
}

let (child, ratio) = match previous {
Some((child, ratio)) => (Some(child), Some(ratio)),
None => (None, None),
};

Ok(CompressedArray::new(
ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![child],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if child is None, should this just be empty...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this is the diff. I don't find either of these particularly palatable but I think the fix is to sort out how compressors pass information from one invocation of compress to the next.

(docs) # g diff
diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs
index c0f5aebe..9adf4ab4 100644
--- a/vortex-sampling-compressor/src/compressors/chunked.rs
+++ b/vortex-sampling-compressor/src/compressors/chunked.rs
@@ -78,16 +78,16 @@ fn like_into_parts(
         vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata")
     };
 
-    if children.len() != 1 {
-        vortex_bail!("chunked array compression tree must have one child")
+    if (children.len() == 1) != target_ratio.is_some() {
+        vortex_bail!("chunked array compression tree must have a child iff it has a ratio")
     }
 
-    let child = children.remove(0);
-
-    match (child, target_ratio) {
-        (None, None) => Ok(None),
-        (Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
-        (..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"),
+    if children.len() == 0 {
+        return Ok(None);
+    } else if children.len() == 1 {
+        return Ok(Some((children.remove(0).unwrap(), target_ratio.unwrap())));
+    } else {
+        vortex_bail!("chunked array compression tree must have at most one child")
     }
 }
 
@@ -141,16 +141,16 @@ impl ChunkedCompressor {
             }
         }
 
-        let (child, ratio) = match previous {
-            Some((child, ratio)) => (Some(child), Some(ratio)),
-            None => (None, None),
+        let (children, ratio) = match previous {
+            Some((child, ratio)) => (vec![Some(child)], Some(ratio)),
+            None => (vec![], None),
         };
 
         Ok(CompressedArray::new(
             ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
             Some(CompressionTree::new_with_metadata(
                 self,
-                vec![child],
+                children,
                 Arc::new(ChunkedCompressorMetadata(ratio)),
             )),
         ))

Arc::new(ChunkedCompressorMetadata(ratio)),
)),
))
}
}
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EncodingCompressor for FSSTCompressor {
// between 2-3x depending on the text quality.
//
// It's not worth running a full compression step unless the array is large enough.
if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE {
if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why my changes encountered this, but I was getting samples that were hundreds of bytes too small for FSST which triggered this PR to compress poorly.

We may want to think more broadly about how to estimate FSST compression ratio on a tiny sample, but for now this seems reasonable unless we're directly calling compress on a small array (rather than a sample thereof).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a10y any guidance on how you arrived at 10x multiplier or just a guess?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I had chatted 1:1 with Dan about this, it was fairly arbitrary

return Ok(CompressedArray::uncompressed(array.clone()));
}

Expand Down
14 changes: 14 additions & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::SamplingCompressor;
pub mod alp;
pub mod alp_rd;
pub mod bitpacked;
pub mod chunked;
pub mod constant;
pub mod date_time_parts;
pub mod delta;
Expand All @@ -23,6 +24,7 @@ pub mod roaring_bool;
pub mod roaring_int;
pub mod runend;
pub mod sparse;
pub mod struct_;
pub mod zigzag;

pub trait EncodingCompressor: Sync + Send + Debug {
Expand Down Expand Up @@ -162,6 +164,17 @@ impl<'a> CompressionTree<'a> {
.filter_map(|child| child.as_ref().map(|c| c.num_descendants() + 1))
.sum::<usize>()
}

#[allow(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
&'a dyn EncodingCompressor,
Vec<Option<CompressionTree<'a>>>,
Option<Arc<dyn EncoderMetadata>>,
) {
(self.compressor, self.children, self.metadata)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -199,6 +212,7 @@ impl<'a> CompressedArray<'a> {
self.path
}

#[inline]
pub fn into_parts(self) -> (Array, Option<CompressionTree<'a>>) {
(self.array, self.path)
}
Expand Down
66 changes: 66 additions & 0 deletions vortex-sampling-compressor/src/compressors/struct_.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::collections::HashSet;

use itertools::Itertools;
use vortex::array::{Struct, StructArray};
use vortex::encoding::EncodingRef;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct StructCompressor;

impl EncodingCompressor for StructCompressor {
fn id(&self) -> &str {
Struct::ID.as_ref()
}

fn cost(&self) -> u8 {
0
}

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
StructArray::try_from(array)
.ok()
.map(|_| self as &dyn EncodingCompressor)
}

fn compress<'a>(
&'a self,
array: &Array,
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let array = StructArray::try_from(array)?;
let compressed_validity = ctx.compress_validity(array.validity())?;

let children_trees = match like {
Some(tree) => tree.children,
None => vec![None; array.nfields()],
};

let (arrays, trees) = array
.children()
.zip_eq(children_trees)
.map(|(array, like)| ctx.compress(&array, like.as_ref()))
.process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?;

Ok(CompressedArray::new(
StructArray::try_new(
array.names().clone(),
arrays,
array.len(),
compressed_validity,
)?
.into_array(),
Some(CompressionTree::new(self, trees)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([])
}
}
Loading
Loading