Skip to content

Commit

Permalink
be slightly safer
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Sep 3, 2024
1 parent 1d1d56a commit 79197e9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 50 deletions.
23 changes: 15 additions & 8 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
Expand All @@ -9,7 +11,7 @@ use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
Expand All @@ -18,10 +20,11 @@ pub struct FSSTCompressor;
/// Maximum size in bytes of the FSST symbol table
const FSST_SYMTAB_MAX_SIZE: usize = 8 * 255 + 255;

/// We use a 16KB sample of text from the input.
///
/// This value is derived from the FSST paper section 4.4
// const DEFAULT_SAMPLE_BYTES: usize = 1 << 14;
impl EncoderMetadata for Compressor {
fn as_any(&self) -> &dyn Any {
self
}
}

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
Expand Down Expand Up @@ -60,13 +63,17 @@ impl EncodingCompressor for FSSTCompressor {
}

let compressor = like
.and_then(|mut c| unsafe { c.metadata::<Compressor>() })
.unwrap_or_else(|| Box::new(fsst_train_compressor(array)));
.and_then(|mut tree| tree.metadata())
.unwrap_or_else(|| Arc::new(fsst_train_compressor(array)));

let Some(fsst_compressor) = compressor.as_any().downcast_ref::<Compressor>() else {
vortex_bail!("Could not downcast metadata as FSST Compressor")
};

let result_array =
if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array, compressor.as_ref()).into_array()
fsst_compress(array, fsst_compressor).into_array()
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
Expand Down
67 changes: 25 additions & 42 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use vortex::encoding::EncodingRef;
use vortex::Array;
Expand Down Expand Up @@ -55,11 +57,25 @@ impl Hash for dyn EncodingCompressor + '_ {
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct CompressionTree<'a> {
compressor: &'a dyn EncodingCompressor,
children: Vec<Option<CompressionTree<'a>>>,
metadata: Option<*const ()>,
metadata: Option<Arc<dyn EncoderMetadata>>,
}

impl Debug for CompressionTree<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}

/// Metadata that can optionally be attached to a compression tree.
///
/// This enables codecs to cache trained parameters from the sampling runs to reuse for
/// the large run.
pub trait EncoderMetadata {
fn as_any(&self) -> &dyn Any;
}

impl Display for CompressionTree<'_> {
Expand Down Expand Up @@ -88,17 +104,15 @@ impl<'a> CompressionTree<'a> {
///
/// This can be specific encoder parameters that were discovered at sample time
/// that should be reused when compressing the full array.
pub(crate) fn new_with_metadata<T>(
pub(crate) fn new_with_metadata(
compressor: &'a dyn EncodingCompressor,
children: Vec<Option<CompressionTree<'a>>>,
metadata: Box<T>,
metadata: Arc<dyn EncoderMetadata>,
) -> Self {
// SAFETY: the memory pointed to will get cleaned up in Drop impl.
let ptr = Box::into_raw(metadata) as *const ();
Self {
compressor,
children,
metadata: Some(ptr),
metadata: Some(metadata),
}
}

Expand Down Expand Up @@ -129,45 +143,14 @@ impl<'a> CompressionTree<'a> {
.map(|c| c.compress(array, Some(self.clone()), ctx.for_compressor(c)))
}

// /// Access the saved opaque metadata by reference.
// ///
// /// # Safety
// ///
// /// It is up to the caller to ensure that the type `T` is the correct type for the stored
// /// metadata.
// ///
// /// The value of `T` will almost always be `EncodingCompressor`-specific.
// pub(crate) unsafe fn metadata_ref<T>(&self) -> Option<&T> {
// unsafe { self.metadata.map(|m| &*(m as *const T)) }
// }

/// Access the saved opaque metadata.
///
/// This will consume the struct's metadata pointer, giving the caller ownership of
/// the memory by returning a `Box<T>`.
///
/// # Safety
///
/// It is up to the caller to ensure that the type `T` is the correct type for the stored
/// metadata.
/// This will consume the owned metadata, giving the caller ownership of
/// the Box.
///
/// The value of `T` will almost always be `EncodingCompressor`-specific.
pub unsafe fn metadata<T>(&mut self) -> Option<Box<T>> {
let metadata = std::mem::take(&mut self.metadata);

metadata.map(|m| {
let ptr = m as *mut T;
unsafe { Box::from_raw(ptr) }
})
}
}

impl Drop for CompressionTree<'_> {
fn drop(&mut self) {
if let Some(ptr) = self.metadata {
// Recnostruct the box from the pointer to do a manual drop.
let _ = unsafe { Box::from_raw(ptr as *mut ()) };
}
pub fn metadata(&mut self) -> Option<Arc<dyn EncoderMetadata>> {
std::mem::take(&mut self.metadata)
}
}

Expand Down

0 comments on commit 79197e9

Please sign in to comment.