Skip to content

Commit

Permalink
Support arbitrary slicing of Bitpacked arrays (#209)
Browse files Browse the repository at this point in the history
This follows the pattern we use elsewhere where we do the maximum
possible work in slice and for anything irreducible we keep extra state,
in this case offset, and apply it when we take value out of the encoded
array.
  • Loading branch information
robert3005 authored Apr 5, 2024
1 parent 79a0630 commit 0496b54
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 55 deletions.
22 changes: 20 additions & 2 deletions vortex-fastlanes/benches/bitpacking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ fn pack_unpack(c: &mut Criterion) {

let packed = bitpack_primitive(&values, bits);
c.bench_function("unpack_1M", |b| {
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, values.len())));
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, 0, values.len())));
});

c.bench_function("unpack_1M_offset", |b| {
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, 768, values.len())));
});

c.bench_function("unpack_1M_singles", |b| {
Expand All @@ -40,7 +44,11 @@ fn pack_unpack(c: &mut Criterion) {
// 1024 elements pack into `128 * bits` bytes
let packed_1024 = &packed[0..128 * bits];
c.bench_function("unpack_1024_alloc", |b| {
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, values.len())));
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, 0, values.len())));
});

c.bench_function("unpack_1024_alloc_offset", |b| {
b.iter(|| black_box(unpack_primitive::<u32>(&packed, bits, 768, values.len())));
});

let mut output: Vec<u32> = Vec::with_capacity(1024);
Expand All @@ -52,6 +60,16 @@ fn pack_unpack(c: &mut Criterion) {
})
});

let mut output: Vec<u32> = Vec::with_capacity(1024);
c.bench_function("unpack_1024_noalloc_offset", |b| {
b.iter(|| {
output.clear();
TryBitPack::try_unpack_into(packed_1024, bits, &mut output).unwrap();
output.drain(0..768);
black_box(output[0])
})
});

