Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dk/recursively compress arrays #960

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,9 @@ tests_outside_test_module = "deny"
unwrap_in_result = "deny"
unwrap_used = "deny"
use_debug = "deny"

[profile.release]
debug = true

[profile.bench]
debug = true
65 changes: 32 additions & 33 deletions encodings/fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use arrow_buffer::ArrowNativeType;
use fastlanes::BitPacking;
use vortex::array::{PrimitiveArray, Sparse, SparseArray};
use vortex::array::PrimitiveArray;
use vortex::stats::ArrayStatistics;
use vortex::validity::{ArrayValidity, Validity};
use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant, ToArray};
use vortex_buffer::Buffer;
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap};
use vortex_scalar::{Scalar, ScalarValue};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::BitPackedArray;

Expand Down Expand Up @@ -115,7 +115,7 @@ pub fn bitpack_patches(
parray: &PrimitiveArray,
bit_width: usize,
num_exceptions_hint: usize,
) -> Option<Array> {
) -> Option<(Array, Array, usize)> {
match_each_integer_ptype!(parray.ptype(), |$T| {
let mut indices: Vec<u64> = Vec::with_capacity(num_exceptions_hint);
let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint);
Expand All @@ -126,16 +126,15 @@ pub fn bitpack_patches(
}
}

(!indices.is_empty()).then(|| {
SparseArray::try_new(
if !indices.is_empty() {
Some((
indices.into_array(),
PrimitiveArray::from_vec(values, Validity::AllValid).into_array(),
parray.len(),
ScalarValue::Null,
)
.vortex_unwrap()
.into_array()
})
PrimitiveArray::from_vec(values, Validity::NonNullable).to_array(),
0
)) // FIXME(DK): we could store validity here or on bitpacked, why not here?
} else {
None
}
})
}

Expand All @@ -156,29 +155,29 @@ pub fn unpack(array: BitPackedArray) -> VortexResult<PrimitiveArray> {
unpacked = unpacked.reinterpret_cast(ptype);
}

if let Some(patches) = array.patches() {
patch_unpacked(unpacked, &patches)
if let Some((indices, values, _)) = array._patches() {
patch_unpacked(unpacked, indices, values)
} else {
Ok(unpacked)
}
}

