diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 541e2d8609..04f8b364db 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -16,15 +16,9 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; -use vortex::encoding::EncodingRef; use vortex::{Array, Context, IntoArray}; -use vortex_alp::ALPEncoding; -use vortex_datetime_parts::DateTimePartsEncoding; -use vortex_dict::DictEncoding; use vortex_dtype::DType; -use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; -use vortex_roaring::RoaringBoolEncoding; -use vortex_runend::RunEndEncoding; +use vortex_fastlanes::DeltaEncoding; use vortex_sampling_compressor::compressors::alp::ALPCompressor; use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor; use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor; @@ -50,19 +44,11 @@ pub mod tpch; pub mod vortex_utils; lazy_static! { - pub static ref CTX: Arc = Arc::new(Context::default().with_encodings([ - &ALPEncoding as EncodingRef, - &DictEncoding, - &BitPackedEncoding, - &FoREncoding, - &DateTimePartsEncoding, - // &DeltaEncoding, Blows up the search space too much. - &RunEndEncoding, - &RoaringBoolEncoding, - // &RoaringIntEncoding, - // Doesn't offer anything more than FoR really - // &ZigZagEncoding, - ])); + pub static ref CTX: Arc = Arc::new( + Context::default() + .with_encodings(SamplingCompressor::default().used_encodings()) + .with_encoding(&DeltaEncoding) + ); } lazy_static! { diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 8c2e2ecd58..8c979ea3c0 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -14,9 +14,8 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use tokio::fs::OpenOptions; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; -use vortex::compress::CompressionStrategy; use vortex::variants::StructArrayTrait; -use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_datafusion::memory::VortexMemTableOptions; use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions}; use vortex_datafusion::SessionContextExt; @@ -24,7 +23,7 @@ use vortex_dtype::DType; use vortex_sampling_compressor::SamplingCompressor; use vortex_serde::layouts::LayoutWriter; -use crate::idempotent_async; +use crate::{idempotent_async, CTX}; pub mod dbgen; mod execute; @@ -306,12 +305,6 @@ async fn register_vortex_file( }) .await?; - let ctx = if enable_compression { - Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings())) - } else { - Arc::new(Context::default()) - }; - let f = OpenOptions::new() .read(true) .write(true) @@ -330,7 +323,7 @@ async fn register_vortex_file( vtx_file.to_str().unwrap().to_string(), file_size, )], - ctx, + CTX.clone(), ), )?; diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 23f004a787..6130200ac1 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -163,7 +163,9 @@ impl FSSTArray { impl AcceptArrayVisitor for FSSTArray { fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> { visitor.visit_child("symbols", &self.symbols())?; - visitor.visit_child("codes", &self.codes()) + visitor.visit_child("symbol_lengths", &self.symbol_lengths())?; + visitor.visit_child("codes", &self.codes())?; + visitor.visit_child("uncompressed_lengths", &self.uncompressed_lengths()) } } diff --git a/vortex-sampling-compressor/src/compressors/delta.rs b/vortex-sampling-compressor/src/compressors/delta.rs index 2304444200..8a6f78bfd9 100644 --- a/vortex-sampling-compressor/src/compressors/delta.rs +++ b/vortex-sampling-compressor/src/compressors/delta.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; +use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; use vortex_fastlanes::{delta_compress, Delta, DeltaArray, DeltaEncoding}; @@ -12,6 +13,10 @@ use crate::SamplingCompressor; #[derive(Debug)] pub struct DeltaCompressor; +fn possibly_negative(v: Option) -> bool { + v.map(|x| x < 0).unwrap_or(true) +} + impl EncodingCompressor for DeltaCompressor { fn id(&self) -> &str { Delta::ID.as_ref() @@ -21,11 +26,17 @@ impl EncodingCompressor for DeltaCompressor { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; - // Only supports ints - if !parray.ptype().is_unsigned_int() { + if !parray.ptype().is_int() { return None; } + if parray.ptype().is_signed_int() { + let min = parray.statistics().compute_min::(); + if possibly_negative(min) { + return None; + } + } + Some(self) } diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 09ca22f9e6..c72fc0a086 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -4,13 +4,14 @@ use std::fmt::Debug; use std::sync::Arc; use fsst::Compressor; -use vortex::array::{VarBin, VarBinView}; +use vortex::array::{VarBin, VarBinArray, VarBinView}; use vortex::encoding::EncodingRef; use vortex::{ArrayDType, ArrayDef, IntoArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST}; +use super::delta::DeltaCompressor; use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor}; use crate::SamplingCompressor; @@ -91,18 +92,45 @@ impl EncodingCompressor for FSSTCompressor { like.as_ref().and_then(|l| l.child(0)), )?; + let codes_compressor = ctx.auxiliary("codes"); + let codes_varbin = VarBinArray::try_from(fsst_array.codes())?; + let codes_varbin_dtype = codes_varbin.dtype().clone(); + + let codes_offsets_delta = DeltaCompressor.compress( + &codes_varbin.offsets(), + like.as_ref().and_then(|l| l.child(1).cloned()), + ctx.auxiliary("offsets"), + )?; + + let codes_offsets_compressed = codes_compressor.auxiliary("delta_offsets").compress( + &codes_offsets_delta.array, + like.as_ref().and_then(|l| l.child(2)), + )?; + + let codes = VarBinArray::try_new( + codes_offsets_compressed.array, + codes_varbin.bytes(), + codes_varbin_dtype, + codes_varbin.validity(), + )? + .into_array(); + Ok(CompressedArray::new( FSSTArray::try_new( fsst_array.dtype().clone(), fsst_array.symbols(), fsst_array.symbol_lengths(), - fsst_array.codes(), + codes, uncompressed_lengths.array, )? .into_array(), Some(CompressionTree::new_with_metadata( self, - vec![uncompressed_lengths.path], + vec![ + uncompressed_lengths.path, + codes_offsets_delta.path, + codes_offsets_compressed.path, + ], compressor, )), ))