From 9ce178929e6767c9acc187349da240009490484a Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 3 Oct 2024 11:06:42 -0400 Subject: [PATCH 01/36] feat: compress chunk n+1 like chunk n --- Cargo.lock | 1 + vortex-sampling-compressor/Cargo.toml | 1 + .../src/compressors/mod.rs | 2 + .../src/compressors/struct_.rs | 62 +++++++++++++++++++ vortex-sampling-compressor/src/lib.rs | 51 ++++++--------- 5 files changed, 86 insertions(+), 31 deletions(-) create mode 100644 vortex-sampling-compressor/src/compressors/struct_.rs diff --git a/Cargo.lock b/Cargo.lock index f22e5c2493..5a16dcc172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4597,6 +4597,7 @@ dependencies = [ "arbitrary", "chrono", "fsst-rs", + "itertools 0.13.0", "lazy_static", "log", "rand", diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index 6e097f1cd6..e616de8b58 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -16,6 +16,7 @@ readme = { workspace = true } [dependencies] arbitrary = { workspace = true, optional = true } fsst-rs = { workspace = true } +itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 3b087df2dd..0b4e6aaec7 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -23,6 +23,7 @@ pub mod roaring_bool; pub mod roaring_int; pub mod runend; pub mod sparse; +pub mod struct_; pub mod zigzag; pub trait EncodingCompressor: Sync + Send + Debug { @@ -199,6 +200,7 @@ impl<'a> CompressedArray<'a> { self.path } + #[inline] pub fn into_parts(self) -> (Array, Option>) { (self.array, self.path) } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs new file mode 100644 index 0000000000..22b3ed0644 --- /dev/null +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -0,0 +1,62 @@ +use std::collections::HashSet; + +use itertools::Itertools; +use vortex::array::{Struct, StructArray}; +use vortex::encoding::EncodingRef; +use vortex::variants::StructArrayTrait; +use vortex::{Array, ArrayDef, IntoArray}; +use vortex_error::VortexResult; + +use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; +use crate::SamplingCompressor; + +#[derive(Debug)] +pub struct StructCompressor; + +impl EncodingCompressor for StructCompressor { + fn id(&self) -> &str { + Struct::ID.as_ref() + } + + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { + StructArray::try_from(array) + .ok() + .map(|_| self as &dyn EncodingCompressor) + } + + fn compress<'a>( + &'a self, + array: &Array, + like: Option>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let array = StructArray::try_from(array)?; + let compressed_validity = ctx.compress_validity(array.validity())?; + + let children_trees = match like { + Some(tree) => tree.children, + None => vec![None; array.nfields()], + }; + + let (arrays, trees) = array + .children() + .zip(children_trees) + .map(|(array, like)| ctx.compress(&array, like.as_ref())) + .process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; + + Ok(CompressedArray::new( + StructArray::try_new( + array.names().clone(), + arrays, + array.len(), + compressed_validity, + )? + .into_array(), + Some(CompressionTree::new(self, trees)), + )) + } + + fn used_encodings(&self) -> HashSet { + HashSet::from([]) + } +} diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 1ad5083ef5..1bed2ac7d8 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -3,16 +3,16 @@ use std::fmt::{Debug, Display, Formatter}; use compressors::bitpacked::BITPACK_WITH_PATCHES; use compressors::fsst::FSSTCompressor; +use compressors::struct_::StructCompressor; use lazy_static::lazy_static; use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; -use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray}; +use vortex::array::{Chunked, ChunkedArray, Constant}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; use vortex::encoding::EncodingRef; use vortex::validity::Validity; -use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoCanonical}; use vortex_error::{VortexExpect as _, VortexResult}; @@ -36,7 +36,7 @@ mod constants; mod sampling; lazy_static! { - pub static ref DEFAULT_COMPRESSORS: [CompressorRef<'static>; 11] = [ + pub static ref DEFAULT_COMPRESSORS: [CompressorRef<'static>; 13] = [ &ALPCompressor as CompressorRef, &BITPACK_WITH_PATCHES, &DateTimePartsCompressor, @@ -49,6 +49,7 @@ lazy_static! { &RoaringIntCompressor, &SparseCompressor, &ZigZagCompressor, + &StructCompressor, ]; pub static ref FASTEST_COMPRESSORS: [CompressorRef<'static>; 7] = [ @@ -249,13 +250,22 @@ impl<'a> SamplingCompressor<'a> { self.options().target_block_bytesize, self.options().target_block_size, )?; - let compressed_chunks = less_chunked - .chunks() - .map(|chunk| { - self.compress_array(&chunk) - .map(compressors::CompressedArray::into_array) - }) - .collect::>>()?; + let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); + let mut previous: Option = None; + for (index, chunk) in less_chunked.chunks().enumerate() { + if let Some(previous) = &previous { + debug!( + "using previous compression to save time: {} {}", + previous, chunk + ); + } + let (compressed_chunk, tree) = self + .named(&format!("chunk-{}", index)) + .compress(&chunk, previous.as_ref())? + .into_parts(); + previous = tree; + compressed_chunks.push(compressed_chunk); + } Ok(CompressedArray::uncompressed( ChunkedArray::try_new(compressed_chunks, chunked.dtype().clone())?.into_array(), )) @@ -264,27 +274,6 @@ impl<'a> SamplingCompressor<'a> { // Not much better we can do than constant! Ok(CompressedArray::uncompressed(arr.clone())) } - Struct::ID => { - // For struct arrays, we compress each field individually - let strct = StructArray::try_from(arr)?; - let compressed_fields = strct - .children() - .map(|field| { - self.compress_array(&field) - .map(compressors::CompressedArray::into_array) - }) - .collect::>>()?; - let validity = self.compress_validity(strct.validity())?; - Ok(CompressedArray::uncompressed( - StructArray::try_new( - strct.names().clone(), - compressed_fields, - strct.len(), - validity, - )? - .into_array(), - )) - } _ => { // Otherwise, we run sampled compression over pluggable encodings let mut rng = StdRng::seed_from_u64(self.options.rng_seed); From 336888f058c518940dc34cc51b72350a93ea2979 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 3 Oct 2024 12:19:57 -0400 Subject: [PATCH 02/36] handle chunked too --- .../src/compressors/chunked.rs | 74 +++++++++++++++++++ .../src/compressors/mod.rs | 1 + vortex-sampling-compressor/src/lib.rs | 32 +------- 3 files changed, 79 insertions(+), 28 deletions(-) create mode 100644 vortex-sampling-compressor/src/compressors/chunked.rs diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs new file mode 100644 index 0000000000..edc008a7e5 --- /dev/null +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -0,0 +1,74 @@ +use std::collections::HashSet; + +use vortex::array::{Chunked, ChunkedArray}; +use vortex::encoding::EncodingRef; +use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; +use crate::SamplingCompressor; + +#[derive(Debug)] +pub struct ChunkedCompressor; + +impl EncodingCompressor for ChunkedCompressor { + fn id(&self) -> &str { + Chunked::ID.as_ref() + } + + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { + ChunkedArray::try_from(array) + .ok() + .map(|_| self as &dyn EncodingCompressor) + } + + fn compress<'a>( + &'a self, + array: &Array, + like: Option>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let array = ChunkedArray::try_from(array)?; + + let mut previous = match like { + None => None, + Some(tree) => { + if tree.children.len() != 1 { + vortex_bail!("chunked array compression tree should have exactly one child"); + } + tree.children[0].clone() + } + }; + let mut target_ratio: Option = None; + + let less_chunked = array.rechunk( + ctx.options().target_block_bytesize, + ctx.options().target_block_size, + )?; + let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); + for (index, chunk) in less_chunked.chunks().enumerate() { + let compressed_chunk = ctx + .named(&format!("chunk-{}", index)) + .compress(&chunk, previous.as_ref())? + .into_array(); + + let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); + if ratio > 1.0 || target_ratio.map(|r| ratio > r * 1.2).unwrap_or(false) { + let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); + previous = tree; + target_ratio = Some((compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32)); + compressed_chunks.push(compressed_chunk); + } else { + compressed_chunks.push(compressed_chunk); + } + } + Ok(CompressedArray::new( + ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), + previous, + )) + } + + fn used_encodings(&self) -> HashSet { + HashSet::from([]) + } +} diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 0b4e6aaec7..5349587ce6 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -13,6 +13,7 @@ use crate::SamplingCompressor; pub mod alp; pub mod alp_rd; pub mod bitpacked; +pub mod chunked; pub mod constant; pub mod date_time_parts; pub mod delta; diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 1bed2ac7d8..aa4c37b47a 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -2,18 +2,19 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use compressors::bitpacked::BITPACK_WITH_PATCHES; +use compressors::chunked::ChunkedCompressor; use compressors::fsst::FSSTCompressor; use compressors::struct_::StructCompressor; use lazy_static::lazy_static; use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; -use vortex::array::{Chunked, ChunkedArray, Constant}; +use vortex::array::{ChunkedArray, Constant}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; use vortex::encoding::EncodingRef; use vortex::validity::Validity; -use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayDef, IntoCanonical}; use vortex_error::{VortexExpect as _, VortexResult}; use crate::compressors::alp::ALPCompressor; @@ -50,6 +51,7 @@ lazy_static! { &SparseCompressor, &ZigZagCompressor, &StructCompressor, + &ChunkedCompressor, ]; pub static ref FASTEST_COMPRESSORS: [CompressorRef<'static>; 7] = [ @@ -244,32 +246,6 @@ impl<'a> SamplingCompressor<'a> { fn compress_array(&self, arr: &Array) -> VortexResult> { match arr.encoding().id() { - Chunked::ID => { - let chunked = ChunkedArray::try_from(arr)?; - let less_chunked = chunked.rechunk( - self.options().target_block_bytesize, - self.options().target_block_size, - )?; - let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); - let mut previous: Option = None; - for (index, chunk) in less_chunked.chunks().enumerate() { - if let Some(previous) = &previous { - debug!( - "using previous compression to save time: {} {}", - previous, chunk - ); - } - let (compressed_chunk, tree) = self - .named(&format!("chunk-{}", index)) - .compress(&chunk, previous.as_ref())? - .into_parts(); - previous = tree; - compressed_chunks.push(compressed_chunk); - } - Ok(CompressedArray::uncompressed( - ChunkedArray::try_new(compressed_chunks, chunked.dtype().clone())?.into_array(), - )) - } Constant::ID => { // Not much better we can do than constant! Ok(CompressedArray::uncompressed(arr.clone())) From f73df0614cda4de4c929d4f78868369889b4a154 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 3 Oct 2024 13:11:32 -0400 Subject: [PATCH 03/36] wip --- bench-vortex/benches/compress_noci.rs | 30 +++++--- .../src/compressors/chunked.rs | 76 +++++++++++++------ vortex-sampling-compressor/src/lib.rs | 8 +- 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/bench-vortex/benches/compress_noci.rs b/bench-vortex/benches/compress_noci.rs index 85c557f03a..ac52e5932d 100644 --- a/bench-vortex/benches/compress_noci.rs +++ b/bench-vortex/benches/compress_noci.rs @@ -13,10 +13,12 @@ use bench_vortex::{fetch_taxi_data, tpch}; use criterion::{ black_box, criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion, Throughput, }; +use log::LevelFilter; use parquet::arrow::ArrowWriter; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use regex::Regex; +use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::{ChunkedArray, StructArray}; use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; use vortex_dtype::field::Field; @@ -196,7 +198,7 @@ fn benchmark_compress( } } -fn yellow_taxi_trip_data(c: &mut Criterion) { +fn _yellow_taxi_trip_data(c: &mut Criterion) { taxi_data_parquet(); let group_name = "Yellow Taxi Trip Data"; let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); @@ -212,21 +214,29 @@ fn yellow_taxi_trip_data(c: &mut Criterion) { } fn public_bi_benchmark(c: &mut Criterion) { + TermLogger::init( + LevelFilter::Error, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ) + .unwrap(); + let group_name = "Public BI"; let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); group.sample_size(10); // group.measurement_time(Duration::new(10, 0)); for dataset_handle in [ - AirlineSentiment, - Arade, - Bimbo, + // AirlineSentiment, + // Arade, + // Bimbo, CMSprovider, // Corporations, // duckdb thinks ' is a quote character but its used as an apostrophe // CityMaxCapita, // 11th column has F, M, and U but is inferred as boolean - Euro2016, - Food, - HashTags, + // Euro2016, + // Food, + // HashTags, // Hatred, // panic in fsst_compress_iter // TableroSistemaPenal, // thread 'main' panicked at bench-vortex/benches/compress_benchmark.rs:224:42: called `Result::unwrap()` on an `Err` value: expected type: {column00=utf8?, column01=i64?, column02=utf8?, column03=f64?, column04=i64?, column05=utf8?, column06=utf8?, column07=utf8?, column08=utf8?, column09=utf8?, column10=i64?, column11=i64?, column12=utf8?, column13=utf8?, column14=i64?, column15=i64?, column16=utf8?, column17=utf8?, column18=utf8?, column19=utf8?, column20=i64?, column21=utf8?, column22=utf8?, column23=utf8?, column24=utf8?, column25=i64?, column26=utf8?} but instead got {column00=utf8?, column01=i64?, column02=i64?, column03=i64?, column04=i64?, column05=utf8?, column06=i64?, column07=i64?, column08=i64?, column09=utf8?, column10=ext(vortex.date, ExtMetadata([4]))?, column11=ext(vortex.date, ExtMetadata([4]))?, column12=utf8?, column13=utf8?, column14=utf8?, column15=i64?, column16=i64?, column17=utf8?, column18=utf8?, column19=utf8?, column20=utf8?, column21=utf8?} // YaleLanguages, // 4th column looks like integer but also contains Y @@ -244,7 +254,7 @@ fn public_bi_benchmark(c: &mut Criterion) { group.finish() } -fn tpc_h_l_comment(c: &mut Criterion) { +fn _tpc_h_l_comment(c: &mut Criterion) { let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -319,8 +329,8 @@ fn tpc_h_l_comment(c: &mut Criterion) { criterion_group!( benches, - yellow_taxi_trip_data, + // yellow_taxi_trip_data, public_bi_benchmark, - tpc_h_l_comment, + // tpc_h_l_comment, ); criterion_main!(benches); diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index edc008a7e5..eddd85c97c 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -11,34 +11,13 @@ use crate::SamplingCompressor; #[derive(Debug)] pub struct ChunkedCompressor; -impl EncodingCompressor for ChunkedCompressor { - fn id(&self) -> &str { - Chunked::ID.as_ref() - } - - fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { - ChunkedArray::try_from(array) - .ok() - .map(|_| self as &dyn EncodingCompressor) - } - - fn compress<'a>( +impl ChunkedCompressor { + fn compress_chunked<'a>( &'a self, - array: &Array, - like: Option>, + array: &ChunkedArray, + compress_child_like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let array = ChunkedArray::try_from(array)?; - - let mut previous = match like { - None => None, - Some(tree) => { - if tree.children.len() != 1 { - vortex_bail!("chunked array compression tree should have exactly one child"); - } - tree.children[0].clone() - } - }; let mut target_ratio: Option = None; let less_chunked = array.rechunk( @@ -46,6 +25,7 @@ impl EncodingCompressor for ChunkedCompressor { ctx.options().target_block_size, )?; let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); + let mut previous = compress_child_like; for (index, chunk) in less_chunked.chunks().enumerate() { let compressed_chunk = ctx .named(&format!("chunk-{}", index)) @@ -67,6 +47,52 @@ impl EncodingCompressor for ChunkedCompressor { previous, )) } +} + +impl EncodingCompressor for ChunkedCompressor { + fn id(&self) -> &str { + Chunked::ID.as_ref() + } + + fn cost(&self) -> u8 { + 0 + } + + fn can_compress(&self, _array: &Array) -> Option<&dyn EncodingCompressor> { + Some(self) + // ChunkedArray::try_from(array) + // .ok() + // .map(|_| self as &dyn EncodingCompressor) + } + + fn compress<'a>( + &'a self, + array: &Array, + like: Option>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let compress_child_like = match like { + None => None, + Some(tree) => { + if tree.children.len() != 1 { + vortex_bail!("chunked array compression tree should have exactly one child"); + } + tree.children[0].clone() + } + }; + + if let Ok(chunked_array) = ChunkedArray::try_from(array) { + self.compress_chunked(&chunked_array, compress_child_like, ctx) + } else { + let (array, like) = ctx + .compress(array, compress_child_like.as_ref())? + .into_parts(); + Ok(CompressedArray::new( + array, + Some(CompressionTree::new(self, vec![like])), + )) + } + } fn used_encodings(&self) -> HashSet { HashSet::from([]) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index aa4c37b47a..b12b04f745 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -9,7 +9,7 @@ use lazy_static::lazy_static; use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; -use vortex::array::{ChunkedArray, Constant}; +use vortex::array::{Chunked, ChunkedArray, Constant}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; use vortex::encoding::EncodingRef; @@ -291,6 +291,11 @@ fn sampled_compression<'a>( }) .copied() .collect(); + + if ChunkedArray::try_from(array).is_err() { + candidates.retain(|&compression| compression.id() != Chunked::ID.as_ref()) + } + debug!("{} candidates for {}: {:?}", compressor, array, candidates); if candidates.is_empty() { @@ -366,6 +371,7 @@ fn find_best_compression<'a>( sample ); if compression.can_compress(sample).is_none() { + debug!("ignoring {} for {}", compression.id(), sample); continue; } let compressed_sample = From 33ff0aa5ef2ebb88f0f4288b36263ae8a1d8b111 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 3 Oct 2024 15:48:02 -0400 Subject: [PATCH 04/36] revert compression benchmark --- bench-vortex/benches/compress_noci.rs | 30 +++++++++------------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/bench-vortex/benches/compress_noci.rs b/bench-vortex/benches/compress_noci.rs index ac52e5932d..85c557f03a 100644 --- a/bench-vortex/benches/compress_noci.rs +++ b/bench-vortex/benches/compress_noci.rs @@ -13,12 +13,10 @@ use bench_vortex::{fetch_taxi_data, tpch}; use criterion::{ black_box, criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion, Throughput, }; -use log::LevelFilter; use parquet::arrow::ArrowWriter; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use regex::Regex; -use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::{ChunkedArray, StructArray}; use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; use vortex_dtype::field::Field; @@ -198,7 +196,7 @@ fn benchmark_compress( } } -fn _yellow_taxi_trip_data(c: &mut Criterion) { +fn yellow_taxi_trip_data(c: &mut Criterion) { taxi_data_parquet(); let group_name = "Yellow Taxi Trip Data"; let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); @@ -214,29 +212,21 @@ fn _yellow_taxi_trip_data(c: &mut Criterion) { } fn public_bi_benchmark(c: &mut Criterion) { - TermLogger::init( - LevelFilter::Error, - Config::default(), - TerminalMode::Mixed, - ColorChoice::Auto, - ) - .unwrap(); - let group_name = "Public BI"; let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); group.sample_size(10); // group.measurement_time(Duration::new(10, 0)); for dataset_handle in [ - // AirlineSentiment, - // Arade, - // Bimbo, + AirlineSentiment, + Arade, + Bimbo, CMSprovider, // Corporations, // duckdb thinks ' is a quote character but its used as an apostrophe // CityMaxCapita, // 11th column has F, M, and U but is inferred as boolean - // Euro2016, - // Food, - // HashTags, + Euro2016, + Food, + HashTags, // Hatred, // panic in fsst_compress_iter // TableroSistemaPenal, // thread 'main' panicked at bench-vortex/benches/compress_benchmark.rs:224:42: called `Result::unwrap()` on an `Err` value: expected type: {column00=utf8?, column01=i64?, column02=utf8?, column03=f64?, column04=i64?, column05=utf8?, column06=utf8?, column07=utf8?, column08=utf8?, column09=utf8?, column10=i64?, column11=i64?, column12=utf8?, column13=utf8?, column14=i64?, column15=i64?, column16=utf8?, column17=utf8?, column18=utf8?, column19=utf8?, column20=i64?, column21=utf8?, column22=utf8?, column23=utf8?, column24=utf8?, column25=i64?, column26=utf8?} but instead got {column00=utf8?, column01=i64?, column02=i64?, column03=i64?, column04=i64?, column05=utf8?, column06=i64?, column07=i64?, column08=i64?, column09=utf8?, column10=ext(vortex.date, ExtMetadata([4]))?, column11=ext(vortex.date, ExtMetadata([4]))?, column12=utf8?, column13=utf8?, column14=utf8?, column15=i64?, column16=i64?, column17=utf8?, column18=utf8?, column19=utf8?, column20=utf8?, column21=utf8?} // YaleLanguages, // 4th column looks like integer but also contains Y @@ -254,7 +244,7 @@ fn public_bi_benchmark(c: &mut Criterion) { group.finish() } -fn _tpc_h_l_comment(c: &mut Criterion) { +fn tpc_h_l_comment(c: &mut Criterion) { let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -329,8 +319,8 @@ fn _tpc_h_l_comment(c: &mut Criterion) { criterion_group!( benches, - // yellow_taxi_trip_data, + yellow_taxi_trip_data, public_bi_benchmark, - // tpc_h_l_comment, + tpc_h_l_comment, ); criterion_main!(benches); From fe2db9590eede7c06b49e7550eef527140d8ca68 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 3 Oct 2024 15:49:51 -0400 Subject: [PATCH 05/36] wip --- vortex-sampling-compressor/src/compressors/chunked.rs | 3 --- vortex-sampling-compressor/src/lib.rs | 1 - 2 files changed, 4 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index eddd85c97c..da2cd1479a 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -60,9 +60,6 @@ impl EncodingCompressor for ChunkedCompressor { fn can_compress(&self, _array: &Array) -> Option<&dyn EncodingCompressor> { Some(self) - // ChunkedArray::try_from(array) - // .ok() - // .map(|_| self as &dyn EncodingCompressor) } fn compress<'a>( diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index b12b04f745..5f51fa5976 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -371,7 +371,6 @@ fn find_best_compression<'a>( sample ); if compression.can_compress(sample).is_none() { - debug!("ignoring {} for {}", compression.id(), sample); continue; } let compressed_sample = From 3cb83f13081c10889ad4e2b35f3db46ddbe1a458 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 4 Oct 2024 11:14:51 -0400 Subject: [PATCH 06/36] wip --- .../src/compressors/bitpacked.rs | 4 +++ .../src/compressors/chunked.rs | 35 +++++++++++-------- .../src/compressors/for.rs | 5 +++ .../src/compressors/mod.rs | 15 ++++++-- vortex-sampling-compressor/src/lib.rs | 16 ++++++--- 5 files changed, 53 insertions(+), 22 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 1cdaaa608c..703254633d 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -1,5 +1,7 @@ use std::collections::HashSet; +#[allow(unused_imports)] +use log::warn; use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; @@ -58,6 +60,7 @@ impl EncodingCompressor for BitPackedCompressor { // Only supports unsigned ints if !parray.ptype().is_unsigned_int() { + // warn!("not unsigned {}", parray.ptype()); return None; } @@ -65,6 +68,7 @@ impl EncodingCompressor for BitPackedCompressor { // Check that the bit width is less than the type's bit width if bit_width == parray.ptype().bit_width() { + // warn!("bit width too big {} {}", bit_width, parray.ptype()); return None; } diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index da2cd1479a..80e02b66fb 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use log::warn; use vortex::array::{Chunked, ChunkedArray}; use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; @@ -27,18 +28,29 @@ impl ChunkedCompressor { let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); let mut previous = compress_child_like; for (index, chunk) in less_chunked.chunks().enumerate() { - let compressed_chunk = ctx + let (compressed_chunk, tree) = ctx .named(&format!("chunk-{}", index)) .compress(&chunk, previous.as_ref())? - .into_array(); + .into_parts(); let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); if ratio > 1.0 || target_ratio.map(|r| ratio > r * 1.2).unwrap_or(false) { + warn!( + "unsatisfactory ratio {} {:?} {:?}", + ratio, target_ratio, previous + ); let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); previous = tree; target_ratio = Some((compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32)); compressed_chunks.push(compressed_chunk); } else { + // warn!( + // "satisfactory ratio {} {} {:?} {:?} {:?}", + // compressed_chunk, ratio, target_ratio, previous, tree + // ); + if previous.is_none() { + previous = tree; + } compressed_chunks.push(compressed_chunk); } } @@ -58,8 +70,10 @@ impl EncodingCompressor for ChunkedCompressor { 0 } - fn can_compress(&self, _array: &Array) -> Option<&dyn EncodingCompressor> { - Some(self) + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { + ChunkedArray::try_from(array) + .ok() + .map(|_| self as &dyn EncodingCompressor) } fn compress<'a>( @@ -78,17 +92,8 @@ impl EncodingCompressor for ChunkedCompressor { } }; - if let Ok(chunked_array) = ChunkedArray::try_from(array) { - self.compress_chunked(&chunked_array, compress_child_like, ctx) - } else { - let (array, like) = ctx - .compress(array, compress_child_like.as_ref())? - .into_parts(); - Ok(CompressedArray::new( - array, - Some(CompressionTree::new(self, vec![like])), - )) - } + let chunked_array = ChunkedArray::try_from(array)?; + self.compress_chunked(&chunked_array, compress_child_like, ctx) } fn used_encodings(&self) -> HashSet { diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 6d4ccae8a8..5e07d80713 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -1,5 +1,7 @@ use std::collections::HashSet; +#[allow(unused_imports)] +use log::warn; use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::{trailing_zeros, ArrayStatistics}; @@ -30,11 +32,13 @@ impl EncodingCompressor for FoRCompressor { // Only supports integers if !parray.ptype().is_int() { + // warn!("not int {}", parray.ptype()); return None; } // For all-null, cannot encode. if parray.logical_validity().all_invalid() { + // warn!("all invalid"); return None; } @@ -43,6 +47,7 @@ impl EncodingCompressor for FoRCompressor { match_each_integer_ptype!(parray.ptype(), |$P| { let min: $P = parray.statistics().compute_min()?; if min == 0 && shift == 0 && parray.ptype().is_unsigned_int() { + // warn!("min {} or shift {}", min, shift); return None; } }); diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 5349587ce6..592f5ebb38 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -1,6 +1,6 @@ use std::any::Any; use std::collections::HashSet; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Display, Formatter, Write}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -81,7 +81,18 @@ pub trait EncoderMetadata { impl Display for CompressionTree<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Display::fmt(self.compressor.id(), f) + let mut children = String::new(); + for (index, child) in self.children.iter().enumerate() { + if index != 0 { + write!(children, ", ")?; + } + match child { + None => write!(children, "x")?, + Some(child) => write!(children, "{}", child)?, + } + } + write!(f, "{}({})", self.compressor.id(), children) + // Display::fmt(self.compressor.id(), f) } } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 5f51fa5976..2860b5d1a3 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -9,7 +9,7 @@ use lazy_static::lazy_static; use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; -use vortex::array::{Chunked, ChunkedArray, Constant}; +use vortex::array::{ChunkedArray, Constant}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; use vortex::encoding::EncodingRef; @@ -209,6 +209,9 @@ impl<'a> SamplingCompressor<'a> { arr: &Array, like: Option<&CompressionTree<'a>>, ) -> VortexResult> { + // if arr.dtype().is_struct() { + // warn!("like is {:?} {}", like, arr); + // } if arr.is_empty() { return Ok(CompressedArray::uncompressed(arr.clone())); } @@ -234,6 +237,9 @@ impl<'a> SamplingCompressor<'a> { check_validity_unchanged(arr, compressed.as_ref()); check_dtype_unchanged(arr, compressed.as_ref()); + // if arr.dtype().is_struct() { + // warn!("found compression {:?} {}", compressed.path(), arr); + // } Ok(compressed) } @@ -265,6 +271,10 @@ fn sampled_compression<'a>( compressor: &SamplingCompressor<'a>, rng: &mut StdRng, ) -> VortexResult>> { + if let Some(cc) = ChunkedCompressor.can_compress(array) { + return cc.compress(array, None, compressor.clone()).map(Some); + } + // First, we try constant compression and shortcut any sampling. if let Some(cc) = ConstantCompressor.can_compress(array) { return cc.compress(array, None, compressor.clone()).map(Some); @@ -292,10 +302,6 @@ fn sampled_compression<'a>( .copied() .collect(); - if ChunkedArray::try_from(array).is_err() { - candidates.retain(|&compression| compression.id() != Chunked::ID.as_ref()) - } - debug!("{} candidates for {}: {:?}", compressor, array, candidates); if candidates.is_empty() { From a0af4cdf404d03ba2701d93381d7fe77b3548609 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 4 Oct 2024 16:07:40 -0400 Subject: [PATCH 07/36] main function for cargo instruments --- bench-vortex/src/main.rs | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 bench-vortex/src/main.rs diff --git a/bench-vortex/src/main.rs b/bench-vortex/src/main.rs new file mode 100644 index 0000000000..69ac85a9e3 --- /dev/null +++ b/bench-vortex/src/main.rs @@ -0,0 +1,52 @@ +use std::env; +use std::time::Instant; + +use bench_vortex::data_downloads::BenchmarkDataset; +use bench_vortex::public_bi_data::BenchmarkDatasets; +use bench_vortex::public_bi_data::PBIDataset::{ + AirlineSentiment, Arade, Bimbo, CMSprovider, Euro2016, Food, HashTags, +}; +use log::{warn, LevelFilter}; +use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; +use vortex_sampling_compressor::SamplingCompressor; + +fn main() { + let mut args = env::args().skip(1); + + if args.len() > 1 { + panic!("too many arguments"); + } else if args.len() < 1 { + panic!("too few arguments"); + } + + let dataset: &str = &args.nth(0).unwrap(); + + TermLogger::init( + LevelFilter::Warn, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ) + .unwrap(); + + let compressor = SamplingCompressor::default(); + let dataset = match dataset { + "AirlineSentiment" => BenchmarkDatasets::PBI(AirlineSentiment), + "Arade" => BenchmarkDatasets::PBI(Arade), + "CMSprovider" => BenchmarkDatasets::PBI(CMSprovider), + "HashTags" => BenchmarkDatasets::PBI(HashTags), + "Bimbo" => BenchmarkDatasets::PBI(Bimbo), + "Euro2016" => BenchmarkDatasets::PBI(Euro2016), + "Food" => BenchmarkDatasets::PBI(Food), + _ => panic!("invalid dataset"), + }; + let start = Instant::now(); + let uncompressed = dataset.to_vortex_array().unwrap(); + let compressed = compressor.compress(&uncompressed, None).unwrap(); + let duration = start.elapsed(); + + warn!("Time elapsed: {:?}", duration); + warn!("Uncompressed size: {}", uncompressed.nbytes()); + warn!("Compressed size: {}", compressed.nbytes()); + warn!("Encoding: {}", compressed.into_array().tree_display()); +} From 75c03dc650ab0a1f20b37d17c9733a728b1ed55a Mon Sep 17 00:00:00 2001 From: Daniel King Date: Mon, 7 Oct 2024 09:18:46 -0400 Subject: [PATCH 08/36] add fixme to slow is_strict_sorted check --- vortex-sampling-compressor/src/compressors/dict.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 67aa92b955..10a699a645 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -35,6 +35,7 @@ impl EncodingCompressor for DictCompressor { .compute_is_strict_sorted() .unwrap_or(false) { + // FIXME(DK): this check is fairly expensive... return None; } From 7734c17cb2ff97ac29329dc86ac4af173f8bb7f7 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Mon, 7 Oct 2024 11:39:52 -0400 Subject: [PATCH 09/36] remove is_strict_sorted check --- .../src/compressors/dict.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 10a699a645..2710fd9f64 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; use vortex::encoding::EncodingRef; -use vortex::stats::ArrayStatistics; +// use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; use vortex_error::VortexResult; @@ -28,16 +28,16 @@ impl EncodingCompressor for DictCompressor { return None; }; - // No point dictionary coding if the array is unique. - // We don't have a unique stat yet, but strict-sorted implies unique. - if array - .statistics() - .compute_is_strict_sorted() - .unwrap_or(false) - { - // FIXME(DK): this check is fairly expensive... - return None; - } + // // No point dictionary coding if the array is unique. + // // We don't have a unique stat yet, but strict-sorted implies unique. + // if array + // .statistics() + // .compute_is_strict_sorted() + // .unwrap_or(false) + // { + // // FIXME(DK): this check is fairly expensive... + // return None; + // } Some(self) } From 8bcc45f4f0a2d30c585549a97bd94fd2919857bd Mon Sep 17 00:00:00 2001 From: Daniel King Date: Mon, 7 Oct 2024 11:39:52 -0400 Subject: [PATCH 10/36] feat: reduce compress time with smarter use and calculation of stats 1. In VarBinArray (the most expensive kind of array on which to compute states due to copies for min/max), implement a short-circuiting is_constant algorithm. 2. In DictCompressor, let the sampling compressor learn that an array is all unique values by observing poor compression ratios. --- vortex-array/src/array/varbin/stats.rs | 22 +++++++++++++++++++ .../src/compressors/dict.rs | 12 ---------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 9cc892cf96..545cb555b0 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -15,10 +15,32 @@ impl ArrayStatisticsCompute for VarBinArray { if self.is_empty() { return Ok(StatsSet::new()); } + if _stat == Stat::IsConstant { + return self.with_iterator(|iter| compute_is_constant(iter)); + } self.with_iterator(|iter| compute_stats(iter, self.dtype())) } } +pub fn compute_is_constant(iter: &mut dyn Iterator>) -> StatsSet { + fn is_constant_stats_set(is_constant: bool) -> StatsSet { + StatsSet::from(HashMap::from([(Stat::IsConstant, is_constant.into())])) + } + + match iter.next() { + None => {} + Some(first) => { + for v in iter { + if v != first { + return is_constant_stats_set(false); + } + } + } + }; + + is_constant_stats_set(true) +} + pub fn compute_stats(iter: &mut dyn Iterator>, dtype: &DType) -> StatsSet { let mut leading_nulls: usize = 0; let mut first_value: Option<&[u8]> = None; diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 2710fd9f64..e48c344250 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -2,7 +2,6 @@ use std::collections::HashSet; use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; use vortex::encoding::EncodingRef; -// use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; use vortex_error::VortexResult; @@ -28,17 +27,6 @@ impl EncodingCompressor for DictCompressor { return None; }; - // // No point dictionary coding if the array is unique. - // // We don't have a unique stat yet, but strict-sorted implies unique. - // if array - // .statistics() - // .compute_is_strict_sorted() - // .unwrap_or(false) - // { - // // FIXME(DK): this check is fairly expensive... - // return None; - // } - Some(self) } From 4b8266cb086585bcf9a7393ecafd046ade35b7fe Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 10:39:56 -0400 Subject: [PATCH 11/36] remove comment --- vortex-sampling-compressor/src/compressors/chunked.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 80e02b66fb..eeb8826303 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -44,10 +44,6 @@ impl ChunkedCompressor { target_ratio = Some((compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32)); compressed_chunks.push(compressed_chunk); } else { - // warn!( - // "satisfactory ratio {} {} {:?} {:?} {:?}", - // compressed_chunk, ratio, target_ratio, previous, tree - // ); if previous.is_none() { previous = tree; } From 8c4312bd38d9515ed7035685d16c4fa4a6d78eb8 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 10:40:52 -0400 Subject: [PATCH 12/36] remove comments --- vortex-sampling-compressor/src/compressors/bitpacked.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 703254633d..1cdaaa608c 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -1,7 +1,5 @@ use std::collections::HashSet; -#[allow(unused_imports)] -use log::warn; use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; @@ -60,7 +58,6 @@ impl EncodingCompressor for BitPackedCompressor { // Only supports unsigned ints if !parray.ptype().is_unsigned_int() { - // warn!("not unsigned {}", parray.ptype()); return None; } @@ -68,7 +65,6 @@ impl EncodingCompressor for BitPackedCompressor { // Check that the bit width is less than the type's bit width if bit_width == parray.ptype().bit_width() { - // warn!("bit width too big {} {}", bit_width, parray.ptype()); return None; } From c211061f18fdf90c0343f5a2ec8dddd2c3ed0bfc Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 10:42:05 -0400 Subject: [PATCH 13/36] remove comments --- vortex-sampling-compressor/src/compressors/for.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 5e07d80713..6d4ccae8a8 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -1,7 +1,5 @@ use std::collections::HashSet; -#[allow(unused_imports)] -use log::warn; use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::{trailing_zeros, ArrayStatistics}; @@ -32,13 +30,11 @@ impl EncodingCompressor for FoRCompressor { // Only supports integers if !parray.ptype().is_int() { - // warn!("not int {}", parray.ptype()); return None; } // For all-null, cannot encode. if parray.logical_validity().all_invalid() { - // warn!("all invalid"); return None; } @@ -47,7 +43,6 @@ impl EncodingCompressor for FoRCompressor { match_each_integer_ptype!(parray.ptype(), |$P| { let min: $P = parray.statistics().compute_min()?; if min == 0 && shift == 0 && parray.ptype().is_unsigned_int() { - // warn!("min {} or shift {}", min, shift); return None; } }); From e9b6da11f81d411a6b00932042bf792a112aafc6 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 11:55:54 -0400 Subject: [PATCH 14/36] struct is cost 0 --- vortex-sampling-compressor/src/compressors/struct_.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index 22b3ed0644..7f69f93ae9 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -18,6 +18,10 @@ impl EncodingCompressor for StructCompressor { Struct::ID.as_ref() } + fn cost(&self) -> u8 { + 0 + } + fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { StructArray::try_from(array) .ok() From 629ea6c6b663ea06595b6995aba880d5d2ee08b7 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 13:23:16 -0400 Subject: [PATCH 15/36] reduce fsst multiple to five --- vortex-sampling-compressor/src/compressors/fsst.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 285332672c..69e27ac3e8 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -63,7 +63,7 @@ impl EncodingCompressor for FSSTCompressor { // between 2-3x depending on the text quality. // // It's not worth running a full compression step unless the array is large enough. - if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE { + if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE { return Ok(CompressedArray::uncompressed(array.clone())); } From 6a40f1c8b6f6c8c464e770ddd4e050d2f9fb50be Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 8 Oct 2024 13:24:17 -0400 Subject: [PATCH 16/36] restore --- vortex-sampling-compressor/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 2860b5d1a3..ca90872046 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -6,7 +6,7 @@ use compressors::chunked::ChunkedCompressor; use compressors::fsst::FSSTCompressor; use compressors::struct_::StructCompressor; use lazy_static::lazy_static; -use log::{debug, info, warn}; +use log::{debug, info}; use rand::rngs::StdRng; use rand::SeedableRng; use vortex::array::{ChunkedArray, Constant}; @@ -225,7 +225,7 @@ impl<'a> SamplingCompressor<'a> { check_dtype_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - warn!( + debug!( "{} cannot find compressor to compress {} like {}", self, arr, l ); From 1a5489b6aad6d4200776042a3cc34c92d9f3d4e4 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Wed, 9 Oct 2024 10:24:46 -0400 Subject: [PATCH 17/36] slightly better From> for Scalar --- vortex-scalar/Cargo.toml | 3 +-- vortex-scalar/src/list.rs | 55 +++++++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/vortex-scalar/Cargo.toml b/vortex-scalar/Cargo.toml index 669bb2745b..2edbcb957f 100644 --- a/vortex-scalar/Cargo.toml +++ b/vortex-scalar/Cargo.toml @@ -19,7 +19,7 @@ arrow-array = { workspace = true } datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true, optional = true } flexbuffers = { workspace = true, optional = true } -half = { workspace = true, optional = true } +half = { workspace = true } itertools = { workspace = true } jiff = { workspace = true } num-traits = { workspace = true } @@ -55,7 +55,6 @@ flatbuffers = [ proto = [ "dep:prost", "dep:prost-types", - "dep:half", "vortex-dtype/proto", "vortex-proto/scalar", ] diff --git a/vortex-scalar/src/list.rs b/vortex-scalar/src/list.rs index 2ecac732ab..f742633d2c 100644 --- a/vortex-scalar/src/list.rs +++ b/vortex-scalar/src/list.rs @@ -1,10 +1,10 @@ use std::ops::Deref; use std::sync::Arc; -use itertools::Itertools; -use vortex_dtype::DType; +use half::f16; use vortex_dtype::Nullability::NonNullable; -use vortex_error::{vortex_bail, VortexError, VortexExpect as _, VortexResult}; +use vortex_dtype::{DType, NativePType, PType}; +use vortex_error::{vortex_bail, VortexError, VortexResult}; use crate::value::ScalarValue; use crate::Scalar; @@ -104,21 +104,42 @@ impl<'a, T: for<'b> TryFrom<&'b Scalar, Error = VortexError>> TryFrom<&'a Scalar } } -impl From> for Scalar -where - Self: From, -{ - fn from(value: Vec) -> Self { - let scalars = value.into_iter().map(|v| Self::from(v)).collect_vec(); - let element_dtype = scalars - .first() - .vortex_expect("Empty list, could not determine element dtype") - .dtype() - .clone(); - let dtype = DType::List(Arc::new(element_dtype), NonNullable); +macro_rules! from_native_ptype_for_scalar { + ($T:ty) => { + impl From> for Scalar { + fn from(value: Vec<$T>) -> Self { + Self { + dtype: DType::List( + DType::Primitive(<$T>::PTYPE, NonNullable).into(), + NonNullable, + ), + value: ScalarValue::List(value.into_iter().map(ScalarValue::from).collect()), + } + } + } + }; +} + +from_native_ptype_for_scalar!(u8); +from_native_ptype_for_scalar!(u16); +from_native_ptype_for_scalar!(u32); +from_native_ptype_for_scalar!(u64); +from_native_ptype_for_scalar!(i8); +from_native_ptype_for_scalar!(i16); +from_native_ptype_for_scalar!(i32); +from_native_ptype_for_scalar!(i64); +from_native_ptype_for_scalar!(f16); +from_native_ptype_for_scalar!(f32); +from_native_ptype_for_scalar!(f64); + +impl From> for Scalar { + fn from(value: Vec) -> Self { Self { - dtype, - value: ScalarValue::List(scalars.into_iter().map(|s| s.value).collect_vec().into()), + dtype: DType::List( + DType::Primitive(PType::U64, NonNullable).into(), + NonNullable, + ), + value: ScalarValue::List(value.into_iter().map(ScalarValue::from).collect()), } } } From 8abdca9ac0af74e70d271165bb17ee0e6734a4f7 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Wed, 9 Oct 2024 16:18:29 -0400 Subject: [PATCH 18/36] fixes --- bench-vortex/src/main.rs | 2 +- vortex-sampling-compressor/src/compressors/chunked.rs | 2 +- vortex-sampling-compressor/src/lib.rs | 9 ++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bench-vortex/src/main.rs b/bench-vortex/src/main.rs index 69ac85a9e3..7167bd41e1 100644 --- a/bench-vortex/src/main.rs +++ b/bench-vortex/src/main.rs @@ -22,7 +22,7 @@ fn main() { let dataset: &str = &args.nth(0).unwrap(); TermLogger::init( - LevelFilter::Warn, + LevelFilter::Info, Config::default(), TerminalMode::Mixed, ColorChoice::Auto, diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index eeb8826303..1905f36179 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -52,7 +52,7 @@ impl ChunkedCompressor { } Ok(CompressedArray::new( ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), - previous, + Some(CompressionTree::new(self, vec![previous])), )) } } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index ca90872046..78e1d02fe8 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -225,7 +225,7 @@ impl<'a> SamplingCompressor<'a> { check_dtype_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - debug!( + info!( "{} cannot find compressor to compress {} like {}", self, arr, l ); @@ -302,7 +302,7 @@ fn sampled_compression<'a>( .copied() .collect(); - debug!("{} candidates for {}: {:?}", compressor, array, candidates); + info!("{} candidates for {}: {:?}", compressor, array, candidates); if candidates.is_empty() { debug!( @@ -351,7 +351,10 @@ fn sampled_compression<'a>( find_best_compression(candidates, &sample, compressor)? .into_path() .map(|best_compressor| { - info!("Compressing array {} with {}", array, best_compressor); + info!( + "{} Compressing array {} with {}", + compressor, array, best_compressor + ); best_compressor.compress_unchecked(array, compressor) }) .transpose() From 2065033cf80d59a5171166fe546c611dc1e21ffb Mon Sep 17 00:00:00 2001 From: Daniel King Date: Wed, 9 Oct 2024 18:08:33 -0400 Subject: [PATCH 19/36] cleanups --- bench-vortex/src/main.rs | 52 ----- .../src/compressors/mod.rs | 15 +- vortex-sampling-compressor/src/lib.rs | 202 ++++++++---------- 3 files changed, 95 insertions(+), 174 deletions(-) delete mode 100644 bench-vortex/src/main.rs diff --git a/bench-vortex/src/main.rs b/bench-vortex/src/main.rs deleted file mode 100644 index 7167bd41e1..0000000000 --- a/bench-vortex/src/main.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::env; -use std::time::Instant; - -use bench_vortex::data_downloads::BenchmarkDataset; -use bench_vortex::public_bi_data::BenchmarkDatasets; -use bench_vortex::public_bi_data::PBIDataset::{ - AirlineSentiment, Arade, Bimbo, CMSprovider, Euro2016, Food, HashTags, -}; -use log::{warn, LevelFilter}; -use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; -use vortex_sampling_compressor::SamplingCompressor; - -fn main() { - let mut args = env::args().skip(1); - - if args.len() > 1 { - panic!("too many arguments"); - } else if args.len() < 1 { - panic!("too few arguments"); - } - - let dataset: &str = &args.nth(0).unwrap(); - - TermLogger::init( - LevelFilter::Info, - Config::default(), - TerminalMode::Mixed, - ColorChoice::Auto, - ) - .unwrap(); - - let compressor = SamplingCompressor::default(); - let dataset = match dataset { - "AirlineSentiment" => BenchmarkDatasets::PBI(AirlineSentiment), - "Arade" => BenchmarkDatasets::PBI(Arade), - "CMSprovider" => BenchmarkDatasets::PBI(CMSprovider), - "HashTags" => BenchmarkDatasets::PBI(HashTags), - "Bimbo" => BenchmarkDatasets::PBI(Bimbo), - "Euro2016" => BenchmarkDatasets::PBI(Euro2016), - "Food" => BenchmarkDatasets::PBI(Food), - _ => panic!("invalid dataset"), - }; - let start = Instant::now(); - let uncompressed = dataset.to_vortex_array().unwrap(); - let compressed = compressor.compress(&uncompressed, None).unwrap(); - let duration = start.elapsed(); - - warn!("Time elapsed: {:?}", duration); - warn!("Uncompressed size: {}", uncompressed.nbytes()); - warn!("Compressed size: {}", compressed.nbytes()); - warn!("Encoding: {}", compressed.into_array().tree_display()); -} diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 592f5ebb38..5349587ce6 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -1,6 +1,6 @@ use std::any::Any; use std::collections::HashSet; -use std::fmt::{Debug, Display, Formatter, Write}; +use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -81,18 +81,7 @@ pub trait EncoderMetadata { impl Display for CompressionTree<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut children = String::new(); - for (index, child) in self.children.iter().enumerate() { - if index != 0 { - write!(children, ", ")?; - } - match child { - None => write!(children, "x")?, - Some(child) => write!(children, "{}", child)?, - } - } - write!(f, "{}({})", self.compressor.id(), children) - // Display::fmt(self.compressor.id(), f) + Display::fmt(self.compressor.id(), f) } } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 78e1d02fe8..52502af80d 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -6,7 +6,7 @@ use compressors::chunked::ChunkedCompressor; use compressors::fsst::FSSTCompressor; use compressors::struct_::StructCompressor; use lazy_static::lazy_static; -use log::{debug, info}; +use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng; use vortex::array::{ChunkedArray, Constant}; @@ -50,8 +50,6 @@ lazy_static! { &RoaringIntCompressor, &SparseCompressor, &ZigZagCompressor, - &StructCompressor, - &ChunkedCompressor, ]; pub static ref FASTEST_COMPRESSORS: [CompressorRef<'static>; 7] = [ @@ -209,9 +207,6 @@ impl<'a> SamplingCompressor<'a> { arr: &Array, like: Option<&CompressionTree<'a>>, ) -> VortexResult> { - // if arr.dtype().is_struct() { - // warn!("like is {:?} {}", like, arr); - // } if arr.is_empty() { return Ok(CompressedArray::uncompressed(arr.clone())); } @@ -225,7 +220,7 @@ impl<'a> SamplingCompressor<'a> { check_dtype_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - info!( + warn!( "{} cannot find compressor to compress {} like {}", self, arr, l ); @@ -237,9 +232,6 @@ impl<'a> SamplingCompressor<'a> { check_validity_unchanged(arr, compressed.as_ref()); check_dtype_unchanged(arr, compressed.as_ref()); - // if arr.dtype().is_struct() { - // warn!("found compression {:?} {}", compressed.path(), arr); - // } Ok(compressed) } @@ -250,114 +242,106 @@ impl<'a> SamplingCompressor<'a> { } } - fn compress_array(&self, arr: &Array) -> VortexResult> { - match arr.encoding().id() { - Constant::ID => { - // Not much better we can do than constant! - Ok(CompressedArray::uncompressed(arr.clone())) - } - _ => { - // Otherwise, we run sampled compression over pluggable encodings - let mut rng = StdRng::seed_from_u64(self.options.rng_seed); - let sampled = sampled_compression(arr, self, &mut rng)?; - Ok(sampled.unwrap_or_else(|| CompressedArray::uncompressed(arr.clone()))) - } + fn compress_array(&self, array: &Array) -> VortexResult> { + let mut rng = StdRng::seed_from_u64(self.options.rng_seed); + // let sampled = sampled_compression(arr, self, &mut rng)?; + // Ok(sampled.unwrap_or_else(|| CompressedArray::uncompressed(arr.clone()))) + + if array.encoding().id() == Constant::ID { + // Not much better we can do than constant! + return Ok(CompressedArray::uncompressed(array.clone())); } - } -} -fn sampled_compression<'a>( - array: &Array, - compressor: &SamplingCompressor<'a>, - rng: &mut StdRng, -) -> VortexResult>> { - if let Some(cc) = ChunkedCompressor.can_compress(array) { - return cc.compress(array, None, compressor.clone()).map(Some); - } + if let Some(cc) = ChunkedCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - // First, we try constant compression and shortcut any sampling. - if let Some(cc) = ConstantCompressor.can_compress(array) { - return cc.compress(array, None, compressor.clone()).map(Some); - } + if let Some(cc) = StructCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - let mut candidates: Vec<&dyn EncodingCompressor> = compressor - .compressors - .iter() - .filter(|&encoding| !compressor.disabled_compressors.contains(encoding)) - .filter(|compression| { - if compression.can_compress(array).is_some() { - if compressor.depth + compression.cost() > compressor.options.max_cost { - debug!( - "{} skipping encoding {} due to depth", - compressor, - compression.id() - ); - return false; - } - true - } else { - false - } - }) - .copied() - .collect(); + if let Some(cc) = ConstantCompressor.can_compress(array) { + return cc.compress(array, None, self.clone()); + } - info!("{} candidates for {}: {:?}", compressor, array, candidates); + let (mut candidates, too_deep) = self + .compressors + .iter() + .filter(|&encoding| !self.disabled_compressors.contains(encoding)) + .filter(|&encoding| encoding.can_compress(array).is_some()) + .partition::, _>(|&encoding| { + self.depth + encoding.cost() > self.options.max_cost + }); + + if !too_deep.is_empty() { + debug!( + "{} skipping encodings due to depth/cost: {}", + self, + too_deep + .iter() + .map(|x| x.id()) + .collect::>() + .join(", ") + ); + } - if candidates.is_empty() { - debug!( - "{} no compressors for array with dtype: {} and encoding: {}", - compressor, - array.dtype(), - array.encoding().id(), - ); - return Ok(None); - } + info!("{} candidates for {}: {:?}", self, array, candidates); - // We prefer all other candidates to the array's own encoding. - // This is because we assume that the array's own encoding is the least efficient, but useful - // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded - // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the - // dictionary, but we still have a large offsets array that should be compressed. - // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin - // encodings, e.g. FSST. - if candidates.len() > 1 { - candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); - } + if candidates.is_empty() { + debug!( + "{} no compressors for array with dtype: {} and encoding: {}", + self, + array.dtype(), + array.encoding().id(), + ); + return Ok(CompressedArray::uncompressed(array.clone())); + } - if array.len() - <= (compressor.options.sample_size as usize * compressor.options.sample_count as usize) - { - // We're either already within a sample, or we're operating over a sufficiently small array. - return find_best_compression(candidates, array, compressor).map(Some); - } + // We prefer all other candidates to the array's own encoding. + // This is because we assume that the array's own encoding is the least efficient, but useful + // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded + // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the + // dictionary, but we still have a large offsets array that should be compressed. + // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin + // encodings, e.g. FSST. + if candidates.len() > 1 { + candidates.retain(|&compression| compression.id() != array.encoding().id().as_ref()); + } - // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = ChunkedArray::try_new( - stratified_slices( - array.len(), - compressor.options.sample_size, - compressor.options.sample_count, - rng, - ) - .into_iter() - .map(|(start, stop)| slice(array, start, stop)) - .collect::>>()?, - array.dtype().clone(), - )? - .into_canonical()? - .into(); - - find_best_compression(candidates, &sample, compressor)? - .into_path() - .map(|best_compressor| { - info!( - "{} Compressing array {} with {}", - compressor, array, best_compressor - ); - best_compressor.compress_unchecked(array, compressor) - }) - .transpose() + if array.len() <= (self.options.sample_size as usize * self.options.sample_count as usize) { + // We're either already within a sample, or we're operating over a sufficiently small array. + return find_best_compression(candidates, array, self); + } + + // Take a sample of the array, then ask codecs for their best compression estimate. + let sample = ChunkedArray::try_new( + stratified_slices( + array.len(), + self.options.sample_size, + self.options.sample_count, + &mut rng, + ) + .into_iter() + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .into_canonical()? + .into(); + + let best = find_best_compression(candidates, &sample, self)? + .into_path() + .map(|best_compressor| { + info!( + "{} Compressing array {} with {}", + self, array, best_compressor + ); + best_compressor.compress_unchecked(array, self) + }) + .transpose()?; + + Ok(best.unwrap_or_else(|| CompressedArray::uncompressed(array.clone()))) + } } fn find_best_compression<'a>( From 799dd6a7941170095cebfe027eff822c225a4ad0 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:33:03 -0400 Subject: [PATCH 20/36] pass along metadata --- .../src/compressors/chunked.rs | 146 +++++++++++------- .../src/compressors/mod.rs | 10 ++ 2 files changed, 104 insertions(+), 52 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 1905f36179..d0886b0d20 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -1,4 +1,6 @@ +use std::any::Any; use std::collections::HashSet; +use std::sync::Arc; use log::warn; use vortex::array::{Chunked, ChunkedArray}; @@ -6,54 +8,18 @@ use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; use vortex_error::{vortex_bail, VortexResult}; +use super::EncoderMetadata; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; #[derive(Debug)] pub struct ChunkedCompressor; -impl ChunkedCompressor { - fn compress_chunked<'a>( - &'a self, - array: &ChunkedArray, - compress_child_like: Option>, - ctx: SamplingCompressor<'a>, - ) -> VortexResult> { - let mut target_ratio: Option = None; - - let less_chunked = array.rechunk( - ctx.options().target_block_bytesize, - ctx.options().target_block_size, - )?; - let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); - let mut previous = compress_child_like; - for (index, chunk) in less_chunked.chunks().enumerate() { - let (compressed_chunk, tree) = ctx - .named(&format!("chunk-{}", index)) - .compress(&chunk, previous.as_ref())? - .into_parts(); +pub struct ChunkedCompressorMetadata(Option); - let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); - if ratio > 1.0 || target_ratio.map(|r| ratio > r * 1.2).unwrap_or(false) { - warn!( - "unsatisfactory ratio {} {:?} {:?}", - ratio, target_ratio, previous - ); - let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); - previous = tree; - target_ratio = Some((compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32)); - compressed_chunks.push(compressed_chunk); - } else { - if previous.is_none() { - previous = tree; - } - compressed_chunks.push(compressed_chunk); - } - } - Ok(CompressedArray::new( - ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), - Some(CompressionTree::new(self, vec![previous])), - )) +impl EncoderMetadata for ChunkedCompressorMetadata { + fn as_any(&self) -> &dyn Any { + self } } @@ -78,21 +44,97 @@ impl EncodingCompressor for ChunkedCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let compress_child_like = match like { - None => None, - Some(tree) => { - if tree.children.len() != 1 { - vortex_bail!("chunked array compression tree should have exactly one child"); - } - tree.children[0].clone() - } - }; - let chunked_array = ChunkedArray::try_from(array)?; - self.compress_chunked(&chunked_array, compress_child_like, ctx) + let like_and_ratio = like_into_parts(like)?; + self.compress_chunked(&chunked_array, like_and_ratio, ctx) } fn used_encodings(&self) -> HashSet { HashSet::from([]) } } + +fn like_into_parts<'a>( + tree: Option>, +) -> VortexResult, f32)>> { + match tree { + None => Ok(None), + Some(tree) => { + let (_, mut children, metadata) = tree.into_parts(); + if let Some(target_ratio) = metadata { + if let Some(ChunkedCompressorMetadata(target_ratio)) = + target_ratio.as_ref().as_any().downcast_ref() + { + if children.len() == 1 { + match (children.remove(0), target_ratio) { + (Some(child), Some(ratio)) => Ok(Some((child, *ratio))), + (None, None) => Ok(None), + (..) => { + vortex_bail!("chunked array compression tree must have a child iff it has a ratio") + } + } + } else { + vortex_bail!("chunked array compression tree must have one child") + } + } else { + vortex_bail!("chunked array compression tree must ChunkedCompressorMetadata") + } + } else { + vortex_bail!("chunked array compression tree must have metadata") + } + } + } +} + +impl ChunkedCompressor { + fn compress_chunked<'a>( + &'a self, + array: &ChunkedArray, + mut previous: Option<(CompressionTree<'a>, f32)>, + ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + let less_chunked = array.rechunk( + ctx.options().target_block_bytesize, + ctx.options().target_block_size, + )?; + let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); + for (index, chunk) in less_chunked.chunks().enumerate() { + let like = previous.as_ref().map(|(like, _)| like); + let (compressed_chunk, tree) = ctx + .named(&format!("chunk-{}", index)) + .compress(&chunk, like)? + .into_parts(); + + let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); + let exceeded_target_ratio = previous + .as_ref() + .map(|(_, target_ratio)| ratio > target_ratio * 1.2) + .unwrap_or(false); + + if ratio > 1.0 || exceeded_target_ratio { + warn!("unsatisfactory ratio {} {:?}", ratio, previous); + let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); + let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); + previous = tree.map(|tree| (tree, new_ratio)); + compressed_chunks.push(compressed_chunk); + } else { + previous = previous.or_else(|| tree.map(|tree| (tree, ratio))); + compressed_chunks.push(compressed_chunk); + } + } + + let (child, ratio) = match previous { + Some((child, ratio)) => (Some(child), Some(ratio)), + None => (None, None), + }; + + Ok(CompressedArray::new( + ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), + Some(CompressionTree::new_with_metadata( + self, + vec![child], + Arc::new(ChunkedCompressorMetadata(ratio)), + )), + )) + } +} diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 5349587ce6..d7549b80f4 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -164,6 +164,16 @@ impl<'a> CompressionTree<'a> { .filter_map(|child| child.as_ref().map(|c| c.num_descendants() + 1)) .sum::() } + + pub fn into_parts( + self, + ) -> ( + &'a dyn EncodingCompressor, + Vec>>, + Option>, + ) { + (self.compressor, self.children, self.metadata) + } } #[derive(Debug, Clone)] From bcb82d552c759e936d36dac0a673d51eb7a0f561 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:35:40 -0400 Subject: [PATCH 21/36] 14 => 12 --- vortex-sampling-compressor/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 52502af80d..aaaa3bb4c6 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -37,7 +37,7 @@ mod constants; mod sampling; lazy_static! { - pub static ref DEFAULT_COMPRESSORS: [CompressorRef<'static>; 13] = [ + pub static ref DEFAULT_COMPRESSORS: [CompressorRef<'static>; 11] = [ &ALPCompressor as CompressorRef, &BITPACK_WITH_PATCHES, &DateTimePartsCompressor, From 25a387c2ab42b537b0309dfa6e0db9a3107f5423 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:36:21 -0400 Subject: [PATCH 22/36] kill commented code --- vortex-sampling-compressor/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index aaaa3bb4c6..ccb80471b3 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -244,8 +244,6 @@ impl<'a> SamplingCompressor<'a> { fn compress_array(&self, array: &Array) -> VortexResult> { let mut rng = StdRng::seed_from_u64(self.options.rng_seed); - // let sampled = sampled_compression(arr, self, &mut rng)?; - // Ok(sampled.unwrap_or_else(|| CompressedArray::uncompressed(arr.clone()))) if array.encoding().id() == Constant::ID { // Not much better we can do than constant! From fb9b80421a9f1188243de1f0dca56f182f887ab5 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:41:48 -0400 Subject: [PATCH 23/36] revert Scalar changes --- vortex-scalar/src/list.rs | 55 ++++++++++++--------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/vortex-scalar/src/list.rs b/vortex-scalar/src/list.rs index f742633d2c..2ecac732ab 100644 --- a/vortex-scalar/src/list.rs +++ b/vortex-scalar/src/list.rs @@ -1,10 +1,10 @@ use std::ops::Deref; use std::sync::Arc; -use half::f16; +use itertools::Itertools; +use vortex_dtype::DType; use vortex_dtype::Nullability::NonNullable; -use vortex_dtype::{DType, NativePType, PType}; -use vortex_error::{vortex_bail, VortexError, VortexResult}; +use vortex_error::{vortex_bail, VortexError, VortexExpect as _, VortexResult}; use crate::value::ScalarValue; use crate::Scalar; @@ -104,42 +104,21 @@ impl<'a, T: for<'b> TryFrom<&'b Scalar, Error = VortexError>> TryFrom<&'a Scalar } } -macro_rules! from_native_ptype_for_scalar { - ($T:ty) => { - impl From> for Scalar { - fn from(value: Vec<$T>) -> Self { - Self { - dtype: DType::List( - DType::Primitive(<$T>::PTYPE, NonNullable).into(), - NonNullable, - ), - value: ScalarValue::List(value.into_iter().map(ScalarValue::from).collect()), - } - } - } - }; -} - -from_native_ptype_for_scalar!(u8); -from_native_ptype_for_scalar!(u16); -from_native_ptype_for_scalar!(u32); -from_native_ptype_for_scalar!(u64); -from_native_ptype_for_scalar!(i8); -from_native_ptype_for_scalar!(i16); -from_native_ptype_for_scalar!(i32); -from_native_ptype_for_scalar!(i64); -from_native_ptype_for_scalar!(f16); -from_native_ptype_for_scalar!(f32); -from_native_ptype_for_scalar!(f64); - -impl From> for Scalar { - fn from(value: Vec) -> Self { +impl From> for Scalar +where + Self: From, +{ + fn from(value: Vec) -> Self { + let scalars = value.into_iter().map(|v| Self::from(v)).collect_vec(); + let element_dtype = scalars + .first() + .vortex_expect("Empty list, could not determine element dtype") + .dtype() + .clone(); + let dtype = DType::List(Arc::new(element_dtype), NonNullable); Self { - dtype: DType::List( - DType::Primitive(PType::U64, NonNullable).into(), - NonNullable, - ), - value: ScalarValue::List(value.into_iter().map(ScalarValue::from).collect()), + dtype, + value: ScalarValue::List(scalars.into_iter().map(|s| s.value).collect_vec().into()), } } } From 4326be904d9d5d07825309ddb16a8c51521ce4a1 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:42:56 -0400 Subject: [PATCH 24/36] revert varbin IsConstant change and Dict compressor change --- vortex-array/src/array/varbin/stats.rs | 22 ------------------- .../src/compressors/dict.rs | 11 ++++++++++ 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 545cb555b0..9cc892cf96 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -15,32 +15,10 @@ impl ArrayStatisticsCompute for VarBinArray { if self.is_empty() { return Ok(StatsSet::new()); } - if _stat == Stat::IsConstant { - return self.with_iterator(|iter| compute_is_constant(iter)); - } self.with_iterator(|iter| compute_stats(iter, self.dtype())) } } -pub fn compute_is_constant(iter: &mut dyn Iterator>) -> StatsSet { - fn is_constant_stats_set(is_constant: bool) -> StatsSet { - StatsSet::from(HashMap::from([(Stat::IsConstant, is_constant.into())])) - } - - match iter.next() { - None => {} - Some(first) => { - for v in iter { - if v != first { - return is_constant_stats_set(false); - } - } - } - }; - - is_constant_stats_set(true) -} - pub fn compute_stats(iter: &mut dyn Iterator>, dtype: &DType) -> StatsSet { let mut leading_nulls: usize = 0; let mut first_value: Option<&[u8]> = None; diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index e48c344250..67aa92b955 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; use vortex::encoding::EncodingRef; +use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; use vortex_error::VortexResult; @@ -27,6 +28,16 @@ impl EncodingCompressor for DictCompressor { return None; }; + // No point dictionary coding if the array is unique. + // We don't have a unique stat yet, but strict-sorted implies unique. + if array + .statistics() + .compute_is_strict_sorted() + .unwrap_or(false) + { + return None; + } + Some(self) } From dc5111523213d5a98415112d455c8dfe3222e478 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:48:02 -0400 Subject: [PATCH 25/36] placate clippy --- vortex-sampling-compressor/src/compressors/chunked.rs | 6 +++--- vortex-sampling-compressor/src/compressors/mod.rs | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index d0886b0d20..8b2f11df63 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -54,9 +54,9 @@ impl EncodingCompressor for ChunkedCompressor { } } -fn like_into_parts<'a>( - tree: Option>, -) -> VortexResult, f32)>> { +fn like_into_parts( + tree: Option>, +) -> VortexResult, f32)>> { match tree { None => Ok(None), Some(tree) => { diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index d7549b80f4..132f0cbbac 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -165,6 +165,7 @@ impl<'a> CompressionTree<'a> { .sum::() } + #[allow(clippy::type_complexity)] pub fn into_parts( self, ) -> ( From 5a1be70312d5fa72ff363b1069d4c139ad26e0b5 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 10:58:09 -0400 Subject: [PATCH 26/36] fix direction on partition --- Cargo.lock | 1 + vortex-sampling-compressor/Cargo.toml | 1 + vortex-sampling-compressor/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 5a16dcc172..1a7abe36ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4601,6 +4601,7 @@ dependencies = [ "lazy_static", "log", "rand", + "simplelog", "vortex-alp", "vortex-array", "vortex-datetime-dtype", diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index e616de8b58..a46aad6d29 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -20,6 +20,7 @@ itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } +simplelog = { workspace = true } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-datetime-dtype = { workspace = true } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index ccb80471b3..c8a81850e7 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -268,7 +268,7 @@ impl<'a> SamplingCompressor<'a> { .filter(|&encoding| !self.disabled_compressors.contains(encoding)) .filter(|&encoding| encoding.can_compress(array).is_some()) .partition::, _>(|&encoding| { - self.depth + encoding.cost() > self.options.max_cost + self.depth + encoding.cost() <= self.options.max_cost }); if !too_deep.is_empty() { From fe3ce9d98cfe097f3d0fa365c2d08db7bac36736 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 11:06:36 -0400 Subject: [PATCH 27/36] no changes to vortex-scalar --- vortex-scalar/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vortex-scalar/Cargo.toml b/vortex-scalar/Cargo.toml index 2edbcb957f..669bb2745b 100644 --- a/vortex-scalar/Cargo.toml +++ b/vortex-scalar/Cargo.toml @@ -19,7 +19,7 @@ arrow-array = { workspace = true } datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true, optional = true } flexbuffers = { workspace = true, optional = true } -half = { workspace = true } +half = { workspace = true, optional = true } itertools = { workspace = true } jiff = { workspace = true } num-traits = { workspace = true } @@ -55,6 +55,7 @@ flatbuffers = [ proto = [ "dep:prost", "dep:prost-types", + "dep:half", "vortex-dtype/proto", "vortex-proto/scalar", ] From 8bde547013e9848180258b9d15537fd5a0fa91f8 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 11:07:02 -0400 Subject: [PATCH 28/36] remove simplelog --- Cargo.lock | 1 - vortex-sampling-compressor/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a7abe36ae..5a16dcc172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4601,7 +4601,6 @@ dependencies = [ "lazy_static", "log", "rand", - "simplelog", "vortex-alp", "vortex-array", "vortex-datetime-dtype", diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index a46aad6d29..e616de8b58 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -20,7 +20,6 @@ itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } -simplelog = { workspace = true } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-datetime-dtype = { workspace = true } From 9b8859e1199becedc4b7f738d451deab0cff5ba1 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 14:52:13 -0400 Subject: [PATCH 29/36] deindent like_into_parts --- .../src/compressors/chunked.rs | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 8b2f11df63..40abab20fe 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -57,32 +57,34 @@ impl EncodingCompressor for ChunkedCompressor { fn like_into_parts( tree: Option>, ) -> VortexResult, f32)>> { - match tree { - None => Ok(None), - Some(tree) => { - let (_, mut children, metadata) = tree.into_parts(); - if let Some(target_ratio) = metadata { - if let Some(ChunkedCompressorMetadata(target_ratio)) = - target_ratio.as_ref().as_any().downcast_ref() - { - if children.len() == 1 { - match (children.remove(0), target_ratio) { - (Some(child), Some(ratio)) => Ok(Some((child, *ratio))), - (None, None) => Ok(None), - (..) => { - vortex_bail!("chunked array compression tree must have a child iff it has a ratio") - } - } - } else { - vortex_bail!("chunked array compression tree must have one child") - } - } else { - vortex_bail!("chunked array compression tree must ChunkedCompressorMetadata") - } - } else { - vortex_bail!("chunked array compression tree must have metadata") - } - } + let (_, mut children, metadata) = match tree { + None => return Ok(None), + Some(tree) => tree.into_parts(), + }; + + let Some(target_ratio) = metadata else { + vortex_bail!("chunked array compression tree must have metadata") + }; + + let Some(ChunkedCompressorMetadata(target_ratio)) = + target_ratio.as_ref().as_any().downcast_ref() + else { + vortex_bail!( + "chunked array compression + tree must ChunkedCompressorMetadata" + ) + }; + + if children.len() != 1 { + vortex_bail!("chunked array compression tree must have one child") + } + + let child = children.remove(0); + + match (child, target_ratio) { + (None, None) => Ok(None), + (Some(child), Some(ratio)) => Ok(Some((child, *ratio))), + (..) => vortex_bail!("chunked array compression tree must have a child iff it has a ratio"), } } From 7274215a0f58b6de11796094098df3663ebeb553 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 15:17:25 -0400 Subject: [PATCH 30/36] fix nit and pull 1.2 into documented constant --- .../src/compressors/chunked.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 40abab20fe..eb7b0514de 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -69,10 +69,7 @@ fn like_into_parts( let Some(ChunkedCompressorMetadata(target_ratio)) = target_ratio.as_ref().as_any().downcast_ref() else { - vortex_bail!( - "chunked array compression - tree must ChunkedCompressorMetadata" - ) + vortex_bail!("chunked array compression tree must be ChunkedCompressorMetadata") }; if children.len() != 1 { @@ -89,6 +86,17 @@ fn like_into_parts( } impl ChunkedCompressor { + /// How far the compression ratio is allowed to grow from one chunk to another chunk. + /// + /// As long as a compressor compresses subsequent chunks "reasonably well" we should continue to + /// use it, which saves us the cost of searching for a good compressor. This constant quantifies + /// "reasonably well" as + /// + /// ``` + /// new_ratio <= old_ratio * ChunkedCompressor::RELATIVELY_GOOD_RATIO + /// ``` + const RELATIVELY_GOOD_RATIO: f32 = 1.2; + fn compress_chunked<'a>( &'a self, array: &ChunkedArray, @@ -110,7 +118,7 @@ impl ChunkedCompressor { let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); let exceeded_target_ratio = previous .as_ref() - .map(|(_, target_ratio)| ratio > target_ratio * 1.2) + .map(|(_, target_ratio)| ratio > target_ratio * Self::RELATIVELY_GOOD_RATIO) .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { From deb4287070c3a2449a34a10088fcd946b64cdd0c Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 15:22:08 -0400 Subject: [PATCH 31/36] field rather than const --- .../src/compressors/chunked.rs | 14 +++++++++++--- vortex-sampling-compressor/src/lib.rs | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index eb7b0514de..c0f5aebe1c 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -13,7 +13,13 @@ use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; #[derive(Debug)] -pub struct ChunkedCompressor; +pub struct ChunkedCompressor { + relatively_good_ratio: f32, +} + +pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor { + relatively_good_ratio: 1.2, +}; pub struct ChunkedCompressorMetadata(Option); @@ -95,7 +101,9 @@ impl ChunkedCompressor { /// ``` /// new_ratio <= old_ratio * ChunkedCompressor::RELATIVELY_GOOD_RATIO /// ``` - const RELATIVELY_GOOD_RATIO: f32 = 1.2; + fn relatively_good_ratio(&self) -> f32 { + self.relatively_good_ratio + } fn compress_chunked<'a>( &'a self, @@ -118,7 +126,7 @@ impl ChunkedCompressor { let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); let exceeded_target_ratio = previous .as_ref() - .map(|(_, target_ratio)| ratio > target_ratio * Self::RELATIVELY_GOOD_RATIO) + .map(|(_, target_ratio)| ratio > target_ratio * Self.relatively_good_ratio()) .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index c8a81850e7..9714689e49 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use compressors::bitpacked::BITPACK_WITH_PATCHES; -use compressors::chunked::ChunkedCompressor; +use compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; use compressors::fsst::FSSTCompressor; use compressors::struct_::StructCompressor; use lazy_static::lazy_static; @@ -250,7 +250,7 @@ impl<'a> SamplingCompressor<'a> { return Ok(CompressedArray::uncompressed(array.clone())); } - if let Some(cc) = ChunkedCompressor.can_compress(array) { + if let Some(cc) = DEFAULT_CHUNKED_COMPRESSOR.can_compress(array) { return cc.compress(array, None, self.clone()); } From da8d39c63214bbedbfd0d787a2f2f8081dc4405f Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 15:28:58 -0400 Subject: [PATCH 32/36] zip => zip_eq --- vortex-sampling-compressor/src/compressors/struct_.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index 7f69f93ae9..9225f68540 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -44,7 +44,7 @@ impl EncodingCompressor for StructCompressor { let (arrays, trees) = array .children() - .zip(children_trees) + .zip_eq(children_trees) .map(|(array, like)| ctx.compress(&array, like.as_ref())) .process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; From c11620e81a2125f92c8e59669eb46f2869ca8164 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 15:30:05 -0400 Subject: [PATCH 33/36] compile error: Self -> self --- vortex-sampling-compressor/src/compressors/chunked.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index c0f5aebe1c..3f3dfc7a07 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -126,7 +126,7 @@ impl ChunkedCompressor { let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); let exceeded_target_ratio = previous .as_ref() - .map(|(_, target_ratio)| ratio > target_ratio * Self.relatively_good_ratio()) + .map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio()) .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { From 2be867e441724ab3cd891af2069e6ee931e901b1 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 15:35:00 -0400 Subject: [PATCH 34/36] do not run fenced code block --- vortex-sampling-compressor/src/compressors/chunked.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 3f3dfc7a07..662dab84e3 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -98,8 +98,8 @@ impl ChunkedCompressor { /// use it, which saves us the cost of searching for a good compressor. This constant quantifies /// "reasonably well" as /// - /// ``` - /// new_ratio <= old_ratio * ChunkedCompressor::RELATIVELY_GOOD_RATIO + /// ```no_run + /// new_ratio <= old_ratio * self.relatively_good_ratio /// ``` fn relatively_good_ratio(&self) -> f32 { self.relatively_good_ratio From 240693b24d8b08026dc3878cadfa0320ac9db135 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 16:32:09 -0400 Subject: [PATCH 35/36] this is text not code --- vortex-sampling-compressor/src/compressors/chunked.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 662dab84e3..f752b7ef2d 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -98,7 +98,7 @@ impl ChunkedCompressor { /// use it, which saves us the cost of searching for a good compressor. This constant quantifies /// "reasonably well" as /// - /// ```no_run + /// ```text /// new_ratio <= old_ratio * self.relatively_good_ratio /// ``` fn relatively_good_ratio(&self) -> f32 { From d325968f73732e95181b60c997fdef1b2713a67b Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 10 Oct 2024 16:33:03 -0400 Subject: [PATCH 36/36] save an allocation --- vortex-sampling-compressor/src/compressors/chunked.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index f752b7ef2d..23ff730d17 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -39,9 +39,7 @@ impl EncodingCompressor for ChunkedCompressor { } fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> { - ChunkedArray::try_from(array) - .ok() - .map(|_| self as &dyn EncodingCompressor) + array.is_encoding(Chunked::ID).then_some(self) } fn compress<'a>(