Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix binary stats for arrays containing null bytes and match stats behaviour between varbin and primitive arrays #233

Merged
merged 5 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ impl PrimitiveArray {
&self.buffer
}

#[inline]
pub fn into_buffer(self) -> Buffer {
self.buffer
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
ScalarBuffer::from(self.buffer().clone())
}
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl StatsCompute for PrimitiveArray {
match_each_native_ptype!(self.ptype(), |$P| {
match self.logical_validity() {
Validity::Valid(_) => self.typed_data::<$P>().compute(stat),
Validity::Invalid(_) => all_null_stats::<$P>(),
Validity::Invalid(v) => all_null_stats::<$P>(v),
Validity::Array(a) => {
NullableValues(self.typed_data::<$P>(), flatten_bool(&a)?.buffer()).compute(stat)
}
Expand All @@ -38,15 +38,15 @@ impl<T: NativePType> StatsCompute for &[T] {
}
}

fn all_null_stats<T: NativePType>() -> VortexResult<StatsSet> {
fn all_null_stats<T: NativePType>(len: usize) -> VortexResult<StatsSet> {
Ok(StatsSet::from(HashMap::from([
(Stat::Min, Option::<T>::None.into()),
(Stat::Max, Option::<T>::None.into()),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, true.into()),
(Stat::IsStrictSorted, (len == 1).into()),
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
(Stat::RunCount, 1.into()),
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
(Stat::NullCount, 1.into()),
(Stat::NullCount, len.into()),
(
Stat::BitWidthFreq,
ListScalarVec(vec![0; size_of::<T>() * 8 + 1]).into(),
Expand Down
6 changes: 5 additions & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::{Arc, RwLock};

use linkme::distributed_slice;
use num_traits::AsPrimitive;
pub use stats::compute_stats;
pub use stats::VarBinAccumulator;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_schema::{DType, IntWidth, Nullability, Signedness};
Expand Down Expand Up @@ -173,7 +174,10 @@ impl VarBinArray {
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
let sliced = slice(self.bytes(), start, end)?;
Ok(flatten_primitive(sliced.as_ref())?.buffer().to_vec())
Ok(flatten_primitive(sliced.as_ref())?
.into_buffer()
.into_vec()
.unwrap_or_else(|buf| buf.to_vec()))
}
}

Expand Down
93 changes: 66 additions & 27 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex_schema::DType;

use crate::array::varbin::{varbin_scalar, VarBinArray};
use crate::array::Array;
use crate::scalar::Scalar;
use crate::stats::{Stat, StatsCompute, StatsSet};

impl StatsCompute for VarBinArray {
Expand All @@ -15,22 +16,52 @@ impl StatsCompute for VarBinArray {
return Ok(StatsSet::new());
}

let mut acc = VarBinAccumulator::new();
self.iter_primitive()
.map(|prim_iter| {
for next_val in prim_iter {
acc.nullable_next(next_val.map(Cow::from));
}
})
Ok(self
.iter_primitive()
.map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype()))
.unwrap_or_else(|_| {
for next_val in self.iter() {
acc.nullable_next(next_val.map(Cow::from));
}
});
Ok(acc.finish(self.len(), self.dtype()))
compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype())
}))
}
}

pub fn compute_stats(
iter: &mut dyn Iterator<Item = Option<Cow<'_, [u8]>>>,
dtype: &DType,
) -> StatsSet {
let mut leading_nulls: usize = 0;
let mut first_value: Option<Cow<'_, [u8]>> = None;
for v in &mut *iter {
if v.is_none() {
leading_nulls += 1;
} else {
first_value = v;
break;
}
}

if let Some(first_non_null) = first_value {
let mut acc = VarBinAccumulator::new(first_non_null);
iter.for_each(|n| acc.nullable_next(n));
acc.n_nulls(leading_nulls);
acc.finish(dtype)
} else {
all_null_stats(leading_nulls, dtype)
}
}

fn all_null_stats(len: usize, dtype: &DType) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, Scalar::null(dtype)),
(Stat::Max, Scalar::null(dtype)),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, (len == 1).into()),
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
(Stat::RunCount, 1.into()),
(Stat::NullCount, len.into()),
]))
}

#[derive(Debug, Default)]
pub struct VarBinAccumulator<'a> {
min: Cow<'a, [u8]>,
Expand All @@ -44,15 +75,15 @@ pub struct VarBinAccumulator<'a> {
}

