Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSSTCompressor #664

Merged
merged 16 commits into from
Sep 3, 2024
295 changes: 163 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.0"
fsst-rs = "0.3.0"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down Expand Up @@ -139,8 +139,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" }
vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false }
vortex-error = { version = "0.7.0", path = "./vortex-error" }
vortex-expr = { version = "0.7.0", path = "./vortex-expr" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" }
vortex-proto = { version = "0.7.0", path = "./vortex-proto" }
vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" }
vortex-runend = { version = "0.7.0", path = "./encodings/runend" }
Expand Down Expand Up @@ -169,5 +170,4 @@ panic_in_result_fn = { level = "deny" }
same_name_method = { level = "deny" }
tests_outside_test_module = { level = "deny" }
unwrap_in_result = { level = "deny" }
#unwrap_used = { level = "deny" }
use_debug = { level = "deny" }
4 changes: 4 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ bench = false
[[bench]]
name = "compress_benchmark"
harness = false
test = false

[[bench]]
name = "random_access"
test = false
harness = false

[[bench]]
name = "datafusion_benchmark"
test = false
harness = false

[[bench]]
name = "tpch_benchmark"
test = false
harness = false
66 changes: 64 additions & 2 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use bench_vortex::compress_taxi_data;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets;
use bench_vortex::public_bi_data::PBIDataset::Medicare1;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{compress_taxi_data, tpch};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use vortex::{IntoArray, IntoCanonical};
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::SamplingCompressor;

fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
Expand All @@ -24,5 +28,63 @@ fn vortex_compress_medicare1(c: &mut Criterion) {
group.finish()
}

criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1);
fn vortex_compress_tpch(c: &mut Criterion) {
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let lineitem_vortex = rt.block_on(tpch::load_table(
data_dir,
"lineitem",
&tpch::schema::LINEITEM,
));

let compressor = SamplingCompressor::default().excluding(&FSSTCompressor);

let compressor_fsst = SamplingCompressor::default();

// l_comment column only
let mut group = c.benchmark_group("l_comment");
let comments = lineitem_vortex.with_dyn(|a| {
a.as_struct_array_unchecked()
.field_by_name("l_comment")
.unwrap()
});

group.sample_size(10);
group.bench_function("compress-default", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor.compress(&comments, None)).unwrap()
});
});

group.bench_function("compress-fsst-chunked", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor_fsst.compress(&comments, None)).unwrap()
});
});

// Compare canonicalizing
let comments_canonical = comments
.into_canonical()
.unwrap()
.into_varbin()
.unwrap()
.into_array();
group.bench_function("compress-fsst-canonicalized", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor_fsst.compress(&comments_canonical, None)).unwrap()
});
});

group.finish();
}

criterion_group!(
benches,
vortex_compress_taxi,
vortex_compress_medicare1,
vortex_compress_tpch
);
criterion_main!(benches);
21 changes: 12 additions & 9 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
group.sample_size(10).bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path = object_store::path::Path::from_url_path(
taxi_vortex.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
a10y marked this conversation as resolved.
Show resolved Hide resolved

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
Expand All @@ -63,18 +65,19 @@ fn random_access_parquet(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet");
group.sample_size(10);

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let taxi_parquet = taxi_data_parquet();
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
});

