Skip to content

Commit

Permalink
Primitive Iterator API (#689)
Browse files Browse the repository at this point in the history
This PR includes a new batched iterator API, that yeilds a `Vec<T>` and
validity data for batches of items from arrays. It can dynamically
dispatch over the type of underlying arrays, so recursive compression
should hold.
The PR includes implementations for primitve arrays, as well as ALP as a
test case for more complex encodings and constant array as its fairly
trivial.

I also added a bunch of benchmarks to various pieces it touches to have
some initial performance numbers and test basic correctness.
  • Loading branch information
AdamGS authored Aug 30, 2024
1 parent 53178e1 commit 7c017cb
Show file tree
Hide file tree
Showing 14 changed files with 814 additions and 25 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ allocator-api2 = "0.2.16"
anyhow = "1.0"
arbitrary = "1.3.2"
arrayref = "0.3.7"
arrow = { version = "52.0.0" }
arrow = { version = "52.0.0", default-features = false }
arrow-arith = "52.0.0"
arrow-array = "52.0.0"
arrow-buffer = "52.0.0"
Expand Down
1 change: 1 addition & 0 deletions encodings/alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ vortex-error = { workspace = true }
vortex-scalar = { workspace = true }

[dev-dependencies]
arrow = { workspace = true }
divan = { workspace = true }

[[bench]]
Expand Down
85 changes: 84 additions & 1 deletion encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use vortex_alp::{ALPFloat, Exponents};
use arrow::array::{as_primitive_array, ArrowNativeTypeOp, ArrowPrimitiveType};
use arrow::datatypes::{Float32Type, Float64Type};
use divan::{black_box, Bencher};
use vortex::array::PrimitiveArray;
use vortex::validity::Validity;
use vortex::variants::PrimitiveArrayTrait;
use vortex::IntoCanonical;
use vortex_alp::{alp_encode_components, ALPArray, ALPFloat, Exponents};
use vortex_dtype::NativePType;

fn main() {
divan::main();
Expand All @@ -9,3 +17,78 @@ fn alp_compress<T: ALPFloat>(n: usize) -> (Exponents, Vec<T::ALPInt>, Vec<u64>,
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
}

#[divan::bench(types = [f32, f64], args = [100_000, 1_000_000, 10_000_000])]
fn alp_iter<T>(bencher: Bencher, n: usize)
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
{
let values = PrimitiveArray::from_vec(vec![T::from(1.234).unwrap(); n], Validity::AllValid);
let (exponents, encoded, patches) = alp_encode_components::<T>(&values, None);

let alp_array = ALPArray::try_new(encoded, exponents, patches).unwrap();

bencher.bench_local(move || black_box(alp_sum(alp_array.clone())));
}

#[divan::bench(types = [Float32Type, Float64Type], args = [100_000, 1_000_000, 10_000_000])]
fn alp_iter_to_arrow<T>(bencher: Bencher, n: usize)
where
T: ArrowPrimitiveType,
T::Native: ALPFloat + NativePType + From<f32>,
<T::Native as ALPFloat>::ALPInt: NativePType,
{
let values = PrimitiveArray::from_vec(vec![T::Native::from(1.234_f32); n], Validity::AllValid);
let (exponents, encoded, patches) = alp_encode_components::<T::Native>(&values, None);

let alp_array = ALPArray::try_new(encoded, exponents, patches).unwrap();

bencher.bench_local(move || black_box(alp_canonicalize_sum::<T>(alp_array.clone())));
}

fn alp_canonicalize_sum<T: ArrowPrimitiveType>(array: ALPArray) -> T::Native {
let array = array.into_canonical().unwrap().into_arrow();
let arrow_primitive = as_primitive_array::<T>(array.as_ref());
arrow_primitive
.iter()
.fold(T::default_value(), |acc, value| {
if let Some(value) = value {
acc.add_wrapping(value)
} else {
acc
}
})
}

fn alp_sum(array: ALPArray) -> f64 {
if let Some(iter) = array.f32_iter() {
let mut sum = 0.0_f32;

for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { batch.get_unchecked(idx) }
}
}
}

return sum as f64;
}

if let Some(iter) = array.f64_iter() {
let mut sum = 0.0_f64;

for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { batch.get_unchecked(idx) }
}
}
}

return sum;
}

unreachable!()
}
120 changes: 118 additions & 2 deletions encodings/alp/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Debug;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use vortex::array::PrimitiveArray;
use vortex::iter::{Accessor, AccessorRef};
use vortex::stats::ArrayStatisticsCompute;
use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{
Expand All @@ -14,6 +16,7 @@ use vortex_error::{vortex_bail, VortexResult};

use crate::alp::Exponents;
use crate::compress::{alp_encode, decompress};
use crate::ALPFloat;

impl_encoding!("vortex.alp", 13u16, ALP);

Expand Down Expand Up @@ -115,7 +118,120 @@ impl ArrayVariants for ALPArray {
}
}

impl PrimitiveArrayTrait for ALPArray {}
struct ALPAccessor<F: ALPFloat> {
encoded: Arc<dyn Accessor<F::ALPInt>>,
patches: Option<Arc<dyn Accessor<F>>>,
validity: Validity,
exponents: Exponents,
}
impl<F: ALPFloat> ALPAccessor<F> {
fn new(
encoded: AccessorRef<F::ALPInt>,
patches: Option<AccessorRef<F>>,
exponents: Exponents,
validity: Validity,
) -> Self {
Self {
encoded,
patches,
validity,
exponents,
}
}
}