impl<'a> VarBinAccumulator<'a> {
pub fn new() -> Self {
pub fn new(value: Cow<'a, [u8]>) -> Self {
Self {
min: Cow::from(&[0xFF]),
max: Cow::from(&[0x00]),
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
min: value.clone(),
max: value.clone(),
is_constant: true,
is_sorted: true,
is_strict_sorted: true,
last_value: Cow::from(&[0x00]),
runs: 0,
last_value: value,
runs: 1,
null_count: 0,
}
}
Expand All @@ -64,6 +95,10 @@ impl<'a> VarBinAccumulator<'a> {
}
}

pub fn n_nulls(&mut self, null_count: usize) {
self.null_count += null_count;
}

pub fn next(&mut self, val: Cow<'a, [u8]>) {
if val < self.min {
self.min.clone_from(&val);
Expand All @@ -84,19 +119,16 @@ impl<'a> VarBinAccumulator<'a> {
self.runs += 1;
}

pub fn finish(&self, len: usize, dtype: &DType) -> StatsSet {
let mut stats = StatsSet::from(HashMap::from([
pub fn finish(&self, dtype: &DType) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, varbin_scalar(self.min.to_vec(), dtype)),
(Stat::Max, varbin_scalar(self.max.to_vec(), dtype)),
(Stat::RunCount, self.runs.into()),
(Stat::IsSorted, self.is_sorted.into()),
(Stat::IsStrictSorted, self.is_strict_sorted.into()),
(Stat::IsConstant, self.is_constant.into()),
(Stat::NullCount, self.null_count.into()),
]));
if self.null_count < len {
stats.set(Stat::Min, varbin_scalar(self.min.to_vec(), dtype));
stats.set(Stat::Max, varbin_scalar(self.max.to_vec(), dtype));
}
stats
]))
}
}

Expand All @@ -106,6 +138,7 @@ mod test {

use crate::array::varbin::VarBinArray;
use crate::array::Array;
use crate::scalar::Utf8Scalar;
use crate::stats::Stat;

fn array(dtype: DType) -> VarBinArray {
Expand Down Expand Up @@ -206,7 +239,13 @@ mod test {
vec![Option::<&str>::None, None, None],
DType::Utf8(Nullability::Nullable),
);
assert!(array.stats().get_or_compute(&Stat::Min).is_none());
assert!(array.stats().get_or_compute(&Stat::Max).is_none());
assert_eq!(
array.stats().get_or_compute(&Stat::Min).unwrap(),
Utf8Scalar::none().into()
);
assert_eq!(
array.stats().get_or_compute(&Stat::Max).unwrap(),
Utf8Scalar::none().into()
);
}
}
8 changes: 5 additions & 3 deletions vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,15 @@ impl VarBinViewArray {
let view = self.view_at(index);
unsafe {
if view.inlined.size > 12 {
let arrow_data_buffer = flatten_primitive(&slice(
let data_buf = flatten_primitive(&slice(
self.data.get(view._ref.buffer_index as usize).unwrap(),
view._ref.offset as usize,
(view._ref.size + view._ref.offset) as usize,
)?)?;
// TODO(ngates): can we avoid returning a copy?
Ok(arrow_data_buffer.typed_data::<u8>().to_vec())
Ok(data_buf
.into_buffer()
.into_vec()
.unwrap_or_else(|buf| buf.to_vec()))
} else {
Ok(view.inlined.data[..view.inlined.size as usize].to_vec())
}
Expand Down
19 changes: 6 additions & 13 deletions vortex-array/src/array/varbinview/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;

use vortex_error::VortexResult;

use crate::array::varbin::VarBinAccumulator;
use crate::array::varbin::compute_stats;
use crate::array::varbinview::VarBinViewArray;
use crate::array::Array;
use crate::stats::{Stat, StatsCompute, StatsSet};
Expand All @@ -13,18 +13,11 @@ impl StatsCompute for VarBinViewArray {
return Ok(StatsSet::new());
}

let mut acc = VarBinAccumulator::new();
self.iter_primitive()
.map(|prim_iter| {
for next_val in prim_iter {
acc.nullable_next(next_val.map(Cow::from));
}
})
Ok(self
.iter_primitive()
.map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype()))
.unwrap_or_else(|_| {
for next_val in self.iter() {
acc.nullable_next(next_val.map(Cow::from));
}
});
Ok(acc.finish(self.len(), self.dtype()))
compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype())
}))
}
}