fn patch_unpacked(array: PrimitiveArray, patches: &Array) -> VortexResult<PrimitiveArray> {
match patches.encoding().id() {
Sparse::ID => {
match_each_integer_ptype!(array.ptype(), |$T| {
let typed_patches = SparseArray::try_from(patches).unwrap();
array.patch(
&typed_patches.resolved_indices(),
typed_patches.values().into_primitive()?.maybe_null_slice::<$T>())
})
}
_ => vortex_bail!(
"Can't patch bitpacked array with {}, only {} is supported",
patches,
Sparse::ID
),
}
fn patch_unpacked(
array: PrimitiveArray,
indices: Array,
values: Array,
) -> VortexResult<PrimitiveArray> {
match_each_integer_ptype!(array.ptype(), |$T| {
array.patch(
&(indices
.into_primitive()?
.maybe_null_slice::<u64>()
.iter()
.map(|v| (*v as usize))
.collect::<Vec<_>>()),
values.into_primitive()?.maybe_null_slice::<$T>(),
)
})
}

pub fn unpack_primitive<T: NativePType + BitPacking>(
Expand Down Expand Up @@ -351,7 +350,7 @@ mod test {
PrimitiveArray::from_vec((0u32..24).collect::<Vec<_>>(), Validity::from(valid_values));
assert!(values.ptype().is_unsigned_int());
let compressed = BitPackedArray::encode(values.as_ref(), 4).unwrap();
assert!(compressed.patches().is_none());
assert!(compressed._patches().is_none());
assert_eq!(
(0..(1 << 4)).collect::<Vec<_>>(),
compressed
Expand Down
31 changes: 15 additions & 16 deletions encodings/fastlanes/src/bitpacking/compute/scalar_at.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vortex::compute::unary::{scalar_at_unchecked, ScalarAtFn};
use vortex::compute::unary::{scalar_at, ScalarAtFn};
use vortex::compute::{search_sorted, SearchResult, SearchSortedSide};
use vortex::ArrayDType;
use vortex_error::{VortexResult, VortexUnwrap as _};
use vortex_scalar::Scalar;
Expand All @@ -7,10 +8,13 @@ use crate::{unpack_single, BitPackedArray};

impl ScalarAtFn for BitPackedArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if let Some(patches) = self.patches() {
if let Some((indices, values, _)) = self._patches() {
// NB: All non-null values are considered patches
if patches.with_dyn(|a| a.is_valid(index)) {
return scalar_at_unchecked(&patches, index).cast(self.dtype());
match search_sorted(&indices, index, SearchSortedSide::Left)? {
SearchResult::Found(patches_index) => {
return scalar_at(&values, patches_index)?.cast(self.dtype());
}
SearchResult::NotFound(_) => {}
}
}

Expand All @@ -24,13 +28,13 @@ impl ScalarAtFn for BitPackedArray {

#[cfg(test)]
mod test {
use vortex::array::{PrimitiveArray, SparseArray};
use vortex::array::PrimitiveArray;
use vortex::compute::unary::scalar_at;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Nullability, PType};
use vortex_scalar::{Scalar, ScalarValue};
use vortex_scalar::Scalar;

use crate::BitPackedArray;

Expand All @@ -40,16 +44,11 @@ mod test {
Buffer::from(vec![0u8; 128]),
PType::U32,
Validity::AllInvalid,
Some(
SparseArray::try_new(
PrimitiveArray::from(vec![1u64]).into_array(),
PrimitiveArray::from_vec(vec![999u32], Validity::AllValid).into_array(),
8,
ScalarValue::Null,
)
.unwrap()
.into_array(),
),
Some((
PrimitiveArray::from(vec![1u64]).into_array(),
PrimitiveArray::from_vec(vec![999u32], Validity::NonNullable).into_array(),
0,
)),
1,
8,
)
Expand Down
32 changes: 19 additions & 13 deletions encodings/fastlanes/src/bitpacking/compute/search_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::cmp::Ordering::Greater;
use fastlanes::BitPacking;
use itertools::Itertools;
use num_traits::AsPrimitive;
use vortex::array::SparseArray;
use vortex::compute::{
search_sorted_u64, IndexOrd, Len, SearchResult, SearchSorted, SearchSortedFn, SearchSortedSide,
};
use vortex::stats::ArrayStatistics;
use vortex::validity::Validity;
use vortex::ArrayDType;
use vortex_dtype::{match_each_unsigned_integer_ptype, NativePType};
use vortex_error::{VortexError, VortexExpect as _, VortexResult};
use vortex_error::{VortexError, VortexResult};
use vortex_scalar::Scalar;

use crate::{unpack_single_primitive, BitPackedArray};
Expand Down Expand Up @@ -105,12 +105,19 @@ fn search_sorted_native<T>(
where
T: NativePType + BitPacking + AsPrimitive<usize> + AsPrimitive<u64>,
{
if let Some(patches_array) = array.patches() {
if let Some((_indices, values, _indices_offset)) = array._patches() {
// If patches exist they must be the last elements in the array, if the value we're looking for is greater than
// max packed value just search the patches
let usize_value: usize = value.as_();
if usize_value > array.max_packed_value() {
search_sorted_u64(&patches_array, value.as_(), side)
match search_sorted_u64(&values, value.as_(), side)? {
SearchResult::Found(patches_index) => {
Ok(SearchResult::Found(array.packed_len() + patches_index))
}
SearchResult::NotFound(patches_index) => {
Ok(SearchResult::NotFound(array.packed_len() + patches_index))
}
}
} else {
Ok(BitPackedSearch::<'_, T>::new(array).search_sorted(&value, side))
}
Expand All @@ -133,14 +140,6 @@ struct BitPackedSearch<'a, T> {

impl<'a, T: BitPacking + NativePType> BitPackedSearch<'a, T> {
pub fn new(array: &'a BitPackedArray) -> Self {
let min_patch_offset = array
.patches()
.and_then(|p| {
SparseArray::try_from(p)
.vortex_expect("Only sparse patches are supported")
.min_index()
})
.unwrap_or_else(|| array.len());
let first_null_idx = match array.validity() {
Validity::NonNullable | Validity::AllValid => array.len(),
Validity::AllInvalid => 0,
Expand All @@ -150,7 +149,14 @@ impl<'a, T: BitPacking + NativePType> BitPackedSearch<'a, T> {
}
};

let first_invalid_idx = cmp::min(min_patch_offset, first_null_idx);
let first_invalid_idx = match array._patches() {
None => first_null_idx,
Some((indices, _, indices_offset)) => {
let min_patch_offset =
indices.statistics().compute_min().unwrap_or(array.len()) - indices_offset; // FIXME(DK): indices are always sorted, right? We could just take the zeroth element
cmp::min(min_patch_offset, first_null_idx)
}
};

Self {
packed_maybe_null_slice: array.packed_slice::<T>(),
Expand Down
49 changes: 29 additions & 20 deletions encodings/fastlanes/src/bitpacking/compute/slice.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::cmp::max;

use vortex::array::SparseArray;
use vortex::compute::{slice, SliceFn};
use vortex::{Array, IntoArray};
use vortex_error::{VortexExpect, VortexResult};
use vortex_error::VortexResult;

use crate::BitPackedArray;

Expand All @@ -17,22 +16,34 @@ impl SliceFn for BitPackedArray {

let encoded_start = (block_start / 8) * self.bit_width();
let encoded_stop = (block_stop / 8) * self.bit_width();
let new_packed = self.packed().slice(encoded_start..encoded_stop);

let patch_start = offset_start.saturating_sub(self.packed_len());
let patch_stop = offset_stop.saturating_sub(self.packed_len());
let new_patches = if patch_start == patch_stop {
None
} else {
self._patches()
.map(|(indices, values, indices_offset)| -> VortexResult<_> {
Ok((
slice(&indices, patch_start, patch_stop)?,
slice(&values, patch_start, patch_stop)?,
indices_offset + start,
))
})
.transpose()?
.filter(|(indices, ..)| {
// No need to keep an empty set of patches
!indices.is_empty()
})
}; // FIXME(DK): Do I need to apply the offset to the indices eagerly?

// slice the buffer using the encoded start/stop values
Self::try_new_from_offset(
self.packed().slice(encoded_start..encoded_stop),
new_packed,
self.ptype(),
self.validity().slice(start, stop)?,
self.patches()
.map(|p| slice(&p, start, stop))
.transpose()?
.filter(|a| {
// If the sliced patch_indices is empty, we should not propagate the patches.
// There may be other logic that depends on Some(patches) indicating non-empty.
!SparseArray::try_from(a)
.vortex_expect("BitPackedArray must encode patches as SparseArray")
.indices()
.is_empty()
}),
new_patches,
self.bit_width(),
stop - start,
offset,
Expand All @@ -44,7 +55,7 @@ impl SliceFn for BitPackedArray {
#[cfg(test)]
mod test {
use itertools::Itertools;
use vortex::array::{PrimitiveArray, SparseArray};
use vortex::array::PrimitiveArray;
use vortex::compute::unary::scalar_at;
use vortex::compute::{slice, take};
use vortex::IntoArray;
Expand Down Expand Up @@ -165,17 +176,15 @@ mod test {
BitPackedArray::encode(PrimitiveArray::from((0u32..=64).collect_vec()).as_ref(), 6)
.unwrap();

assert!(array.patches().is_some());
assert!(array._patches().is_some());

let patch_indices = SparseArray::try_from(array.patches().unwrap())
.unwrap()
.indices();
let patch_indices = array._patches().unwrap().0;
assert_eq!(patch_indices.len(), 1);

// Slicing drops the empty patches array.
let sliced = slice(array, 0, 64).unwrap();
let sliced_bp = BitPackedArray::try_from(sliced).unwrap();
assert!(sliced_bp.patches().is_none());
assert!(sliced_bp._patches().is_none());
}

#[test]
Expand Down
Loading
Loading