Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSSTCompressor #664

Merged
merged 16 commits into from
Sep 3, 2024
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.0"
fsst-rs = "0.2.2"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down Expand Up @@ -138,8 +138,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" }
vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false }
vortex-error = { version = "0.7.0", path = "./vortex-error" }
vortex-expr = { version = "0.7.0", path = "./vortex-expr" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" }
vortex-proto = { version = "0.7.0", path = "./vortex-proto" }
vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" }
vortex-runend = { version = "0.7.0", path = "./encodings/runend" }
Expand Down
4 changes: 4 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ bench = false
[[bench]]
name = "compress_benchmark"
harness = false
test = false

[[bench]]
name = "random_access"
test = false
harness = false

[[bench]]
name = "datafusion_benchmark"
test = false
harness = false

[[bench]]
name = "tpch_benchmark"
test = false
harness = false
21 changes: 12 additions & 9 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
group.sample_size(10).bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path = object_store::path::Path::from_url_path(
taxi_vortex.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
Expand All @@ -63,18 +65,19 @@ fn random_access_parquet(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet");
group.sample_size(10);

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let taxi_parquet = taxi_data_parquet();
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
});

let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
group.bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
Expand Down Expand Up @@ -72,6 +73,7 @@ lazy_static! {
&DictCompressor,
&BitPackedCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
Expand Down
7 changes: 4 additions & 3 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl FSSTArray {

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "strings array must be DType::Binary type");
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
}

let symbols_len = symbols.len();
Expand Down Expand Up @@ -95,8 +95,9 @@ impl FSSTArray {
}

impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
todo!("implement this")
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
}
}

Expand Down
94 changes: 94 additions & 0 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Compress a set of values into an Array.

use fsst::{Compressor, Symbol};
use vortex::accessor::ArrayAccessor;
use vortex::array::builder::VarBinBuilder;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::DType;

use crate::FSSTArray;

/// Compress an array using FSST. If a compressor is provided, use the existing compressor, else
/// it will train a new compressor directly from the `strings`.
///
/// # Panics
///
/// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`].
pub fn fsst_compress(strings: Array, compressor: Option<Compressor>) -> FSSTArray {
let len = strings.len();
let dtype = strings.dtype().clone();

// Compress VarBinArray
if let Ok(varbin) = VarBinArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

// Compress VarBinViewArray
if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin_view
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

panic!(
"cannot fsst_compress array with unsupported encoding {:?}",
strings.encoding().id()
)
}

fn fsst_train_compressor<'a, I>(iter: I) -> Compressor
where
I: Iterator<Item = Option<&'a [u8]>>,
{
// TODO(aduffy): eliminate the copying.
let mut sample = Vec::with_capacity(1_024 * 1_024);
for string in iter {
match string {
None => {}
Some(b) => sample.extend_from_slice(b),
}
}

Compressor::train(&sample)
}

pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
dtype: DType,
compressor: &Compressor,
) -> FSSTArray
where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
for string in iter {
match string {
None => builder.push_null(),
Some(s) => builder.push_value(&compressor.compress(s)),
}
}

let codes = builder.finish(DType::Binary(dtype.nullability()));
let symbols_vec: Vec<Symbol> = compressor.symbol_table().to_vec();
// SAFETY: Symbol and u64 are same size
let symbols_u64: Vec<u64> = unsafe { std::mem::transmute(symbols_vec) };
let symbols = PrimitiveArray::from_vec(symbols_u64, Validity::NonNullable);

FSSTArray::try_new(dtype, symbols.into_array(), codes.into_array())
.expect("building FSSTArray from parts")
}
2 changes: 2 additions & 0 deletions encodings/fsst/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

mod array;
mod canonical;
mod compress;
mod compute;

pub use array::*;
pub use compress::*;
4 changes: 0 additions & 4 deletions encodings/fsst/tests/fsst_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ fn fsst_array() -> Array {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_scalar_at(fsst_array: Array) {
assert_nth_scalar!(
fsst_array,
Expand All @@ -91,7 +90,6 @@ fn test_scalar_at(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_slice(fsst_array: Array) {
let fsst_sliced = slice(&fsst_array, 1, 3).unwrap();
assert_eq!(fsst_sliced.encoding().id(), FSST::ENCODING.id());
Expand All @@ -109,7 +107,6 @@ fn test_slice(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_take(fsst_array: Array) {
let indices = PrimitiveArray::from_vec(vec![0, 2], Validity::NonNullable).into_array();
let fsst_taken = take(&fsst_array, &indices).unwrap();
Expand All @@ -127,7 +124,6 @@ fn test_take(fsst_array: Array) {
}

#[rstest]
#[cfg_attr(miri, ignore)]
fn test_filter(fsst_array: Array) {
let predicate =
BoolArray::from_vec(vec![false, true, false], Validity::NonNullable).into_array();
Expand Down
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = { workspace = true }
rust-version = { workspace = true }

[dependencies]
fsst-rs = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
vortex-alp = { workspace = true }
Expand All @@ -22,6 +23,7 @@ vortex-dict = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-fsst = { workspace = true }
vortex-roaring = { workspace = true }
vortex-runend = { workspace = true }
vortex-zigzag = { workspace = true }
Expand Down
67 changes: 67 additions & 0 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::HashSet;

use vortex::array::{VarBin, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dict::DictArray;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct FSSTCompressor;

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
FSST::ID.as_ref()
}

fn can_compress(&self, array: &vortex::Array) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
// Note that while it can accept binary data, it is unlikely to perform well.
if !matches!(array.dtype(), &DType::Utf8(_)) {
return None;
}

// FSST cannot be applied recursively.
if array.encoding().id() == FSST::ID {
return None;
}

Some(self)
}

fn compress<'a>(
&'a self,
array: &vortex::Array,
_like: Option<CompressionTree<'a>>,
_ctx: SamplingCompressor<'a>,
) -> VortexResult<super::CompressedArray<'a>> {
// TODO(aduffy): use like array to clone the existing symbol table
let result_array =
if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array.clone(), None).into_array()
} else if let Ok(dict) = DictArray::try_from(array) {
// For a dict array, just compress the values
let values = fsst_compress(dict.values(), None).into_array();
let codes = dict.codes();
DictArray::try_new(codes, values)?.into_array()
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
array.encoding().id()
)
};

Ok(CompressedArray::new(result_array, None))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FSSTEncoding as EncodingRef])
}
}
1 change: 1 addition & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod date_time_parts;
pub mod delta;
pub mod dict;
pub mod r#for;
pub mod fsst;
pub mod roaring_bool;
pub mod roaring_int;
pub mod runend;
Expand Down
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};

use compressors::fsst::FSSTCompressor;
use log::{debug, info, warn};
use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray};
use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy};
Expand Down Expand Up @@ -89,6 +90,7 @@ impl Default for SamplingCompressor<'_> {
// TODO(robert): Implement minimal compute for DeltaArrays - scalar_at and slice
// &DeltaCompressor,
&DictCompressor,
&FSSTCompressor,
&FoRCompressor,
&DateTimePartsCompressor,
&RoaringBoolCompressor,
Expand Down
Loading