Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSSTCompressor #664

Merged
merged 16 commits into from
Sep 3, 2024
23 changes: 15 additions & 8 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
Expand All @@ -9,7 +11,7 @@ use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
Expand All @@ -18,10 +20,11 @@ pub struct FSSTCompressor;
/// Maximum size in bytes of the FSST symbol table
const FSST_SYMTAB_MAX_SIZE: usize = 8 * 255 + 255;

/// We use a 16KB sample of text from the input.
///
/// This value is derived from the FSST paper section 4.4
// const DEFAULT_SAMPLE_BYTES: usize = 1 << 14;
impl EncoderMetadata for Compressor {
fn as_any(&self) -> &dyn Any {
self
}
}

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
Expand Down Expand Up @@ -60,13 +63,17 @@ impl EncodingCompressor for FSSTCompressor {
}

let compressor = like
.and_then(|mut c| unsafe { c.metadata::<Compressor>() })
.unwrap_or_else(|| Box::new(fsst_train_compressor(array)));
.and_then(|mut tree| tree.metadata())
.unwrap_or_else(|| Arc::new(fsst_train_compressor(array)));

let Some(fsst_compressor) = compressor.as_any().downcast_ref::<Compressor>() else {
vortex_bail!("Could not downcast metadata as FSST Compressor")
};

let result_array =
if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array, compressor.as_ref()).into_array()
fsst_compress(array, fsst_compressor).into_array()
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
Expand Down
67 changes: 25 additions & 42 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use vortex::encoding::EncodingRef;
use vortex::Array;
Expand Down Expand Up @@ -55,11 +57,25 @@ impl Hash for dyn EncodingCompressor + '_ {
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct CompressionTree<'a> {
compressor: &'a dyn EncodingCompressor,
children: Vec<Option<CompressionTree<'a>>>,
metadata: Option<*const ()>,
metadata: Option<Arc<dyn EncoderMetadata>>,
}

impl Debug for CompressionTree<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}

/// Metadata that can optionally be attached to a compression tree.
///
/// This enables codecs to cache trained parameters from the sampling runs to reuse for
/// the large run.
pub trait EncoderMetadata {
fn as_any(&self) -> &dyn Any;
}

impl Display for CompressionTree<'_> {
Expand Down Expand Up @@ -88,17 +104,15 @@ impl<'a> CompressionTree<'a> {
///
/// This can be specific encoder parameters that were discovered at sample time
/// that should be reused when compressing the full array.
pub(crate) fn new_with_metadata<T>(
pub(crate) fn new_with_metadata(
compressor: &'a dyn EncodingCompressor,
children: Vec<Option<CompressionTree<'a>>>,
metadata: Box<T>,
metadata: Arc<dyn EncoderMetadata>,
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
// SAFETY: the memory pointed to will get cleaned up in Drop impl.
let ptr = Box::into_raw(metadata) as *const ();
Self {
compressor,
children,
metadata: Some(ptr),
metadata: Some(metadata),
}
}

Expand Down Expand Up @@ -129,45 +143,14 @@ impl<'a> CompressionTree<'a> {
.map(|c| c.compress(array, Some(self.clone()), ctx.for_compressor(c)))
}

// /// Access the saved opaque metadata by reference.
// ///
// /// # Safety
// ///
// /// It is up to the caller to ensure that the type `T` is the correct type for the stored
// /// metadata.
// ///
// /// The value of `T` will almost always be `EncodingCompressor`-specific.
// pub(crate) unsafe fn metadata_ref<T>(&self) -> Option<&T> {
// unsafe { self.metadata.map(|m| &*(m as *const T)) }
// }

/// Access the saved opaque metadata.
///
/// This will consume the struct's metadata pointer, giving the caller ownership of
/// the memory by returning a `Box<T>`.
///
/// # Safety
///
/// It is up to the caller to ensure that the type `T` is the correct type for the stored
/// metadata.
/// This will consume the owned metadata, giving the caller ownership of
/// the Box.
///
/// The value of `T` will almost always be `EncodingCompressor`-specific.
pub unsafe fn metadata<T>(&mut self) -> Option<Box<T>> {
let metadata = std::mem::take(&mut self.metadata);

metadata.map(|m| {
let ptr = m as *mut T;
unsafe { Box::from_raw(ptr) }
})
}
}

impl Drop for CompressionTree<'_> {
fn drop(&mut self) {
if let Some(ptr) = self.metadata {
// Recnostruct the box from the pointer to do a manual drop.
let _ = unsafe { Box::from_raw(ptr as *mut ()) };
}
pub fn metadata(&mut self) -> Option<Arc<dyn EncoderMetadata>> {
std::mem::take(&mut self.metadata)
}
}

Expand Down
Loading