From 0496b548b71979772a7b781281acc3277414657c Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 5 Apr 2024 17:35:14 +0100 Subject: [PATCH] Support arbitrary slicing of Bitpacked arrays (#209) This follows the pattern we use elsewhere where we do the maximum possible work in slice and for anything irreducible we keep extra state, in this case offset, and apply it when we take value out of the encoded array. --- vortex-fastlanes/benches/bitpacking.rs | 22 +++- vortex-fastlanes/src/bitpacking/compress.rs | 76 +++++++++---- vortex-fastlanes/src/bitpacking/compute.rs | 6 +- vortex-fastlanes/src/bitpacking/mod.rs | 118 +++++++++++++++----- vortex-fastlanes/src/bitpacking/serde.rs | 11 +- 5 files changed, 178 insertions(+), 55 deletions(-) diff --git a/vortex-fastlanes/benches/bitpacking.rs b/vortex-fastlanes/benches/bitpacking.rs index 34611dcabf..a9f3ee649b 100644 --- a/vortex-fastlanes/benches/bitpacking.rs +++ b/vortex-fastlanes/benches/bitpacking.rs @@ -30,7 +30,11 @@ fn pack_unpack(c: &mut Criterion) { let packed = bitpack_primitive(&values, bits); c.bench_function("unpack_1M", |b| { - b.iter(|| black_box(unpack_primitive::(&packed, bits, values.len()))); + b.iter(|| black_box(unpack_primitive::(&packed, bits, 0, values.len()))); + }); + + c.bench_function("unpack_1M_offset", |b| { + b.iter(|| black_box(unpack_primitive::(&packed, bits, 768, values.len()))); }); c.bench_function("unpack_1M_singles", |b| { @@ -40,7 +44,11 @@ fn pack_unpack(c: &mut Criterion) { // 1024 elements pack into `128 * bits` bytes let packed_1024 = &packed[0..128 * bits]; c.bench_function("unpack_1024_alloc", |b| { - b.iter(|| black_box(unpack_primitive::(&packed, bits, values.len()))); + b.iter(|| black_box(unpack_primitive::(&packed, bits, 0, values.len()))); + }); + + c.bench_function("unpack_1024_alloc_offset", |b| { + b.iter(|| black_box(unpack_primitive::(&packed, bits, 768, values.len()))); }); let mut output: Vec = Vec::with_capacity(1024); @@ -52,6 +60,16 @@ fn pack_unpack(c: &mut Criterion) { }) }); + let mut output: Vec = Vec::with_capacity(1024); + c.bench_function("unpack_1024_noalloc_offset", |b| { + b.iter(|| { + output.clear(); + TryBitPack::try_unpack_into(packed_1024, bits, &mut output).unwrap(); + output.drain(0..768); + black_box(output[0]) + }) + }); + c.bench_function("unpack_single", |b| { b.iter(|| black_box(unsafe { unpack_single_primitive::(packed_1024, 8, 0) })); }); diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 0df9899e37..4ffd8570c7 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -165,24 +165,25 @@ fn bitpack_patches( pub fn unpack(array: &BitPackedArray) -> VortexResult { let bit_width = array.bit_width(); let length = array.len(); - let encoded = flatten_primitive(cast(array.encoded(), U8.into())?.as_ref())?; + let offset = array.offset(); + let encoded = flatten_primitive(&cast(array.encoded(), U8.into())?)?; let ptype: PType = array.dtype().try_into()?; let mut unpacked = match ptype { I8 | U8 => PrimitiveArray::from_nullable( - unpack_primitive::(encoded.typed_data::(), bit_width, length), + unpack_primitive::(encoded.typed_data::(), bit_width, offset, length), array.validity().to_owned_view(), ), I16 | U16 => PrimitiveArray::from_nullable( - unpack_primitive::(encoded.typed_data::(), bit_width, length), + unpack_primitive::(encoded.typed_data::(), bit_width, offset, length), array.validity().to_owned_view(), ), I32 | U32 => PrimitiveArray::from_nullable( - unpack_primitive::(encoded.typed_data::(), bit_width, length), + unpack_primitive::(encoded.typed_data::(), bit_width, offset, length), array.validity().to_owned_view(), ), I64 | U64 => PrimitiveArray::from_nullable( - unpack_primitive::(encoded.typed_data::(), bit_width, length), + unpack_primitive::(encoded.typed_data::(), bit_width, offset, length), array.validity().to_owned_view(), ), _ => panic!("Unsupported ptype {}", ptype), @@ -216,6 +217,7 @@ fn patch_unpacked(array: PrimitiveArray, patches: &dyn Array) -> VortexResult( packed: &[u8], bit_width: usize, + offset: usize, length: usize, ) -> Vec { if bit_width == 0 { @@ -223,7 +225,8 @@ pub fn unpack_primitive( } // How many fastlanes vectors we will process. - let num_chunks = (length + 1023) / 1024; + // Packed array might not start at 0 when the array is sliced. Offset is guaranteed to be < 1024. + let num_chunks = (offset + length + 1023) / 1024; let bytes_per_chunk = 128 * bit_width; assert_eq!( packed.len(), @@ -234,9 +237,19 @@ pub fn unpack_primitive( ); // Allocate a result vector. - let mut output = Vec::with_capacity(num_chunks * 1024); + let mut output = Vec::with_capacity(num_chunks * 1024 - offset); + // Handle first chunk if offset is non 0. We have to decode the chunk and skip first offset elements + let first_full_chunk = if offset != 0 { + let chunk: &[u8] = &packed[0..bytes_per_chunk]; + TryBitPack::try_unpack_into(chunk, bit_width, &mut output).unwrap(); + output.drain(0..offset); + 1 + } else { + 0 + }; + // Loop over all the chunks. - (0..num_chunks).for_each(|i| { + (first_full_chunk..num_chunks).for_each(|i| { let chunk: &[u8] = &packed[i * bytes_per_chunk..][0..bytes_per_chunk]; TryBitPack::try_unpack_into(chunk, bit_width, &mut output).unwrap(); }); @@ -250,6 +263,14 @@ pub fn unpack_primitive( if output.len() < 1024 { output.shrink_to_fit(); } + + assert_eq!( + output.len(), + length, + "Expected unpacked array to be of length {} but got {}", + length, + output.len() + ); output } @@ -257,23 +278,34 @@ pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResul let bit_width = array.bit_width(); let encoded = flatten_primitive(cast(array.encoded(), U8.into())?.as_ref())?; let ptype: PType = array.dtype().try_into()?; + let index_in_encoded = index + array.offset(); let scalar: Scalar = unsafe { match ptype { - I8 | U8 => unpack_single_primitive::(encoded.typed_data::(), bit_width, index) - .map(|v| v.into()), - I16 | U16 => { - unpack_single_primitive::(encoded.typed_data::(), bit_width, index) - .map(|v| v.into()) - } - I32 | U32 => { - unpack_single_primitive::(encoded.typed_data::(), bit_width, index) - .map(|v| v.into()) - } - I64 | U64 => { - unpack_single_primitive::(encoded.typed_data::(), bit_width, index) - .map(|v| v.into()) - } + I8 | U8 => unpack_single_primitive::( + encoded.typed_data::(), + bit_width, + index_in_encoded, + ) + .map(|v| v.into()), + I16 | U16 => unpack_single_primitive::( + encoded.typed_data::(), + bit_width, + index_in_encoded, + ) + .map(|v| v.into()), + I32 | U32 => unpack_single_primitive::( + encoded.typed_data::(), + bit_width, + index_in_encoded, + ) + .map(|v| v.into()), + I64 | U64 => unpack_single_primitive::( + encoded.typed_data::(), + bit_width, + index_in_encoded, + ) + .map(|v| v.into()), _ => vortex_bail!("Unsupported ptype {}", ptype), }? }; diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index 95d29b5c9d..1b69074d2e 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -1,3 +1,5 @@ +use std::cmp::min; + use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; @@ -37,7 +39,7 @@ impl FlattenFn for BitPackedArray { impl ScalarAtFn for BitPackedArray { fn scalar_at(&self, index: usize) -> VortexResult { if index >= self.len() { - return Err(vortex_err!(OutOfBounds:index, 0, self.len())); + return Err(vortex_err!(OutOfBounds: index, 0, self.len())); } if let Some(patches) = self.patches() { // NB: All non-null values are considered patches @@ -67,7 +69,7 @@ impl TakeFn for BitPackedArray { let taken = relative_indices .into_iter() .map(|(chunk, offsets)| { - let sliced = self.slice(chunk * 1024, (chunk + 1) * 1024)?; + let sliced = self.slice(chunk * 1024, min((chunk + 1) * 1024, self.len()))?; take( &unpack(sliced.as_bitpacked())?, diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index 5d7b153fde..88639cccc9 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -1,10 +1,9 @@ -use std::cmp::min; +use std::cmp::max; use std::sync::{Arc, RwLock}; pub use compress::*; -use vortex::array::{Array, ArrayRef}; +use vortex::array::{check_slice_bounds, Array, ArrayRef}; use vortex::compress::EncodingCompression; -use vortex::compute::flatten::flatten_primitive; use vortex::compute::ArrayCompute; use vortex::encoding::{Encoding, EncodingId, EncodingRef}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; @@ -27,6 +26,7 @@ pub struct BitPackedArray { encoded: ArrayRef, validity: Option, patches: Option, + offset: usize, len: usize, bit_width: usize, dtype: DType, @@ -44,6 +44,18 @@ impl BitPackedArray { bit_width: usize, dtype: DType, len: usize, + ) -> VortexResult { + Self::try_new_from_offset(encoded, validity, patches, bit_width, dtype, len, 0) + } + + pub(crate) fn try_new_from_offset( + encoded: ArrayRef, + validity: Option, + patches: Option, + bit_width: usize, + dtype: DType, + len: usize, + offset: usize, ) -> VortexResult { if encoded.dtype() != &Self::ENCODED_DTYPE { vortex_bail!(MismatchedTypes: Self::ENCODED_DTYPE, encoded.dtype()); @@ -71,8 +83,9 @@ impl BitPackedArray { encoded, validity, patches, - bit_width, + offset, len, + bit_width, dtype, stats: Arc::new(RwLock::new(StatsSet::new())), }) @@ -92,6 +105,11 @@ impl BitPackedArray { pub fn patches(&self) -> Option<&ArrayRef> { self.patches.as_ref() } + + #[inline] + pub fn offset(&self) -> usize { + self.offset + } } impl Array for BitPackedArray { @@ -125,31 +143,21 @@ impl Array for BitPackedArray { } fn slice(&self, start: usize, stop: usize) -> VortexResult { - if start % 1024 != 0 || stop % 1024 != 0 { - return flatten_primitive(self)?.slice(start, stop); - } - - if start > self.len() { - vortex_bail!(OutOfBounds: start, 0, self.len()); - } - // If we are slicing more than one 1024 element chunk beyond end, we consider this out of bounds - if stop / 1024 > ((self.len() + 1023) / 1024) { - vortex_bail!(OutOfBounds: stop, 0, self.len()); - } + check_slice_bounds(self, start, stop)?; + let offset = start % 1024; + let block_start = max(0, start - offset); + let block_stop = ((stop + 1023) / 1024) * 1024; - let encoded_start = (start / 8) * self.bit_width; - let encoded_stop = (stop / 8) * self.bit_width; - Self::try_new( + let encoded_start = (block_start / 8) * self.bit_width; + let encoded_stop = (block_stop / 8) * self.bit_width; + Self::try_new_from_offset( self.encoded().slice(encoded_start, encoded_stop)?, - self.validity() - .map(|v| v.slice(start, min(stop, self.len()))) - .transpose()?, - self.patches() - .map(|p| p.slice(start, min(stop, self.len()))) - .transpose()?, + self.validity().map(|v| v.slice(start, stop)).transpose()?, + self.patches().map(|p| p.slice(start, stop)).transpose()?, self.bit_width(), self.dtype().clone(), - min(stop - start, self.len()), + stop - start, + offset, ) .map(|a| a.into_array()) } @@ -185,6 +193,7 @@ impl OwnedValidity for BitPackedArray { impl ArrayDisplay for BitPackedArray { fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.property("offset", self.offset)?; f.property("packed", format!("u{}", self.bit_width()))?; f.child("encoded", self.encoded())?; f.maybe_child("patches", self.patches())?; @@ -218,3 +227,62 @@ impl Encoding for BitPackedEncoding { Some(self) } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use vortex::array::primitive::PrimitiveArray; + use vortex::array::Array; + use vortex::compress::{CompressConfig, CompressCtx}; + use vortex::compute::scalar_at::scalar_at; + use vortex::encoding::EncodingRef; + + use crate::BitPackedEncoding; + + #[test] + fn slice_within_block() { + let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); + let ctx = CompressCtx::new(Arc::new(cfg)); + + let compressed = ctx + .compress( + &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()), + None, + ) + .unwrap() + .slice(768, 9999) + .unwrap(); + assert_eq!( + scalar_at(&compressed, 0).unwrap(), + ((768 % 63) as u8).into() + ); + assert_eq!( + scalar_at(&compressed, compressed.len() - 1).unwrap(), + ((9998 % 63) as u8).into() + ); + } + + #[test] + fn slice_block_boundary() { + let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); + let ctx = CompressCtx::new(Arc::new(cfg)); + + let compressed = ctx + .compress( + &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()), + None, + ) + .unwrap() + .slice(7168, 9216) + .unwrap(); + assert_eq!( + scalar_at(&compressed, 0).unwrap(), + ((7168 % 63) as u8).into() + ); + assert_eq!( + scalar_at(&compressed, compressed.len() - 1).unwrap(), + ((9215 % 63) as u8).into() + ); + } +} diff --git a/vortex-fastlanes/src/bitpacking/serde.rs b/vortex-fastlanes/src/bitpacking/serde.rs index 717bcbd74d..3474f09166 100644 --- a/vortex-fastlanes/src/bitpacking/serde.rs +++ b/vortex-fastlanes/src/bitpacking/serde.rs @@ -11,7 +11,8 @@ impl ArraySerde for BitPackedArray { ctx.write_validity(self.validity())?; ctx.write_optional_array(self.patches())?; ctx.write_usize(self.bit_width())?; - ctx.write_usize(self.len()) + ctx.write_usize(self.len())?; + ctx.write_usize(self.offset()) } fn metadata(&self) -> VortexResult>> { @@ -19,6 +20,7 @@ impl ArraySerde for BitPackedArray { let mut ctx = WriteCtx::new(&mut vec); ctx.write_usize(self.bit_width())?; ctx.write_usize(self.len())?; + ctx.write_usize(self.offset())?; Ok(Some(vec)) } } @@ -30,15 +32,16 @@ impl EncodingSerde for BitPackedEncoding { let patches = ctx.read_optional_array()?; let bit_width = ctx.read_usize()?; let len = ctx.read_usize()?; - Ok(BitPackedArray::try_new( + let offset = ctx.read_usize()?; + Ok(BitPackedArray::try_new_from_offset( encoded, validity, patches, bit_width, ctx.schema().clone(), len, - ) - .unwrap() + offset, + )? .into_array()) } }