diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 541e2d8609..04f8b364db 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -16,15 +16,9 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; -use vortex::encoding::EncodingRef; use vortex::{Array, Context, IntoArray}; -use vortex_alp::ALPEncoding; -use vortex_datetime_parts::DateTimePartsEncoding; -use vortex_dict::DictEncoding; use vortex_dtype::DType; -use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; -use vortex_roaring::RoaringBoolEncoding; -use vortex_runend::RunEndEncoding; +use vortex_fastlanes::DeltaEncoding; use vortex_sampling_compressor::compressors::alp::ALPCompressor; use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor; use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor; @@ -50,19 +44,11 @@ pub mod tpch; pub mod vortex_utils; lazy_static! { - pub static ref CTX: Arc = Arc::new(Context::default().with_encodings([ - &ALPEncoding as EncodingRef, - &DictEncoding, - &BitPackedEncoding, - &FoREncoding, - &DateTimePartsEncoding, - // &DeltaEncoding, Blows up the search space too much. - &RunEndEncoding, - &RoaringBoolEncoding, - // &RoaringIntEncoding, - // Doesn't offer anything more than FoR really - // &ZigZagEncoding, - ])); + pub static ref CTX: Arc = Arc::new( + Context::default() + .with_encodings(SamplingCompressor::default().used_encodings()) + .with_encoding(&DeltaEncoding) + ); } lazy_static! { diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index bcceeace78..c4dae19193 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -14,9 +14,8 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use tokio::fs::OpenOptions; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; -use vortex::compress::CompressionStrategy; use vortex::variants::StructArrayTrait; -use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_datafusion::memory::VortexMemTableOptions; use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions}; use vortex_datafusion::SessionContextExt; @@ -24,7 +23,7 @@ use vortex_dtype::DType; use vortex_sampling_compressor::SamplingCompressor; use vortex_serde::layouts::LayoutWriter; -use crate::idempotent_async; +use crate::{idempotent_async, CTX}; pub mod dbgen; mod execute; @@ -312,12 +311,6 @@ async fn register_vortex_file( }) .await?; - let ctx = if enable_compression { - Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings())) - } else { - Arc::new(Context::default()) - }; - let f = OpenOptions::new() .read(true) .write(true) @@ -336,7 +329,7 @@ async fn register_vortex_file( vtx_file.to_str().unwrap().to_string(), file_size, )], - ctx, + CTX.clone(), ), )?; diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 23f004a787..6130200ac1 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -163,7 +163,9 @@ impl FSSTArray { impl AcceptArrayVisitor for FSSTArray { fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> { visitor.visit_child("symbols", &self.symbols())?; - visitor.visit_child("codes", &self.codes()) + visitor.visit_child("symbol_lengths", &self.symbol_lengths())?; + visitor.visit_child("codes", &self.codes())?; + visitor.visit_child("uncompressed_lengths", &self.uncompressed_lengths()) } } diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 09ca22f9e6..1cd1365e28 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -4,13 +4,14 @@ use std::fmt::Debug; use std::sync::Arc; use fsst::Compressor; -use vortex::array::{VarBin, VarBinView}; +use vortex::array::{VarBin, VarBinArray, VarBinView}; use vortex::encoding::EncodingRef; use vortex::{ArrayDType, ArrayDef, IntoArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST}; +use super::delta::DeltaCompressor; use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor}; use crate::SamplingCompressor; @@ -91,18 +92,38 @@ impl EncodingCompressor for FSSTCompressor { like.as_ref().and_then(|l| l.child(0)), )?; + let codes_varbin = VarBinArray::try_from(fsst_array.codes())?; + let codes_varbin_dtype = codes_varbin.dtype().clone(); + + let codes_offsets_compressed = ctx + .named("fsst_codes_offsets") + .excluding(self) + .including(&DeltaCompressor) + .compress( + &codes_varbin.offsets(), + like.as_ref().and_then(|l| l.child(1)), + )?; + + let codes = VarBinArray::try_new( + codes_offsets_compressed.array, + codes_varbin.bytes(), + codes_varbin_dtype, + codes_varbin.validity(), + )? + .into_array(); + Ok(CompressedArray::new( FSSTArray::try_new( fsst_array.dtype().clone(), fsst_array.symbols(), fsst_array.symbol_lengths(), - fsst_array.codes(), + codes, uncompressed_lengths.array, )? .into_array(), Some(CompressionTree::new_with_metadata( self, - vec![uncompressed_lengths.path], + vec![uncompressed_lengths.path, codes_offsets_compressed.path], compressor, )), )) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 50edc93115..f6a7790eb7 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -165,6 +165,12 @@ impl<'a> SamplingCompressor<'a> { cloned } + pub fn including(&self, compressor: CompressorRef<'a>) -> Self { + let mut cloned = self.clone(); + cloned.compressors.insert(compressor); + cloned + } + #[allow(clippy::same_name_method)] pub fn compress( &self,