From e8e5557d9fb7353dad719f0469a5529f47a2f772 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 18:46:27 +0100 Subject: [PATCH] Remove AsContiguousFn (#346) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the comments on #341 , this PR removes the AsContiguousFn trait and as_contiguous function entirely, and instead pushes the relevant details into the implementation of `ArrayFlatten::flatten` for the `ChunkedArray` type. A few questions I've bumped into while doing this refactor: * Can DType::List only reference primitive types? I see ListScalar, but it’s not clear to me how Vortex would encode e.g. `List` or `List` * **Answer**: Deferring for now as List DTypes not fully support yet anyway * How should this work for ExtensionArray? * **Answer**: ExtensionArray can contain a ChunkedArray for its internal storage Array --- Cargo.toml | 2 +- bench-vortex/benches/compress_benchmark.rs | 4 +- vortex-alp/src/compute.rs | 39 +--- .../src/array/bool/compute/as_contiguous.rs | 27 --- vortex-array/src/array/bool/compute/mod.rs | 6 - vortex-array/src/array/chunked/compute/mod.rs | 23 +-- .../src/array/chunked/compute/take.rs | 16 +- vortex-array/src/array/chunked/flatten.rs | 192 ++++++++++++++++++ vortex-array/src/array/chunked/mod.rs | 15 +- vortex-array/src/array/constant/as_arrow.rs | 50 +++++ vortex-array/src/array/constant/compute.rs | 32 +-- vortex-array/src/array/constant/mod.rs | 1 + .../src/array/datetime/localdatetime.rs | 2 +- vortex-array/src/array/extension/compute.rs | 16 -- .../array/primitive/compute/as_contiguous.rs | 31 --- .../src/array/primitive/compute/mod.rs | 6 - vortex-array/src/array/sparse/compute/mod.rs | 65 +----- vortex-array/src/array/sparse/flatten.rs | 79 +++++-- vortex-array/src/array/struct/compute.rs | 53 ----- vortex-array/src/array/struct/mod.rs | 15 +- vortex-array/src/array/varbin/compute/mod.rs | 46 +---- vortex-array/src/array/varbin/mod.rs | 2 +- vortex-array/src/compress.rs | 15 +- vortex-array/src/compute/as_contiguous.rs | 67 ------ vortex-array/src/compute/mod.rs | 6 - vortex-array/src/flatten.rs | 15 +- vortex-array/src/lib.rs | 10 + vortex-array/src/typed.rs | 6 +- vortex-array/src/validity.rs | 29 +-- vortex-datetime-parts/src/compute.rs | 42 +--- vortex-dict/src/compute.rs | 9 +- vortex-fastlanes/src/bitpacking/compress.rs | 4 +- vortex-fastlanes/src/for/compress.rs | 6 + vortex-fastlanes/src/for/compute.rs | 37 +--- 34 files changed, 398 insertions(+), 570 deletions(-) delete mode 100644 vortex-array/src/array/bool/compute/as_contiguous.rs create mode 100644 vortex-array/src/array/chunked/flatten.rs create mode 100644 vortex-array/src/array/constant/as_arrow.rs delete mode 100644 vortex-array/src/array/primitive/compute/as_contiguous.rs delete mode 100644 vortex-array/src/compute/as_contiguous.rs diff --git a/Cargo.toml b/Cargo.toml index f04c450fbc..080a89a81a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,5 +107,5 @@ zigzag = "0.1.0" warnings = "deny" [workspace.lints.clippy] -all = "deny" +all = { level = "deny", priority = -1 } or_fun_call = "deny" diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index b41bda0499..e586cc404d 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn vortex_compress_taxi(c: &mut Criterion) { taxi_data_parquet(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - taxi"); group.sample_size(10); group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); group.finish() @@ -16,7 +16,7 @@ fn vortex_compress_taxi(c: &mut Criterion) { fn vortex_compress_medicare1(c: &mut Criterion) { let dataset = BenchmarkDatasets::PBI(Medicare1); dataset.as_uncompressed(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - medicare"); group.sample_size(10); group.bench_function("compress", |b| { b.iter(|| black_box(dataset.compress_to_vortex())) diff --git a/vortex-alp/src/compute.rs b/vortex-alp/src/compute.rs index aa5c3d9c18..19f7e36366 100644 --- a/vortex-alp/src/compute.rs +++ b/vortex-alp/src/compute.rs @@ -1,16 +1,13 @@ -use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, ArrayDType, IntoArray}; +use vortex::{Array, ArrayDType, IntoArray}; use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::{match_each_alp_float_ptype, ALPArray}; -impl_default_as_contiguous_fn!(ALPArray); - impl ArrayCompute for ALPArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -23,10 +20,6 @@ impl ArrayCompute for ALPArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl ScalarAtFn for ALPArray { @@ -68,33 +61,3 @@ impl SliceFn for ALPArray { .into_array()) } } - -#[cfg(test)] -mod test { - use vortex::array::primitive::PrimitiveArray; - use vortex::compute::as_contiguous::AsContiguousFn; - use vortex::compute::scalar_at::scalar_at; - use vortex::validity::Validity; - use vortex::IntoArray; - - use crate::ALPArray; - - #[test] - fn test_as_contiguous() { - let values = vec![1.0, 2.0, 3.0]; - let primitives = PrimitiveArray::from_vec(values, Validity::NonNullable); - let encoded = ALPArray::encode(primitives.into_array()).unwrap(); - let alp = ALPArray::try_from(&encoded).unwrap(); - - let flat = alp.as_contiguous(&[encoded]).unwrap(); - - let a: f64 = scalar_at(&flat, 0).unwrap().try_into().unwrap(); - let b: f64 = scalar_at(&flat, 1).unwrap().try_into().unwrap(); - - let c: f64 = scalar_at(&flat, 2).unwrap().try_into().unwrap(); - - assert_eq!(a, 1.0); - assert_eq!(b, 2.0); - assert_eq!(c, 3.0); - } -} diff --git a/vortex-array/src/array/bool/compute/as_contiguous.rs b/vortex-array/src/array/bool/compute/as_contiguous.rs deleted file mode 100644 index 54f5aeba3a..0000000000 --- a/vortex-array/src/array/bool/compute/as_contiguous.rs +++ /dev/null @@ -1,27 +0,0 @@ -use arrow_buffer::BooleanBuffer; -use vortex_error::VortexResult; - -use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::{Array, ArrayDType, IntoArray}; - -impl AsContiguousFn for BoolArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum()); - for buffer in arrays - .iter() - .map(|a| Self::try_from(a).unwrap().boolean_buffer()) - { - bools.extend(buffer.iter()) - } - - Ok(Self::try_new(BooleanBuffer::from(bools), validity)?.into_array()) - } -} diff --git a/vortex-array/src/array/bool/compute/mod.rs b/vortex-array/src/array/bool/compute/mod.rs index b8832b8113..36970c63c6 100644 --- a/vortex-array/src/array/bool/compute/mod.rs +++ b/vortex-array/src/array/bool/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::bool::BoolArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; use crate::compute::scalar_at::ScalarAtFn; @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod compare; mod fill; mod flatten; @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn compare(&self) -> Option<&dyn CompareFn> { Some(self) } diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index 7d2950a583..0469a11b97 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -2,23 +2,21 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::chunked::ChunkedArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::scalar_subtract::SubtractScalarFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::Array; mod slice; mod take; impl ArrayCompute for ChunkedArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { Some(self) } @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { - Some(self) - } -} - -impl AsContiguousFn for ChunkedArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - // Combine all the chunks into one, then call as_contiguous again. - let mut chunks = Vec::with_capacity(self.nchunks()); - for array in arrays { - for chunk in Self::try_from(array).unwrap().chunks() { - chunks.push(chunk); - } - } - as_contiguous(&chunks) - } } impl ScalarAtFn for ChunkedArray { diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index 96b357af7c..a4c6e5671b 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -52,10 +52,7 @@ impl TakeFn for ChunkedArray { #[cfg(test)] mod test { - use itertools::Itertools; - use crate::array::chunked::ChunkedArray; - use crate::compute::as_contiguous::as_contiguous; use crate::compute::take::take; use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray}; @@ -68,14 +65,11 @@ mod test { assert_eq!(arr.len(), 9); let indices = vec![0, 0, 6, 4].into_array(); - let result = as_contiguous( - &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) - .unwrap() - .chunks() - .collect_vec(), - ) - .unwrap() - .into_primitive(); + let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) + .unwrap() + .into_array() + .flatten_primitive() + .unwrap(); assert_eq!(result.typed_data::(), &[1, 1, 1, 2]); } } diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs new file mode 100644 index 0000000000..c70b5083d6 --- /dev/null +++ b/vortex-array/src/array/chunked/flatten.rs @@ -0,0 +1,192 @@ +use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; +use itertools::Itertools; +use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType, StructDType}; +use vortex_error::{vortex_bail, ErrString, VortexResult}; +use vortex_scalar::Scalar; + +use crate::accessor::ArrayAccessor; +use crate::array::bool::BoolArray; +use crate::array::chunked::ChunkedArray; +use crate::array::constant::ConstantArray; +use crate::array::extension::ExtensionArray; +use crate::array::primitive::PrimitiveArray; +use crate::array::r#struct::StructArray; +use crate::array::varbin::builder::VarBinBuilder; +use crate::array::varbin::VarBinArray; +use crate::validity::Validity; +use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; + +impl ArrayFlatten for ChunkedArray { + fn flatten(self) -> VortexResult { + try_flatten_chunks(self.chunks().collect(), self.dtype().clone()) + } +} + +pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { + let mismatched = chunks + .iter() + .filter(|chunk| !chunk.dtype().eq(&dtype)) + .collect::>(); + if !mismatched.is_empty() { + vortex_bail!(MismatchedTypes: dtype, ErrString::from(format!("{:?}", mismatched))) + } + + match &dtype { + // Structs can have their internal field pointers swizzled to push the chunking down + // one level internally without copying or decompressing any data. + DType::Struct(struct_dtype, _) => { + let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?; + Ok(Flattened::Struct(struct_array)) + } + + // Extension arrays wrap an internal storage array, which can hold a ChunkedArray until + // it is safe to unpack them. + DType::Extension(ext_dtype, _) => { + let ext_array = ExtensionArray::new( + ext_dtype.clone(), + ChunkedArray::try_new(chunks, dtype.clone())?.into_array(), + ); + + Ok(Flattened::Extension(ext_array)) + } + + // Lists just flatten into their inner PType + DType::List(..) => { + todo!() + } + + DType::Bool(nullability) => { + let bool_array = pack_bools(chunks.as_slice(), *nullability)?; + Ok(Flattened::Bool(bool_array)) + } + DType::Primitive(ptype, nullability) => { + let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?; + Ok(Flattened::Primitive(prim_array)) + } + DType::Utf8(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Binary(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Null => { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let const_array = ConstantArray::new(Scalar::null(DType::Null), len); + Ok(Flattened::Null(const_array)) + } + } +} + +/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single +/// StructArray, where the Array for each Field is a ChunkedArray. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn swizzle_struct_chunks( + chunks: &[Array], + struct_dtype: &StructDType, +) -> VortexResult { + let chunks: Vec = chunks.iter().map(StructArray::try_from).try_collect()?; + + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = chunks + .iter() + .map(|chunk| chunk.logical_validity()) + .collect::(); + + let mut field_arrays = Vec::new(); + + for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() { + let mut field_chunks = Vec::new(); + for chunk in &chunks { + field_chunks.push( + chunk + .field(field_idx) + .expect("all chunks must have same dtype"), + ); + } + let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; + field_arrays.push(field_array.into_array()); + } + + StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity) +} + +/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = validity_from_chunks(chunks, nullability); + let mut bools = Vec::with_capacity(len); + for chunk in chunks { + let chunk = chunk.clone().flatten_bool()?; + bools.extend(chunk.boolean_buffer().iter()); + } + + BoolArray::try_new(BooleanBuffer::from(bools), validity) +} + +/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single +/// contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_primitives( + chunks: &[Array], + ptype: PType, + nullability: Nullability, +) -> VortexResult { + let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = validity_from_chunks(chunks, nullability); + let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width()); + for chunk in chunks { + let chunk = chunk.clone().flatten_primitive()?; + buffer.extend_from_slice(chunk.buffer()); + } + + match_each_native_ptype!(ptype, |$T| { + Ok(PrimitiveArray::try_new( + ScalarBuffer::<$T>::from(buffer), + validity)?) + }) +} + +/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_varbin( + chunks: &[Array], + dtype: &DType, + _nullability: Nullability, +) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut builder = VarBinBuilder::::with_capacity(len); + + for chunk in chunks { + let chunk = chunk.clone().flatten_varbin()?; + chunk.with_iterator(|iter| { + for datum in iter { + builder.push(datum); + } + })?; + } + + Ok(builder.finish(dtype.clone())) +} + +fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity { + if nullability == Nullability::NonNullable { + Validity::NonNullable + } else { + chunks + .iter() + .map(|chunk| chunk.with_dyn(|a| a.logical_validity())) + .collect() + } +} diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 25d5eb360e..8d13e642aa 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -6,7 +6,6 @@ use vortex_error::vortex_bail; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; @@ -15,9 +14,10 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::Validity::NonNullable; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, ArrayFlatten, IntoArrayData, ToArrayData}; +use crate::{impl_encoding, ArrayDType, IntoArrayData, ToArrayData}; mod compute; +mod flatten; mod stats; impl_encoding!("vortex.chunked", Chunked); @@ -114,17 +114,6 @@ impl FromIterator for ChunkedArray { } } -impl ArrayFlatten for ChunkedArray { - fn flatten(self) -> VortexResult { - let chunks = self.chunks().collect_vec(); - if chunks.is_empty() { - // TODO(ngates): return an empty FlattenedArray with the correct DType. - panic!("Cannot yet flatten an empty chunked array"); - } - as_contiguous(chunks.as_slice())?.flatten() - } -} - impl AcceptArrayVisitor for ChunkedArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("chunk_ends", &self.chunk_ends())?; diff --git a/vortex-array/src/array/constant/as_arrow.rs b/vortex-array/src/array/constant/as_arrow.rs new file mode 100644 index 0000000000..f2e75cb0e0 --- /dev/null +++ b/vortex-array/src/array/constant/as_arrow.rs @@ -0,0 +1,50 @@ +//! Implementation of the [AsArrowArray] trait for [ConstantArray] that is representing +//! [DType::Null] values. + +use std::sync::Arc; + +use arrow_array::{ArrayRef as ArrowArrayRef, NullArray}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::array::constant::ConstantArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::{ArrayDType, ArrayTrait}; + +impl AsArrowArray for ConstantArray { + fn as_arrow(&self) -> VortexResult { + if self.dtype() != &DType::Null { + vortex_bail!(InvalidArgument: "only null ConstantArrays convert to arrow"); + } + + let arrow_null = NullArray::new(self.len()); + Ok(Arc::new(arrow_null)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{Array, NullArray}; + + use crate::array::constant::ConstantArray; + use crate::arrow::FromArrowArray; + use crate::compute::as_arrow::AsArrowArray; + use crate::{ArrayData, IntoArray}; + + #[test] + fn test_round_trip() { + let arrow_nulls = NullArray::new(10); + let vortex_nulls = ArrayData::from_arrow(&arrow_nulls, true).into_array(); + + assert_eq!( + *ConstantArray::try_from(vortex_nulls) + .unwrap() + .as_arrow() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + arrow_nulls + ); + } +} diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 4183e7e8ba..103b1693f7 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -1,19 +1,13 @@ -use itertools::Itertools; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::constant::ConstantArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::{Array, ArrayTrait, IntoArray}; +use crate::{Array, IntoArray}; impl ArrayCompute for ConstantArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -23,28 +17,6 @@ impl ArrayCompute for ConstantArray { } } -impl AsContiguousFn for ConstantArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let chunks = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if chunks.iter().map(|c| c.scalar()).all_equal() { - Ok(Self::new( - chunks.first().unwrap().scalar().clone(), - chunks.iter().map(|c| c.len()).sum(), - ) - .into_array()) - } else { - // TODO(ngates): we need to flatten the constant arrays and then concatenate them - Err(vortex_err!( - "Cannot concatenate constant arrays with differing scalars" - )) - } - } -} - impl ScalarAtFn for ConstantArray { fn scalar_at(&self, _index: usize) -> VortexResult { Ok(self.scalar().clone()) diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 0292398166..d3344fe6c9 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -7,6 +7,7 @@ use crate::impl_encoding; use crate::stats::Stat; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +mod as_arrow; mod compute; mod flatten; mod stats; diff --git a/vortex-array/src/array/datetime/localdatetime.rs b/vortex-array/src/array/datetime/localdatetime.rs index 849d84cf9c..5d854731bc 100644 --- a/vortex-array/src/array/datetime/localdatetime.rs +++ b/vortex-array/src/array/datetime/localdatetime.rs @@ -16,7 +16,7 @@ use crate::validity::ArrayValidity; use crate::{Array, ArrayDType, ArrayData, IntoArrayData}; lazy_static! { - static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); + pub static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); } pub struct LocalDateTimeArray { diff --git a/vortex-array/src/array/extension/compute.rs b/vortex-array/src/array/extension/compute.rs index 70ab9706c3..d7611d3f73 100644 --- a/vortex-array/src/array/extension/compute.rs +++ b/vortex-array/src/array/extension/compute.rs @@ -5,7 +5,6 @@ use vortex_scalar::Scalar; use crate::array::datetime::LocalDateTimeArray; use crate::array::extension::ExtensionArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::CastFn; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; @@ -18,10 +17,6 @@ impl ArrayCompute for ExtensionArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { // It's not possible to cast an extension array to another type. // TODO(ngates): we should allow some extension arrays to implement a callback @@ -54,17 +49,6 @@ impl AsArrowArray for ExtensionArray { } } -impl AsContiguousFn for ExtensionArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let storage_arrays = arrays - .iter() - .map(|a| Self::try_from(a).expect("not an extension array").storage()) - .collect::>(); - - Ok(Self::new(self.ext_dtype().clone(), as_contiguous(&storage_arrays)?).into_array()) - } -} - impl ScalarAtFn for ExtensionArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::extension( diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs deleted file mode 100644 index 150849ef32..0000000000 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ /dev/null @@ -1,31 +0,0 @@ -use arrow_buffer::{MutableBuffer, ScalarBuffer}; -use vortex_dtype::match_each_native_ptype; -use vortex_error::VortexResult; - -use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::ArrayDType; -use crate::{Array, IntoArray}; - -impl AsContiguousFn for PrimitiveArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut buffer = MutableBuffer::with_capacity( - arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), - ); - for array in arrays { - buffer.extend_from_slice(array.as_primitive().buffer()) - } - match_each_native_ptype!(self.ptype(), |$T| { - Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) - .unwrap() - .into_array()) - }) - } -} diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 8b87128a11..637f63f366 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::primitive::PrimitiveArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::cast::CastFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod cast; mod compare; mod fill; @@ -29,10 +27,6 @@ impl ArrayCompute for PrimitiveArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { Some(self) } diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index 722ba3e082..bb772bd0bd 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -2,25 +2,20 @@ use std::collections::HashMap; use itertools::Itertools; use vortex_dtype::match_each_integer_ptype; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::SliceFn; use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; -use crate::{Array, ArrayDType, ArrayTrait, IntoArray}; +use crate::{Array, ArrayDType, IntoArray}; mod slice; impl ArrayCompute for SparseArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -34,27 +29,6 @@ impl ArrayCompute for SparseArray { } } -impl AsContiguousFn for SparseArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let sparse = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if !sparse.iter().map(|a| a.fill_value()).all_equal() { - vortex_bail!("Cannot concatenate SparseArrays with differing fill values"); - } - - Ok(Self::new( - as_contiguous(&sparse.iter().map(|a| a.indices()).collect_vec())?, - as_contiguous(&sparse.iter().map(|a| a.values()).collect_vec())?, - sparse.iter().map(|a| a.len()).sum(), - self.fill_value().clone(), - ) - .into_array()) - } -} - impl ScalarAtFn for SparseArray { fn scalar_at(&self, index: usize) -> VortexResult { match self.find_index(index)? { @@ -144,8 +118,6 @@ mod test { use crate::array::primitive::PrimitiveArray; use crate::array::sparse::compute::take_map; use crate::array::sparse::SparseArray; - use crate::compute::as_contiguous::as_contiguous; - use crate::compute::slice::slice; use crate::compute::take::take; use crate::validity::Validity; use crate::{Array, ArrayTrait, IntoArray}; @@ -206,39 +178,6 @@ mod test { assert_eq!(taken.len(), 2); } - #[test] - fn take_slices_and_reassemble() { - let sparse = sparse_array(); - let slices = (0..10) - .map(|i| slice(&sparse, i * 10, (i + 1) * 10).unwrap()) - .collect_vec(); - - let taken = slices - .iter() - .map(|s| take(s, &(0u64..10).collect_vec().into_array()).unwrap()) - .collect_vec(); - for i in [1, 2, 5, 6, 7, 8] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 0); - } - for i in [0, 3, 4, 9] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 1); - } - - let contiguous = SparseArray::try_from(as_contiguous(&taken).unwrap()).unwrap(); - assert_eq!( - contiguous.indices().into_primitive().typed_data::(), - [0u64, 7, 7, 9] // relative offsets - ); - assert_eq!( - contiguous.values().into_primitive().typed_data::(), - SparseArray::try_from(sparse) - .unwrap() - .values() - .into_primitive() - .typed_data::() - ); - } - #[test] fn test_take_map() { let sparse = SparseArray::try_from(sparse_array()).unwrap(); diff --git a/vortex-array/src/array/sparse/flatten.rs b/vortex-array/src/array/sparse/flatten.rs index 8d13350d8b..0ebbef70cb 100644 --- a/vortex-array/src/array/sparse/flatten.rs +++ b/vortex-array/src/array/sparse/flatten.rs @@ -1,13 +1,14 @@ -use arrow_buffer::BooleanBufferBuilder; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use itertools::Itertools; -use vortex_dtype::{match_each_native_ptype, NativePType}; +use vortex_dtype::{match_each_native_ptype, DType, NativePType}; use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; +use crate::array::bool::BoolArray; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; use crate::validity::Validity; -use crate::{ArrayFlatten, ArrayTrait, Flattened}; +use crate::{ArrayDType, ArrayFlatten, ArrayTrait, Flattened}; impl ArrayFlatten for SparseArray { fn flatten(self) -> VortexResult { @@ -16,20 +17,50 @@ impl ArrayFlatten for SparseArray { let mut validity = BooleanBufferBuilder::new(self.len()); validity.append_n(self.len(), false); - let values = self.values().flatten_primitive()?; - match_each_native_ptype!(values.ptype(), |$P| { - flatten_sparse_values( - values.typed_data::<$P>(), - &indices, - self.len(), - self.fill_value(), - validity - ) - }) + + if matches!(self.dtype(), DType::Bool(_)) { + let values = self.values().flatten_bool()?.boolean_buffer(); + flatten_sparse_bools(values, &indices, self.len(), self.fill_value(), validity) + } else { + let values = self.values().flatten_primitive()?; + match_each_native_ptype!(values.ptype(), |$P| { + flatten_sparse_primitives( + values.typed_data::<$P>(), + &indices, + self.len(), + self.fill_value(), + validity + ) + }) + } + } +} + +fn flatten_sparse_bools( + values: BooleanBuffer, + indices: &[usize], + len: usize, + fill_value: &Scalar, + mut validity: BooleanBufferBuilder, +) -> VortexResult { + let fill_bool: bool = if fill_value.is_null() { + bool::default() + } else { + fill_value.try_into()? + }; + let mut flat_bools = vec![fill_bool; len]; + for idx in indices { + flat_bools[*idx] = values.value(*idx); + validity.set_bit(*idx, true); } + + let validity = Validity::from(validity.finish()); + let bool_values = BoolArray::from_vec(flat_bools, validity); + + Ok(Flattened::Bool(bool_values)) } -fn flatten_sparse_values TryFrom<&'a Scalar, Error = VortexError>>( +fn flatten_sparse_primitives TryFrom<&'a Scalar, Error = VortexError>>( values: &[T], indices: &[usize], len: usize, @@ -56,3 +87,23 @@ fn flatten_sparse_values TryFrom<&'a Scalar, Error = Vo }; Ok(Flattened::Primitive(array)) } + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::bool::BoolArray; + use crate::array::sparse::SparseArray; + use crate::validity::Validity; + use crate::{ArrayDType, ArrayFlatten, Flattened, IntoArray}; + + #[test] + fn test_sparse_bool() { + let indices = vec![0u64].into_array(); + let values = BoolArray::from_vec(vec![true], Validity::NonNullable).into_array(); + let sparse_bools = SparseArray::new(indices, values, 10, true.into()); + assert_eq!(*sparse_bools.dtype(), DType::Bool(Nullability::NonNullable)); + let flat_bools = sparse_bools.flatten().unwrap(); + assert!(matches!(flat_bools, Flattened::Bool(_))); + } +} diff --git a/vortex-array/src/array/struct/compute.rs b/vortex-array/src/array/struct/compute.rs index e7751652f1..7b46317b5b 100644 --- a/vortex-array/src/array/struct/compute.rs +++ b/vortex-array/src/array/struct/compute.rs @@ -10,12 +10,10 @@ use vortex_scalar::Scalar; use crate::array::r#struct::StructArray; use crate::compute::as_arrow::{as_arrow, AsArrowArray}; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; -use crate::validity::Validity; use crate::{Array, ArrayDType, IntoArray}; impl ArrayCompute for StructArray { @@ -23,10 +21,6 @@ impl ArrayCompute for StructArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -68,53 +62,6 @@ impl AsArrowArray for StructArray { } } -impl AsContiguousFn for StructArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let struct_arrays = arrays - .iter() - .map(Self::try_from) - .collect::>>()?; - let mut fields = vec![Vec::new(); self.dtypes().len()]; - for array in struct_arrays.iter() { - for (f, field) in fields.iter_mut().enumerate() { - field.push(array.field(f).unwrap()); - } - } - - let fields_len = fields - .first() - .map(|field| field.iter().map(|a| a.len()).sum()) - .unwrap_or_default(); - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - Self::try_new( - self.names().clone(), - fields - .iter() - .map(|field_arrays| { - // Currently, as_contiguous cannot handle sub-arrays with differing encodings. - // So, first flatten each constituent array, then as_contiguous them back into - // a single array. - let flattened = field_arrays - .iter() - .cloned() - .map(|array| array.flatten().unwrap().into_array()) - .collect::>(); - as_contiguous(flattened.as_slice()) - }) - .try_collect()?, - fields_len, - validity, - ) - .map(|a| a.into_array()) - } -} - impl ScalarAtFn for StructArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::r#struct( diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index 91d4c6c10b..f25ddcc0ba 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -99,20 +99,9 @@ impl StructArray { } impl ArrayFlatten for StructArray { + /// StructEncoding is the canonical form for a [DType::Struct] array, so return self. fn flatten(self) -> VortexResult { - Ok(Flattened::Struct(Self::try_new( - self.names().clone(), - (0..self.nfields()) - .map(|i| { - self.field(i) - .expect("Missing child") - .flatten() - .map(|f| f.into_array()) - }) - .collect::>>()?, - self.len(), - self.validity(), - )?)) + Ok(Flattened::Struct(self)) } } diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index fbf94b6e77..7d3c31401b 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -3,24 +3,21 @@ use std::sync::Arc; use arrow_array::{ ArrayRef as ArrowArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, }; -use itertools::Itertools; use vortex_dtype::DType; use vortex_dtype::PType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; -use crate::array::primitive::PrimitiveArray; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::arrow::wrappers::as_offset_buffer; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::cast; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::validity::{ArrayValidity, Validity}; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::validity::ArrayValidity; +use crate::{ArrayDType, ToArray}; mod slice; mod take; @@ -30,10 +27,6 @@ impl ArrayCompute for VarBinArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -47,41 +40,6 @@ impl ArrayCompute for VarBinArray { } } -impl AsContiguousFn for VarBinArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let bytes_chunks: Vec = arrays - .iter() - .map(|a| Self::try_from(a).unwrap().sliced_bytes()) - .try_collect()?; - let bytes = as_contiguous(&bytes_chunks)?; - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut offsets = Vec::new(); - offsets.push(0); - for a in arrays.iter().map(|a| Self::try_from(a).unwrap()) { - let first_offset: u64 = a.first_offset()?; - let offsets_array = cast(&a.offsets(), PType::U64.into())?.flatten_primitive()?; - let shift = offsets.last().copied().unwrap_or(0); - offsets.extend( - offsets_array - .typed_data::() - .iter() - .skip(1) // Ignore the zero offset for each array - .map(|o| o + shift - first_offset), - ); - } - - let offsets_array = PrimitiveArray::from(offsets).into_array(); - - Self::try_new(offsets_array, bytes, self.dtype().clone(), validity).map(|a| a.into_array()) - } -} - impl AsArrowArray for VarBinArray { fn as_arrow(&self) -> VortexResult { // Ensure the offsets are either i32 or i64 diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 4a7ec6afab..cfd8de1337 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -41,7 +41,7 @@ impl VarBinArray { if !offsets.dtype().is_int() || offsets.dtype().is_nullable() { vortex_bail!(MismatchedTypes: "non nullable int", offsets.dtype()); } - if !matches!(bytes.dtype(), &DType::BYTES,) { + if !matches!(bytes.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", bytes.dtype()); } if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) { diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 26cf51cc46..8e1d0843d2 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -13,7 +13,7 @@ use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; -use crate::{compute, Array, ArrayDType, ArrayDef, ArrayTrait, Context, IntoArray}; +use crate::{Array, ArrayDType, ArrayDef, ArrayFlatten, ArrayTrait, Context, IntoArray}; pub trait EncodingCompression: ArrayEncoding { fn cost(&self) -> u8 { @@ -311,16 +311,19 @@ pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResu } // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = compute::as_contiguous::as_contiguous( - &stratified_slices( + let sample = ChunkedArray::try_new( + stratified_slices( array.len(), compressor.options.sample_size, compressor.options.sample_count, ) .into_iter() - .map(|(start, stop)| slice(array, start, stop).unwrap()) - .collect::>(), - )?; + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .flatten()? + .into_array(); find_best_compression(candidates, &sample, compressor)? .map(|(compression, best)| { diff --git a/vortex-array/src/compute/as_contiguous.rs b/vortex-array/src/compute/as_contiguous.rs deleted file mode 100644 index 3dd51c39d9..0000000000 --- a/vortex-array/src/compute/as_contiguous.rs +++ /dev/null @@ -1,67 +0,0 @@ -use itertools::Itertools; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - -use crate::{Array, ArrayDType}; - -/// Trait that exposes an operation for repacking (and possibly decompressing) an [Array] into -/// a new Array that occupies a contiguous memory range. -pub trait AsContiguousFn { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult; -} - -#[macro_export] -macro_rules! impl_default_as_contiguous_fn { - ($typ:ty) => { - impl $crate::compute::as_contiguous::AsContiguousFn for $typ { - fn as_contiguous(&self, arrays: &[$crate::Array]) -> vortex_error::VortexResult<$crate::Array> { - let dtype = $crate::ArrayDType::dtype(self).clone(); - if !arrays - .iter() - .map(|array| $crate::ArrayDType::dtype(array).clone()) - .all(|dty| dty == dtype) - { - vortex_error::vortex_bail!(ComputeError: "mismatched dtypes in call to as_contiguous"); - } - - let mut chunks = Vec::with_capacity(arrays.len()); - for array in arrays { - chunks.push(array.clone().flatten()?.into_array()); - } - - let chunked_array = $crate::array::chunked::ChunkedArray::try_new(chunks, dtype)?.into_array(); - $crate::compute::as_contiguous::as_contiguous(&[chunked_array]) - } - } - }; -} - -pub fn as_contiguous(arrays: &[Array]) -> VortexResult { - // Simple case: slice with 1 element - if arrays.len() == 1 { - return Ok(arrays[0].clone()); - } - - if arrays.is_empty() { - vortex_bail!(ComputeError: "No arrays to concatenate"); - } - if !arrays.iter().map(|chunk| chunk.encoding().id()).all_equal() { - vortex_bail!(ComputeError: "Chunks have differing encodings"); - } - if !arrays.iter().map(|chunk| chunk.dtype()).all_equal() { - vortex_bail!(ComputeError: - "Chunks have differing dtypes", - ); - } - - let first = arrays.first().unwrap(); - first.with_dyn(|a| { - a.as_contiguous() - .map(|f| f.as_contiguous(arrays)) - .unwrap_or_else(|| { - Err(vortex_err!( - NotImplemented: "as_contiguous", - first.encoding().id() - )) - }) - }) -} diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 8ca8fd1815..9f2f18df63 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -1,5 +1,4 @@ use as_arrow::AsArrowArray; -use as_contiguous::AsContiguousFn; use cast::CastFn; use compare::CompareFn; use fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::filter_indices::FilterIndicesFn; use crate::compute::scalar_subtract::SubtractScalarFn; pub mod as_arrow; -pub mod as_contiguous; pub mod cast; pub mod compare; pub mod fill; @@ -30,10 +28,6 @@ pub trait ArrayCompute { None } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - None - } - fn cast(&self) -> Option<&dyn CastFn> { None } diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index a51f31fbcc..da991117a7 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -1,6 +1,7 @@ use vortex_error::VortexResult; use crate::array::bool::BoolArray; +use crate::array::constant::ConstantArray; use crate::array::extension::ExtensionArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; @@ -11,6 +12,7 @@ use crate::{Array, IntoArray}; /// The set of encodings that can be converted to Arrow with zero-copy. pub enum Flattened { + Null(ConstantArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), @@ -19,12 +21,12 @@ pub enum Flattened { Extension(ExtensionArray), } -/// Support trait for decompressing arrays that have been encoded via a [crate::compress::Compressor]. +/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. /// -/// A flattened array is a copying operation, returning new memory holding the same data in -/// its simplest form. +/// Flattening an Array ensures that the array's encoding matches one of the builtin canonical +/// encodings, each of which has a corresponding [Flattened] variant. /// -/// DType remains the same before and after a flatten operation. +/// **Important**: DType remains the same before and after a flatten operation. pub trait ArrayFlatten { fn flatten(self) -> VortexResult; } @@ -34,6 +36,10 @@ impl Array { ArrayEncoding::flatten(self.encoding(), self) } + pub fn flatten_extension(self) -> VortexResult { + ExtensionArray::try_from(self.flatten()?.into_array()) + } + pub fn flatten_bool(self) -> VortexResult { BoolArray::try_from(self.flatten()?.into_array()) } @@ -50,6 +56,7 @@ impl Array { impl IntoArray for Flattened { fn into_array(self) -> Array { match self { + Self::Null(a) => a.into_array(), Self::Bool(a) => a.into_array(), Self::Primitive(a) => a.into_array(), Self::Struct(a) => a.into_array(), diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index ed3955222e..b60e554d7e 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,3 +1,13 @@ +//! Vortex crate containing core logic for encoding and memory representation of [arrays](Array). +//! +//! At the heart of Vortex are [arrays](Array) and [encodings](crate::encoding::EncodingCompression). +//! Arrays are typed views of memory buffers that hold [scalars](vortex_scalar::Scalar). These +//! buffers can be held in a number of physical encodings to perform lightweight compression that +//! exploits the particular data distribution of the array's values. +//! +//! Every data type recognized by Vortex also has a canonical physical encoding format, which +//! arrays can be [flattened](Flattened) into for ease of access in compute functions. +//! pub mod accessor; pub mod array; pub mod arrow; diff --git a/vortex-array/src/typed.rs b/vortex-array/src/typed.rs index 49184e8c5e..4284284d6d 100644 --- a/vortex-array/src/typed.rs +++ b/vortex-array/src/typed.rs @@ -48,7 +48,11 @@ impl TryFrom for TypedArray { fn try_from(array: Array) -> Result { if array.encoding().id() != D::ENCODING.id() { - vortex_bail!("incorrect encoding"); + vortex_bail!( + "incorrect encoding {}, expected {}", + array.encoding().id().as_ref(), + D::ENCODING.id().as_ref(), + ); } let metadata = match &array { Array::Data(d) => d diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index bf0aa4967b..a88fa66714 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -1,10 +1,9 @@ -use arrow_buffer::{BooleanBuffer, NullBuffer}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use serde::{Deserialize, Serialize}; use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; use crate::compute::take::take; @@ -196,17 +195,23 @@ impl FromIterator for Validity { return Self::AllInvalid; } - // Otherwise, map each to a bool array and concatenate them. - let arrays = validities - .iter() - .map(|v| { - v.to_present_null_buffer() - .unwrap() - .into_array_data() + // Else, construct the boolean buffer + let mut buffer = BooleanBufferBuilder::new(validities.iter().map(|v| v.len()).sum()); + for validity in validities { + let present = match validity { + LogicalValidity::AllValid(count) => BooleanBuffer::new_set(count), + LogicalValidity::AllInvalid(count) => BooleanBuffer::new_unset(count), + LogicalValidity::Array(array) => array .into_array() - }) - .collect::>(); - Self::Array(as_contiguous(&arrays).unwrap()) + .flatten_bool() + .expect("validity must flatten to BoolArray") + .boolean_buffer(), + }; + buffer.append_buffer(&present); + } + let bool_array = BoolArray::try_new(buffer.finish(), Validity::NonNullable) + .expect("BoolArray::try_new from BooleanBuffer should always succeed"); + Self::Array(bool_array.into_array()) } } diff --git a/vortex-datetime-parts/src/compute.rs b/vortex-datetime-parts/src/compute.rs index 4a525af07f..e19241a29f 100644 --- a/vortex-datetime-parts/src/compute.rs +++ b/vortex-datetime-parts/src/compute.rs @@ -1,12 +1,11 @@ use vortex::array::datetime::{try_parse_time_unit, LocalDateTimeArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; -use vortex::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::validity::ArrayValidity; -use vortex::{Array, ArrayDType, ArrayFlatten, IntoArray}; +use vortex::{Array, ArrayDType, IntoArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; @@ -14,19 +13,15 @@ use vortex_scalar::Scalar; use crate::DateTimePartsArray; impl ArrayCompute for DateTimePartsArray { - fn slice(&self) -> Option<&dyn SliceFn> { - Some(self) - } - - fn take(&self) -> Option<&dyn TakeFn> { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + fn take(&self) -> Option<&dyn TakeFn> { Some(self) } } @@ -142,35 +137,6 @@ pub fn decode_to_localdatetime(array: &Array) -> VortexResult VortexResult { - let dtype = self.dtype().clone(); - - if !arrays - .iter() - .map(|array| array.dtype().clone()) - .all(|dty| dty == dtype) - { - vortex_bail!(ComputeError: "mismatched dtypes in call to as_contiguous"); - } - - let mut chunks = Vec::with_capacity(arrays.len()); - - for array in arrays { - let dt_parts = Self::try_from(array)?; - chunks.push(dt_parts.flatten()?.into_array()); - } - - // Reduces down to as_contiguous on the flattened variants. - as_contiguous(chunks.as_slice()) - } -} - #[cfg(test)] mod test { use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index f59febc8aa..3ceba51342 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -1,16 +1,13 @@ -use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, IntoArray}; +use vortex::{Array, IntoArray}; use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::DictArray; -impl_default_as_contiguous_fn!(DictArray); - impl ArrayCompute for DictArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -23,10 +20,6 @@ impl ArrayCompute for DictArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl ScalarAtFn for DictArray { diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 63ed4dba55..bde98d42cf 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -300,7 +300,9 @@ pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult( packed: &[u8], bit_width: usize, diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 7f56d821b1..e7a3cc82f0 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -4,6 +4,7 @@ use vortex::array::constant::ConstantArray; use vortex::array::primitive::PrimitiveArray; use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; +use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; use vortex_error::{vortex_err, VortexResult}; @@ -29,6 +30,11 @@ impl EncodingCompression for FoREncoding { return None; } + // For all-null, cannot encode. + if parray.logical_validity().all_invalid() { + return None; + } + // Nothing for us to do if the min is already zero and tz == 0 let shift = trailing_zeros(array); let min = parray.statistics().compute_as_cast::(Stat::Min)?; diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 4b18d308fe..463ae7c6ce 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -1,17 +1,14 @@ -use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, IntoArray}; +use vortex::{Array, IntoArray}; use vortex_dtype::match_each_integer_ptype; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::{PrimitiveScalar, Scalar, ScalarValue}; use crate::FoRArray; -impl_default_as_contiguous_fn!(FoRArray); - impl ArrayCompute for FoRArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -24,10 +21,6 @@ impl ArrayCompute for FoRArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl TakeFn for FoRArray { @@ -75,7 +68,6 @@ impl SliceFn for FoRArray { mod test { use vortex::array::primitive::PrimitiveArray; use vortex::compress::{Compressor, EncodingCompression}; - use vortex::compute::as_contiguous::as_contiguous; use vortex::compute::scalar_at::scalar_at; use vortex::Context; @@ -94,31 +86,4 @@ mod test { assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into()); assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into()); } - - #[test] - fn for_as_contiguous() { - let forarr1 = FoREncoding - .compress( - PrimitiveArray::from(vec![1, 2, 3, 4]).array(), - None, - Compressor::new(&Context::default()), - ) - .unwrap(); - let forarr2 = FoREncoding - .compress( - PrimitiveArray::from(vec![5, 6, 7, 8]).array(), - None, - Compressor::new(&Context::default()), - ) - .unwrap(); - - let flattened = as_contiguous(&[forarr1, forarr2]).unwrap(); - - [1, 2, 3, 4, 5, 6, 7, 8] - .iter() - .enumerate() - .for_each(|(idx, value)| { - assert_eq!(scalar_at(&flattened, idx).unwrap(), (*value).into()); - }); - } }