diff --git a/encodings/fastlanes/src/bitpacking/compute/take.rs b/encodings/fastlanes/src/bitpacking/compute/take.rs index 7eb6d4e840..8d9451e975 100644 --- a/encodings/fastlanes/src/bitpacking/compute/take.rs +++ b/encodings/fastlanes/src/bitpacking/compute/take.rs @@ -8,7 +8,7 @@ use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant, IntoCanonical}; use vortex_dtype::{ match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType, }; -use vortex_error::VortexResult; +use vortex_error::{VortexExpect as _, VortexResult}; use crate::{unpack_single_primitive, BitPackedArray}; @@ -35,111 +35,177 @@ impl TakeFn for BitPackedArray { let indices = indices.clone().into_primitive()?; let taken = match_each_unsigned_integer_ptype!(ptype, |$T| { - PrimitiveArray::from_vec(take_primitive::<$T>(self, &indices)?, taken_validity) + match_each_integer_ptype!(indices.ptype(), |$I| { + PrimitiveArray::from_vec(take_primitive::<$T, $I>(self, &indices)?, taken_validity) + }) }); Ok(taken.reinterpret_cast(ptype).into_array()) } } -fn take_primitive( +// array_chunks must use while let so that we can get the remainder +#[allow(clippy::while_let_on_iterator)] +fn take_primitive( array: &BitPackedArray, indices: &PrimitiveArray, ) -> VortexResult> { - let offset = array.offset() as usize; - - // Group indices into 1024-element chunks and relativise them to the beginning of each chunk - let relative_indices: Vec<(usize, Vec)> = match_each_integer_ptype!(indices.ptype(), |$P| { - indices - .maybe_null_slice::<$P>() - .iter() - .copied() - .map(|i| i as usize + offset) - .chunk_by(|idx| idx / 1024) - .into_iter() - .map(|(k, g)| (k, g.map(|idx| (idx % 1024) as u16).collect())) - .collect() - }); + if indices.is_empty() { + return Ok(vec![]); + } + let offset = array.offset() as usize; let bit_width = array.bit_width() as usize; let packed = array.packed_slice::(); - let patches = array.patches().map(SparseArray::try_from).transpose()?; - // if we have a small number of relatively large batches, we gain by slicing and then patching inside the loop - // if we have a large number of relatively small batches, the overhead isn't worth it, and we're better off with a bulk patch - // roughly, if we have an average of less than 64 elements per batch, we prefer bulk patching - let prefer_bulk_patch = relative_indices.len() * BULK_PATCH_THRESHOLD > indices.len(); + // Group indices into 1024-element chunks and relativise them to the beginning of each chunk + // *without* allocating on the heap + let chunked_indices = &indices + .maybe_null_slice::() + .iter() + .map(|i| { + i.to_usize() + .vortex_expect("index must be expressible as usize") + + offset + }) + .chunk_by(|idx| idx / 1024); let mut output = Vec::with_capacity(indices.len()); let mut unpacked = [T::zero(); 1024]; - for (chunk, offsets) in relative_indices { + + let mut batch_count = 0_usize; + for (chunk, offsets) in chunked_indices { + batch_count += 1; let chunk_size = 128 * bit_width / size_of::(); let packed_chunk = &packed[chunk * chunk_size..][..chunk_size]; - if offsets.len() > UNPACK_CHUNK_THRESHOLD { - unsafe { - BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked); - } - for index in &offsets { - output.push(unpacked[*index as usize]); + + // array_chunks produced a fixed size array, doesn't heap allocate + let mut have_unpacked = false; + let mut offset_chunk_iter = offsets + .map(|i| i % 1024) + .array_chunks::(); + while let Some(offset_chunk) = offset_chunk_iter.next() { + if !have_unpacked { + unsafe { + BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked); + } + have_unpacked = true; } - } else { - for index in &offsets { - output.push(unsafe { - unpack_single_primitive::(packed_chunk, bit_width, *index as usize) - }); + + for index in offset_chunk { + output.push(unpacked[index]); } } - if !prefer_bulk_patch { - if let Some(ref patches) = patches { - // NOTE: we need to subtract the array offset before slicing into the patches. - // This is because BitPackedArray is rounded to block boundaries, but patches - // is sliced exactly. - let patches_start = if chunk == 0 { - 0 - } else { - (chunk * 1024) - offset - }; - let patches_end = min((chunk + 1) * 1024 - offset, patches.len()); - let patches_slice = slice(patches.as_ref(), patches_start, patches_end)?; - let patches_slice = SparseArray::try_from(patches_slice)?; - let offsets = PrimitiveArray::from(offsets); - do_patch_for_take_primitive(&patches_slice, &offsets, &mut output)?; + if let Some(remainder) = offset_chunk_iter.into_remainder() { + if have_unpacked { + // we already bulk unpacked this chunk, so we can just push the remaining elements + for index in remainder { + output.push(unpacked[index]); + } + } else { + // we had fewer than UNPACK_CHUNK_THRESHOLD offsets, so we just unpack each one individually + for index in remainder { + output.push(unsafe { + unpack_single_primitive::(packed_chunk, bit_width, index) + }); + } } } } - if prefer_bulk_patch { - if let Some(ref patches) = patches { - do_patch_for_take_primitive(patches, indices, &mut output)?; - } + if let Some(ref patches) = patches { + patch_for_take_primitive::(patches, indices, offset, batch_count, &mut output)?; } Ok(output) } -fn do_patch_for_take_primitive( +fn patch_for_take_primitive( patches: &SparseArray, indices: &PrimitiveArray, + offset: usize, + batch_count: usize, output: &mut [T], ) -> VortexResult<()> { - let taken_patches = take(patches.as_ref(), indices.as_ref())?; - let taken_patches = SparseArray::try_from(taken_patches)?; - - let base_index = output.len() - indices.len(); - let output_patches = taken_patches - .values() - .into_primitive()? - .reinterpret_cast(T::PTYPE); - taken_patches - .resolved_indices() + #[inline] + fn inner_patch( + patches: &SparseArray, + indices: &PrimitiveArray, + output: &mut [T], + ) -> VortexResult<()> { + let taken_patches = take(patches.as_ref(), indices.as_ref())?; + let taken_patches = SparseArray::try_from(taken_patches)?; + + let base_index = output.len() - indices.len(); + let output_patches = taken_patches + .values() + .into_primitive()? + .reinterpret_cast(T::PTYPE); + taken_patches + .resolved_indices() + .iter() + .map(|idx| base_index + *idx) + .zip_eq(output_patches.maybe_null_slice::()) + .for_each(|(idx, val)| { + output[idx] = *val; + }); + + Ok(()) + } + + // if we have a small number of relatively large batches, we gain by slicing and then patching inside the loop + // if we have a large number of relatively small batches, the overhead isn't worth it, and we're better off with a bulk patch + // roughly, if we have an average of less than 64 elements per batch, we prefer bulk patching + let prefer_bulk_patch = batch_count * BULK_PATCH_THRESHOLD > indices.len(); + if prefer_bulk_patch { + return inner_patch(patches, indices, output); + } + + let min_index = patches.min_index().unwrap_or_default(); + let max_index = patches.max_index().unwrap_or_default(); + + // Group indices into 1024-element chunks and relativise them to the beginning of each chunk + let chunked_indices = &indices + .maybe_null_slice::() .iter() - .map(|idx| base_index + *idx) - .zip_eq(output_patches.maybe_null_slice::()) - .for_each(|(idx, val)| { - output[idx] = *val; - }); + .map(|i| { + i.to_usize() + .vortex_expect("index must be expressible as usize") + + offset + }) + .filter(|i| *i >= min_index && *i <= max_index) // short-circuit + .chunk_by(|idx| idx / 1024); + + for (chunk, offsets) in chunked_indices { + // NOTE: we need to subtract the array offset before slicing into the patches. + // This is because BitPackedArray is rounded to block boundaries, but patches + // is sliced exactly. + let patches_start = if chunk == 0 { + 0 + } else { + (chunk * 1024) - offset + }; + let patches_end = min((chunk + 1) * 1024 - offset, patches.len()); + let patches_slice = slice(patches.as_ref(), patches_start, patches_end)?; + let patches_slice = SparseArray::try_from(patches_slice)?; + if patches_slice.is_empty() { + continue; + } + + let min_index = patches_slice.min_index().unwrap_or_default(); + let max_index = patches_slice.max_index().unwrap_or_default(); + let offsets = offsets + .map(|i| (i % 1024) as u16) + .filter(|i| *i as usize >= min_index && *i as usize <= max_index) + .collect_vec(); + if offsets.is_empty() { + continue; + } + + inner_patch(&patches_slice, &PrimitiveArray::from(offsets), output)?; + } Ok(()) } diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 528a3d31ae..530896efc0 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -1,6 +1,7 @@ #![allow(incomplete_features)] #![feature(generic_const_exprs)] #![feature(vec_into_raw_parts)] +#![feature(iter_array_chunks)] pub use bitpacking::*; pub use delta::*; diff --git a/vortex-array/src/array/sparse/compute/take.rs b/vortex-array/src/array/sparse/compute/take.rs index 2b44ecbfa5..d2f97590b6 100644 --- a/vortex-array/src/array/sparse/compute/take.rs +++ b/vortex-array/src/array/sparse/compute/take.rs @@ -42,11 +42,14 @@ fn take_map( .enumerate() .map(|(i, r)| (*r as u64, i as u64)) .collect(); + let min_index = array.min_index().unwrap_or_default() as u64; + let max_index = array.max_index().unwrap_or_default() as u64; let (positions, patch_indices): (Vec, Vec) = match_each_integer_ptype!(indices.ptype(), |$P| { indices.maybe_null_slice::<$P>() .iter() .map(|pi| *pi as u64) .enumerate() + .filter(|(_, pi)| *pi >= min_index && *pi <= max_index) // short-circuit .filter_map(|(i, pi)| indices_map.get(&pi).map(|phy_idx| (i as u64, phy_idx))) .unzip() }); @@ -60,11 +63,14 @@ fn take_search_sorted( array: &SparseArray, indices: &PrimitiveArray, ) -> VortexResult<(PrimitiveArray, PrimitiveArray)> { + let min_index = array.min_index().unwrap_or_default() as u64; + let max_index = array.max_index().unwrap_or_default() as u64; let resolved = match_each_integer_ptype!(indices.ptype(), |$P| { indices .maybe_null_slice::<$P>() .iter() .enumerate() + .filter(|(_, i)| **i as u64 >= min_index && **i as u64 <= max_index) // short-circuit .map(|(pos, i)| { array .search_index(*i as usize) diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 7379217f71..33ca689760 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -152,10 +152,22 @@ impl SparseArray { let min_index: usize = scalar_at(self.indices(), 0) .and_then(|s| s.as_ref().try_into()) .vortex_expect("SparseArray indices is non-empty"); - min_index - self.indices_offset() }) } + + /// Return the maximum index if indices are present. + /// + /// If this sparse array has no indices (i.e. all elements are equal to fill_value) + /// then it returns None. + pub fn max_index(&self) -> Option { + (!self.indices().is_empty()).then(|| { + let max_index: usize = scalar_at(self.indices(), self.indices().len() - 1) + .and_then(|s| s.as_ref().try_into()) + .vortex_expect("SparseArray indices is non-empty"); + max_index - self.indices_offset() + }) + } } impl ArrayTrait for SparseArray {} diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 97bedd5be2..54889e22d9 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -197,7 +197,7 @@ pub struct Buffers<'a> { array: &'a VarBinViewArray, } -impl<'a> Iterator for Buffers<'a> { +impl Iterator for Buffers<'_> { type Item = Array; fn next(&mut self) -> Option { diff --git a/vortex-array/src/compute/take.rs b/vortex-array/src/compute/take.rs index 76ecb767a3..c7fa05f451 100644 --- a/vortex-array/src/compute/take.rs +++ b/vortex-array/src/compute/take.rs @@ -1,7 +1,7 @@ use log::info; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use crate::{Array, IntoCanonical}; +use crate::{Array, ArrayDType as _, IntoCanonical as _}; pub trait TakeFn { fn take(&self, indices: &Array) -> VortexResult; @@ -10,6 +10,14 @@ pub trait TakeFn { pub fn take(array: impl AsRef, indices: impl AsRef) -> VortexResult { let array = array.as_ref(); let indices = indices.as_ref(); + + if !indices.dtype().is_int() || indices.dtype().is_nullable() { + vortex_bail!( + "Take indices must be a non-nullable integer type, got {}", + indices.dtype() + ); + } + array.with_dyn(|a| { if let Some(take) = a.take() { return take.take(indices); diff --git a/vortex-array/src/tree.rs b/vortex-array/src/tree.rs index eaaeaeafae..2af9475c4f 100644 --- a/vortex-array/src/tree.rs +++ b/vortex-array/src/tree.rs @@ -22,7 +22,7 @@ impl<'a> TreeDisplayWrapper<'a> { } } -impl<'a> fmt::Display for TreeDisplayWrapper<'a> { +impl fmt::Display for TreeDisplayWrapper<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let array = self.0; let mut array_fmt = TreeFormatter::new(f, "".to_string()); diff --git a/vortex-serde/src/layouts/read/recordbatchreader.rs b/vortex-serde/src/layouts/read/recordbatchreader.rs index 6e02bb347e..81ad736bfb 100644 --- a/vortex-serde/src/layouts/read/recordbatchreader.rs +++ b/vortex-serde/src/layouts/read/recordbatchreader.rs @@ -49,7 +49,7 @@ where } } -impl<'a, R, AR> Iterator for VortexRecordBatchReader<'a, R, AR> +impl Iterator for VortexRecordBatchReader<'_, R, AR> where R: VortexReadAt + Unpin + 'static, AR: AsyncRuntime, @@ -62,7 +62,7 @@ where } } -impl<'a, R, AR> RecordBatchReader for VortexRecordBatchReader<'a, R, AR> +impl RecordBatchReader for VortexRecordBatchReader<'_, R, AR> where R: VortexReadAt + Unpin + 'static, AR: AsyncRuntime,