-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add ChunkedCompressor which compresses chunk n+1 like chunk n #996
Changes from 28 commits
9ce1789
336888f
f73df06
33ff0aa
fe2db95
3cb83f1
a0af4cd
75c03dc
7734c17
8bcc45f
4b8266c
8c4312b
c211061
e9b6da1
629ea6c
6a40f1c
1a5489b
8abdca9
2065033
799dd6a
bcb82d5
25a387c
fb9b804
4326be9
dc51115
5a1be70
fe3ce9d
8bde547
9b8859e
7274215
deb4287
da8d39c
c11620e
2be867e
240693b
d325968
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
use std::any::Any; | ||
use std::collections::HashSet; | ||
use std::sync::Arc; | ||
|
||
use log::warn; | ||
use vortex::array::{Chunked, ChunkedArray}; | ||
use vortex::encoding::EncodingRef; | ||
use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; | ||
use vortex_error::{vortex_bail, VortexResult}; | ||
|
||
use super::EncoderMetadata; | ||
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; | ||
use crate::SamplingCompressor; | ||
|
||
#[derive(Debug)] | ||
pub struct ChunkedCompressor; | ||
|
||
pub struct ChunkedCompressorMetadata(Option<f32>); | ||
|
||
impl EncoderMetadata for ChunkedCompressorMetadata { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
} | ||
|
||
impl EncodingCompressor for ChunkedCompressor { | ||
fn id(&self) -> &str { | ||
Chunked::ID.as_ref() | ||
} | ||
|
||
fn cost(&self) -> u8 { | ||
0 | ||
} | ||
|
||
fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { | ||
ChunkedArray::try_from(array) | ||
.ok() | ||
.map(|_| self as &dyn EncodingCompressor) | ||
} | ||
|
||
fn compress<'a>( | ||
&'a self, | ||
array: &Array, | ||
like: Option<CompressionTree<'a>>, | ||
ctx: SamplingCompressor<'a>, | ||
) -> VortexResult<CompressedArray<'a>> { | ||
let chunked_array = ChunkedArray::try_from(array)?; | ||
let like_and_ratio = like_into_parts(like)?; | ||
self.compress_chunked(&chunked_array, like_and_ratio, ctx) | ||
} | ||
|
||
fn used_encodings(&self) -> HashSet<EncodingRef> { | ||
HashSet::from([]) | ||
} | ||
} | ||
|
||
fn like_into_parts( | ||
tree: Option<CompressionTree<'_>>, | ||
) -> VortexResult<Option<(CompressionTree<'_>, f32)>> { | ||
lwwmanning marked this conversation as resolved.
Show resolved
Hide resolved
|
||
match tree { | ||
None => Ok(None), | ||
Some(tree) => { | ||
let (_, mut children, metadata) = tree.into_parts(); | ||
if let Some(target_ratio) = metadata { | ||
if let Some(ChunkedCompressorMetadata(target_ratio)) = | ||
target_ratio.as_ref().as_any().downcast_ref() | ||
{ | ||
if children.len() == 1 { | ||
match (children.remove(0), target_ratio) { | ||
(Some(child), Some(ratio)) => Ok(Some((child, *ratio))), | ||
(None, None) => Ok(None), | ||
(..) => { | ||
vortex_bail!("chunked array compression tree must have a child iff it has a ratio") | ||
} | ||
} | ||
} else { | ||
vortex_bail!("chunked array compression tree must have one child") | ||
} | ||
} else { | ||
vortex_bail!("chunked array compression tree must ChunkedCompressorMetadata") | ||
} | ||
} else { | ||
vortex_bail!("chunked array compression tree must have metadata") | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl ChunkedCompressor { | ||
fn compress_chunked<'a>( | ||
&'a self, | ||
array: &ChunkedArray, | ||
mut previous: Option<(CompressionTree<'a>, f32)>, | ||
ctx: SamplingCompressor<'a>, | ||
) -> VortexResult<CompressedArray<'a>> { | ||
let less_chunked = array.rechunk( | ||
ctx.options().target_block_bytesize, | ||
ctx.options().target_block_size, | ||
)?; | ||
let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); | ||
for (index, chunk) in less_chunked.chunks().enumerate() { | ||
let like = previous.as_ref().map(|(like, _)| like); | ||
let (compressed_chunk, tree) = ctx | ||
.named(&format!("chunk-{}", index)) | ||
.compress(&chunk, like)? | ||
.into_parts(); | ||
|
||
let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); | ||
let exceeded_target_ratio = previous | ||
.as_ref() | ||
.map(|(_, target_ratio)| ratio > target_ratio * 1.2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this magic constant 1.2 should be a field of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.unwrap_or(false); | ||
|
||
if ratio > 1.0 || exceeded_target_ratio { | ||
warn!("unsatisfactory ratio {} {:?}", ratio, previous); | ||
let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); | ||
let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); | ||
previous = tree.map(|tree| (tree, new_ratio)); | ||
compressed_chunks.push(compressed_chunk); | ||
} else { | ||
previous = previous.or_else(|| tree.map(|tree| (tree, ratio))); | ||
compressed_chunks.push(compressed_chunk); | ||
} | ||
} | ||
|
||
let (child, ratio) = match previous { | ||
Some((child, ratio)) => (Some(child), Some(ratio)), | ||
None => (None, None), | ||
}; | ||
|
||
Ok(CompressedArray::new( | ||
ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), | ||
Some(CompressionTree::new_with_metadata( | ||
self, | ||
vec![child], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if child is None, should this just be empty...? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, this is the diff. I don't find either of these particularly palatable but I think the fix is to sort out how compressors pass information from one invocation of compress to the next. (docs) # g diff
diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs
index c0f5aebe..9adf4ab4 100644
--- a/vortex-sampling-compressor/src/compressors/chunked.rs
+++ b/vortex-sampling-compressor/src/compressors/chunked.rs
@@ -78,16 +78,16 @@ fn like_into_parts(
vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata")
};
- if children.len() != 1 {
- vortex_bail!("chunked array compression tree must have one child")
+ if (children.len() == 1) != target_ratio.is_some() {
+ vortex_bail!("chunked array compression tree must have a child iff it has a ratio")
}
- let child = children.remove(0);
-
- match (child, target_ratio) {
- (None, None) => Ok(None),
- (Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
- (..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"),
+ if children.len() == 0 {
+ return Ok(None);
+ } else if children.len() == 1 {
+ return Ok(Some((children.remove(0).unwrap(), target_ratio.unwrap())));
+ } else {
+ vortex_bail!("chunked array compression tree must have at most one child")
}
}
@@ -141,16 +141,16 @@ impl ChunkedCompressor {
}
}
- let (child, ratio) = match previous {
- Some((child, ratio)) => (Some(child), Some(ratio)),
- None => (None, None),
+ let (children, ratio) = match previous {
+ Some((child, ratio)) => (vec![Some(child)], Some(ratio)),
+ None => (vec![], None),
};
Ok(CompressedArray::new(
ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
Some(CompressionTree::new_with_metadata(
self,
- vec![child],
+ children,
Arc::new(ChunkedCompressorMetadata(ratio)),
)),
)) |
||
Arc::new(ChunkedCompressorMetadata(ratio)), | ||
)), | ||
)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,7 +63,7 @@ impl EncodingCompressor for FSSTCompressor { | |
// between 2-3x depending on the text quality. | ||
// | ||
// It's not worth running a full compression step unless the array is large enough. | ||
if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE { | ||
if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why my changes encountered this, but I was getting samples that were hundreds of bytes too small for FSST which triggered this PR to compress poorly. We may want to think more broadly about how to estimate FSST compression ratio on a tiny sample, but for now this seems reasonable unless we're directly calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @a10y any guidance on how you arrived at 10x multiplier or just a guess? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I had chatted 1:1 with Dan about this, it was fairly arbitrary |
||
return Ok(CompressedArray::uncompressed(array.clone())); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
use std::collections::HashSet; | ||
|
||
use itertools::Itertools; | ||
use vortex::array::{Struct, StructArray}; | ||
use vortex::encoding::EncodingRef; | ||
use vortex::variants::StructArrayTrait; | ||
use vortex::{Array, ArrayDef, IntoArray}; | ||
use vortex_error::VortexResult; | ||
|
||
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; | ||
use crate::SamplingCompressor; | ||
|
||
#[derive(Debug)] | ||
pub struct StructCompressor; | ||
|
||
impl EncodingCompressor for StructCompressor { | ||
fn id(&self) -> &str { | ||
Struct::ID.as_ref() | ||
} | ||
|
||
fn cost(&self) -> u8 { | ||
0 | ||
} | ||
|
||
fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { | ||
StructArray::try_from(array) | ||
.ok() | ||
.map(|_| self as &dyn EncodingCompressor) | ||
} | ||
|
||
fn compress<'a>( | ||
&'a self, | ||
array: &Array, | ||
like: Option<CompressionTree<'a>>, | ||
ctx: SamplingCompressor<'a>, | ||
) -> VortexResult<CompressedArray<'a>> { | ||
let array = StructArray::try_from(array)?; | ||
let compressed_validity = ctx.compress_validity(array.validity())?; | ||
|
||
let children_trees = match like { | ||
Some(tree) => tree.children, | ||
None => vec![None; array.nfields()], | ||
}; | ||
|
||
let (arrays, trees) = array | ||
.children() | ||
.zip(children_trees) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.map(|(array, like)| ctx.compress(&array, like.as_ref())) | ||
.process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; | ||
|
||
Ok(CompressedArray::new( | ||
StructArray::try_new( | ||
array.names().clone(), | ||
arrays, | ||
array.len(), | ||
compressed_validity, | ||
)? | ||
.into_array(), | ||
Some(CompressionTree::new(self, trees)), | ||
)) | ||
} | ||
|
||
fn used_encodings(&self) -> HashSet<EncodingRef> { | ||
HashSet::from([]) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
array.is_encoding(&Chunked::ID).then_some(self)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.