Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSSTCompressor #664

Merged
merged 16 commits into from
Sep 3, 2024
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.0"
fsst-rs = "0.2.1"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down Expand Up @@ -138,8 +138,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" }
vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false }
vortex-error = { version = "0.7.0", path = "./vortex-error" }
vortex-expr = { version = "0.7.0", path = "./vortex-expr" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" }
vortex-proto = { version = "0.7.0", path = "./vortex-proto" }
vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" }
vortex-runend = { version = "0.7.0", path = "./encodings/runend" }
Expand Down
94 changes: 94 additions & 0 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Compress a set of values into an Array.

use fsst::{Compressor, Symbol};
use vortex::accessor::ArrayAccessor;
use vortex::array::builder::VarBinBuilder;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::DType;

use crate::FSSTArray;

/// Compress an array using FSST. If a compressor is provided, use the existing compressor, else
/// it will train a new compressor directly from the `strings`.
///
/// # Panics
///
/// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`].
pub fn fsst_compress(strings: Array, compressor: Option<Compressor>) -> FSSTArray {
let len = strings.len();
let dtype = strings.dtype().clone();

// Compress VarBinArray
if let Ok(varbin) = VarBinArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

// Compress VarBinViewArray
if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin_view
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

panic!(
"cannot fsst_compress array with unsupported encoding {:?}",
strings.encoding().id()
)
}

fn fsst_train_compressor<'a, I>(iter: I) -> Compressor
where
I: Iterator<Item = Option<&'a [u8]>>,
{
// TODO(aduffy): eliminate the copying.
let mut sample = Vec::with_capacity(1_024 * 1_024);
for string in iter {
match string {
None => {}
Some(b) => sample.extend_from_slice(b),
}
}

Compressor::train(&sample)
}

pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
dtype: DType,
compressor: &Compressor,
) -> FSSTArray
where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
for string in iter {
match string {
None => builder.push_null(),
Some(s) => builder.push_value(&compressor.compress(s)),
}
}

let codes = builder.finish(dtype.clone());
let symbols_vec: Vec<Symbol> = compressor.symbol_table().to_vec();
// SAFETY: Symbol and u64 are same size
let symbols_u64: Vec<u64> = unsafe { std::mem::transmute(symbols_vec) };
let symbols = PrimitiveArray::from_vec(symbols_u64, Validity::NonNullable);

FSSTArray::try_new(dtype, symbols.into_array(), codes.into_array())
.expect("building FSSTArray from parts")
}
2 changes: 2 additions & 0 deletions encodings/fsst/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

mod array;
mod canonical;
mod compress;
mod compute;

pub use array::*;
pub use compress::*;
4 changes: 0 additions & 4 deletions encodings/fsst/tests/fsst_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ fn fsst_array() -> Array {
}

#[rstest]
#[cfg_attr(miri, ignore)]
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
fn test_scalar_at(fsst_array: Array) {
assert_nth_scalar!(
fsst_array,
Expand All @@ -91,7 +90,6 @@ fn test_scalar_at(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_slice(fsst_array: Array) {
let fsst_sliced = slice(&fsst_array, 1, 3).unwrap();
assert_eq!(fsst_sliced.encoding().id(), FSST::ENCODING.id());
Expand All @@ -109,7 +107,6 @@ fn test_slice(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_take(fsst_array: Array) {
let indices = PrimitiveArray::from_vec(vec![0, 2], Validity::NonNullable).into_array();
let fsst_taken = take(&fsst_array, &indices).unwrap();
Expand All @@ -127,7 +124,6 @@ fn test_take(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_filter(fsst_array: Array) {
let predicate =
BoolArray::from_vec(vec![false, true, false], Validity::NonNullable).into_array();
Expand Down
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = { workspace = true }
rust-version = { workspace = true }

[dependencies]
fsst-rs = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
vortex-alp = { workspace = true }
Expand All @@ -22,6 +23,7 @@ vortex-dict = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-fsst = { workspace = true }
vortex-roaring = { workspace = true }
vortex-runend = { workspace = true }
vortex-zigzag = { workspace = true }
Expand Down
65 changes: 65 additions & 0 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::collections::HashSet;

use vortex::array::{VarBinArray, VarBinViewArray};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dict::DictArray;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct FSSTCompressor;

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
FSST::ID.as_ref()
}

fn can_compress(&self, array: &vortex::Array) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
// Note that while it can accept binary data, it is unlikely to perform well.
if !matches!(array.dtype(), &DType::Utf8(_)) {
return None;
}

// FSST cannot be applied recursively.
if array.encoding().id() == FSST::ID {
return None;
}

Some(self)
}

fn compress<'a>(
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
array: &vortex::Array,
_like: Option<CompressionTree<'a>>,
_ctx: SamplingCompressor<'a>,
) -> VortexResult<super::CompressedArray<'a>> {
// TODO(aduffy): use like array to clone the existing symbol table
let fsst_array =
if VarBinArray::try_from(array).is_ok() || VarBinViewArray::try_from(array).is_ok() {
a10y marked this conversation as resolved.
Show resolved Hide resolved
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array.clone(), None)
} else if let Ok(dict) = DictArray::try_from(array) {
// For a dict array, just compress the values
fsst_compress(dict.values(), None)
a10y marked this conversation as resolved.
Show resolved Hide resolved
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
array.encoding().id()
)
};

Ok(CompressedArray::new(fsst_array.into_array(), None))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FSSTEncoding as EncodingRef])
}
}
1 change: 1 addition & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod date_time_parts;
pub mod delta;
pub mod dict;
pub mod r#for;
pub mod fsst;
pub mod roaring_bool;
pub mod roaring_int;
pub mod runend;
Expand Down
Loading