Skip to content

Commit

Permalink
feat: teach FSSTArray to compress the offsets of its codes (#952)
Browse files Browse the repository at this point in the history
The codes of an FSSTArray are a vector of binary-strings of one byte
codes or an escape code followed by a data. The offsets, unexpectedly,
grow quite large, increasing the file size (for example, the TPC-H
l_comment column with this PR is 78% the byte size of itself on
`develop`). Delta encoding notably decreases the size but also inflates
the compression time, seemingly proportional to the space savings (TPC-H
l_comment compresses in 111% of the time on `develop`).

---------

Co-authored-by: Robert Kruszewski <[email protected]>
  • Loading branch information
danking and robert3005 authored Oct 1, 2024
1 parent 02b14c9 commit c95f037
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 34 deletions.
26 changes: 6 additions & 20 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_dtype::DType;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_roaring::RoaringBoolEncoding;
use vortex_runend::RunEndEncoding;
use vortex_fastlanes::DeltaEncoding;
use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
Expand All @@ -50,19 +44,11 @@ pub mod tpch;
pub mod vortex_utils;

lazy_static! {
pub static ref CTX: Arc<Context> = Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&RunEndEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]));
pub static ref CTX: Arc<Context> = Arc::new(
Context::default()
.with_encodings(SamplingCompressor::default().used_encodings())
.with_encoding(&DeltaEncoding)
);
}

lazy_static! {
Expand Down
13 changes: 3 additions & 10 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
use vortex_dtype::DType;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::LayoutWriter;

use crate::idempotent_async;
use crate::{idempotent_async, CTX};

pub mod dbgen;
mod execute;
Expand Down Expand Up @@ -312,12 +311,6 @@ async fn register_vortex_file(
})
.await?;

let ctx = if enable_compression {
Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings()))
} else {
Arc::new(Context::default())
};

let f = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -336,7 +329,7 @@ async fn register_vortex_file(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
ctx,
CTX.clone(),
),
)?;

Expand Down
4 changes: 3 additions & 1 deletion encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl FSSTArray {
impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
visitor.visit_child("symbol_lengths", &self.symbol_lengths())?;
visitor.visit_child("codes", &self.codes())?;
visitor.visit_child("uncompressed_lengths", &self.uncompressed_lengths())
}
}

Expand Down
27 changes: 24 additions & 3 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use std::fmt::Debug;
use std::sync::Arc;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
use vortex::array::{VarBin, VarBinArray, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST};

use super::delta::DeltaCompressor;
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;

Expand Down Expand Up @@ -91,18 +92,38 @@ impl EncodingCompressor for FSSTCompressor {
like.as_ref().and_then(|l| l.child(0)),
)?;

let codes_varbin = VarBinArray::try_from(fsst_array.codes())?;
let codes_varbin_dtype = codes_varbin.dtype().clone();

let codes_offsets_compressed = ctx
.named("fsst_codes_offsets")
.excluding(self)
.including(&DeltaCompressor)
.compress(
&codes_varbin.offsets(),
like.as_ref().and_then(|l| l.child(1)),
)?;

let codes = VarBinArray::try_new(
codes_offsets_compressed.array,
codes_varbin.bytes(),
codes_varbin_dtype,
codes_varbin.validity(),
)?
.into_array();

Ok(CompressedArray::new(
FSSTArray::try_new(
fsst_array.dtype().clone(),
fsst_array.symbols(),
fsst_array.symbol_lengths(),
fsst_array.codes(),
codes,
uncompressed_lengths.array,
)?
.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![uncompressed_lengths.path],
vec![uncompressed_lengths.path, codes_offsets_compressed.path],
compressor,
)),
))
Expand Down
6 changes: 6 additions & 0 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ impl<'a> SamplingCompressor<'a> {
cloned
}

pub fn including(&self, compressor: CompressorRef<'a>) -> Self {
let mut cloned = self.clone();
cloned.compressors.insert(compressor);
cloned
}

#[allow(clippy::same_name_method)]
pub fn compress(
&self,
Expand Down

0 comments on commit c95f037

Please sign in to comment.