let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
group.bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
Expand Down Expand Up @@ -72,6 +73,7 @@ lazy_static! {
&DictCompressor,
&BitPackedCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
Expand Down
37 changes: 37 additions & 0 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,43 @@ async fn register_vortex(
Ok(())
}

/// Load a table as an uncompressed Vortex array.
pub async fn load_table(data_dir: impl AsRef<Path>, name: &str, schema: &Schema) -> Array {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
// Create a local session to load the CSV file from the path.
let path = data_dir
.as_ref()
.to_owned()
.join(format!("{name}.tbl"))
.to_str()
.unwrap()
.to_string();
let record_batches = SessionContext::new()
.read_csv(
&path,
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await
.unwrap()
.collect()
.await
.unwrap();

let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(ArrowStructArray::from)
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

let dtype = chunks[0].dtype().clone();

ChunkedArray::try_new(chunks, dtype).unwrap().into_array()
}

pub fn tpch_queries() -> impl Iterator<Item = (usize, Vec<String>)> {
(1..=22).map(|q| (q, tpch_query(q)))
}
Expand Down
62 changes: 48 additions & 14 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use fsst::{Decompressor, Symbol, MAX_CODE};
use fsst::{Decompressor, Symbol};
use serde::{Deserialize, Serialize};
use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity};
Expand All @@ -13,6 +13,7 @@ use vortex_error::{vortex_bail, VortexResult};
impl_encoding!("vortex.fsst", 24u16, FSST);

static SYMBOLS_DTYPE: DType = DType::Primitive(PType::U64, Nullability::NonNullable);
static SYMBOL_LENS_DTYPE: DType = DType::Primitive(PType::U8, Nullability::NonNullable);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FSSTMetadata {
Expand All @@ -29,26 +30,39 @@ impl FSSTArray {
/// The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes.
/// Each code corresponds either to a symbol, or to the "escape code",
/// which tells the decoder to emit the following byte without doing a table lookup.
pub fn try_new(dtype: DType, symbols: Array, codes: Array) -> VortexResult<Self> {
pub fn try_new(
dtype: DType,
symbols: Array,
symbol_lengths: Array,
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
codes: Array,
) -> VortexResult<Self> {
// Check: symbols must be a u64 array
if symbols.dtype() != &DType::Primitive(PType::U64, Nullability::NonNullable) {
if symbols.dtype() != &SYMBOLS_DTYPE {
vortex_bail!(InvalidArgument: "symbols array must be of type u64")
}

if symbol_lengths.dtype() != &SYMBOL_LENS_DTYPE {
vortex_bail!(InvalidArgument: "symbol_lengths array must be of type u8")
}

// Check: symbols must not have length > MAX_CODE
if symbols.len() > MAX_CODE as usize {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255")
if symbols.len() > 255 {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
}

if symbols.len() != symbol_lengths.len() {
vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
}

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "strings array must be DType::Binary type");
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
}

let symbols_len = symbols.len();
let len = codes.len();
let strings_dtype = codes.dtype().clone();
let children = Arc::new([symbols, codes]);
let children = Arc::new([symbols, symbol_lengths, codes]);

Self::try_from_parts(
dtype,
Expand All @@ -69,18 +83,28 @@ impl FSSTArray {
.expect("FSSTArray must have a symbols child array")
}

/// Access the symbol table array
pub fn symbol_lengths(&self) -> Array {
self.array()
.child(1, &SYMBOL_LENS_DTYPE, self.metadata().symbols_len)
.expect("FSSTArray must have a symbols child array")
}

/// Access the codes array
pub fn codes(&self) -> Array {
self.array()
.child(1, &self.metadata().codes_dtype, self.len())
.child(2, &self.metadata().codes_dtype, self.len())
.expect("FSSTArray must have a codes child array")
}

/// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
/// this array.
/// this array, and pass it to the given function.
///
/// This is private to the crate to avoid leaking `fsst` as part of the public API.
pub(crate) fn decompressor(&self) -> Decompressor {
/// This is private to the crate to avoid leaking `fsst-rs` types as part of the public API.
pub(crate) fn with_decompressor<F, R>(&self, apply: F) -> R
where
F: FnOnce(Decompressor) -> R,
{
// canonicalize the symbols child array, so we can view it contiguously
let symbols_array = self
.symbols()
Expand All @@ -90,18 +114,28 @@ impl FSSTArray {
.expect("Symbols must be a Primitive Array");
let symbols = symbols_array.maybe_null_slice::<u64>();

let symbol_lengths_array = self
.symbol_lengths()
.into_canonical()
.unwrap()
.into_primitive()
.unwrap();
let symbol_lengths = symbol_lengths_array.maybe_null_slice::<u8>();

// Transmute the 64-bit symbol values into fsst `Symbol`s.
// SAFETY: Symbol is guaranteed to be 8 bytes, guaranteed by the compiler.
let symbols = unsafe { std::mem::transmute::<&[u64], &[Symbol]>(symbols) };

// Build a new decompressor that uses these symbols.
Decompressor::new(symbols)
let decompressor = Decompressor::new(symbols, symbol_lengths);
apply(decompressor)
}
}

impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
todo!("implement this")
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
}
}

Expand Down
Loading
Loading