Skip to content

Commit

Permalink
implement FilterFn for ChunkedArray (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y authored Sep 15, 2024
1 parent 05fe6ce commit 03e03ce
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl BitPackedSearch {
offset: array.offset(),
length: array.len(),
bit_width: array.bit_width(),
min_patch_offset: array.patches().map(|p| {
min_patch_offset: array.patches().and_then(|p| {
SparseArray::try_from(p)
.vortex_expect("Only sparse patches are supported")
.min_index()
Expand Down
69 changes: 64 additions & 5 deletions encodings/fastlanes/src/bitpacking/compute/slice.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::cmp::max;

use vortex::array::SparseArray;
use vortex::compute::{slice, SliceFn};
use vortex::{Array, IntoArray};
use vortex_error::VortexResult;
use vortex_error::{VortexExpect, VortexResult};

use crate::BitPackedArray;

Expand All @@ -19,9 +20,19 @@ impl SliceFn for BitPackedArray {
Self::try_new_from_offset(
slice(&self.packed(), encoded_start, encoded_stop)?,
self.validity().slice(start, stop)?,
self.patches().map(|p| slice(&p, start, stop)).transpose()?,
self.patches()
.map(|p| slice(&p, start, stop))
.transpose()?
.filter(|a| {
// If the sliced patch_indices is empty, we should not propagate the patches.
// There may be other logic that depends on Some(patches) indicating non-empty.
!SparseArray::try_from(a)
.vortex_expect("BitPackedArray must encode patches as SparseArray")
.indices()
.is_empty()
}),
self.bit_width(),
offset_stop - offset_start,
stop - start,
offset,
)
.map(|a| a.into_array())
Expand All @@ -30,9 +41,10 @@ impl SliceFn for BitPackedArray {

#[cfg(test)]
mod test {
use vortex::array::PrimitiveArray;
use vortex::compute::slice;
use itertools::Itertools;
use vortex::array::{PrimitiveArray, SparseArray};
use vortex::compute::unary::scalar_at;
use vortex::compute::{slice, take};
use vortex::IntoArray;

use crate::BitPackedArray;
Expand Down Expand Up @@ -140,4 +152,51 @@ mod test {
assert_eq!(doubly_sliced.offset(), 639);
assert_eq!(doubly_sliced.len(), 784);
}

#[test]
fn slice_empty_patches() {
// We create an array that has 1 element that does not fit in the 6-bit range.
let array =
BitPackedArray::encode(PrimitiveArray::from((0u32..=64).collect_vec()).array(), 6)
.unwrap();

assert!(array.patches().is_some());

let patch_indices = SparseArray::try_from(array.patches().unwrap())
.unwrap()
.indices();
assert_eq!(patch_indices.len(), 1);

// Slicing drops the empty patches array.
let sliced = slice(&array.into_array(), 0, 64).unwrap();
let sliced_bp = BitPackedArray::try_from(sliced).unwrap();
assert!(sliced_bp.patches().is_none());
}

#[test]
fn take_after_slice() {
// Check that our take implementation respects the offsets applied after slicing.

let array = BitPackedArray::encode(
PrimitiveArray::from((63u32..).take(3072).collect_vec()).array(),
6,
)
.unwrap();

// Slice the array.
// The resulting array will still have 3 1024-element chunks.
let sliced = slice(array.array(), 922, 2061).unwrap();

// Take one element from each chunk.
// Chunk 1: physical indices 922-1023, logical indices 0-101
// Chunk 2: physical indices 1024-2047, logical indices 102-1125
// Chunk 3: physical indices 2048-2060, logical indices 1126-1138

let taken = take(
&sliced,
PrimitiveArray::from(vec![101i64, 1125i64, 1138i64]).array(),
)
.unwrap();
assert_eq!(taken.len(), 3);
}
}
15 changes: 10 additions & 5 deletions encodings/fastlanes/src/bitpacking/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ fn take_primitive<T: NativePType + BitPacking>(

if !prefer_bulk_patch {
if let Some(ref patches) = patches {
let patches_slice = slice(
patches.array(),
chunk * 1024,
min((chunk + 1) * 1024, patches.len()),
)?;
// NOTE: we need to subtract the array offset before slicing into the patches.
// This is because BitPackedArray is rounded to block boundaries, but patches
// is sliced exactly.
let patches_start = if chunk == 0 {
0
} else {
(chunk * 1024) - array.offset()
};
let patches_end = min((chunk + 1) * 1024 - array.offset(), patches.len());
let patches_slice = slice(patches.array(), patches_start, patches_end)?;
let patches_slice = SparseArray::try_from(patches_slice)?;
let offsets = PrimitiveArray::from(offsets);
do_patch_for_take_primitive(&patches_slice, &offsets, &mut output)?;
Expand Down
13 changes: 11 additions & 2 deletions encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ::serde::{Deserialize, Serialize};
pub use compress::*;
use vortex::array::{Primitive, PrimitiveArray};
use vortex::array::{Primitive, PrimitiveArray, SparseArray};
use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
Expand Down Expand Up @@ -80,6 +80,10 @@ impl BitPackedArray {
parray.len()
)
}

if SparseArray::try_from(parray)?.indices().is_empty() {
vortex_bail!("cannot construct BitPackedArray using patches without indices");
}
}

let metadata = BitPackedMetadata {
Expand Down Expand Up @@ -123,9 +127,14 @@ impl BitPackedArray {
self.metadata().bit_width
}

/// Access the patches array.
///
/// If present, patches MUST be a `SparseArray` with equal-length to this array, and whose
/// indices indicate the locations of patches. The indices must have non-zero length.
#[inline]
pub fn patches(&self) -> Option<Array> {
(self.metadata().has_patches)
self.metadata()
.has_patches
.then(|| {
self.array().child(
1,
Expand Down
1 change: 0 additions & 1 deletion encodings/runend/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ impl TakeFn for RunEndArray {
}
self.find_physical_index(idx).map(|loc| loc as u64)
})

.collect::<VortexResult<Vec<_>>>()?
});
let physical_indices_array = PrimitiveArray::from(physical_indices).into_array();
Expand Down
13 changes: 13 additions & 0 deletions encodings/runend/src/runend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ impl RunEndArray {
Self::try_from_parts(dtype, length, metadata, children.into(), StatsSet::new())
}

/// Convert the given logical index to an index into the `values` array
pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
search_sorted(&self.ends(), index + self.offset(), SearchSortedSide::Right)
.map(|s| s.to_ends_index(self.ends().len()))
}

/// Run the array through run-end encoding.
pub fn encode(array: Array) -> VortexResult<Self> {
if array.encoding().id() == Primitive::ID {
let primitive = PrimitiveArray::try_from(array)?;
Expand All @@ -99,18 +101,29 @@ impl RunEndArray {
.to_validity(self.array().child(2, &Validity::DTYPE, self.len()))
}

/// The offset that the `ends` is relative to.
///
/// This is generally zero for a "new" array, and non-zero after a slicing operation.
#[inline]
pub fn offset(&self) -> usize {
self.metadata().offset
}

/// The encoded "ends" of value runs.
///
/// The `i`-th element indicates that there is a run of the same value, beginning
/// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
#[inline]
pub fn ends(&self) -> Array {
self.array()
.child(0, &self.metadata().ends_dtype, self.metadata().num_runs)
.vortex_expect("RunEndArray is missing its run ends")
}

/// The scalar values.
///
/// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
/// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
#[inline]
pub fn values(&self) -> Array {
self.array()
Expand Down
4 changes: 0 additions & 4 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ pub(crate) fn try_canonicalize_chunks(
validity: Validity,
dtype: &DType,
) -> VortexResult<Canonical> {
if chunks.is_empty() {
vortex_bail!(InvalidArgument: "chunks must be non-empty")
}

let mismatched = chunks
.iter()
.filter(|chunk| !chunk.dtype().eq(dtype))
Expand Down
Loading

0 comments on commit 03e03ce

Please sign in to comment.