c.bench_function("unpack_single", |b| {
b.iter(|| black_box(unsafe { unpack_single_primitive::<u32>(packed_1024, 8, 0) }));
});
Expand Down
76 changes: 54 additions & 22 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,24 +165,25 @@ fn bitpack_patches(
pub fn unpack(array: &BitPackedArray) -> VortexResult<PrimitiveArray> {
let bit_width = array.bit_width();
let length = array.len();
let encoded = flatten_primitive(cast(array.encoded(), U8.into())?.as_ref())?;
let offset = array.offset();
let encoded = flatten_primitive(&cast(array.encoded(), U8.into())?)?;
let ptype: PType = array.dtype().try_into()?;

let mut unpacked = match ptype {
I8 | U8 => PrimitiveArray::from_nullable(
unpack_primitive::<u8>(encoded.typed_data::<u8>(), bit_width, length),
unpack_primitive::<u8>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity().to_owned_view(),
),
I16 | U16 => PrimitiveArray::from_nullable(
unpack_primitive::<u16>(encoded.typed_data::<u8>(), bit_width, length),
unpack_primitive::<u16>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity().to_owned_view(),
),
I32 | U32 => PrimitiveArray::from_nullable(
unpack_primitive::<u32>(encoded.typed_data::<u8>(), bit_width, length),
unpack_primitive::<u32>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity().to_owned_view(),
),
I64 | U64 => PrimitiveArray::from_nullable(
unpack_primitive::<u64>(encoded.typed_data::<u8>(), bit_width, length),
unpack_primitive::<u64>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity().to_owned_view(),
),
_ => panic!("Unsupported ptype {}", ptype),
Expand Down Expand Up @@ -216,14 +217,16 @@ fn patch_unpacked(array: PrimitiveArray, patches: &dyn Array) -> VortexResult<Pr
pub fn unpack_primitive<T: NativePType + TryBitPack>(
packed: &[u8],
bit_width: usize,
offset: usize,
length: usize,
) -> Vec<T> {
if bit_width == 0 {
return vec![T::zero(); length];
}

// How many fastlanes vectors we will process.
let num_chunks = (length + 1023) / 1024;
// Packed array might not start at 0 when the array is sliced. Offset is guaranteed to be < 1024.
let num_chunks = (offset + length + 1023) / 1024;
let bytes_per_chunk = 128 * bit_width;
assert_eq!(
packed.len(),
Expand All @@ -234,9 +237,19 @@ pub fn unpack_primitive<T: NativePType + TryBitPack>(
);

// Allocate a result vector.
let mut output = Vec::with_capacity(num_chunks * 1024);
let mut output = Vec::with_capacity(num_chunks * 1024 - offset);
// Handle first chunk if offset is non 0. We have to decode the chunk and skip first offset elements
let first_full_chunk = if offset != 0 {
let chunk: &[u8] = &packed[0..bytes_per_chunk];
TryBitPack::try_unpack_into(chunk, bit_width, &mut output).unwrap();
output.drain(0..offset);
1
} else {
0
};

// Loop over all the chunks.
(0..num_chunks).for_each(|i| {
(first_full_chunk..num_chunks).for_each(|i| {
let chunk: &[u8] = &packed[i * bytes_per_chunk..][0..bytes_per_chunk];
TryBitPack::try_unpack_into(chunk, bit_width, &mut output).unwrap();
});
Expand All @@ -250,30 +263,49 @@ pub fn unpack_primitive<T: NativePType + TryBitPack>(
if output.len() < 1024 {
output.shrink_to_fit();
}

assert_eq!(
output.len(),
length,
"Expected unpacked array to be of length {} but got {}",
length,
output.len()
);
output
}

pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult<Scalar> {
let bit_width = array.bit_width();
let encoded = flatten_primitive(cast(array.encoded(), U8.into())?.as_ref())?;
let ptype: PType = array.dtype().try_into()?;
let index_in_encoded = index + array.offset();

let scalar: Scalar = unsafe {
match ptype {
I8 | U8 => unpack_single_primitive::<u8>(encoded.typed_data::<u8>(), bit_width, index)
.map(|v| v.into()),
I16 | U16 => {
unpack_single_primitive::<u16>(encoded.typed_data::<u8>(), bit_width, index)
.map(|v| v.into())
}
I32 | U32 => {
unpack_single_primitive::<u32>(encoded.typed_data::<u8>(), bit_width, index)
.map(|v| v.into())
}
I64 | U64 => {
unpack_single_primitive::<u64>(encoded.typed_data::<u8>(), bit_width, index)
.map(|v| v.into())
}
I8 | U8 => unpack_single_primitive::<u8>(
encoded.typed_data::<u8>(),
bit_width,
index_in_encoded,
)
.map(|v| v.into()),
I16 | U16 => unpack_single_primitive::<u16>(
encoded.typed_data::<u8>(),
bit_width,
index_in_encoded,
)
.map(|v| v.into()),
I32 | U32 => unpack_single_primitive::<u32>(
encoded.typed_data::<u8>(),
bit_width,
index_in_encoded,
)
.map(|v| v.into()),
I64 | U64 => unpack_single_primitive::<u64>(
encoded.typed_data::<u8>(),
bit_width,
index_in_encoded,
)
.map(|v| v.into()),
_ => vortex_bail!("Unsupported ptype {}", ptype),
}?
};
Expand Down
6 changes: 4 additions & 2 deletions vortex-fastlanes/src/bitpacking/compute.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::cmp::min;

use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
Expand Down Expand Up @@ -37,7 +39,7 @@ impl FlattenFn for BitPackedArray {
impl ScalarAtFn for BitPackedArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if index >= self.len() {
return Err(vortex_err!(OutOfBounds:index, 0, self.len()));
return Err(vortex_err!(OutOfBounds: index, 0, self.len()));
}
if let Some(patches) = self.patches() {
// NB: All non-null values are considered patches
Expand Down Expand Up @@ -67,7 +69,7 @@ impl TakeFn for BitPackedArray {
let taken = relative_indices
.into_iter()
.map(|(chunk, offsets)| {
let sliced = self.slice(chunk * 1024, (chunk + 1) * 1024)?;
let sliced = self.slice(chunk * 1024, min((chunk + 1) * 1024, self.len()))?;

take(
&unpack(sliced.as_bitpacked())?,
Expand Down
118 changes: 93 additions & 25 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::cmp::min;
use std::cmp::max;
use std::sync::{Arc, RwLock};

pub use compress::*;
use vortex::array::{Array, ArrayRef};
use vortex::array::{check_slice_bounds, Array, ArrayRef};
use vortex::compress::EncodingCompression;
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::ArrayCompute;
use vortex::encoding::{Encoding, EncodingId, EncodingRef};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
Expand All @@ -27,6 +26,7 @@ pub struct BitPackedArray {
encoded: ArrayRef,
validity: Option<Validity>,
patches: Option<ArrayRef>,
offset: usize,
len: usize,
bit_width: usize,
dtype: DType,
Expand All @@ -44,6 +44,18 @@ impl BitPackedArray {
bit_width: usize,
dtype: DType,
len: usize,
) -> VortexResult<Self> {
Self::try_new_from_offset(encoded, validity, patches, bit_width, dtype, len, 0)
}

pub(crate) fn try_new_from_offset(
encoded: ArrayRef,
validity: Option<Validity>,
patches: Option<ArrayRef>,
bit_width: usize,
dtype: DType,
len: usize,
offset: usize,
) -> VortexResult<Self> {
if encoded.dtype() != &Self::ENCODED_DTYPE {
vortex_bail!(MismatchedTypes: Self::ENCODED_DTYPE, encoded.dtype());
Expand Down Expand Up @@ -71,8 +83,9 @@ impl BitPackedArray {
encoded,
validity,
patches,
bit_width,
offset,
len,
bit_width,
dtype,
stats: Arc::new(RwLock::new(StatsSet::new())),
})
Expand All @@ -92,6 +105,11 @@ impl BitPackedArray {
pub fn patches(&self) -> Option<&ArrayRef> {
self.patches.as_ref()
}

#[inline]
pub fn offset(&self) -> usize {
self.offset
}
}

impl Array for BitPackedArray {
Expand Down Expand Up @@ -125,31 +143,21 @@ impl Array for BitPackedArray {
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
if start % 1024 != 0 || stop % 1024 != 0 {
return flatten_primitive(self)?.slice(start, stop);
}

if start > self.len() {
vortex_bail!(OutOfBounds: start, 0, self.len());
}
// If we are slicing more than one 1024 element chunk beyond end, we consider this out of bounds
if stop / 1024 > ((self.len() + 1023) / 1024) {
vortex_bail!(OutOfBounds: stop, 0, self.len());
}
check_slice_bounds(self, start, stop)?;
let offset = start % 1024;
let block_start = max(0, start - offset);
let block_stop = ((stop + 1023) / 1024) * 1024;

let encoded_start = (start / 8) * self.bit_width;
let encoded_stop = (stop / 8) * self.bit_width;
Self::try_new(
let encoded_start = (block_start / 8) * self.bit_width;
let encoded_stop = (block_stop / 8) * self.bit_width;
Self::try_new_from_offset(
self.encoded().slice(encoded_start, encoded_stop)?,
self.validity()
.map(|v| v.slice(start, min(stop, self.len())))
.transpose()?,
self.patches()
.map(|p| p.slice(start, min(stop, self.len())))
.transpose()?,
self.validity().map(|v| v.slice(start, stop)).transpose()?,
self.patches().map(|p| p.slice(start, stop)).transpose()?,
self.bit_width(),
self.dtype().clone(),
min(stop - start, self.len()),
stop - start,
offset,
)
.map(|a| a.into_array())
}
Expand Down Expand Up @@ -185,6 +193,7 @@ impl OwnedValidity for BitPackedArray {

impl ArrayDisplay for BitPackedArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
f.property("offset", self.offset)?;
f.property("packed", format!("u{}", self.bit_width()))?;
f.child("encoded", self.encoded())?;
f.maybe_child("patches", self.patches())?;
Expand Down Expand Up @@ -218,3 +227,62 @@ impl Encoding for BitPackedEncoding {
Some(self)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use vortex::array::primitive::PrimitiveArray;
use vortex::array::Array;
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::compute::scalar_at::scalar_at;
use vortex::encoding::EncodingRef;

use crate::BitPackedEncoding;

#[test]
fn slice_within_block() {
let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]);
let ctx = CompressCtx::new(Arc::new(cfg));

let compressed = ctx
.compress(
&PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::<Vec<_>>()),
None,
)
.unwrap()
.slice(768, 9999)
.unwrap();
assert_eq!(
scalar_at(&compressed, 0).unwrap(),
((768 % 63) as u8).into()
);
assert_eq!(
scalar_at(&compressed, compressed.len() - 1).unwrap(),
((9998 % 63) as u8).into()
);
}

#[test]
fn slice_block_boundary() {
let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]);
let ctx = CompressCtx::new(Arc::new(cfg));

let compressed = ctx
.compress(
&PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::<Vec<_>>()),
None,
)
.unwrap()
.slice(7168, 9216)
.unwrap();
assert_eq!(
scalar_at(&compressed, 0).unwrap(),
((7168 % 63) as u8).into()
);
assert_eq!(
scalar_at(&compressed, compressed.len() - 1).unwrap(),
((9215 % 63) as u8).into()
);
}
}
Loading

0 comments on commit 0496b54

Please sign in to comment.