Skip to content

Commit

Permalink
feat: faster take for BitPackedArray and SparseArray (#1133)
Browse files Browse the repository at this point in the history
fixes #1039
  • Loading branch information
lwwmanning authored Oct 24, 2024
1 parent c82bb97 commit e2a681a
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 76 deletions.
204 changes: 135 additions & 69 deletions encodings/fastlanes/src/bitpacking/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant, IntoCanonical};
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::VortexResult;
use vortex_error::{VortexExpect as _, VortexResult};

use crate::{unpack_single_primitive, BitPackedArray};

Expand All @@ -35,111 +35,177 @@ impl TakeFn for BitPackedArray {

let indices = indices.clone().into_primitive()?;
let taken = match_each_unsigned_integer_ptype!(ptype, |$T| {
PrimitiveArray::from_vec(take_primitive::<$T>(self, &indices)?, taken_validity)
match_each_integer_ptype!(indices.ptype(), |$I| {
PrimitiveArray::from_vec(take_primitive::<$T, $I>(self, &indices)?, taken_validity)
})
});
Ok(taken.reinterpret_cast(ptype).into_array())
}
}

fn take_primitive<T: NativePType + BitPacking>(
// array_chunks must use while let so that we can get the remainder
#[allow(clippy::while_let_on_iterator)]
fn take_primitive<T: NativePType + BitPacking, I: NativePType>(
array: &BitPackedArray,
indices: &PrimitiveArray,
) -> VortexResult<Vec<T>> {
let offset = array.offset() as usize;

// Group indices into 1024-element chunks and relativise them to the beginning of each chunk
let relative_indices: Vec<(usize, Vec<u16>)> = match_each_integer_ptype!(indices.ptype(), |$P| {
indices
.maybe_null_slice::<$P>()
.iter()
.copied()
.map(|i| i as usize + offset)
.chunk_by(|idx| idx / 1024)
.into_iter()
.map(|(k, g)| (k, g.map(|idx| (idx % 1024) as u16).collect()))
.collect()
});
if indices.is_empty() {
return Ok(vec![]);
}

let offset = array.offset() as usize;
let bit_width = array.bit_width() as usize;

let packed = array.packed_slice::<T>();

let patches = array.patches().map(SparseArray::try_from).transpose()?;

// if we have a small number of relatively large batches, we gain by slicing and then patching inside the loop
// if we have a large number of relatively small batches, the overhead isn't worth it, and we're better off with a bulk patch
// roughly, if we have an average of less than 64 elements per batch, we prefer bulk patching
let prefer_bulk_patch = relative_indices.len() * BULK_PATCH_THRESHOLD > indices.len();
// Group indices into 1024-element chunks and relativise them to the beginning of each chunk
// *without* allocating on the heap
let chunked_indices = &indices
.maybe_null_slice::<I>()
.iter()
.map(|i| {
i.to_usize()
.vortex_expect("index must be expressible as usize")
+ offset
})
.chunk_by(|idx| idx / 1024);

let mut output = Vec::with_capacity(indices.len());
let mut unpacked = [T::zero(); 1024];
for (chunk, offsets) in relative_indices {

let mut batch_count = 0_usize;
for (chunk, offsets) in chunked_indices {
batch_count += 1;
let chunk_size = 128 * bit_width / size_of::<T>();
let packed_chunk = &packed[chunk * chunk_size..][..chunk_size];
if offsets.len() > UNPACK_CHUNK_THRESHOLD {
unsafe {
BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked);
}
for index in &offsets {
output.push(unpacked[*index as usize]);

// array_chunks produced a fixed size array, doesn't heap allocate
let mut have_unpacked = false;
let mut offset_chunk_iter = offsets
.map(|i| i % 1024)
.array_chunks::<UNPACK_CHUNK_THRESHOLD>();
while let Some(offset_chunk) = offset_chunk_iter.next() {
if !have_unpacked {
unsafe {
BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked);
}
have_unpacked = true;
}
} else {
for index in &offsets {
output.push(unsafe {
unpack_single_primitive::<T>(packed_chunk, bit_width, *index as usize)
});

for index in offset_chunk {
output.push(unpacked[index]);
}
}

if !prefer_bulk_patch {
if let Some(ref patches) = patches {
// NOTE: we need to subtract the array offset before slicing into the patches.
// This is because BitPackedArray is rounded to block boundaries, but patches
// is sliced exactly.
let patches_start = if chunk == 0 {
0
} else {
(chunk * 1024) - offset
};
let patches_end = min((chunk + 1) * 1024 - offset, patches.len());
let patches_slice = slice(patches.as_ref(), patches_start, patches_end)?;
let patches_slice = SparseArray::try_from(patches_slice)?;
let offsets = PrimitiveArray::from(offsets);
do_patch_for_take_primitive(&patches_slice, &offsets, &mut output)?;
if let Some(remainder) = offset_chunk_iter.into_remainder() {
if have_unpacked {
// we already bulk unpacked this chunk, so we can just push the remaining elements
for index in remainder {
output.push(unpacked[index]);
}
} else {
// we had fewer than UNPACK_CHUNK_THRESHOLD offsets, so we just unpack each one individually
for index in remainder {
output.push(unsafe {
unpack_single_primitive::<T>(packed_chunk, bit_width, index)
});
}
}
}
}

if prefer_bulk_patch {
if let Some(ref patches) = patches {
do_patch_for_take_primitive(patches, indices, &mut output)?;
}
if let Some(ref patches) = patches {
patch_for_take_primitive::<T, I>(patches, indices, offset, batch_count, &mut output)?;
}

Ok(output)
}

fn do_patch_for_take_primitive<T: NativePType>(
fn patch_for_take_primitive<T: NativePType, I: NativePType>(
patches: &SparseArray,
indices: &PrimitiveArray,
offset: usize,
batch_count: usize,
output: &mut [T],
) -> VortexResult<()> {
let taken_patches = take(patches.as_ref(), indices.as_ref())?;
let taken_patches = SparseArray::try_from(taken_patches)?;

let base_index = output.len() - indices.len();
let output_patches = taken_patches
.values()
.into_primitive()?
.reinterpret_cast(T::PTYPE);
taken_patches
.resolved_indices()
#[inline]
fn inner_patch<T: NativePType>(
patches: &SparseArray,
indices: &PrimitiveArray,
output: &mut [T],
) -> VortexResult<()> {
let taken_patches = take(patches.as_ref(), indices.as_ref())?;
let taken_patches = SparseArray::try_from(taken_patches)?;

let base_index = output.len() - indices.len();
let output_patches = taken_patches
.values()
.into_primitive()?
.reinterpret_cast(T::PTYPE);
taken_patches
.resolved_indices()
.iter()
.map(|idx| base_index + *idx)
.zip_eq(output_patches.maybe_null_slice::<T>())
.for_each(|(idx, val)| {
output[idx] = *val;
});

Ok(())
}

// if we have a small number of relatively large batches, we gain by slicing and then patching inside the loop
// if we have a large number of relatively small batches, the overhead isn't worth it, and we're better off with a bulk patch
// roughly, if we have an average of less than 64 elements per batch, we prefer bulk patching
let prefer_bulk_patch = batch_count * BULK_PATCH_THRESHOLD > indices.len();
if prefer_bulk_patch {
return inner_patch(patches, indices, output);
}

let min_index = patches.min_index().unwrap_or_default();
let max_index = patches.max_index().unwrap_or_default();

// Group indices into 1024-element chunks and relativise them to the beginning of each chunk
let chunked_indices = &indices
.maybe_null_slice::<I>()
.iter()
.map(|idx| base_index + *idx)
.zip_eq(output_patches.maybe_null_slice::<T>())
.for_each(|(idx, val)| {
output[idx] = *val;
});
.map(|i| {
i.to_usize()
.vortex_expect("index must be expressible as usize")
+ offset
})
.filter(|i| *i >= min_index && *i <= max_index) // short-circuit
.chunk_by(|idx| idx / 1024);

for (chunk, offsets) in chunked_indices {
// NOTE: we need to subtract the array offset before slicing into the patches.
// This is because BitPackedArray is rounded to block boundaries, but patches
// is sliced exactly.
let patches_start = if chunk == 0 {
0
} else {
(chunk * 1024) - offset
};
let patches_end = min((chunk + 1) * 1024 - offset, patches.len());
let patches_slice = slice(patches.as_ref(), patches_start, patches_end)?;
let patches_slice = SparseArray::try_from(patches_slice)?;
if patches_slice.is_empty() {
continue;
}

let min_index = patches_slice.min_index().unwrap_or_default();
let max_index = patches_slice.max_index().unwrap_or_default();
let offsets = offsets
.map(|i| (i % 1024) as u16)
.filter(|i| *i as usize >= min_index && *i as usize <= max_index)
.collect_vec();
if offsets.is_empty() {
continue;
}

inner_patch(&patches_slice, &PrimitiveArray::from(offsets), output)?;
}

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(incomplete_features)]
#![feature(generic_const_exprs)]
#![feature(vec_into_raw_parts)]
#![feature(iter_array_chunks)]

pub use bitpacking::*;
pub use delta::*;
Expand Down
6 changes: 6 additions & 0 deletions vortex-array/src/array/sparse/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ fn take_map(
.enumerate()
.map(|(i, r)| (*r as u64, i as u64))
.collect();
let min_index = array.min_index().unwrap_or_default() as u64;
let max_index = array.max_index().unwrap_or_default() as u64;
let (positions, patch_indices): (Vec<u64>, Vec<u64>) = match_each_integer_ptype!(indices.ptype(), |$P| {
indices.maybe_null_slice::<$P>()
.iter()
.map(|pi| *pi as u64)
.enumerate()
.filter(|(_, pi)| *pi >= min_index && *pi <= max_index) // short-circuit
.filter_map(|(i, pi)| indices_map.get(&pi).map(|phy_idx| (i as u64, phy_idx)))
.unzip()
});
Expand All @@ -60,11 +63,14 @@ fn take_search_sorted(
array: &SparseArray,
indices: &PrimitiveArray,
) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
let min_index = array.min_index().unwrap_or_default() as u64;
let max_index = array.max_index().unwrap_or_default() as u64;
let resolved = match_each_integer_ptype!(indices.ptype(), |$P| {
indices
.maybe_null_slice::<$P>()
.iter()
.enumerate()
.filter(|(_, i)| **i as u64 >= min_index && **i as u64 <= max_index) // short-circuit
.map(|(pos, i)| {
array
.search_index(*i as usize)
Expand Down
14 changes: 13 additions & 1 deletion vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,22 @@ impl SparseArray {
let min_index: usize = scalar_at(self.indices(), 0)
.and_then(|s| s.as_ref().try_into())
.vortex_expect("SparseArray indices is non-empty");

min_index - self.indices_offset()
})
}

/// Return the maximum index if indices are present.
///
/// If this sparse array has no indices (i.e. all elements are equal to fill_value)
/// then it returns None.
pub fn max_index(&self) -> Option<usize> {
(!self.indices().is_empty()).then(|| {
let max_index: usize = scalar_at(self.indices(), self.indices().len() - 1)
.and_then(|s| s.as_ref().try_into())
.vortex_expect("SparseArray indices is non-empty");
max_index - self.indices_offset()
})
}
}

impl ArrayTrait for SparseArray {}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub struct Buffers<'a> {
array: &'a VarBinViewArray,
}

impl<'a> Iterator for Buffers<'a> {
impl Iterator for Buffers<'_> {
type Item = Array;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
12 changes: 10 additions & 2 deletions vortex-array/src/compute/take.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use log::info;
use vortex_error::{vortex_err, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::{Array, IntoCanonical};
use crate::{Array, ArrayDType as _, IntoCanonical as _};

pub trait TakeFn {
fn take(&self, indices: &Array) -> VortexResult<Array>;
Expand All @@ -10,6 +10,14 @@ pub trait TakeFn {
pub fn take(array: impl AsRef<Array>, indices: impl AsRef<Array>) -> VortexResult<Array> {
let array = array.as_ref();
let indices = indices.as_ref();

if !indices.dtype().is_int() || indices.dtype().is_nullable() {
vortex_bail!(
"Take indices must be a non-nullable integer type, got {}",
indices.dtype()
);
}

array.with_dyn(|a| {
if let Some(take) = a.take() {
return take.take(indices);
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl<'a> TreeDisplayWrapper<'a> {
}
}

impl<'a> fmt::Display for TreeDisplayWrapper<'a> {
impl fmt::Display for TreeDisplayWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let array = self.0;
let mut array_fmt = TreeFormatter::new(f, "".to_string());
Expand Down
4 changes: 2 additions & 2 deletions vortex-serde/src/layouts/read/recordbatchreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where
}
}

impl<'a, R, AR> Iterator for VortexRecordBatchReader<'a, R, AR>
impl<R, AR> Iterator for VortexRecordBatchReader<'_, R, AR>
where
R: VortexReadAt + Unpin + 'static,
AR: AsyncRuntime,
Expand All @@ -62,7 +62,7 @@ where
}
}

impl<'a, R, AR> RecordBatchReader for VortexRecordBatchReader<'a, R, AR>
impl<R, AR> RecordBatchReader for VortexRecordBatchReader<'_, R, AR>
where
R: VortexReadAt + Unpin + 'static,
AR: AsyncRuntime,
Expand Down

0 comments on commit e2a681a

Please sign in to comment.