impl<F: ALPFloat> Accessor<F> for ALPAccessor<F> {
fn array_len(&self) -> usize {
self.encoded.array_len()
}

fn is_valid(&self, index: usize) -> bool {
self.validity.is_valid(index)
}

fn value_unchecked(&self, index: usize) -> F {
match self.patches.as_ref() {
Some(patches) if patches.is_valid(index) => patches.value_unchecked(index),
_ => {
let encoded = self.encoded.value_unchecked(index);
F::decode_single(encoded, self.exponents)
}
}
}

fn array_validity(&self) -> Validity {
self.validity.clone()
}

fn decode_batch(&self, start_idx: usize) -> Vec<F> {
let mut values = self
.encoded
.decode_batch(start_idx)
.into_iter()
.map(|v| F::decode_single(v, self.exponents))
.collect::<Vec<F>>();

if let Some(patches_accessor) = self.patches.as_ref() {
for (index, item) in values.iter_mut().enumerate() {
let index = index + start_idx;

if patches_accessor.is_valid(index) {
*item = patches_accessor.value_unchecked(index);
}
}
}

values
}
}

impl PrimitiveArrayTrait for ALPArray {
fn f32_accessor(&self) -> Option<AccessorRef<f32>> {
match self.dtype() {
DType::Primitive(PType::F32, _) => {
let patches = self
.patches()
.and_then(|p| p.with_dyn(|a| a.as_primitive_array_unchecked().f32_accessor()));

let encoded = self
.encoded()
.with_dyn(|a| a.as_primitive_array_unchecked().i32_accessor())
.unwrap_or_else(|| panic!("This is is an invariant of the ALP algorithm"));

Some(Arc::new(ALPAccessor::new(
encoded,
patches,
self.exponents(),
self.logical_validity().into_validity(),
)))
}
_ => None,
}
}

#[allow(clippy::unwrap_in_result)]
fn f64_accessor(&self) -> Option<AccessorRef<f64>> {
match self.dtype() {
DType::Primitive(PType::F64, _) => {
let patches = self
.patches()
.and_then(|p| p.with_dyn(|a| a.as_primitive_array_unchecked().f64_accessor()));

let encoded = self
.encoded()
.with_dyn(|a| a.as_primitive_array_unchecked().i64_accessor())
.expect("This is is an invariant of the ALP algorithm");
Some(Arc::new(ALPAccessor::new(
encoded,
patches,
self.exponents(),
self.logical_validity().into_validity(),
)))
}
_ => None,
}
}
}

impl ArrayValidity for ALPArray {
fn is_valid(&self, index: usize) -> bool {
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ harness = false
[[bench]]
name = "compare"
harness = false

[[bench]]
name = "iter"
harness = false
100 changes: 100 additions & 0 deletions vortex-array/benches/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use itertools::Itertools;
use vortex::array::PrimitiveArray;
use vortex::iter::VectorizedArrayIter;
use vortex::validity::Validity;
use vortex::variants::ArrayVariants;

fn std_iter(c: &mut Criterion) {
let data = (0_u32..1_000_000).map(Some).collect_vec();
c.bench_function("std_iter", |b| {
b.iter_batched(|| data.iter().copied(), do_work, BatchSize::SmallInput)
});
}

fn std_iter_no_option(c: &mut Criterion) {
let data = (0_u32..1_000_000).collect_vec();
c.bench_function("std_iter_no_option", |b| {
b.iter_batched(
|| data.iter().copied(),
|mut iter| {
let mut u = 0;
for n in iter.by_ref() {
u += n;
}
u
},
BatchSize::SmallInput,
)
});
}

fn vortex_iter(c: &mut Criterion) {
let data = PrimitiveArray::from_vec((0_u32..1_000_000).collect_vec(), Validity::AllValid);

c.bench_function("vortex_iter", |b| {
b.iter_batched(
|| data.as_primitive_array_unchecked().u32_iter().unwrap(),
do_work_vortex,
BatchSize::SmallInput,
)
});
}

fn vortex_iter_flat(c: &mut Criterion) {
let data = PrimitiveArray::from_vec((0_u32..1_000_000).collect_vec(), Validity::AllValid);

c.bench_function("vortex_iter_flat", |b| {
b.iter_batched(
|| {
data.as_primitive_array_unchecked()
.u32_iter()
.unwrap()
.flatten()
},
do_work,
BatchSize::SmallInput,
)
});
}

fn arrow_iter(c: &mut Criterion) {
let data = arrow_array::UInt32Array::from_iter(0_u32..1_000_000);
c.bench_function("arrow_iter", |b| {
b.iter_batched(|| data.iter(), do_work, BatchSize::SmallInput)
});
}

fn do_work(
mut iter: impl Iterator<Item = Option<u32>>,
) -> (u32, impl Iterator<Item = Option<u32>>) {
let mut u = 0;
for n in iter.by_ref() {
u += n.unwrap();
}
(u, iter)
}

fn do_work_vortex(iter: VectorizedArrayIter<u32>) -> u32 {
let mut sum = 0;
for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { *batch.get_unchecked(idx) };
}
}
}

sum
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(100);
targets = std_iter_no_option,
std_iter,
vortex_iter,
vortex_iter_flat,
arrow_iter,
);
criterion_main!(benches);
Loading

0 comments on commit 7c017cb

Please sign in to comment.