Skip to content

Commit

Permalink
feature: split computations of stats for VarBin & VarBinView (#1457)
Browse files Browse the repository at this point in the history
... instead of computing all stats always (which is very expensive for
strings)
  • Loading branch information
lwwmanning authored Nov 23, 2024
1 parent 96a0604 commit 1bf69f7
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 118 deletions.
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::{Debug, Display};

use num_traits::AsPrimitive;
use serde::{Deserialize, Serialize};
pub use stats::compute_stats;
pub use stats::compute_varbin_statistics;
use vortex_buffer::Buffer;
use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability, PType};
use vortex_error::{
Expand Down
215 changes: 116 additions & 99 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,129 +1,148 @@
use std::cmp::Ordering;

use itertools::{Itertools, MinMaxResult};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::{vortex_panic, VortexResult};

use super::varbin_scalar;
use crate::accessor::ArrayAccessor;
use crate::array::varbin::{varbin_scalar, VarBinArray};
use crate::array::varbin::VarBinArray;
use crate::array::VarBinEncoding;
use crate::nbytes::ArrayNBytes;
use crate::compute::unary::scalar_at;
use crate::stats::{Stat, StatisticsVTable, StatsSet};
use crate::{ArrayDType, ArrayLen};
use crate::ArrayTrait;

impl StatisticsVTable<VarBinArray> for VarBinEncoding {
fn compute_statistics(&self, array: &VarBinArray, stat: Stat) -> VortexResult<StatsSet> {
if stat == Stat::UncompressedSizeInBytes {
return Ok(StatsSet::of(stat, array.nbytes()));
}

if array.is_empty() {
return Ok(StatsSet::default());
}
array.with_iterator(|iter| compute_stats(iter, array.dtype()))
compute_varbin_statistics(array, stat)
}
}

pub fn compute_stats(iter: &mut dyn Iterator<Item = Option<&[u8]>>, dtype: &DType) -> StatsSet {
let mut leading_nulls: usize = 0;
let mut first_value: Option<&[u8]> = None;
for v in &mut *iter {
if v.is_none() {
leading_nulls += 1;
} else {
first_value = v;
break;
}
pub fn compute_varbin_statistics<T: ArrayTrait + ArrayAccessor<[u8]>>(
array: &T,
stat: Stat,
) -> VortexResult<StatsSet> {
if stat == Stat::UncompressedSizeInBytes {
return Ok(StatsSet::of(stat, array.nbytes()));
}

if let Some(first_non_null) = first_value {
let mut acc = VarBinAccumulator::new(first_non_null);
acc.n_nulls(leading_nulls);
iter.for_each(|n| acc.nullable_next(n));
acc.finish(dtype)
} else {
StatsSet::nulls(leading_nulls, dtype)
if array.is_empty()
|| stat == Stat::TrueCount
|| stat == Stat::RunCount
|| stat == Stat::BitWidthFreq
|| stat == Stat::TrailingZeroFreq
{
return Ok(StatsSet::default());
}
}

pub struct VarBinAccumulator<'a> {
min: &'a [u8],
max: &'a [u8],
is_sorted: bool,
is_strict_sorted: bool,
last_value: &'a [u8],
null_count: usize,
runs: usize,
len: usize,
}
Ok(match stat {
Stat::NullCount => {
let null_count = array.logical_validity().null_count(array.len())?;
if null_count == array.len() {
return Ok(StatsSet::nulls(array.len(), array.dtype()));
}

impl<'a> VarBinAccumulator<'a> {
pub fn new(value: &'a [u8]) -> Self {
Self {
min: value,
max: value,
is_sorted: true,
is_strict_sorted: true,
last_value: value,
runs: 1,
null_count: 0,
len: 1,
let mut stats = StatsSet::of(Stat::NullCount, null_count);
if null_count > 0 {
// we know that there is at least one null, but not all nulls, so it's not constant
stats.set(Stat::IsConstant, false);
}
stats
}
}

pub fn nullable_next(&mut self, val: Option<&'a [u8]>) {
match val {
None => {
self.null_count += 1;
self.len += 1;
Stat::IsConstant => {
let is_constant = array.with_iterator(compute_is_constant)?;
if is_constant {
// we know that the array is not empty
StatsSet::constant(scalar_at(array, 0)?, array.len())
} else {
StatsSet::of(Stat::IsConstant, is_constant)
}
Some(v) => self.next(v),
}
}
Stat::Min | Stat::Max => compute_min_max(array)?,
Stat::IsSorted => {
let is_sorted = array.with_iterator(|iter| iter.flatten().is_sorted())?;
let mut stats = StatsSet::of(Stat::IsSorted, is_sorted);
if !is_sorted {
stats.set(Stat::IsStrictSorted, false);
}
stats
}
Stat::IsStrictSorted => {
let is_strict_sorted = array.with_iterator(|iter| {
iter.flatten()
.is_sorted_by(|a, b| matches!(a.cmp(b), Ordering::Less))
})?;
let mut stats = StatsSet::of(Stat::IsStrictSorted, is_strict_sorted);
if is_strict_sorted {
stats.set(Stat::IsSorted, true);
}
stats
}
Stat::UncompressedSizeInBytes
| Stat::TrueCount
| Stat::RunCount
| Stat::BitWidthFreq
| Stat::TrailingZeroFreq => {
vortex_panic!(
"Unreachable, stat {} should have already been handled",
stat
)
}
})
}

pub fn n_nulls(&mut self, null_count: usize) {
self.len += null_count;
self.null_count += null_count;
fn compute_is_constant(iter: &mut dyn Iterator<Item = Option<&[u8]>>) -> bool {
let Some(first_value) = iter.next() else {
return true; // empty array is constant
};
for v in iter {
if v != first_value {
return false;
}
}
true
}

pub fn next(&mut self, val: &'a [u8]) {
self.len += 1;
fn compute_min_max<T: ArrayTrait + ArrayAccessor<[u8]>>(array: &T) -> VortexResult<StatsSet> {
let mut stats = StatsSet::default();
if array.is_empty() {
return Ok(stats);
}

if val < self.min {
self.min.clone_from(&val);
} else if val > self.max {
self.max.clone_from(&val);
let minmax = array.with_iterator(|iter| match iter.flatten().minmax() {
MinMaxResult::NoElements => None,
MinMaxResult::OneElement(value) => {
let scalar = varbin_scalar(Buffer::from(value), array.dtype());
Some((scalar.clone(), scalar))
}

match val.cmp(self.last_value) {
Ordering::Less => {
self.is_sorted = false;
self.is_strict_sorted = false;
}
Ordering::Equal => {
self.is_strict_sorted = false;
return;
}
Ordering::Greater => {}
MinMaxResult::MinMax(min, max) => Some((
varbin_scalar(Buffer::from(min), array.dtype()),
varbin_scalar(Buffer::from(max), array.dtype()),
)),
})?;
let Some((min, max)) = minmax else {
// we know that the array is not empty, so it must be all nulls
return Ok(StatsSet::nulls(array.len(), array.dtype()));
};

if min == max {
// get (don't compute) null count if `min == max` to determine if it's constant
if array
.statistics()
.get_as::<u64>(Stat::NullCount)
.map_or(false, |null_count| null_count == 0)
{
// if there are no nulls, then the array is constant
return Ok(StatsSet::constant(min, array.len()));
}
self.last_value = val;
self.runs += 1;
} else {
stats.set(Stat::IsConstant, false);
}

pub fn finish(&self, dtype: &DType) -> StatsSet {
let is_constant =
(self.min == self.max && self.null_count == 0) || self.null_count == self.len;

StatsSet::from_iter([
(Stat::Min, varbin_scalar(Buffer::from(self.min), dtype)),
(Stat::Max, varbin_scalar(Buffer::from(self.max), dtype)),
(Stat::RunCount, self.runs.into()),
(Stat::IsSorted, self.is_sorted.into()),
(Stat::IsStrictSorted, self.is_strict_sorted.into()),
(Stat::IsConstant, is_constant.into()),
(Stat::NullCount, self.null_count.into()),
])
}
stats.set(Stat::Min, min);
stats.set(Stat::Max, max);

Ok(stats)
}

#[cfg(test)]
Expand Down Expand Up @@ -154,7 +173,6 @@ mod test {
arr.statistics().compute_max::<BufferString>().unwrap(),
BufferString::from("hello world this is a long string".to_string())
);
assert_eq!(arr.statistics().compute_run_count().unwrap(), 2);
assert!(!arr.statistics().compute_is_constant().unwrap());
assert!(arr.statistics().compute_is_sorted().unwrap());
}
Expand All @@ -170,7 +188,6 @@ mod test {
arr.statistics().compute_max::<Buffer>().unwrap().deref(),
"hello world this is a long string".as_bytes()
);
assert_eq!(arr.statistics().compute_run_count().unwrap(), 2);
assert!(!arr.statistics().compute_is_constant().unwrap());
assert!(arr.statistics().compute_is_sorted().unwrap());
}
Expand Down
15 changes: 2 additions & 13 deletions vortex-array/src/array/varbinview/stats.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
use vortex_error::VortexResult;

use crate::accessor::ArrayAccessor;
use crate::array::varbin::compute_stats;
use crate::array::varbin::compute_varbin_statistics;
use crate::array::varbinview::VarBinViewArray;
use crate::array::VarBinViewEncoding;
use crate::nbytes::ArrayNBytes;
use crate::stats::{Stat, StatisticsVTable, StatsSet};
use crate::{ArrayDType, ArrayLen};

impl StatisticsVTable<VarBinViewArray> for VarBinViewEncoding {
fn compute_statistics(&self, array: &VarBinViewArray, stat: Stat) -> VortexResult<StatsSet> {
if stat == Stat::UncompressedSizeInBytes {
return Ok(StatsSet::of(stat, array.nbytes()));
}

if array.is_empty() {
return Ok(StatsSet::default());
}

array.with_iterator(|iter| compute_stats(iter, array.dtype()))
compute_varbin_statistics(array, stat)
}
}
4 changes: 2 additions & 2 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub enum Stat {
/// Whether all values are the same (nulls are not equal to other non-null values,
/// so this is true iff all values are null or all values are the same non-null value)
IsConstant,
/// Whether the array is sorted
/// Whether the non-null values in the array are sorted (i.e., we skip nulls)
IsSorted,
/// Whether the array is strictly sorted (i.e., sorted with no duplicates)
/// Whether the non-null values in the array are strictly sorted (i.e., sorted with no duplicates)
IsStrictSorted,
/// The maximum value in the array (ignoring nulls, unless all values are null)
Max,
Expand Down
3 changes: 1 addition & 2 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ impl FromIterator<(Stat, Scalar)> for StatsSet {
impl Extend<(Stat, Scalar)> for StatsSet {
#[inline]
fn extend<T: IntoIterator<Item = (Stat, Scalar)>>(&mut self, iter: T) {
let stats = iter.into_iter().collect_vec();
stats.into_iter().for_each(|(stat, scalar)| {
iter.into_iter().for_each(|(stat, scalar)| {
self.set(stat, scalar);
});
}
Expand Down
21 changes: 21 additions & 0 deletions vortex-array/src/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,27 @@ impl LogicalValidity {
Self::Array(a) => Validity::Array(a),
}
}

pub fn null_count(&self, length: usize) -> VortexResult<usize> {
match self {
Self::AllValid(_) => Ok(0),
Self::AllInvalid(_) => Ok(length),
Self::Array(a) => {
let validity_len = a.len();
if validity_len != length {
vortex_bail!(
"Validity array length {} doesn't match array length {}",
validity_len,
length
)
}
let true_count = a.statistics().compute_true_count().ok_or_else(|| {
vortex_err!("Failed to compute true count from validity array")
})?;
Ok(length - true_count)
}
}
}
}

impl TryFrom<ArrayData> for LogicalValidity {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl EncodingCompressor for FSSTCompressor {

let codes = fsst_array.codes();
let compressed_codes = ctx
.named("fsst_codes")
.auxiliary("fsst_codes")
.excluding(self)
.including_only(&[
&VarBinCompressor,
Expand Down

0 comments on commit 1bf69f7

Please sign in to comment.