From c1cee33288ee3dd2ad2d57209185a8eefc10c803 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Thu, 7 Nov 2024 17:42:13 -0500 Subject: [PATCH] feat: propagate statistics through compression (#1236) fixes #1174 --- Cargo.toml | 2 +- encodings/fastlanes/src/for/compress.rs | 36 ++++ vortex-array/src/array/bool/stats.rs | 11 +- vortex-array/src/array/chunked/stats.rs | 3 +- vortex-array/src/array/constant/mod.rs | 22 +-- vortex-array/src/array/constant/stats.rs | 33 ---- vortex-array/src/array/extension/mod.rs | 69 +++++++- vortex-array/src/array/sparse/mod.rs | 25 ++- vortex-array/src/compress.rs | 27 +++ vortex-array/src/stats/mod.rs | 61 +++++++ vortex-array/src/stats/statsset.rs | 160 +++++++++++++++--- .../src/compressors/alp.rs | 4 +- .../src/compressors/alp_rd.rs | 4 +- .../src/compressors/bitpacked.rs | 3 +- .../src/compressors/chunked.rs | 4 +- .../src/compressors/constant.rs | 3 +- .../src/compressors/date_time_parts.rs | 4 +- .../src/compressors/delta.rs | 4 +- .../src/compressors/dict.rs | 3 +- .../src/compressors/for.rs | 53 +++--- .../src/compressors/fsst.rs | 4 +- .../src/compressors/mod.rs | 14 +- .../src/compressors/roaring_bool.rs | 4 +- .../src/compressors/roaring_int.rs | 3 +- .../src/compressors/runend.rs | 3 +- .../src/compressors/sparse.rs | 4 +- .../src/compressors/struct_.rs | 4 +- .../src/compressors/zigzag.rs | 3 +- vortex-sampling-compressor/src/lib.rs | 5 +- vortex-scalar/src/lib.rs | 14 +- vortex-scalar/src/pvalue.rs | 16 +- vortex-scalar/src/value.rs | 3 +- 32 files changed, 477 insertions(+), 131 deletions(-) delete mode 100644 vortex-array/src/array/constant/stats.rs diff --git a/Cargo.toml b/Cargo.toml index c8b9f69f20..5cb0fa3e4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ resolver = "2" version = "0.14.0" homepage = "https://github.com/spiraldb/vortex" repository = "https://github.com/spiraldb/vortex" -authors = ["Vortex Authors "] +authors = ["Vortex Authors "] license = "Apache-2.0" keywords = ["vortex"] include = [ diff --git a/encodings/fastlanes/src/for/compress.rs b/encodings/fastlanes/src/for/compress.rs index 1bfe165495..1d9a5d95ff 100644 --- a/encodings/fastlanes/src/for/compress.rs +++ b/encodings/fastlanes/src/for/compress.rs @@ -121,6 +121,7 @@ fn decompress_primitive( mod test { use vortex_array::compute::unary::ScalarAtFn; use vortex_array::IntoArrayVariant; + use vortex_dtype::Nullability; use super::*; @@ -133,6 +134,41 @@ mod test { assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32); } + #[test] + fn test_zeros() { + let array = PrimitiveArray::from(vec![0i32; 10_000]); + assert!(array.statistics().to_set().into_iter().next().is_none()); + + let compressed = for_compress(&array).unwrap(); + let constant = ConstantArray::try_from(compressed).unwrap(); + assert_eq!(constant.scalar_value(), &ScalarValue::from(0i32)); + } + + #[test] + fn test_nullable_zeros() { + let array = PrimitiveArray::from_nullable_vec( + vec![Some(0i32), None] + .into_iter() + .cycle() + .take(10_000) + .collect_vec(), + ); + assert!(array.statistics().to_set().into_iter().next().is_none()); + + let compressed = for_compress(&array).unwrap(); + let sparse = SparseArray::try_from(compressed).unwrap(); + assert!(sparse.statistics().to_set().into_iter().next().is_none()); + assert_eq!(sparse.fill_value(), &ScalarValue::Null); + assert_eq!( + sparse.scalar_at(0).unwrap(), + Scalar::primitive(0i32, Nullability::Nullable) + ); + assert_eq!( + sparse.scalar_at(1).unwrap(), + Scalar::null(sparse.dtype().clone()) + ); + } + #[test] fn test_decompress() { // Create a range offset by a million diff --git a/vortex-array/src/array/bool/stats.rs b/vortex-array/src/array/bool/stats.rs index 5e8a09338b..20a7bf0036 100644 --- a/vortex-array/src/array/bool/stats.rs +++ b/vortex-array/src/array/bool/stats.rs @@ -1,4 +1,5 @@ use arrow_buffer::BooleanBuffer; +use itertools::Itertools; use vortex_dtype::{DType, Nullability}; use vortex_error::VortexResult; @@ -43,7 +44,7 @@ impl ArrayStatisticsCompute for NullableBools<'_> { acc.n_nulls(first_non_null); self.0 .iter() - .zip(self.1.iter()) + .zip_eq(self.1.iter()) .skip(first_non_null + 1) .map(|(next, valid)| valid.then_some(next)) .for_each(|next| acc.nullable_next(next)); @@ -59,6 +60,10 @@ impl ArrayStatisticsCompute for NullableBools<'_> { impl ArrayStatisticsCompute for BooleanBuffer { fn compute_statistics(&self, _stat: Stat) -> VortexResult { + if self.is_empty() { + return Ok(StatsSet::new()); + } + let mut stats = BoolStatsAccumulator::new(self.value(0)); self.iter().skip(1).for_each(|next| stats.next(next)); Ok(stats.finish()) @@ -75,7 +80,7 @@ struct BoolStatsAccumulator { } impl BoolStatsAccumulator { - fn new(first_value: bool) -> Self { + pub fn new(first_value: bool) -> Self { Self { prev: first_value, is_sorted: true, @@ -86,7 +91,7 @@ impl BoolStatsAccumulator { } } - fn n_nulls(&mut self, n_nulls: usize) { + pub fn n_nulls(&mut self, n_nulls: usize) { self.null_count += n_nulls; self.len += n_nulls; } diff --git a/vortex-array/src/array/chunked/stats.rs b/vortex-array/src/array/chunked/stats.rs index a93df8e6d1..02c659ff73 100644 --- a/vortex-array/src/array/chunked/stats.rs +++ b/vortex-array/src/array/chunked/stats.rs @@ -9,12 +9,11 @@ impl ArrayStatisticsCompute for ChunkedArray { .chunks() .map(|c| { let s = c.statistics(); - // HACK(robert): This will compute all stats, but we could just compute one s.compute(stat); s.to_set() }) .reduce(|mut acc, x| { - acc.merge(&x); + acc.merge_ordered(&x); acc }) .unwrap_or_default()) diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index f267bd9aff..d7d54b05c2 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -4,16 +4,14 @@ use serde::{Deserialize, Serialize}; use vortex_error::{vortex_panic, VortexResult}; use vortex_scalar::{Scalar, ScalarValue}; -use crate::aliases::hash_map::HashMap; use crate::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::encoding::ids; -use crate::stats::{Stat, StatsSet}; +use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::{impl_encoding, ArrayDType, ArrayTrait}; mod canonical; mod compute; -mod stats; mod variants; impl_encoding!("vortex.constant", ids::CONSTANT, Constant); @@ -39,16 +37,6 @@ impl ConstantArray { S: Into, { let scalar = scalar.into(); - // TODO(aduffy): add stats for bools, ideally there should be a - // StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls. - let stats = StatsSet::from(HashMap::from([ - (Stat::Max, scalar.clone()), - (Stat::Min, scalar.clone()), - (Stat::IsConstant, true.into()), - (Stat::IsSorted, true.into()), - (Stat::RunCount, 1.into()), - ])); - Self::try_from_parts( scalar.dtype().clone(), length, @@ -56,7 +44,7 @@ impl ConstantArray { scalar_value: scalar.value().clone(), }, [].into(), - stats, + StatsSet::constant(scalar.clone(), length), ) .unwrap_or_else(|err| { vortex_panic!( @@ -93,6 +81,12 @@ impl ArrayValidity for ConstantArray { } } +impl ArrayStatisticsCompute for ConstantArray { + fn compute_statistics(&self, _stat: Stat) -> VortexResult { + Ok(StatsSet::constant(self.owned_scalar(), self.len())) + } +} + impl AcceptArrayVisitor for ConstantArray { fn accept(&self, _visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { Ok(()) diff --git a/vortex-array/src/array/constant/stats.rs b/vortex-array/src/array/constant/stats.rs deleted file mode 100644 index 8006124409..0000000000 --- a/vortex-array/src/array/constant/stats.rs +++ /dev/null @@ -1,33 +0,0 @@ -use vortex_error::VortexResult; -use vortex_scalar::ScalarValue; - -use crate::aliases::hash_map::HashMap; -use crate::array::constant::ConstantArray; -use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; - -impl ArrayStatisticsCompute for ConstantArray { - fn compute_statistics(&self, _stat: Stat) -> VortexResult { - let mut stats_map = HashMap::from([ - (Stat::IsConstant, true.into()), - (Stat::IsSorted, true.into()), - (Stat::IsStrictSorted, (self.len() <= 1).into()), - ]); - - if let ScalarValue::Bool(b) = self.scalar_value() { - let true_count = if *b { self.len() as u64 } else { 0 }; - - stats_map.insert(Stat::TrueCount, true_count.into()); - } - - stats_map.insert( - Stat::NullCount, - self.scalar_value() - .is_null() - .then_some(self.len() as u64) - .unwrap_or_default() - .into(), - ); - - Ok(StatsSet::from(stats_map)) - } -} diff --git a/vortex-array/src/array/extension/mod.rs b/vortex-array/src/array/extension/mod.rs index 9ce1deebcc..afc68879c7 100644 --- a/vortex-array/src/array/extension/mod.rs +++ b/vortex-array/src/array/extension/mod.rs @@ -1,13 +1,14 @@ use std::fmt::{Debug, Display}; use std::sync::Arc; +use enum_iterator::all; use serde::{Deserialize, Serialize}; use vortex_dtype::{DType, ExtDType, ExtID}; use vortex_error::{VortexExpect as _, VortexResult}; use crate::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::encoding::ids; -use crate::stats::ArrayStatisticsCompute; +use crate::stats::{ArrayStatistics as _, ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::variants::{ArrayVariants, ExtensionArrayTrait}; use crate::{impl_encoding, Array, ArrayDType, ArrayTrait, Canonical, IntoCanonical}; @@ -93,5 +94,69 @@ impl AcceptArrayVisitor for ExtensionArray { } impl ArrayStatisticsCompute for ExtensionArray { - // TODO(ngates): pass through stats to the underlying and cast the scalars. + fn compute_statistics(&self, stat: Stat) -> VortexResult { + let mut stats = self.storage().statistics().compute_all(&[stat])?; + + // for e.g., min/max, we want to cast to the extension array's dtype + // for other stats, we don't need to change anything + for stat in all::().filter(|s| s.has_same_dtype_as_array()) { + if let Some(value) = stats.get(stat) { + stats.set(stat, value.cast(self.dtype())?); + } + } + + Ok(stats) + } +} + +#[cfg(test)] +mod tests { + use itertools::Itertools; + use vortex_dtype::PType; + use vortex_scalar::{PValue, Scalar, ScalarValue}; + + use super::*; + use crate::array::PrimitiveArray; + use crate::validity::Validity; + use crate::IntoArray as _; + + #[test] + fn compute_statistics() { + let ext_dtype = Arc::new(ExtDType::new( + ExtID::new("timestamp".into()), + DType::from(PType::I64).into(), + None, + )); + let array = ExtensionArray::new( + ext_dtype.clone(), + PrimitiveArray::from_vec(vec![1i64, 2, 3, 4, 5], Validity::NonNullable).into_array(), + ); + + let stats = array + .statistics() + .compute_all(&[Stat::Min, Stat::Max, Stat::NullCount]) + .unwrap(); + let num_stats = stats.clone().into_iter().try_len().unwrap(); + assert!( + num_stats >= 3, + "Expected at least 3 stats, got {}", + num_stats + ); + + assert_eq!( + stats.get(Stat::Min), + Some(&Scalar::extension( + ext_dtype.clone(), + ScalarValue::Primitive(PValue::I64(1)) + )) + ); + assert_eq!( + stats.get(Stat::Max), + Some(&Scalar::extension( + ext_dtype.clone(), + ScalarValue::Primitive(PValue::I64(5)) + )) + ); + assert_eq!(stats.get(Stat::NullCount), Some(&0u64.into())); + } } diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 1fb699b995..874e8ac987 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -10,7 +10,7 @@ use crate::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::compute::unary::scalar_at; use crate::compute::{search_sorted, SearchResult, SearchSortedSide}; use crate::encoding::ids; -use crate::stats::{ArrayStatisticsCompute, StatsSet}; +use crate::stats::{ArrayStatistics, ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::variants::PrimitiveArrayTrait; use crate::{impl_encoding, Array, ArrayDType, ArrayTrait, IntoArray, IntoArrayVariant}; @@ -180,7 +180,28 @@ impl AcceptArrayVisitor for SparseArray { } } -impl ArrayStatisticsCompute for SparseArray {} +impl ArrayStatisticsCompute for SparseArray { + fn compute_statistics(&self, stat: Stat) -> VortexResult { + let mut stats = self.values().statistics().compute_all(&[stat])?; + if self.len() == self.values().len() { + return Ok(stats); + } + + let fill_len = self.len() - self.values().len(); + let fill_stats = if self.fill_value().is_null() { + StatsSet::nulls(fill_len, self.dtype()) + } else { + StatsSet::constant(self.fill_scalar(), fill_len) + }; + + if self.values().is_empty() { + return Ok(fill_stats); + } + + stats.merge_unordered(&fill_stats); + Ok(stats) + } +} impl ArrayValidity for SparseArray { fn is_valid(&self, index: usize) -> bool { diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index de1f677d61..8ca3bba9f6 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -2,6 +2,7 @@ use vortex_error::VortexResult; use crate::aliases::hash_set::HashSet; use crate::encoding::EncodingRef; +use crate::stats::{ArrayStatistics as _, PRUNING_STATS}; use crate::Array; pub trait CompressionStrategy { @@ -45,3 +46,29 @@ pub fn check_dtype_unchanged(arr: &Array, compressed: &Array) { ); } } + +// Check that compression preserved the statistics. +pub fn check_statistics_unchanged(arr: &Array, compressed: &Array) { + let _ = arr; + let _ = compressed; + #[cfg(debug_assertions)] + { + for (stat, value) in arr.statistics().to_set().into_iter() { + debug_assert_eq!( + compressed.statistics().get(stat), + Some(value.clone()), + "Compression changed {stat} from {value} to {}", + compressed + .statistics() + .get(stat) + .map(|s| s.to_string()) + .unwrap_or_else(|| "null".to_string()) + ); + } + } +} + +/// Compute pruning stats for an array. +pub fn compute_pruning_stats(arr: &Array) -> VortexResult<()> { + arr.statistics().compute_all(PRUNING_STATS).map(|_| ()) +} diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index 2a02d10324..33dd9fd9b4 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -16,7 +16,11 @@ use crate::Array; pub mod flatbuffers; mod statsset; +/// Statistics that are used for pruning files (i.e., we want to ensure they are computed when compressing/writing). +pub(crate) const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::TrueCount, Stat::NullCount]; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence)] +#[non_exhaustive] pub enum Stat { BitWidthFreq, TrailingZeroFreq, @@ -30,6 +34,28 @@ pub enum Stat { NullCount, } +impl Stat { + /// Whether the statistic is commutative (i.e., whether merging can be done independently of ordering) + /// e.g., min/max are commutative, but is_sorted is not + pub fn is_commutative(&self) -> bool { + matches!( + self, + Stat::BitWidthFreq + | Stat::TrailingZeroFreq + | Stat::IsConstant + | Stat::Max + | Stat::Min + | Stat::TrueCount + | Stat::NullCount + ) + } + + /// Whether the statistic has the same dtype as the array it's computed on + pub fn has_same_dtype_as_array(&self) -> bool { + matches!(self, Stat::Min | Stat::Max) + } +} + impl Display for Stat { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -58,6 +84,15 @@ pub trait Statistics { /// Computes the value of the stat if it's not present fn compute(&self, stat: Stat) -> Option; + + /// Compute all of the requested statistics (if not already present) + /// Returns a StatsSet with the requested stats and any additional available stats + fn compute_all(&self, stats: &[Stat]) -> VortexResult { + for stat in stats { + let _ = self.compute(*stat); + } + Ok(self.to_set()) + } } pub trait ArrayStatistics { @@ -190,6 +225,8 @@ pub fn trailing_zeros(array: &Array) -> u8 { #[cfg(test)] mod test { + use enum_iterator::all; + use crate::array::PrimitiveArray; use crate::stats::{ArrayStatistics, Stat}; @@ -201,4 +238,28 @@ mod test { assert_eq!(min, None); } + + #[test] + fn commutativity() { + assert!(Stat::BitWidthFreq.is_commutative()); + assert!(Stat::TrailingZeroFreq.is_commutative()); + assert!(Stat::IsConstant.is_commutative()); + assert!(Stat::Min.is_commutative()); + assert!(Stat::Max.is_commutative()); + assert!(Stat::TrueCount.is_commutative()); + assert!(Stat::NullCount.is_commutative()); + + assert!(!Stat::IsStrictSorted.is_commutative()); + assert!(!Stat::IsSorted.is_commutative()); + assert!(!Stat::RunCount.is_commutative()); + } + + #[test] + fn has_same_dtype_as_array() { + assert!(Stat::Min.has_same_dtype_as_array()); + assert!(Stat::Max.has_same_dtype_as_array()); + for stat in all::().filter(|s| !matches!(s, Stat::Min | Stat::Max)) { + assert!(!stat.has_same_dtype_as_array()); + } + } } diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index 6e3dadfc35..7ec083895e 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -2,7 +2,7 @@ use enum_iterator::all; use itertools::Itertools; use vortex_dtype::DType; use vortex_error::{vortex_panic, VortexError, VortexExpect}; -use vortex_scalar::Scalar; +use vortex_scalar::{Scalar, ScalarValue}; use crate::aliases::hash_map::{Entry, HashMap, IntoIter}; use crate::stats::Stat; @@ -60,6 +60,33 @@ impl StatsSet { Self::from(stats) } + pub fn constant(scalar: Scalar, length: usize) -> Self { + let mut stats = Self::new(); + stats.set(Stat::IsConstant, true.into()); + stats.set(Stat::IsSorted, true.into()); + stats.set(Stat::IsStrictSorted, (length <= 1).into()); + + let run_count = if length == 0 { 0 } else { 1 }; + stats.set(Stat::RunCount, run_count.into()); + + let null_count = if scalar.value().is_null() { + length as u64 + } else { + 0 + }; + stats.set(Stat::NullCount, null_count.into()); + + if let ScalarValue::Bool(b) = scalar.value() { + let true_count = if *b { length as u64 } else { 0 }; + stats.set(Stat::TrueCount, true_count.into()); + } + + stats.set(Stat::Min, scalar.clone()); + stats.set(Stat::Max, scalar); + + stats + } + pub fn of(stat: Stat, value: Scalar) -> Self { Self::from(HashMap::from([(stat, value)])) } @@ -85,7 +112,9 @@ impl StatsSet { self.values.insert(stat, value); } - pub fn merge(&mut self, other: &Self) -> &Self { + /// Merge stats set `other` into `self`, with the semantic assumption that `other` + /// contains stats from an array that is *appended* to the array represented by `self`. + pub fn merge_ordered(&mut self, other: &Self) -> &Self { for s in all::() { match s { Stat::BitWidthFreq => self.merge_bit_width_freq(other), @@ -104,19 +133,43 @@ impl StatsSet { self } + /// Merge stats set `other` into `self`, with no assumption on ordering. + /// Stats that are not commutative (e.g., is_sorted) are dropped from the result. + pub fn merge_unordered(&mut self, other: &Self) -> &Self { + for s in all::() { + if !s.is_commutative() { + self.values.remove(&s); + continue; + } + + match s { + Stat::BitWidthFreq => self.merge_bit_width_freq(other), + Stat::TrailingZeroFreq => self.merge_trailing_zero_freq(other), + Stat::IsConstant => self.merge_is_constant(other), + Stat::Max => self.merge_max(other), + Stat::Min => self.merge_min(other), + Stat::TrueCount => self.merge_true_count(other), + Stat::NullCount => self.merge_null_count(other), + _ => vortex_panic!("Unrecognized commutative stat {}", s), + } + } + + self + } + fn merge_min(&mut self, other: &Self) { - self.merge_ordered(Stat::Min, other, |other, own| other < own); + self.merge_scalars(Stat::Min, other, |other, own| other < own); } fn merge_max(&mut self, other: &Self) { - self.merge_ordered(Stat::Max, other, |other, own| other > own); + self.merge_scalars(Stat::Max, other, |other, own| other > own); } /// Merges stats if both are present, if either stat is not present, drops the stat from the /// result set. For example, if we know the minimums of two arrays, the minimum of their union /// is the minimum-of-minimums, but if we only know the minimum of one of the two arrays, we /// do not know the minimum of their union. - fn merge_ordered bool>(&mut self, stat: Stat, other: &Self, cmp: F) { + fn merge_scalars bool>(&mut self, stat: Stat, other: &Self, cmp: F) { if let Entry::Occupied(mut e) = self.values.entry(stat) { if let Some(ov) = other.get(stat) { if cmp(ov, e.get()) { @@ -255,70 +308,73 @@ impl IntoIterator for StatsSet { #[cfg(test)] mod test { + use enum_iterator::all; use itertools::Itertools; - use crate::stats::{Stat, StatsSet}; + use crate::array::PrimitiveArray; + use crate::stats::{ArrayStatistics as _, Stat, StatsSet}; + use crate::IntoArray as _; #[test] fn merge_into_min() { let mut first = StatsSet::of(Stat::Min, 42.into()); - first.merge(&StatsSet::new()); + first.merge_ordered(&StatsSet::new()); assert_eq!(first.get(Stat::Min), None); } #[test] fn merge_from_min() { let mut first = StatsSet::new(); - first.merge(&StatsSet::of(Stat::Min, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::Min, 42.into())); assert_eq!(first.get(Stat::Min), None); } #[test] fn merge_mins() { let mut first = StatsSet::of(Stat::Min, 37.into()); - first.merge(&StatsSet::of(Stat::Min, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::Min, 42.into())); assert_eq!(first.get(Stat::Min).cloned(), Some(37.into())); } #[test] fn merge_into_max() { let mut first = StatsSet::of(Stat::Max, 42.into()); - first.merge(&StatsSet::new()); + first.merge_ordered(&StatsSet::new()); assert_eq!(first.get(Stat::Max), None); } #[test] fn merge_from_max() { let mut first = StatsSet::new(); - first.merge(&StatsSet::of(Stat::Max, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::Max, 42.into())); assert_eq!(first.get(Stat::Max), None); } #[test] fn merge_maxes() { let mut first = StatsSet::of(Stat::Max, 37.into()); - first.merge(&StatsSet::of(Stat::Max, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::Max, 42.into())); assert_eq!(first.get(Stat::Max).cloned(), Some(42.into())); } #[test] fn merge_into_scalar() { let mut first = StatsSet::of(Stat::TrueCount, 42.into()); - first.merge(&StatsSet::new()); + first.merge_ordered(&StatsSet::new()); assert_eq!(first.get(Stat::TrueCount), None); } #[test] fn merge_from_scalar() { let mut first = StatsSet::new(); - first.merge(&StatsSet::of(Stat::TrueCount, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::TrueCount, 42.into())); assert_eq!(first.get(Stat::TrueCount), None); } #[test] fn merge_scalars() { let mut first = StatsSet::of(Stat::TrueCount, 37.into()); - first.merge(&StatsSet::of(Stat::TrueCount, 42.into())); + first.merge_ordered(&StatsSet::of(Stat::TrueCount, 42.into())); assert_eq!(first.get(Stat::TrueCount).cloned(), Some(79u64.into())); } @@ -326,7 +382,7 @@ mod test { fn merge_into_freq() { let vec = (0..255).collect_vec(); let mut first = StatsSet::of(Stat::BitWidthFreq, vec.into()); - first.merge(&StatsSet::new()); + first.merge_ordered(&StatsSet::new()); assert_eq!(first.get(Stat::BitWidthFreq), None); } @@ -334,7 +390,7 @@ mod test { fn merge_from_freq() { let vec = (0..255).collect_vec(); let mut first = StatsSet::new(); - first.merge(&StatsSet::of(Stat::BitWidthFreq, vec.into())); + first.merge_ordered(&StatsSet::of(Stat::BitWidthFreq, vec.into())); assert_eq!(first.get(Stat::BitWidthFreq), None); } @@ -343,21 +399,21 @@ mod test { let vec_in = vec![5u64; 256]; let vec_out = vec![10u64; 256]; let mut first = StatsSet::of(Stat::BitWidthFreq, vec_in.clone().into()); - first.merge(&StatsSet::of(Stat::BitWidthFreq, vec_in.into())); + first.merge_ordered(&StatsSet::of(Stat::BitWidthFreq, vec_in.into())); assert_eq!(first.get(Stat::BitWidthFreq).cloned(), Some(vec_out.into())); } #[test] fn merge_into_sortedness() { let mut first = StatsSet::of(Stat::IsStrictSorted, true.into()); - first.merge(&StatsSet::new()); + first.merge_ordered(&StatsSet::new()); assert_eq!(first.get(Stat::IsStrictSorted), None); } #[test] fn merge_from_sortedness() { let mut first = StatsSet::new(); - first.merge(&StatsSet::of(Stat::IsStrictSorted, true.into())); + first.merge_ordered(&StatsSet::of(Stat::IsStrictSorted, true.into())); assert_eq!(first.get(Stat::IsStrictSorted), None); } @@ -367,7 +423,7 @@ mod test { first.set(Stat::Max, 1.into()); let mut second = StatsSet::of(Stat::IsStrictSorted, true.into()); second.set(Stat::Min, 2.into()); - first.merge(&second); + first.merge_ordered(&second); assert_eq!(first.get(Stat::IsStrictSorted).cloned(), Some(true.into())); } @@ -377,7 +433,7 @@ mod test { first.set(Stat::Min, 1.into()); let mut second = StatsSet::of(Stat::IsStrictSorted, true.into()); second.set(Stat::Max, 2.into()); - second.merge(&first); + second.merge_ordered(&first); assert_eq!( second.get(Stat::IsStrictSorted).cloned(), Some(false.into()) @@ -390,7 +446,7 @@ mod test { first.set(Stat::Max, 1.into()); let mut second = StatsSet::of(Stat::IsStrictSorted, false.into()); second.set(Stat::Min, 2.into()); - first.merge(&second); + first.merge_ordered(&second); assert_eq!( second.get(Stat::IsStrictSorted).cloned(), Some(false.into()) @@ -402,7 +458,63 @@ mod test { let mut first = StatsSet::of(Stat::IsStrictSorted, true.into()); first.set(Stat::Max, 1.into()); let second = StatsSet::of(Stat::IsStrictSorted, true.into()); - first.merge(&second); + first.merge_ordered(&second); assert_eq!(first.get(Stat::IsStrictSorted).cloned(), None); } + + #[test] + fn merge_unordered() { + let array = PrimitiveArray::from_nullable_vec(vec![ + Some(1), + None, + Some(2), + Some(42), + Some(10000), + None, + ]) + .into_array(); + let all_stats = all::() + .filter(|s| !matches!(s, Stat::TrueCount)) + .collect_vec(); + array.statistics().compute_all(&all_stats).unwrap(); + + let stats = array.statistics().to_set(); + for stat in &all_stats { + assert!(stats.get(*stat).is_some(), "Stat {} is missing", stat); + } + + let mut merged = stats.clone(); + merged.merge_unordered(&stats); + for stat in &all_stats { + assert_eq!( + merged.get(*stat).is_some(), + stat.is_commutative(), + "Stat {} remains after merge_unordered despite not being commutative, or was removed despite being commutative", + stat + ) + } + + assert_eq!(merged.get(Stat::Min), stats.get(Stat::Min)); + assert_eq!(merged.get(Stat::Max), stats.get(Stat::Max)); + assert_eq!( + merged + .get(Stat::NullCount) + .unwrap() + .value() + .as_pvalue() + .unwrap() + .unwrap() + .as_u64() + .unwrap(), + 2 * stats + .get(Stat::NullCount) + .unwrap() + .value() + .as_pvalue() + .unwrap() + .unwrap() + .as_u64() + .unwrap() + ); + } } diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index d92c0edad4..231dea0eb1 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -4,6 +4,7 @@ use vortex_alp::{ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{Array, ArrayDef, IntoArray}; use vortex_dtype::PType; @@ -65,7 +66,7 @@ impl EncodingCompressor for ALPCompressor { }) .transpose()?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( ALPArray::try_new( compressed_encoded.array, exponents, @@ -79,6 +80,7 @@ impl EncodingCompressor for ALPCompressor { compressed_patches.and_then(|p| p.path), ], )), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/alp_rd.rs b/vortex-sampling-compressor/src/compressors/alp_rd.rs index 58e8fd728d..78c421ed93 100644 --- a/vortex-sampling-compressor/src/compressors/alp_rd.rs +++ b/vortex-sampling-compressor/src/compressors/alp_rd.rs @@ -5,6 +5,7 @@ use vortex_alp::{match_each_alp_float_ptype, ALPRDEncoding, RDEncoder as ALPRDEn use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{Array, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::PType; @@ -64,9 +65,10 @@ impl EncodingCompressor for ALPRDCompressor { }; let encoded = alp_rd_encoder.encode(&primitive).into_array(); - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( encoded, Some(CompressionTree::new_with_metadata(self, vec![], encoder)), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index e746c62d36..7d26bfd06c 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -112,7 +112,7 @@ impl EncodingCompressor for BitPackedCompressor { .flatten() .transpose()?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( BitPackedArray::try_new( packed, parray.ptype(), @@ -126,6 +126,7 @@ impl EncodingCompressor for BitPackedCompressor { self, vec![patches.and_then(|p| p.path)], )), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 9d6b81291a..aabfbd47b2 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -5,6 +5,7 @@ use log::warn; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Chunked, ChunkedArray}; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::{Array, ArrayDType, ArrayDef, IntoArray}; use vortex_error::{vortex_bail, VortexResult}; @@ -144,13 +145,14 @@ impl ChunkedCompressor { None => (None, None), }; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(), Some(CompressionTree::new_with_metadata( self, vec![child], Arc::new(ChunkedCompressorMetadata(ratio)), )), + Some(array.statistics()), )) } } diff --git a/vortex-sampling-compressor/src/compressors/constant.rs b/vortex-sampling-compressor/src/compressors/constant.rs index 9acd3b003d..4b34b53b22 100644 --- a/vortex-sampling-compressor/src/compressors/constant.rs +++ b/vortex-sampling-compressor/src/compressors/constant.rs @@ -32,9 +32,10 @@ impl EncodingCompressor for ConstantCompressor { _like: Option>, _ctx: SamplingCompressor<'a>, ) -> VortexResult> { - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( ConstantArray::new(scalar_at(array, 0)?, array.len()).into_array(), Some(CompressionTree::flat(self)), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/date_time_parts.rs b/vortex-sampling-compressor/src/compressors/date_time_parts.rs index 22db8e273e..25dbf02ce2 100644 --- a/vortex-sampling-compressor/src/compressors/date_time_parts.rs +++ b/vortex-sampling-compressor/src/compressors/date_time_parts.rs @@ -1,6 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::TemporalArray; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::{Array, ArrayDType, ArrayDef, IntoArray}; use vortex_datetime_dtype::TemporalMetadata; use vortex_datetime_parts::{ @@ -56,7 +57,7 @@ impl EncodingCompressor for DateTimePartsCompressor { let subsecond = ctx .named("subsecond") .compress(&subseconds, like.as_ref().and_then(|l| l.child(2)))?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( DateTimePartsArray::try_new( array.dtype().clone(), days.array, @@ -68,6 +69,7 @@ impl EncodingCompressor for DateTimePartsCompressor { self, vec![days.path, seconds.path, subsecond.path], )), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/delta.rs b/vortex-sampling-compressor/src/compressors/delta.rs index 730409382a..ec474f4d20 100644 --- a/vortex-sampling-compressor/src/compressors/delta.rs +++ b/vortex-sampling-compressor/src/compressors/delta.rs @@ -1,6 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -53,10 +54,11 @@ impl EncodingCompressor for DeltaCompressor { .named("deltas") .compress(deltas.as_ref(), like.as_ref().and_then(|l| l.child(1)))?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( DeltaArray::try_from_delta_compress_parts(bases.array, deltas.array, validity)? .into_array(), Some(CompressionTree::new(self, vec![bases.path, deltas.path])), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index b51d68c5e8..56503e8156 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -82,9 +82,10 @@ impl EncodingCompressor for DictCompressor { .compress(&values, like.as_ref().and_then(|l| l.child(1)))?, ); - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( DictArray::try_new(codes.array, values.array)?.into_array(), Some(CompressionTree::new(self, vec![codes.path, values.path])), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 2c0f17d782..579fb16c87 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -56,34 +56,35 @@ impl EncodingCompressor for FoRCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let for_compressed = for_compress(&array.clone().into_primitive()?)?; + let compressed = for_compress(&array.clone().into_primitive()?)?; - match FoRArray::try_from(for_compressed.clone()) { - Ok(for_array) => { - let compressed_child = ctx - .named("for") - .excluding(self) - .compress(&for_array.encoded(), like.as_ref().and_then(|l| l.child(0)))?; - Ok(CompressedArray::new( - FoRArray::try_new( - compressed_child.array, - for_array.owned_reference_scalar(), - for_array.shift(), - ) - .map(|a| a.into_array())?, - Some(CompressionTree::new(self, vec![compressed_child.path])), - )) - } - Err(_) => { - let compressed_child = ctx - .named("for") - .excluding(self) - .compress(&for_compressed, like.as_ref())?; - Ok(CompressedArray::new( + if let Ok(for_array) = FoRArray::try_from(compressed.clone()) { + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&for_array.encoded(), like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::compressed( + FoRArray::try_new( compressed_child.array, - Some(CompressionTree::new(self, vec![compressed_child.path])), - )) - } + for_array.owned_reference_scalar(), + for_array.shift(), + ) + .map(|a| a.into_array())?, + Some(CompressionTree::new(self, vec![compressed_child.path])), + Some(array.statistics()), + )) + } else { + // otherwise, we chose a different encoding (e.g., constant or sparse), try compressing that + // (will no-op for constant, may compress indices/values for sparse) + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&compressed, like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::compressed( + compressed_child.array, + Some(CompressionTree::new(self, vec![compressed_child.path])), + Some(array.statistics()), + )) } } diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index ce7e2db55f..f1f3baf7f3 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -6,6 +6,7 @@ use fsst::Compressor; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{VarBin, VarBinArray, VarBinView}; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::{ArrayDType, ArrayDef, IntoArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; @@ -116,7 +117,7 @@ impl EncodingCompressor for FSSTCompressor { )? .into_array(); - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( FSSTArray::try_new( fsst_array.dtype().clone(), fsst_array.symbols(), @@ -130,6 +131,7 @@ impl EncodingCompressor for FSSTCompressor { vec![uncompressed_lengths.path, codes_offsets_compressed.path], compressor, )), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 9a52397473..c8fc50071f 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use vortex_array::aliases::hash_set::HashSet; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::{ArrayStatistics, Statistics}; use vortex_array::Array; use vortex_error::VortexResult; @@ -185,10 +186,19 @@ pub struct CompressedArray<'a> { impl<'a> CompressedArray<'a> { pub fn uncompressed(array: Array) -> Self { - Self::new(array, None) + Self { array, path: None } } - pub fn new(array: Array, path: Option>) -> Self { + pub fn compressed( + array: Array, + path: Option>, + inherited_stats: Option<&dyn Statistics>, + ) -> Self { + if let Some(stats) = inherited_stats { + for (stat, value) in stats.to_set().into_iter() { + array.statistics().set(stat, value); + } + } Self { array, path } } diff --git a/vortex-sampling-compressor/src/compressors/roaring_bool.rs b/vortex-sampling-compressor/src/compressors/roaring_bool.rs index 64413dee84..36e24df75e 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_bool.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_bool.rs @@ -1,6 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::Bool; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::DType; use vortex_dtype::Nullability::NonNullable; @@ -46,9 +47,10 @@ impl EncodingCompressor for RoaringBoolCompressor { _like: Option>, _ctx: SamplingCompressor<'a>, ) -> VortexResult> { - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( roaring_bool_encode(array.clone().into_bool()?)?.into_array(), Some(CompressionTree::flat(self)), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/roaring_int.rs b/vortex-sampling-compressor/src/compressors/roaring_int.rs index ab7517adfe..6de3bba74c 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_int.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_int.rs @@ -48,9 +48,10 @@ impl EncodingCompressor for RoaringIntCompressor { _like: Option>, _ctx: SamplingCompressor<'a>, ) -> VortexResult> { - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( roaring_int_encode(array.clone().into_primitive()?)?.into_array(), Some(CompressionTree::flat(self)), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/runend.rs b/vortex-sampling-compressor/src/compressors/runend.rs index 9a1a1b78ea..8b1369f722 100644 --- a/vortex-sampling-compressor/src/compressors/runend.rs +++ b/vortex-sampling-compressor/src/compressors/runend.rs @@ -60,7 +60,7 @@ impl EncodingCompressor for RunEndCompressor { .excluding(self) .compress(&values.into_array(), like.as_ref().and_then(|l| l.child(1)))?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( RunEndArray::try_new( compressed_ends.array, compressed_values.array, @@ -71,6 +71,7 @@ impl EncodingCompressor for RunEndCompressor { self, vec![compressed_ends.path, compressed_values.path], )), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/sparse.rs b/vortex-sampling-compressor/src/compressors/sparse.rs index 4f1ebdec71..748ff72b4b 100644 --- a/vortex-sampling-compressor/src/compressors/sparse.rs +++ b/vortex-sampling-compressor/src/compressors/sparse.rs @@ -1,6 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Sparse, SparseArray, SparseEncoding}; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -38,7 +39,7 @@ impl EncodingCompressor for SparseCompressor { &sparse_array.values(), like.as_ref().and_then(|l| l.child(0)), )?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( SparseArray::try_new( indices.array, values.array, @@ -47,6 +48,7 @@ impl EncodingCompressor for SparseCompressor { )? .into_array(), Some(CompressionTree::new(self, vec![indices.path, values.path])), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index 114d0b8b26..ba4e0bd073 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -2,6 +2,7 @@ use itertools::Itertools; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Struct, StructArray}; use vortex_array::encoding::EncodingRef; +use vortex_array::stats::ArrayStatistics as _; use vortex_array::variants::StructArrayTrait; use vortex_array::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -47,7 +48,7 @@ impl EncodingCompressor for StructCompressor { .map(|(array, like)| ctx.compress(&array, like.as_ref())) .process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( StructArray::try_new( array.names().clone(), arrays, @@ -56,6 +57,7 @@ impl EncodingCompressor for StructCompressor { )? .into_array(), Some(CompressionTree::new(self, trees)), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/compressors/zigzag.rs b/vortex-sampling-compressor/src/compressors/zigzag.rs index 54b1b11607..91e6da53a1 100644 --- a/vortex-sampling-compressor/src/compressors/zigzag.rs +++ b/vortex-sampling-compressor/src/compressors/zigzag.rs @@ -49,9 +49,10 @@ impl EncodingCompressor for ZigZagCompressor { let encoded = zigzag_encode(PrimitiveArray::try_from(array)?)?; let compressed = ctx.compress(&encoded.encoded(), like.as_ref().and_then(|l| l.child(0)))?; - Ok(CompressedArray::new( + Ok(CompressedArray::compressed( ZigZagArray::try_new(compressed.array)?.into_array(), Some(CompressionTree::new(self, vec![compressed.path])), + Some(array.statistics()), )) } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 06c3774655..2585251acb 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -12,7 +12,8 @@ use vortex_alp::{ALPEncoding, ALPRDEncoding}; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{ChunkedArray, Constant}; use vortex_array::compress::{ - check_dtype_unchanged, check_validity_unchanged, CompressionStrategy, + check_dtype_unchanged, check_statistics_unchanged, check_validity_unchanged, + CompressionStrategy, }; use vortex_array::compute::slice; use vortex_array::encoding::EncodingRef; @@ -249,6 +250,7 @@ impl<'a> SamplingCompressor<'a> { check_validity_unchanged(arr, compressed.as_ref()); check_dtype_unchanged(arr, compressed.as_ref()); + check_statistics_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { warn!( @@ -263,6 +265,7 @@ impl<'a> SamplingCompressor<'a> { check_validity_unchanged(arr, compressed.as_ref()); check_dtype_unchanged(arr, compressed.as_ref()); + check_statistics_unchanged(arr, compressed.as_ref()); Ok(compressed) } diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index 52031495bb..af10a9bc99 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -102,7 +102,19 @@ impl Scalar { DType::Binary(_) => BinaryScalar::try_from(self).and_then(|s| s.cast(dtype)), DType::Struct(..) => StructScalar::try_from(self).and_then(|s| s.cast(dtype)), DType::List(..) => ListScalar::try_from(self).and_then(|s| s.cast(dtype)), - DType::Extension(..) => ExtScalar::try_from(self).and_then(|s| s.cast(dtype)), + DType::Extension(ext_dtype) => { + if !self.value().is_instance_of(ext_dtype.storage_dtype()) { + vortex_bail!( + "Failed to cast scalar to extension dtype with storage type {:?}, found {:?}", + ext_dtype.storage_dtype(), + self.dtype() + ); + } + Ok(Scalar::extension( + ext_dtype.clone(), + self.cast(ext_dtype.storage_dtype())?.value, + )) + } } } } diff --git a/vortex-scalar/src/pvalue.rs b/vortex-scalar/src/pvalue.rs index 997d1c5451..35687d4e13 100644 --- a/vortex-scalar/src/pvalue.rs +++ b/vortex-scalar/src/pvalue.rs @@ -47,10 +47,18 @@ macro_rules! as_primitive { paste! { #[doc = "Access PValue as `" $T "`, returning `None` if conversion is unsuccessful"] pub fn [](self) -> Option<$T> { - if let PValue::$PT(v) = self { - Some(v) - } else { - None + match self { + PValue::U8(v) => <$T as NumCast>::from(v), + PValue::U16(v) => <$T as NumCast>::from(v), + PValue::U32(v) => <$T as NumCast>::from(v), + PValue::U64(v) => <$T as NumCast>::from(v), + PValue::I8(v) => <$T as NumCast>::from(v), + PValue::I16(v) => <$T as NumCast>::from(v), + PValue::I32(v) => <$T as NumCast>::from(v), + PValue::I64(v) => <$T as NumCast>::from(v), + PValue::F16(v) => <$T as NumCast>::from(v), + PValue::F32(v) => <$T as NumCast>::from(v), + PValue::F64(v) => <$T as NumCast>::from(v), } } } diff --git a/vortex-scalar/src/value.rs b/vortex-scalar/src/value.rs index 98ef02cda6..bb149eafe0 100644 --- a/vortex-scalar/src/value.rs +++ b/vortex-scalar/src/value.rs @@ -90,7 +90,8 @@ impl ScalarValue { .zip(structdt.dtypes().to_vec()) .all(|(v, dt)| v.is_instance_of(&dt)), (ScalarValue::Null, dtype) => dtype.is_nullable(), - (..) => false, + (_, DType::Extension(ext_dtype)) => self.is_instance_of(ext_dtype.storage_dtype()), + _ => false, } }