From b8bd7e63c647745240648840ecf673e20f9237ee Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 12 Apr 2024 15:03:43 -0400 Subject: [PATCH] Array2 compute (#224) --- Cargo.lock | 9 + Cargo.toml | 5 +- bench-vortex/Cargo.toml | 7 +- vortex-array/flatbuffers/array.fbs | 7 +- vortex-array/src/serde/view.rs | 14 +- vortex-array2/Cargo.toml | 5 + vortex-array2/src/array/bool/compute.rs | 26 -- .../src/array/bool/compute/as_arrow.rs | 17 + .../src/array/bool/compute/as_contiguous.rs | 27 ++ vortex-array2/src/array/bool/compute/fill.rs | 49 +++ .../src/array/bool/compute/flatten.rs | 1 + vortex-array2/src/array/bool/compute/mod.rs | 36 +++ .../src/array/bool/compute/scalar_at.rs | 19 ++ vortex-array2/src/array/bool/compute/take.rs | 59 ++++ vortex-array2/src/array/bool/mod.rs | 136 ++++---- vortex-array2/src/array/bool/stats.rs | 44 +++ vortex-array2/src/array/mod.rs | 1 - vortex-array2/src/array/primitive/compute.rs | 26 -- .../src/array/primitive/compute/as_arrow.rs | 41 +++ .../array/primitive/compute/as_contiguous.rs | 30 ++ .../src/array/primitive/compute/cast.rs | 75 +++++ .../src/array/primitive/compute/fill.rs | 71 +++++ .../src/array/primitive/compute/mod.rs | 47 +++ .../src/array/primitive/compute/scalar_at.rs | 22 ++ .../array/primitive/compute/search_sorted.rs | 45 +++ .../src/array/primitive/compute/take.rs | 43 +++ vortex-array2/src/array/primitive/mod.rs | 132 ++++---- vortex-array2/src/array/primitive/stats.rs | 293 ++++++++++++++++++ vortex-array2/src/array/ree/compute.rs | 17 - vortex-array2/src/array/ree/mod.rs | 104 ------- vortex-array2/src/array/struct/compute.rs | 17 - vortex-array2/src/array/struct/mod.rs | 99 ++---- vortex-array2/src/arrow/array.rs | 283 +++++++++++++++++ vortex-array2/src/arrow/mod.rs | 2 + vortex-array2/src/arrow/recordbatch.rs | 29 ++ vortex-array2/src/batch.rs | 90 ------ vortex-array2/src/buffer.rs | 89 ++++++ vortex-array2/src/compute.rs | 55 ---- vortex-array2/src/compute/as_arrow.rs | 37 +++ vortex-array2/src/compute/as_contiguous.rs | 36 +++ vortex-array2/src/compute/cast.rs | 21 ++ vortex-array2/src/compute/fill.rs | 24 ++ vortex-array2/src/compute/mod.rs | 51 +++ vortex-array2/src/compute/patch.rs | 28 ++ vortex-array2/src/compute/scalar_at.rs | 22 ++ vortex-array2/src/compute/search_sorted.rs | 144 +++++++++ vortex-array2/src/compute/take.rs | 24 ++ vortex-array2/src/data.rs | 147 +++------ vortex-array2/src/encoding.rs | 88 +++--- vortex-array2/src/flatten.rs | 44 +++ vortex-array2/src/implementation.rs | 69 +++-- vortex-array2/src/lib.rs | 128 +++++--- vortex-array2/src/metadata.rs | 2 +- vortex-array2/src/stats.rs | 48 ++- vortex-array2/src/tree.rs | 37 +-- vortex-array2/src/typed.rs | 160 ++++++++++ vortex-array2/src/validity.rs | 118 ++++++- vortex-array2/src/view.rs | 29 +- vortex-array2/src/visitor.rs | 7 +- vortex-dict/src/compute.rs | 3 + vortex-ipc/Cargo.toml | 11 +- vortex-ipc/benches/ipc_take.rs | 62 ++++ vortex-ipc/src/lib.rs | 23 +- vortex-ipc/src/messages.rs | 8 +- vortex-ipc/src/reader.rs | 18 +- 65 files changed, 2579 insertions(+), 882 deletions(-) delete mode 100644 vortex-array2/src/array/bool/compute.rs create mode 100644 vortex-array2/src/array/bool/compute/as_arrow.rs create mode 100644 vortex-array2/src/array/bool/compute/as_contiguous.rs create mode 100644 vortex-array2/src/array/bool/compute/fill.rs create mode 100644 vortex-array2/src/array/bool/compute/flatten.rs create mode 100644 vortex-array2/src/array/bool/compute/mod.rs create mode 100644 vortex-array2/src/array/bool/compute/scalar_at.rs create mode 100644 vortex-array2/src/array/bool/compute/take.rs create mode 100644 vortex-array2/src/array/bool/stats.rs delete mode 100644 vortex-array2/src/array/primitive/compute.rs create mode 100644 vortex-array2/src/array/primitive/compute/as_arrow.rs create mode 100644 vortex-array2/src/array/primitive/compute/as_contiguous.rs create mode 100644 vortex-array2/src/array/primitive/compute/cast.rs create mode 100644 vortex-array2/src/array/primitive/compute/fill.rs create mode 100644 vortex-array2/src/array/primitive/compute/mod.rs create mode 100644 vortex-array2/src/array/primitive/compute/scalar_at.rs create mode 100644 vortex-array2/src/array/primitive/compute/search_sorted.rs create mode 100644 vortex-array2/src/array/primitive/compute/take.rs create mode 100644 vortex-array2/src/array/primitive/stats.rs delete mode 100644 vortex-array2/src/array/ree/compute.rs delete mode 100644 vortex-array2/src/array/ree/mod.rs delete mode 100644 vortex-array2/src/array/struct/compute.rs create mode 100644 vortex-array2/src/arrow/array.rs create mode 100644 vortex-array2/src/arrow/mod.rs create mode 100644 vortex-array2/src/arrow/recordbatch.rs delete mode 100644 vortex-array2/src/batch.rs create mode 100644 vortex-array2/src/buffer.rs delete mode 100644 vortex-array2/src/compute.rs create mode 100644 vortex-array2/src/compute/as_arrow.rs create mode 100644 vortex-array2/src/compute/as_contiguous.rs create mode 100644 vortex-array2/src/compute/cast.rs create mode 100644 vortex-array2/src/compute/fill.rs create mode 100644 vortex-array2/src/compute/mod.rs create mode 100644 vortex-array2/src/compute/patch.rs create mode 100644 vortex-array2/src/compute/scalar_at.rs create mode 100644 vortex-array2/src/compute/search_sorted.rs create mode 100644 vortex-array2/src/compute/take.rs create mode 100644 vortex-array2/src/flatten.rs create mode 100644 vortex-array2/src/typed.rs create mode 100644 vortex-ipc/benches/ipc_take.rs diff --git a/Cargo.lock b/Cargo.lock index c55210ea94..c93951c46f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -974,6 +974,7 @@ dependencies = [ "tokio", "vortex-alp", "vortex-array", + "vortex-array2", "vortex-datetime", "vortex-dict", "vortex-error", @@ -5197,12 +5198,17 @@ dependencies = [ name = "vortex-array2" version = "0.1.0" dependencies = [ + "arrow-array 51.0.0", "arrow-buffer 51.0.0", + "arrow-schema 51.0.0", "flatbuffers", "flexbuffers", "half", "humansize", + "itertools 0.12.1", "linkme", + "log", + "num-traits", "paste", "serde", "vortex-array", @@ -5282,11 +5288,14 @@ name = "vortex-ipc" version = "0.1.0" dependencies = [ "arrow-buffer 51.0.0", + "criterion", "flatbuffers", "flatc", "itertools 0.12.1", "lending-iterator", "nougat", + "rand", + "simplelog", "streaming-iterator", "vortex-array", "vortex-array2", diff --git a/Cargo.toml b/Cargo.toml index 3eafabf689..3a3e642b08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,9 +49,12 @@ bindgen = "0.69.4" criterion = { version = "0.5.1", features = ["html_reports"] } croaring = "1.0.1" divan = "0.1.14" +enum_dispatch = "0.3.13" +enum-iterator = "2.0.0" flatbuffers = "23.5.26" flexbuffers = "2.0.0" flatc = "0.2.2" +fs_extra = "1.3.0" half = { version = "^2", features = ["std", "num-traits"] } hashbrown = "0.14.3" humansize = "2.1.3" @@ -78,8 +81,6 @@ bzip2 = "0.4.4" csv = "1.3.0" arrow-csv = "51.0.0" lazy_static = "1.4.0" -enum-iterator = "2.0.0" -fs_extra = "1.3.0" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 6e570a0ff4..e6d9f95f5d 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -17,8 +17,11 @@ workspace = true [dependencies] arrow-array = { workspace = true } arrow-select = { workspace = true } +enum-iterator = { workspace = true } +fs_extra = { workspace = true } vortex-alp = { path = "../vortex-alp" } vortex-array = { path = "../vortex-array" } +vortex-array2 = { path = "../vortex-array2" } vortex-datetime = { path = "../vortex-datetime" } vortex-dict = { path = "../vortex-dict" } vortex-error = { path = "../vortex-error", features = ["parquet"] } @@ -32,6 +35,7 @@ itertools = { workspace = true } lance = { version = "0.10.5", features = [] } lance-arrow-array = { package = "arrow-array", version = "50.0" } lance-parquet = { package = "parquet", version = "50.0", features = [] } +lazy_static = { workspace = true } log = { workspace = true } parquet = { workspace = true, features = [] } reqwest = { workspace = true } @@ -41,9 +45,6 @@ bzip2 = { workspace = true } csv = { workspace = true } arrow-csv = { workspace = true } arrow = { workspace = true } -lazy_static = { workspace = true } -enum-iterator = { workspace = true } -fs_extra = { workspace = true } humansize = { workspace = true } [dev-dependencies] diff --git a/vortex-array/flatbuffers/array.fbs b/vortex-array/flatbuffers/array.fbs index 3a317fb090..2dedd3927c 100644 --- a/vortex-array/flatbuffers/array.fbs +++ b/vortex-array/flatbuffers/array.fbs @@ -4,16 +4,11 @@ enum Version: uint8 { V0 = 0, } -// TODO(ngates): figure out if flatbuffers supports optional elements in a vector. -table ArrayChild { - child: Array; -} - table Array { version: Version = V0; encoding: uint16; metadata: [ubyte]; - children: [ArrayChild]; + children: [Array]; nbuffers: uint16; } diff --git a/vortex-array/src/serde/view.rs b/vortex-array/src/serde/view.rs index 87e00c3517..5a0cafebbc 100644 --- a/vortex-array/src/serde/view.rs +++ b/vortex-array/src/serde/view.rs @@ -101,12 +101,7 @@ impl<'a> ArrayView<'a> { .children()? .iter() .take(idx) - .map(|child| { - child - .child() - .map(|c| Self::cumulative_nbuffers(c)) - .unwrap_or_default() - }) + .map(|child| Self::cumulative_nbuffers(child)) .sum(); let buffer_count = Self::cumulative_nbuffers(child); @@ -124,7 +119,7 @@ impl<'a> ArrayView<'a> { fn array_child(&self, idx: usize) -> Option> { let children = self.array.children()?; if idx < children.len() { - children.get(idx).child() + Some(children.get(idx)) } else { None } @@ -139,10 +134,7 @@ impl<'a> ArrayView<'a> { fn cumulative_nbuffers(array: fb::Array) -> usize { let mut nbuffers = array.nbuffers() as usize; for child in array.children().unwrap_or_default() { - nbuffers += child - .child() - .map(|c| Self::cumulative_nbuffers(c)) - .unwrap_or_default(); + nbuffers += Self::cumulative_nbuffers(child) } nbuffers } diff --git a/vortex-array2/Cargo.toml b/vortex-array2/Cargo.toml index ddd336f579..7cd7911db0 100644 --- a/vortex-array2/Cargo.toml +++ b/vortex-array2/Cargo.toml @@ -12,12 +12,17 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } flatbuffers = { workspace = true } flexbuffers = { workspace = true } half = { workspace = true } humansize = { workspace = true } +itertools = { workspace = true } linkme = { workspace = true } +log = { workspace = true } +num-traits = { workspace = true } paste = { workspace = true } serde = { workspace = true, features = ["derive"] } vortex-array = { path = "../vortex-array", features = ["serde"] } diff --git a/vortex-array2/src/array/bool/compute.rs b/vortex-array2/src/array/bool/compute.rs deleted file mode 100644 index 0e172887ad..0000000000 --- a/vortex-array2/src/array/bool/compute.rs +++ /dev/null @@ -1,26 +0,0 @@ -use vortex::scalar::{BoolScalar, Scalar}; -use vortex_error::VortexResult; - -use crate::array::bool::BoolArray; -use crate::compute::{ArrayCompute, ScalarAtFn}; -use crate::validity::ArrayValidity; -use crate::ArrayTrait; - -impl ArrayCompute for BoolArray<'_> { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - Some(self) - } -} - -impl ScalarAtFn for BoolArray<'_> { - fn scalar_at(&self, index: usize) -> VortexResult { - if self.is_valid(index) { - let value = self.boolean_buffer().value(index); - Ok(Scalar::Bool( - BoolScalar::try_new(Some(value), self.dtype().nullability()).unwrap(), - )) - } else { - Ok(Scalar::null(self.dtype())) - } - } -} diff --git a/vortex-array2/src/array/bool/compute/as_arrow.rs b/vortex-array2/src/array/bool/compute/as_arrow.rs new file mode 100644 index 0000000000..6eb760efe1 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/as_arrow.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use arrow_array::{ArrayRef as ArrowArrayRef, BooleanArray as ArrowBoolArray}; +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::validity::ArrayValidity; + +impl AsArrowArray for BoolArray<'_> { + fn as_arrow(&self) -> VortexResult { + Ok(Arc::new(ArrowBoolArray::new( + self.boolean_buffer().clone(), + self.logical_validity().to_null_buffer()?, + ))) + } +} diff --git a/vortex-array2/src/array/bool/compute/as_contiguous.rs b/vortex-array2/src/array/bool/compute/as_contiguous.rs new file mode 100644 index 0000000000..c6f1c85715 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/as_contiguous.rs @@ -0,0 +1,27 @@ +use arrow_buffer::BooleanBuffer; +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::compute::as_contiguous::AsContiguousFn; +use crate::validity::Validity; +use crate::{Array, ArrayTrait, IntoArray}; + +impl AsContiguousFn for BoolArray<'_> { + fn as_contiguous(&self, arrays: &[Array]) -> VortexResult> { + let validity = if self.dtype().is_nullable() { + Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) + } else { + Validity::NonNullable + }; + + let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum()); + for buffer in arrays + .iter() + .map(|a| BoolArray::try_from(a.clone()).unwrap().boolean_buffer()) + { + bools.extend(buffer.iter()) + } + + Ok(BoolArray::try_new(BooleanBuffer::from(bools), validity)?.into_array()) + } +} diff --git a/vortex-array2/src/array/bool/compute/fill.rs b/vortex-array2/src/array/bool/compute/fill.rs new file mode 100644 index 0000000000..bfb85ba344 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/fill.rs @@ -0,0 +1,49 @@ +use vortex_error::VortexResult; +use vortex_schema::Nullability; + +use crate::array::bool::BoolArray; +use crate::compute::fill::FillForwardFn; +use crate::validity::ArrayValidity; +use crate::{Array, ArrayTrait, IntoArray, ToArrayData}; + +impl FillForwardFn for BoolArray<'_> { + fn fill_forward(&self) -> VortexResult> { + if self.dtype().nullability() == Nullability::NonNullable { + return Ok(self.to_array_data().into_array()); + } + + let validity = self.logical_validity().to_null_buffer()?.unwrap(); + let bools = self.boolean_buffer(); + let mut last_value = false; + let filled = bools + .iter() + .zip(validity.inner().iter()) + .map(|(v, valid)| { + if valid { + last_value = v; + } + last_value + }) + .collect::>(); + Ok(BoolArray::from(filled).into_array()) + } +} + +#[cfg(test)] +mod test { + use crate::array::bool::BoolArray; + use crate::validity::Validity; + use crate::{compute, IntoArray}; + + #[test] + fn fill_forward() { + let barr = + BoolArray::from_iter(vec![None, Some(false), None, Some(true), None]).into_array(); + let filled_bool = BoolArray::try_from(compute::fill::fill_forward(&barr).unwrap()).unwrap(); + assert_eq!( + filled_bool.boolean_buffer().iter().collect::>(), + vec![false, false, false, true, true] + ); + assert_eq!(filled_bool.validity(), Validity::NonNullable); + } +} diff --git a/vortex-array2/src/array/bool/compute/flatten.rs b/vortex-array2/src/array/bool/compute/flatten.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/flatten.rs @@ -0,0 +1 @@ + diff --git a/vortex-array2/src/array/bool/compute/mod.rs b/vortex-array2/src/array/bool/compute/mod.rs new file mode 100644 index 0000000000..f804de0538 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/mod.rs @@ -0,0 +1,36 @@ +use crate::array::bool::BoolArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::as_contiguous::AsContiguousFn; +use crate::compute::fill::FillForwardFn; +use crate::compute::scalar_at::ScalarAtFn; +use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; + +mod as_arrow; +mod as_contiguous; +mod fill; +mod flatten; +mod scalar_at; +mod take; + +impl ArrayCompute for BoolArray<'_> { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } + + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + Some(self) + } + + fn fill_forward(&self) -> Option<&dyn FillForwardFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} diff --git a/vortex-array2/src/array/bool/compute/scalar_at.rs b/vortex-array2/src/array/bool/compute/scalar_at.rs new file mode 100644 index 0000000000..09b4ac5679 --- /dev/null +++ b/vortex-array2/src/array/bool/compute/scalar_at.rs @@ -0,0 +1,19 @@ +use vortex::scalar::{BoolScalar, Scalar}; +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::compute::scalar_at::ScalarAtFn; +use crate::validity::ArrayValidity; +use crate::ArrayTrait; + +impl ScalarAtFn for BoolArray<'_> { + fn scalar_at(&self, index: usize) -> VortexResult { + Ok(BoolScalar::try_new( + self.is_valid(index) + .then(|| self.boolean_buffer().value(index)), + self.dtype().nullability(), + ) + .unwrap() + .into()) + } +} diff --git a/vortex-array2/src/array/bool/compute/take.rs b/vortex-array2/src/array/bool/compute/take.rs new file mode 100644 index 0000000000..7b4f16024d --- /dev/null +++ b/vortex-array2/src/array/bool/compute/take.rs @@ -0,0 +1,59 @@ +use arrow_buffer::BooleanBuffer; +use num_traits::AsPrimitive; +use vortex::match_each_integer_ptype; +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::compute::take::TakeFn; +use crate::IntoArray; +use crate::{Array, OwnedArray}; + +impl TakeFn for BoolArray<'_> { + fn take(&self, indices: &Array) -> VortexResult { + let validity = self.validity(); + let indices = indices.clone().flatten_primitive()?; + match_each_integer_ptype!(indices.ptype(), |$I| { + Ok(BoolArray::from_vec( + take_bool(&self.boolean_buffer(), indices.typed_data::<$I>()), + validity.take(indices.array())?, + ).into_array()) + }) + } +} + +fn take_bool>(bools: &BooleanBuffer, indices: &[I]) -> Vec { + indices.iter().map(|&idx| bools.value(idx.as_())).collect() +} + +#[cfg(test)] +mod test { + use crate::array::bool::BoolArray; + use crate::array::primitive::PrimitiveArray; + use crate::compute::take::take; + use crate::IntoArray; + + #[test] + fn take_nullable() { + let reference = BoolArray::from_iter(vec![ + Some(false), + Some(true), + Some(false), + None, + Some(false), + ]) + .into_array(); + + let b = BoolArray::try_from( + take( + &reference, + &PrimitiveArray::from(vec![0, 3, 4]).into_array(), + ) + .unwrap(), + ) + .unwrap(); + assert_eq!( + b.boolean_buffer(), + BoolArray::from_iter(vec![Some(false), None, Some(false)]).boolean_buffer() + ); + } +} diff --git a/vortex-array2/src/array/bool/mod.rs b/vortex-array2/src/array/bool/mod.rs index f7bb943ee7..640832babd 100644 --- a/vortex-array2/src/array/bool/mod.rs +++ b/vortex-array2/src/array/bool/mod.rs @@ -1,18 +1,20 @@ mod compute; +mod stats; -use arrow_buffer::{BooleanBuffer, Buffer}; +use std::collections::HashMap; + +use arrow_buffer::BooleanBuffer; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use vortex_error::VortexResult; use vortex_schema::DType; -use crate::impl_encoding; -use crate::stats::{ArrayStatistics, Statistics}; -use crate::validity::Validity; +use crate::buffer::Buffer; use crate::validity::{ArrayValidity, ValidityMetadata}; +use crate::validity::{LogicalValidity, Validity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::ArrayMetadata; -use crate::{ArrayData, TypedArrayData}; -use crate::{ArrayView, ToArrayData}; +use crate::{impl_encoding, ArrayFlatten}; impl_encoding!("vortex.bool", Bool); @@ -22,71 +24,93 @@ pub struct BoolMetadata { length: usize, } -pub struct BoolArray<'a> { - dtype: &'a DType, - buffer: &'a Buffer, - validity: Validity<'a>, - length: usize, - statistics: &'a dyn Statistics, -} - impl BoolArray<'_> { pub fn buffer(&self) -> &Buffer { - self.buffer - } - - pub fn validity(&self) -> &Validity { - &self.validity + self.array().buffer(0).expect("missing buffer") } pub fn boolean_buffer(&self) -> BooleanBuffer { - BooleanBuffer::new(self.buffer.clone(), 0, self.length) + BooleanBuffer::new(BoolArray::buffer(self).clone().into(), 0, self.len()) } -} -impl<'v> TryFromArrayParts<'v, BoolMetadata> for BoolArray<'v> { - fn try_from_parts(parts: &'v dyn ArrayParts, metadata: &'v BoolMetadata) -> VortexResult { - Ok(BoolArray { - dtype: parts.dtype(), - // FIXME(ngates): implement our own BooleanBuffer that doesn't take ownership of the bytes - buffer: parts - .buffer(0) - .ok_or(vortex_err!("BoolArray requires a buffer"))?, - validity: metadata - .validity - .to_validity(parts.child(0, &Validity::DTYPE)), - length: metadata.length, - statistics: parts.statistics(), - }) + pub fn validity(&self) -> Validity { + self.metadata() + .validity + .to_validity(self.array().child(0, &Validity::DTYPE)) } } -impl BoolData { +impl BoolArray<'_> { pub fn try_new(buffer: BooleanBuffer, validity: Validity) -> VortexResult { - Ok(Self::new_unchecked( + Self::try_from_parts( DType::Bool(validity.nullability()), - Arc::new(BoolMetadata { + BoolMetadata { validity: validity.to_metadata(buffer.len())?, length: buffer.len(), - }), - vec![buffer.into_inner()].into(), - vec![validity.into_array_data()].into(), - )) + }, + vec![Buffer::Owned(buffer.into_inner())].into(), + validity.to_array_data().into_iter().collect_vec().into(), + HashMap::default(), + ) } - pub fn from_vec(bools: Vec) -> Self { + pub fn from_vec(bools: Vec, validity: Validity) -> Self { let buffer = BooleanBuffer::from(bools); - Self::try_new(buffer, Validity::NonNullable).unwrap() + Self::try_new(buffer, validity).unwrap() + } +} + +impl From for OwnedBoolArray { + fn from(value: BooleanBuffer) -> Self { + BoolArray::try_new(value, Validity::NonNullable).unwrap() + } +} + +impl From> for OwnedBoolArray { + fn from(value: Vec) -> Self { + BoolArray::from_vec(value, Validity::NonNullable) + } +} + +impl FromIterator> for OwnedBoolArray { + fn from_iter>>(iter: I) -> Self { + let iter = iter.into_iter(); + let (lower, _) = iter.size_hint(); + + let mut validity: Vec = Vec::with_capacity(lower); + let values: Vec = iter + .map(|i| { + validity.push(i.is_some()); + i.unwrap_or_default() + }) + .collect::>(); + + BoolArray::try_new(BooleanBuffer::from(values), Validity::from(validity)).unwrap() } } impl ArrayTrait for BoolArray<'_> { fn dtype(&self) -> &DType { - self.dtype + // FIXME(ngates): move this + self.array().dtype() } fn len(&self) -> usize { - self.length + self.metadata().length + } + + fn metadata(&self) -> Arc { + // FIXME(ngates): move this + Arc::new(self.metadata().clone()) + } +} + +impl ArrayFlatten for BoolArray<'_> { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a, + { + Ok(Flattened::Bool(self)) } } @@ -94,36 +118,28 @@ impl ArrayValidity for BoolArray<'_> { fn is_valid(&self, index: usize) -> bool { self.validity().is_valid(index) } -} -impl ToArrayData for BoolArray<'_> { - fn to_array_data(&self) -> ArrayData { - todo!() + fn logical_validity(&self) -> LogicalValidity { + self.validity().to_logical(self.len()) } } impl AcceptArrayVisitor for BoolArray<'_> { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_buffer(self.buffer())?; - visitor.visit_validity(self.validity()) - } -} - -impl ArrayStatistics for BoolArray<'_> { - fn statistics(&self) -> &dyn Statistics { - self.statistics + visitor.visit_validity(&self.validity()) } } #[cfg(test)] mod tests { - use crate::array::bool::BoolData; - use crate::compute::scalar_at; + use crate::array::bool::BoolArray; + use crate::compute::scalar_at::scalar_at; use crate::IntoArray; #[test] fn bool_array() { - let arr = BoolData::from_vec(vec![true, false, true]).into_array(); + let arr = BoolArray::from(vec![true, false, true]).into_array(); let scalar: bool = scalar_at(&arr, 0).unwrap().try_into().unwrap(); assert!(scalar); } diff --git a/vortex-array2/src/array/bool/stats.rs b/vortex-array2/src/array/bool/stats.rs new file mode 100644 index 0000000000..67439408f4 --- /dev/null +++ b/vortex-array2/src/array/bool/stats.rs @@ -0,0 +1,44 @@ +use std::collections::HashMap; + +use vortex::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::stats::{ArrayStatisticsCompute, Stat}; +use crate::ArrayTrait; + +impl ArrayStatisticsCompute for BoolArray<'_> { + fn compute_statistics(&self, _stat: Stat) -> VortexResult> { + if self.len() == 0 { + return Ok(HashMap::from([ + (Stat::TrueCount, 0.into()), + (Stat::RunCount, 0.into()), + ])); + } + + let mut prev_bit = self.boolean_buffer().value(0); + let mut true_count: usize = if prev_bit { 1 } else { 0 }; + let mut run_count: usize = 0; + for bit in self.boolean_buffer().iter().skip(1) { + if bit { + true_count += 1 + } + if bit != prev_bit { + run_count += 1; + prev_bit = bit; + } + } + run_count += 1; + + Ok(HashMap::from([ + (Stat::Min, (true_count == self.len()).into()), + (Stat::Max, (true_count > 0).into()), + ( + Stat::IsConstant, + (true_count == self.len() || true_count == 0).into(), + ), + (Stat::RunCount, run_count.into()), + (Stat::TrueCount, true_count.into()), + ])) + } +} diff --git a/vortex-array2/src/array/mod.rs b/vortex-array2/src/array/mod.rs index 8bb4a1d37a..48103db386 100644 --- a/vortex-array2/src/array/mod.rs +++ b/vortex-array2/src/array/mod.rs @@ -1,4 +1,3 @@ pub mod bool; pub mod primitive; -pub mod ree; pub mod r#struct; diff --git a/vortex-array2/src/array/primitive/compute.rs b/vortex-array2/src/array/primitive/compute.rs deleted file mode 100644 index 09763a1db6..0000000000 --- a/vortex-array2/src/array/primitive/compute.rs +++ /dev/null @@ -1,26 +0,0 @@ -use vortex::match_each_native_ptype; -use vortex::scalar::Scalar; -use vortex_error::VortexResult; - -use crate::array::primitive::PrimitiveArray; -use crate::compute::{ArrayCompute, ScalarAtFn}; -use crate::validity::ArrayValidity; -use crate::ArrayTrait; - -impl ArrayCompute for PrimitiveArray<'_> { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - Some(self) - } -} - -impl ScalarAtFn for PrimitiveArray<'_> { - fn scalar_at(&self, index: usize) -> VortexResult { - if self.is_valid(index) { - match_each_native_ptype!(self.ptype(), |$T| { - Scalar::from(self.buffer().typed_data::<$T>()[index]).cast(self.dtype()) - }) - } else { - Ok(Scalar::null(self.dtype())) - } - } -} diff --git a/vortex-array2/src/array/primitive/compute/as_arrow.rs b/vortex-array2/src/array/primitive/compute/as_arrow.rs new file mode 100644 index 0000000000..58abc4c7fc --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/as_arrow.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use arrow_array::{ + ArrayRef as ArrowArrayRef, ArrowPrimitiveType, PrimitiveArray as ArrowPrimitiveArray, +}; +use arrow_buffer::ScalarBuffer; +use vortex::ptype::PType; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::validity::ArrayValidity; +use crate::ArrayTrait; + +impl AsArrowArray for PrimitiveArray<'_> { + fn as_arrow(&self) -> VortexResult { + use arrow_array::types::*; + Ok(match self.ptype() { + PType::U8 => Arc::new(as_arrow_array_primitive::(self)?), + PType::U16 => Arc::new(as_arrow_array_primitive::(self)?), + PType::U32 => Arc::new(as_arrow_array_primitive::(self)?), + PType::U64 => Arc::new(as_arrow_array_primitive::(self)?), + PType::I8 => Arc::new(as_arrow_array_primitive::(self)?), + PType::I16 => Arc::new(as_arrow_array_primitive::(self)?), + PType::I32 => Arc::new(as_arrow_array_primitive::(self)?), + PType::I64 => Arc::new(as_arrow_array_primitive::(self)?), + PType::F16 => Arc::new(as_arrow_array_primitive::(self)?), + PType::F32 => Arc::new(as_arrow_array_primitive::(self)?), + PType::F64 => Arc::new(as_arrow_array_primitive::(self)?), + }) + } +} + +fn as_arrow_array_primitive( + array: &PrimitiveArray, +) -> VortexResult> { + Ok(ArrowPrimitiveArray::new( + ScalarBuffer::::new(array.buffer().clone().into(), 0, array.len()), + array.logical_validity().to_null_buffer()?, + )) +} diff --git a/vortex-array2/src/array/primitive/compute/as_contiguous.rs b/vortex-array2/src/array/primitive/compute/as_contiguous.rs new file mode 100644 index 0000000000..e94251c438 --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/as_contiguous.rs @@ -0,0 +1,30 @@ +use arrow_buffer::{MutableBuffer, ScalarBuffer}; +use vortex::match_each_native_ptype; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::as_contiguous::AsContiguousFn; +use crate::validity::Validity; +use crate::{Array, ArrayTrait, IntoArray, OwnedArray}; + +impl AsContiguousFn for PrimitiveArray<'_> { + fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { + let validity = if self.dtype().is_nullable() { + Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) + } else { + Validity::NonNullable + }; + + let mut buffer = MutableBuffer::with_capacity( + arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), + ); + for array in arrays { + buffer.extend_from_slice(PrimitiveArray::try_from(array).unwrap().buffer().as_slice()) + } + match_each_native_ptype!(self.ptype(), |$T| { + Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) + .unwrap() + .into_array()) + }) + } +} diff --git a/vortex-array2/src/array/primitive/compute/cast.rs b/vortex-array2/src/array/primitive/compute/cast.rs new file mode 100644 index 0000000000..78016aedb7 --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/cast.rs @@ -0,0 +1,75 @@ +use vortex::match_each_native_ptype; +use vortex::ptype::{NativePType, PType}; +use vortex_error::{vortex_err, VortexResult}; +use vortex_schema::DType; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::cast::CastFn; +use crate::{IntoArray, OwnedArray, ToArrayData}; + +impl CastFn for PrimitiveArray<'_> { + fn cast(&self, dtype: &DType) -> VortexResult { + // TODO(ngates): check validity + let ptype = PType::try_from(dtype)?; + if ptype == self.ptype() { + return Ok(self.to_array_data().into_array()); + } + + match_each_native_ptype!(ptype, |$T| { + Ok(PrimitiveArray::from_vec( + cast::<$T>(self)?, + self.validity().clone(), + ).into_array()) + }) + } +} + +fn cast(array: &PrimitiveArray) -> VortexResult> { + match_each_native_ptype!(array.ptype(), |$E| { + array + .typed_data::<$E>() + .iter() + // TODO(ngates): allow configurable checked/unchecked casting + .map(|&v| { + T::from(v).ok_or_else(|| { + vortex_err!(ComputeError: "Failed to cast {} to {:?}", v, T::PTYPE) + }) + }) + .collect() + }) +} + +#[cfg(test)] +mod test { + use vortex::ptype::PType; + use vortex_error::VortexError; + + use crate::array::primitive::PrimitiveArray; + use crate::{compute, IntoArray}; + + #[test] + fn cast_u32_u8() { + let arr = vec![0u32, 10, 200].into_array(); + let p = + PrimitiveArray::try_from(compute::cast::cast(&arr, PType::U8.into()).unwrap()).unwrap(); + assert_eq!(p.typed_data::(), vec![0u8, 10, 200]); + } + + #[test] + fn cast_u32_f32() { + let arr = vec![0u32, 10, 200].into_array(); + let u8arr = PrimitiveArray::try_from(compute::cast::cast(&arr, PType::F32.into()).unwrap()) + .unwrap(); + assert_eq!(u8arr.typed_data::(), vec![0.0f32, 10., 200.]); + } + + #[test] + fn cast_i32_u32() { + let arr = vec![-1i32].into_array(); + let error = compute::cast::cast(&arr, PType::U32.into()).err().unwrap(); + let VortexError::ComputeError(s, _) = error else { + unreachable!() + }; + assert_eq!(s.to_string(), "Failed to cast -1 to U32"); + } +} diff --git a/vortex-array2/src/array/primitive/compute/fill.rs b/vortex-array2/src/array/primitive/compute/fill.rs new file mode 100644 index 0000000000..f361e82b76 --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/fill.rs @@ -0,0 +1,71 @@ +use vortex::match_each_native_ptype; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::fill::FillForwardFn; +use crate::validity::ArrayValidity; +use crate::{IntoArray, OwnedArray, ToArrayData}; + +impl FillForwardFn for PrimitiveArray<'_> { + fn fill_forward(&self) -> VortexResult { + let validity = self.logical_validity(); + let Some(nulls) = validity.to_null_buffer()? else { + return Ok(self.to_array_data().into_array()); + }; + match_each_native_ptype!(self.ptype(), |$T| { + let typed_data = self.typed_data::<$T>(); + let mut last_value = $T::default(); + let filled = typed_data + .iter() + .zip(nulls.into_iter()) + .map(|(v, valid)| { + if valid { + last_value = *v; + } + last_value + }) + .collect::>(); + Ok(filled.into_array()) + }) + } +} + +#[cfg(test)] +mod test { + use crate::array::bool::BoolArray; + use crate::array::primitive::PrimitiveArray; + use crate::validity::{ArrayValidity, Validity}; + use crate::{compute, IntoArray}; + + #[test] + fn leading_none() { + let arr = PrimitiveArray::from_nullable_vec(vec![None, Some(8u8), None, Some(10), None]) + .into_array(); + let p = PrimitiveArray::try_from(compute::fill::fill_forward(&arr).unwrap()).unwrap(); + assert_eq!(p.typed_data::(), vec![0, 8, 8, 10, 10]); + assert!(p.logical_validity().is_all_valid()); + } + + #[test] + fn all_none() { + let arr = + PrimitiveArray::from_nullable_vec(vec![Option::::None, None, None, None, None]) + .into_array(); + + let p = PrimitiveArray::try_from(compute::fill::fill_forward(&arr).unwrap()).unwrap(); + assert_eq!(p.typed_data::(), vec![0, 0, 0, 0, 0]); + assert!(p.logical_validity().is_all_valid()); + } + + #[test] + fn nullable_non_null() { + let arr = PrimitiveArray::from_vec( + vec![8u8, 10u8, 12u8, 14u8, 16u8], + Validity::Array(BoolArray::from(vec![true, true, true, true, true]).into_array()), + ) + .into_array(); + let p = PrimitiveArray::try_from(compute::fill::fill_forward(&arr).unwrap()).unwrap(); + assert_eq!(p.typed_data::(), vec![8, 10, 12, 14, 16]); + assert!(p.logical_validity().is_all_valid()); + } +} diff --git a/vortex-array2/src/array/primitive/compute/mod.rs b/vortex-array2/src/array/primitive/compute/mod.rs new file mode 100644 index 0000000000..17667643fa --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/mod.rs @@ -0,0 +1,47 @@ +use crate::array::primitive::PrimitiveArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::as_contiguous::AsContiguousFn; +use crate::compute::cast::CastFn; +use crate::compute::fill::FillForwardFn; +use crate::compute::scalar_at::ScalarAtFn; +use crate::compute::search_sorted::SearchSortedFn; +use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; + +mod as_arrow; +mod as_contiguous; +mod cast; +mod fill; +mod scalar_at; +mod search_sorted; +mod take; + +impl ArrayCompute for PrimitiveArray<'_> { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } + + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + Some(self) + } + + fn cast(&self) -> Option<&dyn CastFn> { + Some(self) + } + + fn fill_forward(&self) -> Option<&dyn FillForwardFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn search_sorted(&self) -> Option<&dyn SearchSortedFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} diff --git a/vortex-array2/src/array/primitive/compute/scalar_at.rs b/vortex-array2/src/array/primitive/compute/scalar_at.rs new file mode 100644 index 0000000000..aceb4ccfa5 --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/scalar_at.rs @@ -0,0 +1,22 @@ +use vortex::match_each_native_ptype; +use vortex::scalar::PrimitiveScalar; +use vortex::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::scalar_at::ScalarAtFn; +use crate::validity::ArrayValidity; +use crate::ArrayTrait; + +impl ScalarAtFn for PrimitiveArray<'_> { + fn scalar_at(&self, index: usize) -> VortexResult { + match_each_native_ptype!(self.ptype(), |$T| { + Ok(PrimitiveScalar::try_new( + self.is_valid(index) + .then(|| self.typed_data::<$T>()[index]), + self.dtype().nullability(), + )? + .into()) + }) + } +} diff --git a/vortex-array2/src/array/primitive/compute/search_sorted.rs b/vortex-array2/src/array/primitive/compute/search_sorted.rs new file mode 100644 index 0000000000..1cd34324ce --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/search_sorted.rs @@ -0,0 +1,45 @@ +use vortex::match_each_native_ptype; +use vortex::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::search_sorted::SearchSorted; +use crate::compute::search_sorted::{SearchSortedFn, SearchSortedSide}; + +impl SearchSortedFn for PrimitiveArray<'_> { + fn search_sorted(&self, value: &Scalar, side: SearchSortedSide) -> VortexResult { + match_each_native_ptype!(self.ptype(), |$T| { + let pvalue: $T = value.try_into()?; + Ok(self.typed_data::<$T>().search_sorted(&pvalue, side)) + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::compute::search_sorted::search_sorted; + use crate::IntoArray; + + #[test] + fn test_searchsorted_primitive() { + let values = vec![1u16, 2, 3].into_array(); + + assert_eq!( + search_sorted(&values, 0, SearchSortedSide::Left).unwrap(), + 0 + ); + assert_eq!( + search_sorted(&values, 1, SearchSortedSide::Left).unwrap(), + 0 + ); + assert_eq!( + search_sorted(&values, 1, SearchSortedSide::Right).unwrap(), + 1 + ); + assert_eq!( + search_sorted(&values, 4, SearchSortedSide::Left).unwrap(), + 3 + ); + } +} diff --git a/vortex-array2/src/array/primitive/compute/take.rs b/vortex-array2/src/array/primitive/compute/take.rs new file mode 100644 index 0000000000..2df9e9c8c4 --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/take.rs @@ -0,0 +1,43 @@ +use num_traits::PrimInt; +use vortex::ptype::NativePType; +use vortex::{match_each_integer_ptype, match_each_native_ptype}; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::take::TakeFn; +use crate::IntoArray; +use crate::{Array, OwnedArray}; + +impl TakeFn for PrimitiveArray<'_> { + fn take(&self, indices: &Array) -> VortexResult { + let validity = self.validity(); + let indices = indices.clone().flatten_primitive()?; + match_each_native_ptype!(self.ptype(), |$T| { + match_each_integer_ptype!(indices.ptype(), |$I| { + Ok(PrimitiveArray::from_vec( + take_primitive(self.typed_data::<$T>(), indices.typed_data::<$I>()), + validity.take(indices.array())?, + ).into_array()) + }) + }) + } +} + +fn take_primitive(array: &[T], indices: &[I]) -> Vec { + indices + .iter() + .map(|&idx| array[idx.to_usize().unwrap()]) + .collect() +} + +#[cfg(test)] +mod test { + use crate::array::primitive::compute::take::take_primitive; + + #[test] + fn test_take() { + let a = vec![1i32, 2, 3, 4, 5]; + let result = take_primitive(&a, &[0, 0, 4, 2]); + assert_eq!(result, vec![1i32, 1, 5, 3]); + } +} diff --git a/vortex-array2/src/array/primitive/mod.rs b/vortex-array2/src/array/primitive/mod.rs index 4e141acf77..1bb35dbd8e 100644 --- a/vortex-array2/src/array/primitive/mod.rs +++ b/vortex-array2/src/array/primitive/mod.rs @@ -1,18 +1,20 @@ mod compute; +mod stats; -use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer}; +use std::collections::HashMap; + +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use vortex::ptype::{NativePType, PType}; use vortex_error::VortexResult; use vortex_schema::DType; -use crate::impl_encoding; -use crate::stats::{ArrayStatistics, Statistics}; -use crate::validity::{ArrayValidity, Validity, ValidityMetadata}; +use crate::buffer::Buffer; +use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::ArrayMetadata; -use crate::{ArrayData, TypedArrayData}; -use crate::{ArrayView, ToArrayData}; +use crate::{impl_encoding, IntoArray}; +use crate::{ArrayFlatten, ArrayMetadata}; impl_encoding!("vortex.primitive", Primitive); @@ -21,102 +23,102 @@ pub struct PrimitiveMetadata { validity: ValidityMetadata, } -pub struct PrimitiveArray<'a> { - ptype: PType, - dtype: &'a DType, - buffer: &'a Buffer, - validity: Validity<'a>, - statistics: &'a dyn Statistics, -} - impl PrimitiveArray<'_> { pub fn buffer(&self) -> &Buffer { - self.buffer + self.array().buffer(0).expect("missing buffer") } - pub fn validity(&self) -> &Validity { - &self.validity + pub fn validity(&self) -> Validity { + self.metadata() + .validity + .to_validity(self.array().child(0, &Validity::DTYPE)) } pub fn ptype(&self) -> PType { - self.ptype + // TODO(ngates): we can't really cache this anywhere? + self.dtype().try_into().unwrap() + } + + pub fn typed_data(&self) -> &[T] { + self.buffer().typed_data::() } } -impl<'a> TryFromArrayParts<'a, PrimitiveMetadata> for PrimitiveArray<'a> { - fn try_from_parts( - parts: &'a dyn ArrayParts, - metadata: &'a PrimitiveMetadata, +impl PrimitiveArray<'_> { + pub fn try_new( + buffer: ScalarBuffer, + validity: Validity, ) -> VortexResult { - let buffer = parts.buffer(0).unwrap(); - let ptype: PType = parts.dtype().try_into()?; - Ok(PrimitiveArray { - ptype, - dtype: parts.dtype(), - buffer, - validity: metadata.validity.to_validity(parts.child(0, parts.dtype())), - statistics: parts.statistics(), - }) + Self::try_from_parts( + DType::from(T::PTYPE).with_nullability(validity.nullability()), + PrimitiveMetadata { + validity: validity.to_metadata(buffer.len())?, + }, + vec![Buffer::Owned(buffer.into_inner())].into(), + validity.to_array_data().into_iter().collect_vec().into(), + HashMap::default(), + ) + } + + pub fn from_vec(values: Vec, validity: Validity) -> Self { + Self::try_new(ScalarBuffer::from(values), validity).unwrap() + } + + pub fn from_nullable_vec(values: Vec>) -> Self { + let elems: Vec = values.iter().map(|v| v.unwrap_or_default()).collect(); + let validity = Validity::from(values.iter().map(|v| v.is_some()).collect::>()); + Self::from_vec(elems, validity) } } -impl PrimitiveData { - fn try_new(buffer: ScalarBuffer, validity: Validity) -> VortexResult { - Ok(Self::new_unchecked( - DType::from(T::PTYPE).with_nullability(validity.nullability()), - Arc::new(PrimitiveMetadata { - validity: validity.to_metadata(buffer.len() / T::PTYPE.byte_width())?, - }), - vec![buffer.into_inner()].into(), - vec![validity.into_array_data()].into(), - )) +impl From> for PrimitiveArray<'_> { + fn from(values: Vec) -> Self { + PrimitiveArray::from_vec(values, Validity::NonNullable) + } +} + +impl IntoArray<'static> for Vec { + fn into_array(self) -> Array<'static> { + PrimitiveArray::from(self).into_array() } +} - pub fn from_vec(values: Vec) -> Self { - Self::try_new(ScalarBuffer::from(values), Validity::NonNullable).unwrap() +impl ArrayFlatten for PrimitiveArray<'_> { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a, + { + Ok(Flattened::Primitive(self)) } } impl ArrayTrait for PrimitiveArray<'_> { fn dtype(&self) -> &DType { - self.dtype + self.array().dtype() } fn len(&self) -> usize { self.buffer().len() / self.ptype().byte_width() } + + fn metadata(&self) -> Arc { + Arc::new(self.metadata().clone()) + } } impl ArrayValidity for PrimitiveArray<'_> { fn is_valid(&self, index: usize) -> bool { self.validity().is_valid(index) } -} -impl ToArrayData for PrimitiveArray<'_> { - fn to_array_data(&self) -> ArrayData { - ArrayData::try_new( - &PrimitiveEncoding, - self.dtype().clone(), - Arc::new(PrimitiveMetadata { - validity: self.validity().to_metadata(self.len()).unwrap(), - }), - vec![self.buffer().clone()].into(), - vec![].into(), - ) - .unwrap() + fn logical_validity(&self) -> LogicalValidity { + self.validity().to_logical(self.len()) } } impl AcceptArrayVisitor for PrimitiveArray<'_> { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_buffer(self.buffer())?; - visitor.visit_validity(self.validity()) - } -} - -impl ArrayStatistics for PrimitiveArray<'_> { - fn statistics(&self) -> &dyn Statistics { - self.statistics + visitor.visit_validity(&self.validity()) } } diff --git a/vortex-array2/src/array/primitive/stats.rs b/vortex-array2/src/array/primitive/stats.rs new file mode 100644 index 0000000000..e7a74eaf62 --- /dev/null +++ b/vortex-array2/src/array/primitive/stats.rs @@ -0,0 +1,293 @@ +use std::collections::HashMap; +use std::mem::size_of; + +use arrow_buffer::buffer::BooleanBuffer; +use vortex::match_each_native_ptype; +use vortex::ptype::NativePType; +use vortex::scalar::Scalar; +use vortex::scalar::{ListScalarVec, PScalar}; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::stats::{ArrayStatisticsCompute, Stat}; +use crate::validity::ArrayValidity; +use crate::validity::LogicalValidity; +use crate::IntoArray; + +impl ArrayStatisticsCompute for PrimitiveArray<'_> { + fn compute_statistics(&self, stat: Stat) -> VortexResult> { + match_each_native_ptype!(self.ptype(), |$P| { + match self.logical_validity() { + LogicalValidity::AllValid(_) => self.typed_data::<$P>().compute_statistics(stat), + LogicalValidity::AllInvalid(_) => all_null_stats::<$P>(), + LogicalValidity::Array(a) => NullableValues( + self.typed_data::<$P>(), + &a.into_array().flatten_bool()?.boolean_buffer(), + ) + .compute_statistics(stat), + } + }) + } +} + +impl ArrayStatisticsCompute for &[T] { + fn compute_statistics(&self, _stat: Stat) -> VortexResult> { + if self.is_empty() { + return Ok(HashMap::default()); + } + let mut stats = StatsAccumulator::new(self[0]); + self.iter().skip(1).for_each(|next| stats.next(*next)); + Ok(stats.into_map()) + } +} + +fn all_null_stats() -> VortexResult> { + Ok(HashMap::from([ + (Stat::Min, Option::::None.into()), + (Stat::Max, Option::::None.into()), + (Stat::IsConstant, true.into()), + (Stat::IsSorted, true.into()), + (Stat::IsStrictSorted, true.into()), + (Stat::RunCount, 1.into()), + (Stat::NullCount, 1.into()), + ( + Stat::BitWidthFreq, + ListScalarVec(vec![0; size_of::() * 8 + 1]).into(), + ), + ( + Stat::TrailingZeroFreq, + ListScalarVec(vec![size_of::() * 8; size_of::() * 8 + 1]).into(), + ), + ])) +} + +struct NullableValues<'a, T: NativePType>(&'a [T], &'a BooleanBuffer); + +impl<'a, T: NativePType> ArrayStatisticsCompute for NullableValues<'a, T> { + fn compute_statistics(&self, _stat: Stat) -> VortexResult> { + let values = self.0; + if values.is_empty() { + return Ok(HashMap::default()); + } + + let first_non_null = self + .1 + .iter() + .enumerate() + .skip_while(|(_, valid)| !*valid) + .map(|(idx, _)| values[idx]) + .next() + .expect("Must be at least one non-null value"); + + let mut stats = StatsAccumulator::new(first_non_null); + values + .iter() + .zip(self.1.iter()) + .skip(1) + .map(|(next, valid)| valid.then_some(*next)) + .for_each(|next| stats.nullable_next(next)); + Ok(stats.into_map()) + } +} + +trait BitWidth { + fn bit_width(self) -> usize; + fn trailing_zeros(self) -> usize; +} + +impl> BitWidth for T { + fn bit_width(self) -> usize { + let bit_width = size_of::() * 8; + let scalar: PScalar = self.into(); + match scalar { + PScalar::U8(i) => bit_width - i.leading_zeros() as usize, + PScalar::U16(i) => bit_width - i.leading_zeros() as usize, + PScalar::U32(i) => bit_width - i.leading_zeros() as usize, + PScalar::U64(i) => bit_width - i.leading_zeros() as usize, + PScalar::I8(i) => bit_width - i.leading_zeros() as usize, + PScalar::I16(i) => bit_width - i.leading_zeros() as usize, + PScalar::I32(i) => bit_width - i.leading_zeros() as usize, + PScalar::I64(i) => bit_width - i.leading_zeros() as usize, + PScalar::F16(_) => bit_width, + PScalar::F32(_) => bit_width, + PScalar::F64(_) => bit_width, + } + } + + fn trailing_zeros(self) -> usize { + let scalar: PScalar = self.into(); + match scalar { + PScalar::U8(i) => i.trailing_zeros() as usize, + PScalar::U16(i) => i.trailing_zeros() as usize, + PScalar::U32(i) => i.trailing_zeros() as usize, + PScalar::U64(i) => i.trailing_zeros() as usize, + PScalar::I8(i) => i.trailing_zeros() as usize, + PScalar::I16(i) => i.trailing_zeros() as usize, + PScalar::I32(i) => i.trailing_zeros() as usize, + PScalar::I64(i) => i.trailing_zeros() as usize, + PScalar::F16(_) => 0, + PScalar::F32(_) => 0, + PScalar::F64(_) => 0, + } + } +} + +struct StatsAccumulator { + prev: T, + min: T, + max: T, + is_sorted: bool, + is_strict_sorted: bool, + run_count: usize, + null_count: usize, + bit_widths: Vec, + trailing_zeros: Vec, +} + +impl StatsAccumulator { + fn new(first_value: T) -> Self { + let mut stats = Self { + prev: first_value, + min: first_value, + max: first_value, + is_sorted: true, + is_strict_sorted: true, + run_count: 1, + null_count: 0, + bit_widths: vec![0; size_of::() * 8 + 1], + trailing_zeros: vec![0; size_of::() * 8 + 1], + }; + stats.bit_widths[first_value.bit_width()] += 1; + stats.trailing_zeros[first_value.trailing_zeros()] += 1; + stats + } + + pub fn nullable_next(&mut self, next: Option) { + match next { + Some(n) => self.next(n), + None => { + self.bit_widths[0] += 1; + self.trailing_zeros[T::PTYPE.bit_width()] += 1; + self.null_count += 1; + } + } + } + + pub fn next(&mut self, next: T) { + self.bit_widths[next.bit_width()] += 1; + self.trailing_zeros[next.trailing_zeros()] += 1; + + if self.prev == next { + self.is_strict_sorted = false; + } else { + if next < self.prev { + self.is_sorted = false; + } + self.run_count += 1; + } + if next < self.min { + self.min = next; + } else if next > self.max { + self.max = next; + } + self.prev = next; + } + + pub fn into_map(self) -> HashMap { + HashMap::from([ + (Stat::Min, self.min.into()), + (Stat::Max, self.max.into()), + (Stat::NullCount, self.null_count.into()), + (Stat::IsConstant, (self.min == self.max).into()), + (Stat::BitWidthFreq, ListScalarVec(self.bit_widths).into()), + ( + Stat::TrailingZeroFreq, + ListScalarVec(self.trailing_zeros).into(), + ), + (Stat::IsSorted, self.is_sorted.into()), + ( + Stat::IsStrictSorted, + (self.is_sorted && self.is_strict_sorted).into(), + ), + (Stat::RunCount, self.run_count.into()), + ]) + } +} + +#[cfg(test)] +mod test { + use vortex::scalar::ListScalarVec; + + use crate::array::primitive::PrimitiveArray; + use crate::stats::{ArrayStatistics, Stat}; + + #[test] + fn stats() { + let arr = PrimitiveArray::from(vec![1, 2, 3, 4, 5]); + let min: i32 = arr.statistics().compute_as(Stat::Min).unwrap(); + let max: i32 = arr.statistics().compute_as(Stat::Max).unwrap(); + let is_sorted: bool = arr.statistics().compute_as(Stat::IsSorted).unwrap(); + let is_strict_sorted: bool = arr.statistics().compute_as(Stat::IsStrictSorted).unwrap(); + let is_constant: bool = arr.statistics().compute_as(Stat::IsConstant).unwrap(); + let bit_width_freq: Vec = arr + .statistics() + .compute_as::>(Stat::BitWidthFreq) + .unwrap() + .0; + let trailing_zeros_freq: Vec = arr + .statistics() + .compute_as::>(Stat::TrailingZeroFreq) + .unwrap() + .0; + let run_count: u64 = arr.statistics().compute_as(Stat::RunCount).unwrap(); + assert_eq!(min, 1); + assert_eq!(max, 5); + assert!(is_sorted); + assert!(is_strict_sorted); + assert!(!is_constant); + assert_eq!( + bit_width_freq, + vec![ + 0u64, 1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, + ] + ); + assert_eq!( + trailing_zeros_freq, + vec![ + // 1, 3, 5 have 0 trailing zeros + // 2 has 1 trailing zero, 4 has 2 trailing zeros + 3u64, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, + ] + ); + assert_eq!(run_count, 5); + } + + #[test] + fn stats_u8() { + let arr = PrimitiveArray::from(vec![1u8, 2, 3, 4, 5]); + let min: u8 = arr.statistics().compute_as(Stat::Min).unwrap(); + let max: u8 = arr.statistics().compute_as(Stat::Max).unwrap(); + assert_eq!(min, 1); + assert_eq!(max, 5); + } + + #[test] + fn nullable_stats_u8() { + let arr = PrimitiveArray::from_nullable_vec(vec![None, Some(1i32), None, Some(2)]); + let min: Option = arr.statistics().compute_as(Stat::Min); + let max: Option = arr.statistics().compute_as(Stat::Max); + assert_eq!(min, Some(1)); + assert_eq!(max, Some(2)); + } + + #[test] + fn all_null() { + let arr = PrimitiveArray::from_nullable_vec(vec![Option::::None, None, None]); + let min: Option = arr.statistics().compute_as(Stat::Min); + let max: Option = arr.statistics().compute_as(Stat::Max); + assert_eq!(min, None); + assert_eq!(max, None); + } +} diff --git a/vortex-array2/src/array/ree/compute.rs b/vortex-array2/src/array/ree/compute.rs deleted file mode 100644 index d4e78e7e0f..0000000000 --- a/vortex-array2/src/array/ree/compute.rs +++ /dev/null @@ -1,17 +0,0 @@ -use vortex::scalar::Scalar; -use vortex_error::VortexResult; - -use crate::array::ree::REEArray; -use crate::compute::{ArrayCompute, ScalarAtFn}; - -impl ArrayCompute for REEArray<'_> { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - Some(self) - } -} - -impl ScalarAtFn for REEArray<'_> { - fn scalar_at(&self, _index: usize) -> VortexResult { - todo!() - } -} diff --git a/vortex-array2/src/array/ree/mod.rs b/vortex-array2/src/array/ree/mod.rs deleted file mode 100644 index 8097e41105..0000000000 --- a/vortex-array2/src/array/ree/mod.rs +++ /dev/null @@ -1,104 +0,0 @@ -mod compute; - -use serde::{Deserialize, Serialize}; -use vortex_error::VortexResult; -use vortex_schema::DType; - -use crate::impl_encoding; -use crate::stats::{ArrayStatistics, Statistics}; -use crate::validity::ArrayValidity; -use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{Array, ArrayMetadata}; -use crate::{ArrayData, TypedArrayData}; -use crate::{ArrayView, ToArrayData}; - -impl_encoding!("vortex.ree", REE); - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct REEMetadata { - length: usize, - ends_dtype: DType, -} - -pub struct REEArray<'a> { - dtype: &'a DType, - values: Array<'a>, - run_ends: Array<'a>, - length: usize, - statistics: &'a dyn Statistics, -} - -impl REEArray<'_> { - pub fn values(&self) -> &Array { - &self.values - } - - pub fn run_ends(&self) -> &Array { - &self.run_ends - } -} - -impl REEData { - pub fn try_new(ends: ArrayData, values: ArrayData, length: usize) -> VortexResult { - Ok(Self::new_unchecked( - values.dtype().clone(), - Arc::new(REEMetadata { - length, - ends_dtype: ends.dtype().clone(), - }), - vec![].into(), - vec![Some(ends), Some(values)].into(), - )) - } -} - -impl<'v> TryFromArrayParts<'v, REEMetadata> for REEArray<'v> { - fn try_from_parts(parts: &'v dyn ArrayParts, metadata: &'v REEMetadata) -> VortexResult { - Ok(REEArray { - dtype: parts.dtype(), - values: parts - .child(0, parts.dtype()) - .ok_or_else(|| vortex_err!("REEArray missing values"))?, - run_ends: parts - .child(1, &metadata.ends_dtype) - .ok_or_else(|| vortex_err!("REEArray missing run_ends"))?, - length: metadata.length, - statistics: parts.statistics(), - }) - } -} - -impl ArrayTrait for REEArray<'_> { - fn dtype(&self) -> &DType { - self.values.dtype() - } - - fn len(&self) -> usize { - self.length - } -} - -impl ArrayValidity for REEArray<'_> { - fn is_valid(&self, _index: usize) -> bool { - todo!() - } -} - -impl ToArrayData for REEArray<'_> { - fn to_array_data(&self) -> ArrayData { - todo!() - } -} - -impl AcceptArrayVisitor for REEArray<'_> { - fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { - visitor.visit_child("values", self.values())?; - visitor.visit_child("run_ends", self.run_ends()) - } -} - -impl ArrayStatistics for REEArray<'_> { - fn statistics(&self) -> &dyn Statistics { - self.statistics - } -} diff --git a/vortex-array2/src/array/struct/compute.rs b/vortex-array2/src/array/struct/compute.rs deleted file mode 100644 index 83ff492373..0000000000 --- a/vortex-array2/src/array/struct/compute.rs +++ /dev/null @@ -1,17 +0,0 @@ -use vortex::scalar::Scalar; -use vortex_error::VortexResult; - -use crate::array::r#struct::StructArray; -use crate::compute::{ArrayCompute, ScalarAtFn}; - -impl ArrayCompute for StructArray<'_> { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - Some(self) - } -} - -impl ScalarAtFn for StructArray<'_> { - fn scalar_at(&self, _index: usize) -> VortexResult { - todo!() - } -} diff --git a/vortex-array2/src/array/struct/mod.rs b/vortex-array2/src/array/struct/mod.rs index 03b0e8cde7..3d97321bf6 100644 --- a/vortex-array2/src/array/struct/mod.rs +++ b/vortex-array2/src/array/struct/mod.rs @@ -1,16 +1,16 @@ -mod compute; +use std::collections::HashMap; use serde::{Deserialize, Serialize}; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::{DType, FieldNames}; -use crate::stats::ArrayStatistics; -use crate::validity::ArrayValidity; +use crate::compute::ArrayCompute; +use crate::stats::ArrayStatisticsCompute; +use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ToArray, WithArray}; -use crate::{Array, ArrayMetadata}; -use crate::{ArrayData, TypedArrayData}; -use crate::{ArrayView, ToArrayData}; +use crate::ArrayData; +use crate::{impl_encoding, ToArray}; +use crate::{ArrayFlatten, ArrayMetadata}; impl_encoding!("vortex.struct", Struct); @@ -19,22 +19,13 @@ pub struct StructMetadata { length: usize, } -#[derive(Clone)] -pub struct StructArray<'a> { - dtype: &'a DType, - // Note(ngates): for arrays with variable-length children, we don't want to - // allocate a Vec, so instead we defer child access by storing a reference to the parts. - parts: &'a dyn ArrayParts, - length: usize, -} - -impl<'a> StructArray<'a> { - pub fn child(&'a self, idx: usize) -> Option> { +impl StructArray<'_> { + pub fn child(&self, idx: usize) -> Option { let DType::Struct(_, fields) = self.dtype() else { unreachable!() }; let dtype = fields.get(idx)?; - self.parts.child(idx, dtype) + self.array().child(idx, dtype) } pub fn names(&self) -> &FieldNames { @@ -56,7 +47,7 @@ impl<'a> StructArray<'a> { } } -impl StructData { +impl StructArray<'_> { pub fn try_new(names: FieldNames, fields: Vec, length: usize) -> VortexResult { if names.len() != fields.len() { vortex_bail!("Got {} names and {} fields", names.len(), fields.len()); @@ -64,52 +55,42 @@ impl StructData { if fields .iter() - .any(|a| a.to_array().with_array(|a| a.len()) != length) + .any(|a| a.to_array().with_dyn(|a| a.len()) != length) { vortex_bail!("Expected all struct fields to have length {}", length); } let field_dtypes: Vec<_> = fields.iter().map(|d| d.dtype()).cloned().collect(); - let fields: Vec<_> = fields.iter().cloned().map(Some).collect(); - Ok(Self::new_unchecked( + Self::try_from_parts( DType::Struct(names, field_dtypes), - Arc::new(StructMetadata { length }), + StructMetadata { length }, vec![].into(), fields.into(), - )) + HashMap::default(), + ) } } -impl<'v> TryFromArrayParts<'v, StructMetadata> for StructArray<'v> { - fn try_from_parts( - parts: &'v dyn ArrayParts, - metadata: &'v StructMetadata, - ) -> VortexResult { - let DType::Struct(_names, dtypes) = parts.dtype() else { - unreachable!() - }; - if parts.nchildren() != dtypes.len() { - vortex_bail!( - "Expected {} children, found {}", - dtypes.len(), - parts.nchildren() - ); - } - Ok(StructArray { - dtype: parts.dtype(), - parts, - length: metadata.length, - }) +impl ArrayFlatten for StructArray<'_> { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a, + { + todo!() } } impl ArrayTrait for StructArray<'_> { fn dtype(&self) -> &DType { - self.dtype + self.array().dtype() } fn len(&self) -> usize { - self.length + self.metadata().length + } + + fn metadata(&self) -> Arc { + Arc::new(self.metadata().clone()) } } @@ -117,24 +98,9 @@ impl ArrayValidity for StructArray<'_> { fn is_valid(&self, _index: usize) -> bool { todo!() } -} -impl ToArrayData for StructArray<'_> { - fn to_array_data(&self) -> ArrayData { - ArrayData::try_new( - &StructEncoding, - self.dtype().clone(), - Arc::new(StructMetadata { - length: self.length, - }), - vec![].into(), - (0..self.nfields()) - .map(|idx| self.child(idx).unwrap()) - .map(|a| Some(a.to_array_data())) - .collect::>() - .into(), - ) - .unwrap() + fn logical_validity(&self) -> LogicalValidity { + todo!() } } @@ -142,10 +108,11 @@ impl AcceptArrayVisitor for StructArray<'_> { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { for (idx, name) in self.names().iter().enumerate() { let child = self.child(idx).unwrap(); - visitor.visit_column(name, &child)?; + visitor.visit_child(name, &child)?; } Ok(()) } } -impl ArrayStatistics for StructArray<'_> {} +impl ArrayStatisticsCompute for StructArray<'_> {} +impl ArrayCompute for StructArray<'_> {} diff --git a/vortex-array2/src/arrow/array.rs b/vortex-array2/src/arrow/array.rs new file mode 100644 index 0000000000..dc3d43d349 --- /dev/null +++ b/vortex-array2/src/arrow/array.rs @@ -0,0 +1,283 @@ +use std::sync::Arc; + +use arrow_array::array::{ + Array as ArrowArray, ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray, + GenericByteArray, NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + StructArray as ArrowStructArray, +}; +use arrow_array::array::{ArrowPrimitiveType, OffsetSizeTrait}; +use arrow_array::cast::{as_null_array, AsArray}; +use arrow_array::types::{ + ByteArrayType, ByteViewType, Date32Type, Date64Type, DurationMicrosecondType, + DurationMillisecondType, DurationNanosecondType, DurationSecondType, Time32MillisecondType, + Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; +use arrow_array::types::{ + Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_array::{BinaryViewArray, GenericByteViewArray, StringViewArray}; +use arrow_buffer::buffer::{NullBuffer, OffsetBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer}; +use arrow_schema::{DataType, TimeUnit}; +use vortex::ptype::NativePType; +use vortex_schema::DType; + +use crate::array::bool::BoolArray; +use crate::array::primitive::PrimitiveArray; +use crate::array::r#struct::StructArray; +use crate::stats::{Stat, Statistics}; +use crate::validity::Validity; +use crate::{ArrayData, IntoArrayData}; + +pub trait FromArrowArray { + fn from_arrow(array: A, nullable: bool) -> Self; +} + +impl IntoArrayData for Buffer { + fn into_array_data(self) -> ArrayData { + let length = self.len(); + PrimitiveArray::try_new( + ScalarBuffer::::new(self, 0, length), + Validity::NonNullable, + ) + .unwrap() + .into_array_data() + } +} + +impl IntoArrayData for NullBuffer { + fn into_array_data(self) -> ArrayData { + BoolArray::try_new(self.into_inner(), Validity::NonNullable) + .unwrap() + .into_array_data() + } +} + +impl IntoArrayData for ScalarBuffer { + fn into_array_data(self) -> ArrayData { + let length = self.len(); + PrimitiveArray::try_new( + ScalarBuffer::::new(self.into_inner(), 0, length), + Validity::NonNullable, + ) + .unwrap() + .into_array_data() + } +} + +impl IntoArrayData for OffsetBuffer { + fn into_array_data(self) -> ArrayData { + let length = self.len(); + let array = PrimitiveArray::try_new( + ScalarBuffer::::new(self.into_inner().into_inner(), 0, length), + Validity::NonNullable, + ) + .unwrap() + .into_array_data(); + array.set(Stat::IsSorted, true.into()); + array.set(Stat::IsStrictSorted, true.into()); + array + } +} + +impl FromArrowArray<&ArrowPrimitiveArray> for ArrayData +where + ::Native: NativePType, +{ + fn from_arrow(value: &ArrowPrimitiveArray, nullable: bool) -> Self { + let arr = PrimitiveArray::try_new(value.values().clone(), nulls(value.nulls(), nullable)) + .unwrap() + .into_array_data(); + + if T::DATA_TYPE.is_numeric() { + return arr; + } + + match T::DATA_TYPE { + DataType::Timestamp(_time_unit, _tz) => { + todo!("Port from vortex1") + } + DataType::Date32 => todo!(), + DataType::Date64 => todo!(), + DataType::Time32(_) => todo!(), + DataType::Time64(_) => todo!(), + DataType::Duration(_) => todo!(), + DataType::Interval(_) => todo!(), + _ => panic!("Invalid data type for PrimitiveArray"), + } + } +} + +impl FromArrowArray<&GenericByteArray> for ArrayData { + fn from_arrow(_value: &GenericByteArray, nullable: bool) -> Self { + let _dtype = match T::DATA_TYPE { + DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()), + DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()), + _ => panic!("Invalid data type for ByteArray"), + }; + todo!("PORT") + } +} + +impl FromArrowArray<&GenericByteViewArray> for ArrayData { + fn from_arrow(_value: &GenericByteViewArray, nullable: bool) -> Self { + let _dtype = match T::DATA_TYPE { + DataType::BinaryView => DType::Binary(nullable.into()), + DataType::Utf8View => DType::Utf8(nullable.into()), + _ => panic!("Invalid data type for ByteViewArray"), + }; + todo!("PORT") + } +} + +impl FromArrowArray<&ArrowBooleanArray> for ArrayData { + fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self { + BoolArray::try_new(value.values().clone(), nulls(value.nulls(), nullable)) + .unwrap() + .into_array_data() + } +} + +impl FromArrowArray<&ArrowStructArray> for ArrayData { + fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self { + // TODO(ngates): how should we deal with Arrow "logical nulls"? + assert!(!nullable); + StructArray::try_new( + value + .column_names() + .iter() + .map(|s| s.to_string()) + .map(Arc::new) + .collect(), + value + .columns() + .iter() + .zip(value.fields()) + .map(|(c, field)| ArrayData::from_arrow(c.clone(), field.is_nullable())) + .collect(), + value.len(), + ) + .unwrap() + .into_array_data() + } +} + +impl FromArrowArray<&ArrowNullArray> for ArrayData { + fn from_arrow(_value: &ArrowNullArray, nullable: bool) -> Self { + assert!(nullable); + todo!("PORT") + // ConstantArray::new(NullScalar::new(), value.len()).to_array_data() + } +} + +fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { + if nullable { + nulls + .map(|nulls| { + if nulls.null_count() == nulls.len() { + Validity::AllInvalid + } else { + Validity::from(nulls.inner().clone()) + } + }) + .unwrap_or_else(|| Validity::AllValid) + } else { + assert!(nulls.is_none()); + Validity::NonNullable + } +} + +impl FromArrowArray for ArrayData { + fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self { + match array.data_type() { + DataType::Boolean => ArrayData::from_arrow(array.as_boolean(), nullable), + DataType::UInt8 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::UInt16 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::UInt32 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::UInt64 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Int8 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Int16 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Int32 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Int64 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Float16 => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + DataType::Float32 => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + DataType::Float64 => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + DataType::Utf8 => ArrayData::from_arrow(array.as_string::(), nullable), + DataType::LargeUtf8 => ArrayData::from_arrow(array.as_string::(), nullable), + DataType::Binary => ArrayData::from_arrow(array.as_binary::(), nullable), + DataType::LargeBinary => ArrayData::from_arrow(array.as_binary::(), nullable), + DataType::BinaryView => ArrayData::from_arrow( + array.as_any().downcast_ref::().unwrap(), + nullable, + ), + DataType::Utf8View => ArrayData::from_arrow( + array.as_any().downcast_ref::().unwrap(), + nullable, + ), + DataType::Struct(_) => ArrayData::from_arrow(array.as_struct(), nullable), + DataType::Null => ArrayData::from_arrow(as_null_array(&array), nullable), + DataType::Timestamp(u, _) => match u { + TimeUnit::Second => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Millisecond => ArrayData::from_arrow( + array.as_primitive::(), + nullable, + ), + TimeUnit::Microsecond => ArrayData::from_arrow( + array.as_primitive::(), + nullable, + ), + TimeUnit::Nanosecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + }, + DataType::Date32 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Date64 => ArrayData::from_arrow(array.as_primitive::(), nullable), + DataType::Time32(u) => match u { + TimeUnit::Second => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Millisecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + _ => unreachable!(), + }, + DataType::Time64(u) => match u { + TimeUnit::Microsecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Nanosecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + _ => unreachable!(), + }, + DataType::Duration(u) => match u { + TimeUnit::Second => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Millisecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Microsecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + TimeUnit::Nanosecond => { + ArrayData::from_arrow(array.as_primitive::(), nullable) + } + }, + _ => panic!( + "TODO(robert): Missing array encoding for dtype {}", + array.data_type().clone() + ), + } + } +} diff --git a/vortex-array2/src/arrow/mod.rs b/vortex-array2/src/arrow/mod.rs new file mode 100644 index 0000000000..20e0143414 --- /dev/null +++ b/vortex-array2/src/arrow/mod.rs @@ -0,0 +1,2 @@ +mod array; +mod recordbatch; diff --git a/vortex-array2/src/arrow/recordbatch.rs b/vortex-array2/src/arrow/recordbatch.rs new file mode 100644 index 0000000000..a00b27a110 --- /dev/null +++ b/vortex-array2/src/arrow/recordbatch.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use arrow_array::RecordBatch; + +use crate::array::r#struct::StructArray; +use crate::arrow::array::FromArrowArray; +use crate::{ArrayData, IntoArrayData, ToArrayData}; + +impl ToArrayData for RecordBatch { + fn to_array_data(&self) -> ArrayData { + StructArray::try_new( + self.schema() + .fields() + .iter() + .map(|f| f.name()) + .map(|s| s.to_owned()) + .map(Arc::new) + .collect(), + self.columns() + .iter() + .zip(self.schema().fields()) + .map(|(array, field)| ArrayData::from_arrow(array.clone(), field.is_nullable())) + .collect(), + self.num_rows(), + ) + .unwrap() + .into_array_data() + } +} diff --git a/vortex-array2/src/batch.rs b/vortex-array2/src/batch.rs deleted file mode 100644 index 6f05b27e38..0000000000 --- a/vortex-array2/src/batch.rs +++ /dev/null @@ -1,90 +0,0 @@ -use vortex_error::VortexResult; - -use crate::visitor::ArrayVisitor; -use crate::{Array, ArrayData, ArrayTrait, ToArrayData, WithArray}; - -/// TODO(ngates): do we want this to be references? -#[derive(Debug)] -pub struct ColumnBatch { - columns: Vec, - length: usize, -} - -impl ColumnBatch { - pub fn from_array(array: &dyn ArrayTrait) -> Self { - // We want to walk the struct array extracting all nested columns - let mut batch = ColumnBatch { - columns: vec![], - length: array.len(), - }; - array.accept(&mut batch).unwrap(); - batch - } - - pub fn columns(&self) -> &[ArrayData] { - self.columns.as_slice() - } - - pub fn ncolumns(&self) -> usize { - self.columns.len() - } -} - -impl From<&Array<'_>> for ColumnBatch { - fn from(value: &Array) -> Self { - value.with_array(|a| ColumnBatch::from_array(a)) - } -} - -/// Collect all the nested column leaves from an array. -impl ArrayVisitor for ColumnBatch { - fn visit_column(&mut self, _name: &str, array: &Array) -> VortexResult<()> { - let ncols = self.columns.len(); - array.with_array(|a| a.accept(self))?; - if ncols == self.columns.len() { - assert_eq!(self.length, array.len()); - self.columns.push(array.to_array_data()) - } - Ok(()) - } - - fn visit_child(&mut self, _name: &str, array: &Array) -> VortexResult<()> { - // Stop traversing when we hit the first non-column array. - assert_eq!(self.length, array.len()); - self.columns.push(array.to_array_data()); - Ok(()) - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use crate::array::primitive::PrimitiveData; - use crate::array::r#struct::StructData; - use crate::batch::ColumnBatch; - use crate::IntoArray; - - #[test] - fn batch_visitor() { - let col = PrimitiveData::from_vec(vec![0, 1, 2]).into_data(); - let nested_struct = StructData::try_new( - vec![Arc::new("x".into()), Arc::new("y".into())], - vec![col.clone(), col.clone()], - 3, - ) - .unwrap() - .into_data(); - - let arr = StructData::try_new( - vec![Arc::new("a".into()), Arc::new("b".into())], - vec![col.clone(), nested_struct], - 3, - ) - .unwrap() - .into_array(); - - let batch = ColumnBatch::from(&arr); - assert_eq!(batch.columns().len(), 3); - } -} diff --git a/vortex-array2/src/buffer.rs b/vortex-array2/src/buffer.rs new file mode 100644 index 0000000000..8be566938b --- /dev/null +++ b/vortex-array2/src/buffer.rs @@ -0,0 +1,89 @@ +use arrow_buffer::Buffer as ArrowBuffer; +use vortex::ptype::NativePType; + +use crate::ToStatic; + +#[derive(Debug, Clone)] +pub enum Buffer<'a> { + Owned(ArrowBuffer), + View(&'a [u8]), +} + +pub type OwnedBuffer = Buffer<'static>; + +impl Buffer<'_> { + pub fn len(&self) -> usize { + match self { + Buffer::Owned(buffer) => buffer.len(), + Buffer::View(slice) => slice.len(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn as_slice(&self) -> &[u8] { + match self { + Buffer::Owned(buffer) => buffer.as_slice(), + Buffer::View(slice) => slice, + } + } + + pub fn typed_data(&self) -> &[T] { + match self { + Buffer::Owned(buffer) => buffer.typed_data::(), + Buffer::View(slice) => { + // From ArrowBuffer::typed_data + let (prefix, offsets, suffix) = unsafe { slice.align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + offsets + } + } + } +} + +impl<'a> Buffer<'a> { + pub fn into_vec(self) -> Result, Buffer<'a>> { + match self { + Buffer::Owned(buffer) => buffer.into_vec().map_err(Buffer::Owned), + Buffer::View(_) => Err(self), + } + } +} + +impl ToStatic for Buffer<'_> { + type Static = Buffer<'static>; + + fn to_static(&self) -> Self::Static { + match self { + Buffer::Owned(d) => Buffer::Owned(d.clone()), + Buffer::View(_) => Buffer::Owned(self.into()), + } + } +} + +impl From for OwnedBuffer { + fn from(value: ArrowBuffer) -> Self { + Buffer::Owned(value) + } +} + +impl From> for ArrowBuffer { + fn from(value: Buffer<'_>) -> Self { + match value { + Buffer::Owned(b) => b, + Buffer::View(_) => ArrowBuffer::from(&value), + } + } +} + +impl From<&Buffer<'_>> for ArrowBuffer { + fn from(value: &Buffer<'_>) -> Self { + match value { + Buffer::Owned(b) => b.clone(), + // FIXME(ngates): this conversion loses alignment information since go via u8. + Buffer::View(v) => ArrowBuffer::from_vec(v.to_vec()), + } + } +} diff --git a/vortex-array2/src/compute.rs b/vortex-array2/src/compute.rs deleted file mode 100644 index bc8e538fb4..0000000000 --- a/vortex-array2/src/compute.rs +++ /dev/null @@ -1,55 +0,0 @@ -use vortex::scalar::Scalar; -use vortex_error::{vortex_err, VortexResult}; - -use crate::array::bool::BoolData; -use crate::array::primitive::PrimitiveData; -use crate::{Array, WithArray}; - -pub trait ArrayCompute { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - None - } - fn flatten(&self) -> Option<&dyn FlattenFn> { - None - } -} - -pub trait ScalarAtFn { - fn scalar_at(&self, index: usize) -> VortexResult; -} - -pub fn scalar_at(array: &Array, index: usize) -> VortexResult { - array.with_array(|a| { - a.scalar_at() - .ok_or_else(|| vortex_err!("Not implemented: scalar_at"))? - .scalar_at(index) - }) -} - -pub trait FlattenFn { - fn flatten(&self) -> VortexResult; -} - -pub enum FlattenedArray { - Bool(BoolData), - Primitive(PrimitiveData), -} - -pub fn flatten(array: &Array) -> VortexResult { - array.with_array(|a| { - a.flatten() - .ok_or_else(|| vortex_err!("Not implemented: flatten"))? - .flatten() - }) -} - -pub fn flatten_primitive(array: &Array) -> VortexResult { - if let FlattenedArray::Primitive(p) = flatten(array)? { - Ok(p) - } else { - Err(vortex_err!( - "Cannot flatten array {:?} into primitive", - array - )) - } -} diff --git a/vortex-array2/src/compute/as_arrow.rs b/vortex-array2/src/compute/as_arrow.rs new file mode 100644 index 0000000000..bab448221c --- /dev/null +++ b/vortex-array2/src/compute/as_arrow.rs @@ -0,0 +1,37 @@ +use arrow_array::ArrayRef as ArrowArrayRef; +use vortex_error::{vortex_err, VortexResult}; + +use crate::{Array, IntoArray}; + +pub trait AsArrowArray { + fn as_arrow(&self) -> VortexResult; +} + +pub fn as_arrow(array: &Array) -> VortexResult { + array.with_dyn(|a| { + // If as_arrow is implemented, then invoke that. + if let Some(a) = a.as_arrow() { + return a.as_arrow(); + } + + // Otherwise, flatten and try again. + let array = array.clone().flatten()?.into_array(); + a.as_arrow().map(|a| a.as_arrow()).unwrap_or_else(|| { + Err(vortex_err!(NotImplemented: "as_arrow", array.encoding().id().name())) + }) + }) +} + +// TODO(ngates): return a RecordBatchReader instead? +pub fn as_arrow_chunks(_array: &Array) -> VortexResult> { + todo!("PORT") + // if let Some(chunked) = array.as_data::() { + // chunked + // .chunks() + // .iter() + // .map(|a| as_arrow(a.as_ref())) + // .try_collect() + // } else { + // as_arrow(array).map(|a| vec![a]) + // } +} diff --git a/vortex-array2/src/compute/as_contiguous.rs b/vortex-array2/src/compute/as_contiguous.rs new file mode 100644 index 0000000000..ae6a9f006b --- /dev/null +++ b/vortex-array2/src/compute/as_contiguous.rs @@ -0,0 +1,36 @@ +use itertools::Itertools; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::Array; + +pub trait AsContiguousFn { + fn as_contiguous(&self, arrays: &[Array]) -> VortexResult>; +} + +pub fn as_contiguous(arrays: &[Array]) -> VortexResult> { + if arrays.is_empty() { + vortex_bail!(ComputeError: "No arrays to concatenate"); + } + if !arrays.iter().map(|chunk| chunk.encoding().id()).all_equal() { + vortex_bail!(ComputeError: + "Chunks have differing encodings", + ); + } + if !arrays.iter().map(|chunk| chunk.dtype()).all_equal() { + vortex_bail!(ComputeError: + "Chunks have differing dtypes", + ); + } + + let first = arrays.first().unwrap(); + first.with_dyn(|a| { + a.as_contiguous() + .map(|f| f.as_contiguous(arrays)) + .unwrap_or_else(|| { + Err(vortex_err!( + NotImplemented: "as_contiguous", + first.encoding().id().name() + )) + }) + }) +} diff --git a/vortex-array2/src/compute/cast.rs b/vortex-array2/src/compute/cast.rs new file mode 100644 index 0000000000..15fa2694e6 --- /dev/null +++ b/vortex-array2/src/compute/cast.rs @@ -0,0 +1,21 @@ +use vortex_error::{vortex_err, VortexResult}; +use vortex_schema::DType; + +use crate::{Array, OwnedArray, ToStatic}; + +pub trait CastFn { + fn cast(&self, dtype: &DType) -> VortexResult; +} + +pub fn cast(array: &Array, dtype: &DType) -> VortexResult { + if array.dtype() == dtype { + return Ok(array.to_static()); + } + + // TODO(ngates): check for null_count if dtype is non-nullable + array.with_dyn(|a| { + a.cast().map(|f| f.cast(dtype)).unwrap_or_else(|| { + Err(vortex_err!(NotImplemented: "cast", array.encoding().id().name())) + }) + }) +} diff --git a/vortex-array2/src/compute/fill.rs b/vortex-array2/src/compute/fill.rs new file mode 100644 index 0000000000..e984fc2231 --- /dev/null +++ b/vortex-array2/src/compute/fill.rs @@ -0,0 +1,24 @@ +use vortex_error::{vortex_err, VortexResult}; + +use crate::{Array, OwnedArray, ToStatic}; + +pub trait FillForwardFn { + fn fill_forward(&self) -> VortexResult; +} + +pub fn fill_forward(array: &Array) -> VortexResult { + if !array.dtype().is_nullable() { + return Ok(array.to_static()); + } + + array.with_dyn(|a| { + a.fill_forward() + .map(|t| t.fill_forward()) + .unwrap_or_else(|| { + Err(vortex_err!( + NotImplemented: "fill_forward", + array.encoding().id().name() + )) + }) + }) +} diff --git a/vortex-array2/src/compute/mod.rs b/vortex-array2/src/compute/mod.rs new file mode 100644 index 0000000000..c46dda21d3 --- /dev/null +++ b/vortex-array2/src/compute/mod.rs @@ -0,0 +1,51 @@ +use as_arrow::AsArrowArray; +use as_contiguous::AsContiguousFn; +use cast::CastFn; +use fill::FillForwardFn; +use patch::PatchFn; +use scalar_at::ScalarAtFn; +use search_sorted::SearchSortedFn; +use take::TakeFn; + +pub mod as_arrow; +pub mod as_contiguous; +pub mod cast; +pub mod fill; +pub mod patch; +pub mod scalar_at; +pub mod search_sorted; +pub mod take; + +pub trait ArrayCompute { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + None + } + + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + None + } + + fn cast(&self) -> Option<&dyn CastFn> { + None + } + + fn fill_forward(&self) -> Option<&dyn FillForwardFn> { + None + } + + fn patch(&self) -> Option<&dyn PatchFn> { + None + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + None + } + + fn search_sorted(&self) -> Option<&dyn SearchSortedFn> { + None + } + + fn take(&self) -> Option<&dyn TakeFn> { + None + } +} diff --git a/vortex-array2/src/compute/patch.rs b/vortex-array2/src/compute/patch.rs new file mode 100644 index 0000000000..cd8a14d318 --- /dev/null +++ b/vortex-array2/src/compute/patch.rs @@ -0,0 +1,28 @@ +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::{Array, OwnedArray}; + +pub trait PatchFn { + fn patch(&self, patch: &Array) -> VortexResult; +} + +/// Returns a new array where the non-null values from the patch array are replaced in the original. +pub fn patch(array: &Array, patch: &Array) -> VortexResult { + if array.len() != patch.len() { + vortex_bail!( + "patch array {} must have the same length as the original array {}", + patch, + array + ); + } + + if array.dtype().as_nullable() != patch.dtype().as_nullable() { + vortex_bail!(MismatchedTypes: array.dtype(), patch.dtype()); + } + + array.with_dyn(|a| { + a.patch().map(|t| t.patch(patch)).unwrap_or_else(|| { + Err(vortex_err!(NotImplemented: "take", array.encoding().id().name())) + }) + }) +} diff --git a/vortex-array2/src/compute/scalar_at.rs b/vortex-array2/src/compute/scalar_at.rs new file mode 100644 index 0000000000..34d4f0df65 --- /dev/null +++ b/vortex-array2/src/compute/scalar_at.rs @@ -0,0 +1,22 @@ +use vortex::scalar::Scalar; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::Array; + +pub trait ScalarAtFn { + fn scalar_at(&self, index: usize) -> VortexResult; +} + +pub fn scalar_at(array: &Array, index: usize) -> VortexResult { + if index >= array.len() { + vortex_bail!(OutOfBounds: index, 0, array.len()); + } + + array.with_dyn(|a| { + a.scalar_at() + .map(|t| t.scalar_at(index)) + .unwrap_or_else(|| { + Err(vortex_err!(NotImplemented: "scalar_at", array.encoding().id().name())) + }) + }) +} diff --git a/vortex-array2/src/compute/search_sorted.rs b/vortex-array2/src/compute/search_sorted.rs new file mode 100644 index 0000000000..4bd44e3adc --- /dev/null +++ b/vortex-array2/src/compute/search_sorted.rs @@ -0,0 +1,144 @@ +use std::cmp::Ordering; +use std::cmp::Ordering::{Equal, Greater, Less}; + +use vortex::scalar::Scalar; +use vortex_error::{vortex_err, VortexResult}; + +use crate::compute::scalar_at::scalar_at; +use crate::Array; + +#[derive(Debug, Copy, Clone)] +pub enum SearchSortedSide { + Left, + Right, +} + +pub trait SearchSortedFn { + fn search_sorted(&self, value: &Scalar, side: SearchSortedSide) -> VortexResult; +} + +pub fn search_sorted>( + array: &Array, + target: T, + side: SearchSortedSide, +) -> VortexResult { + let scalar = target.into().cast(array.dtype())?; + array.with_dyn(|a| { + if let Some(search_sorted) = a.search_sorted() { + return search_sorted.search_sorted(&scalar, side); + } + + if a.scalar_at().is_some() { + return Ok(SearchSorted::search_sorted(&array, &scalar, side)); + } + + Err(vortex_err!( + NotImplemented: "search_sorted", + array.encoding().id().name() + )) + }) +} + +pub trait IndexOrd { + fn index_cmp(&self, idx: usize, elem: &V) -> Option; + + fn index_lt(&self, idx: usize, elem: &V) -> bool { + matches!(self.index_cmp(idx, elem), Some(Less)) + } + + fn index_le(&self, idx: usize, elem: &V) -> bool { + matches!(self.index_cmp(idx, elem), Some(Less | Equal)) + } + + fn index_gt(&self, idx: usize, elem: &V) -> bool { + matches!(self.index_cmp(idx, elem), Some(Greater)) + } + + fn index_ge(&self, idx: usize, elem: &V) -> bool { + matches!(self.index_cmp(idx, elem), Some(Greater | Equal)) + } +} + +#[allow(clippy::len_without_is_empty)] +pub trait Len { + fn len(&self) -> usize; +} + +pub trait SearchSorted { + fn search_sorted(&self, value: &T, side: SearchSortedSide) -> usize + where + Self: IndexOrd, + { + match side { + SearchSortedSide::Left => self.search_sorted_by(|idx| { + if self.index_lt(idx, value) { + Less + } else { + Greater + } + }), + SearchSortedSide::Right => self.search_sorted_by(|idx| { + if self.index_le(idx, value) { + Less + } else { + Greater + } + }), + } + } + + fn search_sorted_by Ordering>(&self, f: F) -> usize; +} + +impl + Len + ?Sized, T> SearchSorted for S { + // Code adapted from Rust standard library slice::binary_search_by + fn search_sorted_by Ordering>(&self, mut f: F) -> usize { + // INVARIANTS: + // - 0 <= left <= left + size = right <= self.len() + // - f returns Less for everything in self[..left] + // - f returns Greater for everything in self[right..] + let mut size = self.len(); + let mut left = 0; + let mut right = size; + while left < right { + let mid = left + size / 2; + let cmp = f(mid); + + left = if cmp == Less { mid + 1 } else { left }; + right = if cmp == Greater { mid } else { right }; + if cmp == Equal { + return mid; + } + + size = right - left; + } + + left + } +} + +impl IndexOrd for &Array<'_> { + fn index_cmp(&self, idx: usize, elem: &Scalar) -> Option { + let scalar_a = scalar_at(self, idx).ok()?; + scalar_a.partial_cmp(elem) + } +} + +impl IndexOrd for [T] { + fn index_cmp(&self, idx: usize, elem: &T) -> Option { + // SAFETY: Used in search_sorted_by same as the standard library. The search_sorted ensures idx is in bounds + unsafe { self.get_unchecked(idx) }.partial_cmp(elem) + } +} + +impl Len for &Array<'_> { + fn len(&self) -> usize { + Array::len(self) + } +} + +impl Len for [T] { + fn len(&self) -> usize { + self.len() + } +} diff --git a/vortex-array2/src/compute/take.rs b/vortex-array2/src/compute/take.rs new file mode 100644 index 0000000000..adc519870c --- /dev/null +++ b/vortex-array2/src/compute/take.rs @@ -0,0 +1,24 @@ +use log::info; +use vortex_error::{vortex_err, VortexResult}; + +use crate::{Array, IntoArray, OwnedArray}; + +pub trait TakeFn { + fn take(&self, indices: &Array) -> VortexResult; +} + +pub fn take(array: &Array, indices: &Array) -> VortexResult { + array.with_dyn(|a| { + if let Some(take) = a.take() { + return take.take(indices); + } + + // Otherwise, flatten and try again. + info!("TakeFn not implemented for {}, flattening", array); + array.clone().flatten()?.into_array().with_dyn(|a| { + a.take().map(|t| t.take(indices)).unwrap_or_else(|| { + Err(vortex_err!(NotImplemented: "take", array.encoding().id().name())) + }) + }) + }) +} diff --git a/vortex-array2/src/data.rs b/vortex-array2/src/data.rs index 73a43d8bf3..677fdbae69 100644 --- a/vortex-array2/src/data.rs +++ b/vortex-array2/src/data.rs @@ -1,26 +1,24 @@ use std::collections::HashMap; -use std::marker::PhantomData; use std::sync::{Arc, RwLock}; -use arrow_buffer::Buffer; use vortex::scalar::Scalar; -use vortex::stats::Stat; -use vortex_error::{vortex_bail, VortexError, VortexResult}; +use vortex_error::VortexResult; use vortex_schema::DType; +use crate::buffer::{Buffer, OwnedBuffer}; use crate::encoding::EncodingRef; +use crate::stats::Stat; use crate::stats::Statistics; -use crate::{Array, ArrayDef, ArrayMetadata, ArrayParts, IntoArray, ToArray}; +use crate::{Array, ArrayMetadata, ArrayParts, IntoArray, ToArray}; -#[allow(dead_code)] #[derive(Clone, Debug)] pub struct ArrayData { encoding: EncodingRef, dtype: DType, metadata: Arc, - buffers: Arc<[Buffer]>, // Should this just be an Option, not an Arc? - children: Arc<[Option]>, - stats_set: Arc>>, + buffers: Arc<[OwnedBuffer]>, // Should this just be an Option, not an Arc? How many multi-buffer arrays are there? + children: Arc<[ArrayData]>, + stats_map: Arc>>, } impl ArrayData { @@ -28,8 +26,9 @@ impl ArrayData { encoding: EncodingRef, dtype: DType, metadata: Arc, - buffers: Arc<[Buffer]>, - children: Arc<[Option]>, + buffers: Arc<[OwnedBuffer]>, + children: Arc<[ArrayData]>, + statistics: HashMap, ) -> VortexResult { let data = Self { encoding, @@ -37,12 +36,13 @@ impl ArrayData { metadata, buffers, children, - stats_set: Arc::new(RwLock::new(HashMap::new())), + stats_map: Arc::new(RwLock::new(statistics)), }; // Validate here that the metadata correctly parses, so that an encoding can infallibly - // implement Encoding::with_data(). - // encoding.with_data_mut(&data, &mut |_| Ok(()))?; + let array = data.to_array(); + // FIXME(ngates): run some validation function + encoding.with_dyn(&array, &mut |_| Ok(()))?; Ok(data) } @@ -63,11 +63,17 @@ impl ArrayData { &self.buffers } - pub fn child(&self, index: usize) -> Option<&ArrayData> { - self.children.get(index).and_then(|c| c.as_ref()) + pub fn child(&self, index: usize, dtype: &DType) -> Option<&ArrayData> { + match self.children.get(index) { + None => None, + Some(child) => { + assert_eq!(child.dtype(), dtype); + Some(child) + } + } } - pub fn children(&self) -> &[Option] { + pub fn children(&self) -> &[ArrayData] { &self.children } @@ -106,7 +112,7 @@ impl<'a> Iterator for ArrayDataIterator<'a> { fn next(&mut self) -> Option { let next = self.stack.pop()?; - for child in next.children.as_ref().iter().flatten() { + for child in next.children.as_ref().iter() { self.stack.push(child); } Some(next) @@ -125,82 +131,6 @@ impl IntoArray<'static> for ArrayData { } } -#[derive(Debug)] -pub struct TypedArrayData { - data: ArrayData, - phantom: PhantomData, -} - -impl TypedArrayData { - pub fn new_unchecked( - dtype: DType, - metadata: Arc, - buffers: Arc<[Buffer]>, - children: Arc<[Option]>, - ) -> Self { - Self::from_data_unchecked( - ArrayData::try_new(D::ENCODING, dtype, metadata, buffers, children).unwrap(), - ) - } - - pub fn from_data_unchecked(data: ArrayData) -> Self { - Self { - data, - phantom: PhantomData, - } - } - - pub fn data(&self) -> &ArrayData { - &self.data - } - - pub fn into_data(self) -> ArrayData { - self.data - } - - pub fn metadata(&self) -> &D::Metadata { - self.data - .metadata() - .as_any() - .downcast_ref::() - .unwrap() - } - - pub fn into_metadata(self) -> Arc { - self.data - .metadata - .as_any_arc() - .downcast::() - .unwrap() - } -} - -impl ToArray for TypedArrayData { - fn to_array(&self) -> Array { - Array::DataRef(&self.data) - } -} - -impl IntoArray<'static> for TypedArrayData { - fn into_array(self) -> Array<'static> { - Array::Data(self.data) - } -} - -impl TryFrom for TypedArrayData { - type Error = VortexError; - - fn try_from(data: ArrayData) -> Result { - if data.encoding().id() != D::ID { - vortex_bail!("Invalid encoding for array") - } - Ok(Self { - data, - phantom: PhantomData, - }) - } -} - impl ArrayParts for ArrayData { fn dtype(&self) -> &DType { &self.dtype @@ -210,34 +140,43 @@ impl ArrayParts for ArrayData { self.buffers().get(idx) } - fn child(&self, idx: usize, _dtype: &DType) -> Option { - // TODO(ngates): validate the DType - self.child(idx).map(move |a| a.to_array()) + fn child(&self, idx: usize, dtype: &DType) -> Option { + self.child(idx, dtype).map(move |a| a.to_array()) } fn nchildren(&self) -> usize { self.children.len() } - fn statistics(&self) -> &dyn Statistics { + fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a) { self } } impl Statistics for ArrayData { - fn compute(&self, stat: Stat) -> VortexResult> { - let mut locked = self.stats_set.write().unwrap(); + fn compute(&self, stat: Stat) -> Option { + let mut locked = self.stats_map.write().unwrap(); let stats = self - .encoding() - .with_data(self, |a| a.compute_statistics(stat))?; + .to_array() + .with_dyn(|a| a.compute_statistics(stat)) + .ok()?; for (k, v) in &stats { locked.insert(*k, v.clone()); } - Ok(stats.get(&stat).cloned()) + stats.get(&stat).cloned() } fn get(&self, stat: Stat) -> Option { - let locked = self.stats_set.read().unwrap(); + let locked = self.stats_map.read().unwrap(); locked.get(&stat).cloned() } + + fn set(&self, stat: Stat, value: Scalar) { + let mut locked = self.stats_map.write().unwrap(); + locked.insert(stat, value); + } + + fn to_map(&self) -> HashMap { + self.stats_map.read().unwrap().clone() + } } diff --git a/vortex-array2/src/encoding.rs b/vortex-array2/src/encoding.rs index e253be95c5..b217f08321 100644 --- a/vortex-array2/src/encoding.rs +++ b/vortex-array2/src/encoding.rs @@ -1,11 +1,13 @@ +use std::any::Any; use std::fmt::{Debug, Formatter}; use linkme::distributed_slice; pub use vortex::encoding::EncodingId; use vortex_error::VortexResult; -use crate::ArrayView; -use crate::{ArrayData, ArrayTrait}; +use crate::flatten::{ArrayFlatten, Flattened}; +use crate::ArrayDef; +use crate::{Array, ArrayTrait}; #[distributed_slice] pub static VORTEX_ENCODINGS: [EncodingRef] = [..]; @@ -19,66 +21,52 @@ pub fn find_encoding(id: &str) -> Option { .cloned() } -/// Dynamic trait representing an array type. -#[allow(dead_code)] +/// Object-safe encoding trait for an array. pub trait ArrayEncoding: 'static + Sync + Send { + fn as_any(&self) -> &dyn Any; + fn id(&self) -> EncodingId; - fn with_view_mut<'v>( - &self, - view: &'v ArrayView<'v>, - f: &mut dyn FnMut(&dyn ArrayTrait) -> VortexResult<()>, - ) -> VortexResult<()>; + /// Flatten the given array. + fn flatten<'a>(&self, array: Array<'a>) -> VortexResult>; - fn with_data_mut( + /// Unwrap the provided array into an implementation of ArrayTrait + fn with_dyn<'a>( &self, - data: &ArrayData, - f: &mut dyn FnMut(&dyn ArrayTrait) -> VortexResult<()>, + array: &'a Array<'a>, + f: &mut dyn for<'b> FnMut(&'b (dyn ArrayTrait + 'a)) -> VortexResult<()>, ) -> VortexResult<()>; } +/// Non-object-safe extensions to the ArrayEncoding trait. +pub trait ArrayEncodingExt { + type D: ArrayDef; + + fn flatten<'a>(array: Array<'a>) -> VortexResult> + where + ::D: 'a, + { + let typed = <::Array<'a> as TryFrom>::try_from(array)?; + ArrayFlatten::flatten(typed) + } + + fn with_dyn<'a, R, F>(array: &'a Array<'a>, mut f: F) -> R + where + F: for<'b> FnMut(&'b (dyn ArrayTrait + 'a)) -> R, + ::D: 'a, + { + let typed = + <::Array<'a> as TryFrom>::try_from(array.clone()).unwrap(); + f(&typed) + } +} + impl Debug for dyn ArrayEncoding + '_ { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Debug::fmt(&self.id(), f) } } -impl dyn ArrayEncoding { - pub(crate) fn with_view<'v, R, F: FnMut(&dyn ArrayTrait) -> R>( - &self, - view: &'v ArrayView<'v>, - mut f: F, - ) -> R { - let mut result = None; - - // Unwrap the result. This is safe since we validate that encoding against the - // ArrayData during ArrayData::try_new. - self.with_view_mut(view, &mut |array| { - result = Some(f(array)); - Ok(()) - }) - .unwrap(); - - // Now we unwrap the optional, which we know to be populated in the closure. - result.unwrap() - } - - pub(crate) fn with_data R>( - &self, - data: &ArrayData, - mut f: F, - ) -> R { - let mut result = None; - - // Unwrap the result. This is safe since we validate that encoding against the - // ArrayData during ArrayData::try_new. - self.with_data_mut(data, &mut |array| { - result = Some(f(array)); - Ok(()) - }) - .unwrap(); - - // Now we unwrap the optional, which we know to be populated in the closure. - result.unwrap() - } +pub trait ArrayEncodingRef { + fn encoding(&self) -> EncodingRef; } diff --git a/vortex-array2/src/flatten.rs b/vortex-array2/src/flatten.rs new file mode 100644 index 0000000000..1ab7105499 --- /dev/null +++ b/vortex-array2/src/flatten.rs @@ -0,0 +1,44 @@ +use vortex_error::VortexResult; + +use crate::array::bool::BoolArray; +use crate::array::primitive::PrimitiveArray; +use crate::array::r#struct::StructArray; +use crate::encoding::ArrayEncoding; +use crate::{Array, IntoArray}; + +/// The set of encodings that can be converted to Arrow with zero-copy. +pub enum Flattened<'a> { + Bool(BoolArray<'a>), + Primitive(PrimitiveArray<'a>), + Struct(StructArray<'a>), +} + +pub trait ArrayFlatten { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a; +} + +impl<'a> Array<'a> { + pub fn flatten(self) -> VortexResult> { + ArrayEncoding::flatten(self.encoding(), self) + } + + pub fn flatten_bool(self) -> VortexResult> { + BoolArray::try_from(self.flatten()?.into_array()) + } + + pub fn flatten_primitive(self) -> VortexResult> { + PrimitiveArray::try_from(self.flatten()?.into_array()) + } +} + +impl<'a> IntoArray<'a> for Flattened<'a> { + fn into_array(self) -> Array<'a> { + match self { + Flattened::Bool(a) => a.into_array(), + Flattened::Primitive(a) => a.into_array(), + Flattened::Struct(a) => a.into_array(), + } + } +} diff --git a/vortex-array2/src/implementation.rs b/vortex-array2/src/implementation.rs index 9ae6019e21..5e762fcf7b 100644 --- a/vortex-array2/src/implementation.rs +++ b/vortex-array2/src/implementation.rs @@ -1,6 +1,8 @@ -use crate::encoding::EncodingId; +use vortex_error::VortexError; + use crate::encoding::{ArrayEncoding, EncodingRef}; -use crate::{ArrayMetadata, TryFromArrayParts}; +use crate::encoding::{ArrayEncodingExt, EncodingId}; +use crate::{Array, ArrayMetadata}; use crate::{ArrayTrait, TryDeserializeArrayMetadata, TrySerializeArrayMetadata}; /// Trait the defines the set of types relating to an array. @@ -9,11 +11,12 @@ pub trait ArrayDef { const ID: EncodingId; const ENCODING: EncodingRef; - type Array<'a>: ArrayTrait + TryFromArrayParts<'a, Self::Metadata> + 'a; + type Array<'a>: ArrayTrait + TryFrom, Error = VortexError> + 'a; type Metadata: ArrayMetadata + + Clone + TrySerializeArrayMetadata + for<'a> TryDeserializeArrayMetadata<'a>; - type Encoding: ArrayEncoding; + type Encoding: ArrayEncoding + ArrayEncodingExt; } #[macro_export] @@ -23,11 +26,19 @@ macro_rules! impl_encoding { paste! { use $crate::{ - ArrayDef, ArrayParts, ArrayTrait, TryFromArrayParts, - TryDeserializeArrayMetadata, + Array, + ArrayDef, + ArrayTrait, + Flattened, + TypedArray, + }; + use $crate::encoding::{ + ArrayEncoding, + ArrayEncodingExt, + EncodingId, + EncodingRef, + VORTEX_ENCODINGS, }; - use $crate::encoding::{ArrayEncoding, EncodingId, EncodingRef, VORTEX_ENCODINGS}; - use vortex_error::vortex_err; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -35,8 +46,8 @@ macro_rules! impl_encoding { /// The array definition trait #[derive(Debug)] - pub struct [<$Name Def>]; - impl ArrayDef for [<$Name Def>] { + pub struct $Name; + impl ArrayDef for $Name { const ID: EncodingId = EncodingId::new($id); const ENCODING: EncodingRef = &[<$Name Encoding>]; type Array<'a> = [<$Name Array>]<'a>; @@ -44,7 +55,8 @@ macro_rules! impl_encoding { type Encoding = [<$Name Encoding>]; } - pub type [<$Name Data>] = TypedArrayData<[<$Name Def>]>; + pub type [<$Name Array>]<'a> = TypedArray<'a, $Name>; + pub type [] = TypedArray<'static, $Name>; /// The array encoding pub struct [<$Name Encoding>]; @@ -52,35 +64,30 @@ macro_rules! impl_encoding { #[allow(non_upper_case_globals)] static []: EncodingRef = &[<$Name Encoding>]; impl ArrayEncoding for [<$Name Encoding>] { + fn as_any(&self) -> &dyn Any { + self + } + fn id(&self) -> EncodingId { - [<$Name Def>]::ID + $Name::ID } - fn with_view_mut<'v>( - &self, - view: &'v ArrayView<'v>, - f: &mut dyn FnMut(&dyn ArrayTrait) -> VortexResult<()>, - ) -> VortexResult<()> { - // Convert ArrayView -> PrimitiveArray, then call compute. - let metadata = [<$Name Metadata>]::try_deserialize_metadata(view.metadata())?; - let array = [<$Name Array>]::try_from_parts(view as &dyn ArrayParts, &metadata)?; - f(&array) + fn flatten<'a>(&self, array: Array<'a>) -> VortexResult> { + ::flatten(array) } - fn with_data_mut( + #[inline] + fn with_dyn<'a>( &self, - data: &ArrayData, - f: &mut dyn FnMut(&dyn ArrayTrait) -> VortexResult<()>, + array: &'a Array<'a>, + f: &mut dyn for<'b> FnMut(&'b (dyn ArrayTrait + 'a)) -> VortexResult<()>, ) -> VortexResult<()> { - let metadata = data.metadata() - .as_any() - .downcast_ref::<[<$Name Metadata>]>() - .ok_or_else(|| vortex_err!("Failed to downcast metadata"))? - .clone(); - let array = [<$Name Array>]::try_from_parts(data as &dyn ArrayParts, &metadata)?; - f(&array) + ::with_dyn(array, f) } } + impl ArrayEncodingExt for [<$Name Encoding>] { + type D = $Name; + } /// Implement ArrayMetadata impl ArrayMetadata for [<$Name Metadata>] { diff --git a/vortex-array2/src/lib.rs b/vortex-array2/src/lib.rs index 84eb002956..5c9f3e8716 100644 --- a/vortex-array2/src/lib.rs +++ b/vortex-array2/src/lib.rs @@ -1,37 +1,40 @@ -#![allow(dead_code)] - extern crate core; pub mod array; -mod batch; +mod arrow; +pub mod buffer; pub mod compute; mod context; mod data; pub mod encoding; +mod flatten; mod implementation; mod metadata; mod stats; mod tree; -mod validity; +mod typed; +pub mod validity; mod view; mod visitor; use std::fmt::{Debug, Display, Formatter}; +use std::sync::Arc; -use arrow_buffer::Buffer; -pub use batch::*; pub use context::*; pub use data::*; +pub use flatten::*; pub use implementation::*; pub use linkme; pub use metadata::*; +pub use typed::*; pub use view::*; use vortex_error::VortexResult; use vortex_schema::DType; +use crate::buffer::Buffer; use crate::compute::ArrayCompute; -use crate::encoding::EncodingRef; -use crate::stats::{ArrayStatistics, Statistics}; +use crate::encoding::{ArrayEncodingRef, EncodingRef}; +use crate::stats::{ArrayStatistics, ArrayStatisticsCompute, Statistics}; use crate::validity::ArrayValidity; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; @@ -42,6 +45,8 @@ pub enum Array<'v> { View(ArrayView<'v>), } +pub type OwnedArray = Array<'static>; + impl Array<'_> { pub fn encoding(&self) -> EncodingRef { match self { @@ -60,11 +65,35 @@ impl Array<'_> { } pub fn len(&self) -> usize { - self.with_array(|a| a.len()) + self.with_dyn(|a| a.len()) } pub fn is_empty(&self) -> bool { - self.with_array(|a| a.is_empty()) + self.with_dyn(|a| a.is_empty()) + } + + pub fn child<'a>(&'a self, idx: usize, dtype: &'a DType) -> Option> { + match self { + Array::Data(d) => d.child(idx, dtype).map(Array::DataRef), + Array::DataRef(d) => d.child(idx, dtype).map(Array::DataRef), + Array::View(v) => v.child(idx, dtype).map(Array::View), + } + } + + pub fn buffer(&self, idx: usize) -> Option<&Buffer> { + match self { + Array::Data(d) => d.buffer(idx), + Array::DataRef(d) => d.buffer(idx), + Array::View(v) => v.buffer(idx), + } + } +} + +impl ToStatic for Array<'_> { + type Static = OwnedArray; + + fn to_static(&self) -> Self::Static { + Array::Data(self.to_array_data()) } } @@ -80,8 +109,14 @@ pub trait ToArrayData { fn to_array_data(&self) -> ArrayData; } -pub trait WithArray { - fn with_array R>(&self, f: F) -> R; +pub trait IntoArrayData { + fn into_array_data(self) -> ArrayData; +} + +pub trait ToStatic { + type Static; + + fn to_static(&self) -> Self::Static; } pub trait ArrayParts { @@ -89,16 +124,24 @@ pub trait ArrayParts { fn buffer(&self, idx: usize) -> Option<&Buffer>; fn child<'a>(&'a self, idx: usize, dtype: &'a DType) -> Option; fn nchildren(&self) -> usize; - fn statistics(&self) -> &dyn Statistics; + fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a); } +// TODO(ngates): I think we should separate the parts and metadata lifetimes. pub trait TryFromArrayParts<'v, M: ArrayMetadata>: Sized + 'v { fn try_from_parts(parts: &'v dyn ArrayParts, metadata: &'v M) -> VortexResult; } /// Collects together the behaviour of an array. pub trait ArrayTrait: - ArrayCompute + ArrayValidity + AcceptArrayVisitor + ArrayStatistics + ToArrayData + ArrayEncodingRef + + ArrayCompute + + ArrayFlatten + + ArrayValidity + + AcceptArrayVisitor + + ArrayStatistics + + ArrayStatisticsCompute + + ToArrayData { fn dtype(&self) -> &DType; @@ -109,6 +152,8 @@ pub trait ArrayTrait: self.len() == 0 } + fn metadata(&self) -> Arc; + fn nbytes(&self) -> usize { let mut visitor = NBytesVisitor(0); self.accept(&mut visitor).unwrap(); @@ -118,12 +163,8 @@ pub trait ArrayTrait: struct NBytesVisitor(usize); impl ArrayVisitor for NBytesVisitor { - fn visit_column(&mut self, name: &str, array: &Array) -> VortexResult<()> { - self.visit_child(name, array) - } - fn visit_child(&mut self, _name: &str, array: &Array) -> VortexResult<()> { - self.0 += array.with_array(|a| a.nbytes()); + self.0 += array.with_dyn(|a| a.nbytes()); Ok(()) } @@ -133,23 +174,22 @@ impl ArrayVisitor for NBytesVisitor { } } -impl ToArrayData for Array<'_> { - fn to_array_data(&self) -> ArrayData { - match self { - Array::Data(d) => d.clone(), - Array::DataRef(d) => (*d).clone(), - Array::View(v) => v.encoding().with_view(v, |a| a.to_array_data()), - } - } -} - -impl WithArray for Array<'_> { - fn with_array R>(&self, f: F) -> R { - match self { - Array::Data(d) => d.encoding().with_data(d, f), - Array::DataRef(d) => d.encoding().with_data(d, f), - Array::View(v) => v.encoding().with_view(v, f), - } +impl<'a> Array<'a> { + pub fn with_dyn(&'a self, mut f: F) -> R + where + F: FnMut(&dyn ArrayTrait) -> R, + { + let mut result = None; + + self.encoding() + .with_dyn(self, &mut |array| { + result = Some(f(array)); + Ok(()) + }) + .unwrap(); + + // Now we unwrap the optional, which we know to be populated by the closure. + result.unwrap() } } @@ -170,3 +210,19 @@ impl Display for Array<'_> { ) } } + +impl IntoArrayData for Array<'_> { + fn into_array_data(self) -> ArrayData { + match self { + Array::Data(d) => d, + Array::DataRef(d) => d.clone(), + Array::View(_) => self.with_dyn(|a| a.to_array_data()), + } + } +} + +impl ToArrayData for Array<'_> { + fn to_array_data(&self) -> ArrayData { + self.clone().into_array_data() + } +} diff --git a/vortex-array2/src/metadata.rs b/vortex-array2/src/metadata.rs index f27f4f5a5b..bfc5935b56 100644 --- a/vortex-array2/src/metadata.rs +++ b/vortex-array2/src/metadata.rs @@ -10,7 +10,6 @@ use vortex_error::{vortex_err, VortexResult}; /// Note that this allows us to restrict the ('static + Send + Sync) requirement to just the /// metadata trait, and not the entire array trait. We require 'static so that we can downcast /// use the Any trait. -#[allow(dead_code)] pub trait ArrayMetadata: 'static + Send + Sync + Debug + TrySerializeArrayMetadata { fn as_any(&self) -> &dyn Any; fn as_any_arc(self: Arc) -> Arc; @@ -21,6 +20,7 @@ pub trait TrySerializeArrayMetadata { } pub trait TryDeserializeArrayMetadata<'m>: Sized { + // FIXME(ngates): we could push buffer/child validation into here. fn try_deserialize_metadata(metadata: Option<&'m [u8]>) -> VortexResult; } diff --git a/vortex-array2/src/stats.rs b/vortex-array2/src/stats.rs index c6cd05e45b..c0fb6dcaf2 100644 --- a/vortex-array2/src/stats.rs +++ b/vortex-array2/src/stats.rs @@ -1,15 +1,29 @@ use std::collections::HashMap; -use vortex::ptype::NativePType; use vortex::scalar::Scalar; -use vortex::stats::Stat; use vortex_error::VortexResult; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Stat { + BitWidthFreq, + TrailingZeroFreq, + IsConstant, + IsSorted, + IsStrictSorted, + Max, + Min, + RunCount, + TrueCount, + NullCount, +} + pub trait ArrayStatistics { - fn statistics(&self) -> &dyn Statistics { + fn statistics(&self) -> &(dyn Statistics + '_) { &EmptyStatistics } +} +pub trait ArrayStatisticsCompute { /// Compute the requested statistic. Can return additional stats. fn compute_statistics(&self, _stat: Stat) -> VortexResult> { Ok(HashMap::new()) @@ -17,32 +31,32 @@ pub trait ArrayStatistics { } pub trait Statistics { - fn compute(&self, stat: Stat) -> VortexResult>; + fn compute(&self, stat: Stat) -> Option; fn get(&self, stat: Stat) -> Option; + fn set(&self, stat: Stat, value: Scalar); + fn to_map(&self) -> HashMap; } -impl dyn Statistics { - fn compute_as>(&self, _stat: Stat) -> VortexResult> { - // TODO(ngates): should we panic if conversion fails? - todo!() - } - - fn get_as>(&self, _stat: Stat) -> Option { - todo!() +impl dyn Statistics + '_ { + pub fn compute_as>(&self, stat: Stat) -> Option { + self.compute(stat).and_then(|s| T::try_from(s).ok()) } - fn compute_min(&self, default: T) -> VortexResult { - Ok(self.compute_as::(Stat::Min)?.unwrap_or(default)) + pub fn get_as>(&self, stat: Stat) -> Option { + self.get(stat).and_then(|s| T::try_from(s).ok()) } } pub struct EmptyStatistics; impl Statistics for EmptyStatistics { - fn compute(&self, _stat: Stat) -> VortexResult> { - Ok(None) + fn compute(&self, _stat: Stat) -> Option { + None } - fn get(&self, _stat: Stat) -> Option { None } + fn set(&self, _stat: Stat, _value: Scalar) {} + fn to_map(&self) -> HashMap { + HashMap::default() + } } diff --git a/vortex-array2/src/tree.rs b/vortex-array2/src/tree.rs index 764453d24c..f9f554e9ed 100644 --- a/vortex-array2/src/tree.rs +++ b/vortex-array2/src/tree.rs @@ -1,12 +1,12 @@ use std::fmt; -use arrow_buffer::Buffer; use humansize::{format_size, DECIMAL}; use serde::ser::Error; use vortex_error::{VortexError, VortexResult}; +use crate::buffer::Buffer; use crate::visitor::ArrayVisitor; -use crate::{Array, WithArray}; +use crate::Array; impl Array<'_> { pub fn tree_display(&self) -> TreeDisplayWrapper { @@ -24,7 +24,7 @@ impl<'a> TreeDisplayWrapper<'a> { impl<'a, 'fmt: 'a> fmt::Display for TreeDisplayWrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let array = self.0; - let nbytes = array.with_array(|a| a.nbytes()); + let nbytes = array.with_dyn(|a| a.nbytes()); let mut array_fmt = TreeFormatter::new(f, "".to_string(), nbytes); array_fmt .visit_child("root", array) @@ -41,12 +41,8 @@ pub struct TreeFormatter<'a, 'b: 'a> { /// TODO(ngates): I think we want to go back to the old explicit style. It gives arrays more /// control over how their metadata etc is displayed. impl<'a, 'b: 'a> ArrayVisitor for TreeFormatter<'a, 'b> { - fn visit_column(&mut self, name: &str, array: &Array) -> VortexResult<()> { - self.visit_child(name, array) - } - fn visit_child(&mut self, name: &str, array: &Array) -> VortexResult<()> { - array.with_array(|a| { + array.with_dyn(|a| { let nbytes = a.nbytes(); writeln!( self.fmt, @@ -96,6 +92,7 @@ impl<'a, 'b: 'a> TreeFormatter<'a, 'b> { res } + #[allow(dead_code)] pub fn new_total_size(&mut self, total: usize, new_total: F) -> fmt::Result where F: FnOnce(&mut TreeFormatter) -> fmt::Result, @@ -107,27 +104,3 @@ impl<'a, 'b: 'a> TreeFormatter<'a, 'b> { res } } - -#[cfg(test)] -mod tests { - use std::fmt::Write; - - use vortex_error::VortexResult; - - use crate::array::primitive::PrimitiveData; - use crate::array::ree::REEData; - use crate::IntoArray; - - #[test] - fn tree() -> VortexResult<()> { - let primitive = PrimitiveData::from_vec(vec![2i32, 3, 4, 5]); - let ree = REEData::try_new(primitive.data().clone(), primitive.data().clone(), 4)?; - let arr = ree.into_array(); - - let mut str = String::new(); - write!(str, "{}", arr.tree_display())?; - println!("{}", str); - // assert_eq!(str.as_str(), "hello"); - Ok(()) - } -} diff --git a/vortex-array2/src/typed.rs b/vortex-array2/src/typed.rs new file mode 100644 index 0000000000..6555c7fc59 --- /dev/null +++ b/vortex-array2/src/typed.rs @@ -0,0 +1,160 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use vortex::scalar::Scalar; +use vortex_error::{vortex_err, VortexError, VortexResult}; +use vortex_schema::DType; + +use crate::buffer::{Buffer, OwnedBuffer}; +use crate::encoding::{ArrayEncodingRef, EncodingRef}; +use crate::stats::{ArrayStatistics, Stat, Statistics}; +use crate::visitor::ArrayVisitor; +use crate::{ + Array, ArrayData, ArrayDef, ArrayParts, IntoArray, IntoArrayData, ToArrayData, ToStatic, + TryDeserializeArrayMetadata, +}; + +#[derive(Debug)] +pub struct TypedArray<'a, D: ArrayDef> { + array: Array<'a>, + metadata: D::Metadata, +} + +impl TypedArray<'_, D> { + pub fn try_from_parts( + dtype: DType, + metadata: D::Metadata, + buffers: Arc<[OwnedBuffer]>, + children: Arc<[ArrayData]>, + stats: HashMap, + ) -> VortexResult { + let array = Array::Data(ArrayData::try_new( + D::ENCODING, + dtype, + Arc::new(metadata.clone()), + buffers, + children, + stats, + )?); + Ok(Self { array, metadata }) + } + + pub fn array(&self) -> &Array { + &self.array + } + + pub fn metadata(&self) -> &D::Metadata { + &self.metadata + } +} + +impl Clone for TypedArray<'_, D> { + fn clone(&self) -> Self { + Self { + array: self.array.clone(), + metadata: self.metadata.clone(), + } + } +} + +impl<'a, D: ArrayDef> TryFrom> for TypedArray<'a, D> { + type Error = VortexError; + + fn try_from(array: Array<'a>) -> Result { + if array.encoding().id() != D::ENCODING.id() { + return Err(vortex_err!("incorrect encoding")); + } + let metadata = match &array { + Array::Data(d) => d + .metadata() + .as_any() + .downcast_ref::() + .unwrap() + .clone(), + Array::DataRef(d) => d + .metadata() + .as_any() + .downcast_ref::() + .unwrap() + .clone(), + Array::View(v) => D::Metadata::try_deserialize_metadata(v.metadata())?, + }; + Ok(TypedArray { array, metadata }) + } +} + +impl<'a, D: ArrayDef> TryFrom<&'a Array<'a>> for TypedArray<'a, D> { + type Error = VortexError; + + fn try_from(value: &'a Array<'a>) -> Result { + value.clone().try_into() + } +} + +impl ArrayEncodingRef for TypedArray<'_, D> { + fn encoding(&self) -> EncodingRef { + self.array().encoding() + } +} + +impl ArrayStatistics for TypedArray<'_, D> { + fn statistics(&self) -> &(dyn Statistics + '_) { + match self.array() { + Array::Data(d) => d.statistics(), + Array::DataRef(d) => d.statistics(), + Array::View(v) => v.statistics(), + } + } +} + +impl<'a, D: ArrayDef> IntoArray<'a> for TypedArray<'a, D> { + fn into_array(self) -> Array<'a> { + self.array + } +} + +impl IntoArrayData for TypedArray<'_, D> { + fn into_array_data(self) -> ArrayData { + match self.array { + Array::Data(d) => d, + Array::DataRef(d) => d.clone(), + Array::View(_) => { + struct Visitor { + buffers: Vec, + children: Vec, + } + impl ArrayVisitor for Visitor { + fn visit_child(&mut self, _name: &str, array: &Array) -> VortexResult<()> { + self.children.push(array.to_array_data()); + Ok(()) + } + + fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()> { + self.buffers.push(buffer.to_static()); + Ok(()) + } + } + let mut visitor = Visitor { + buffers: vec![], + children: vec![], + }; + self.array().with_dyn(|a| a.accept(&mut visitor).unwrap()); + ArrayData::try_new( + self.encoding(), + self.array().dtype().clone(), + Arc::new(self.metadata().clone()), + visitor.buffers.into(), + visitor.children.into(), + self.statistics().to_map(), + ) + .unwrap() + } + } + } +} + +impl ToArrayData for TypedArray<'_, D> { + fn to_array_data(&self) -> ArrayData { + self.clone().into_array_data() + } +} diff --git a/vortex-array2/src/validity.rs b/vortex-array2/src/validity.rs index 9037df01fe..433ab80056 100644 --- a/vortex-array2/src/validity.rs +++ b/vortex-array2/src/validity.rs @@ -1,14 +1,16 @@ +use arrow_buffer::{BooleanBuffer, NullBuffer}; use serde::{Deserialize, Serialize}; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::{DType, Nullability}; -use crate::array::bool::BoolData; -use crate::compute::scalar_at; -use crate::{Array, ArrayData, IntoArray, ToArrayData, WithArray}; +use crate::array::bool::BoolArray; +use crate::compute::scalar_at::scalar_at; +use crate::compute::take::take; +use crate::{Array, ArrayData, IntoArray, ToArray, ToArrayData}; pub trait ArrayValidity { fn is_valid(&self, index: usize) -> bool; - // Maybe add to_bool_array() here? + fn logical_validity(&self) -> LogicalValidity; } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -42,7 +44,7 @@ pub enum Validity<'v> { impl<'v> Validity<'v> { pub const DTYPE: DType = DType::Bool(Nullability::NonNullable); - pub fn into_array_data(self) -> Option { + pub fn to_array_data(self) -> Option { match self { Validity::Array(a) => Some(a.to_array_data()), _ => None, @@ -56,7 +58,7 @@ impl<'v> Validity<'v> { Validity::AllInvalid => Ok(ValidityMetadata::AllInvalid), Validity::Array(a) => { // We force the caller to validate the length here. - let validity_len = a.with_array(|a| a.len()); + let validity_len = a.with_dyn(|a| a.len()); if validity_len != length { vortex_bail!( "Validity array length {} doesn't match array length {}", @@ -90,17 +92,113 @@ impl<'v> Validity<'v> { Validity::Array(a) => scalar_at(a, index).unwrap().try_into().unwrap(), } } + + pub fn take(&self, indices: &Array) -> VortexResult { + match self { + Validity::NonNullable => Ok(Validity::NonNullable), + Validity::AllValid => Ok(Validity::AllValid), + Validity::AllInvalid => Ok(Validity::AllInvalid), + Validity::Array(a) => Ok(Validity::Array(take(a, indices)?)), + } + } + + // TODO(ngates): into_logical + pub fn to_logical(&self, length: usize) -> LogicalValidity { + match self { + Validity::NonNullable => LogicalValidity::AllValid(length), + Validity::AllValid => LogicalValidity::AllValid(length), + Validity::AllInvalid => LogicalValidity::AllInvalid(length), + Validity::Array(a) => LogicalValidity::Array(a.to_array_data()), + } + } + + pub fn to_static(&self) -> Validity<'static> { + match self { + Validity::NonNullable => Validity::NonNullable, + Validity::AllValid => Validity::AllValid, + Validity::AllInvalid => Validity::AllInvalid, + Validity::Array(a) => Validity::Array(a.to_array_data().into_array()), + } + } } -impl<'a, E> FromIterator<&'a Option> for Validity<'static> { - fn from_iter>>(iter: T) -> Self { - let bools: Vec = iter.into_iter().map(|option| option.is_some()).collect(); +impl PartialEq for Validity<'_> { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Validity::NonNullable, Validity::NonNullable) => true, + (Validity::AllValid, Validity::AllValid) => true, + (Validity::AllInvalid, Validity::AllInvalid) => true, + (Validity::Array(a), Validity::Array(b)) => { + a.clone().flatten_bool().unwrap().boolean_buffer() + == b.clone().flatten_bool().unwrap().boolean_buffer() + } + _ => false, + } + } +} + +impl From> for Validity<'static> { + fn from(bools: Vec) -> Self { if bools.iter().all(|b| *b) { Validity::AllValid } else if !bools.iter().any(|b| *b) { Validity::AllInvalid } else { - Validity::Array(BoolData::from_vec(bools).into_array()) + Validity::Array(BoolArray::from_vec(bools, Validity::NonNullable).into_array()) } } } + +impl From for Validity<'static> { + fn from(value: BooleanBuffer) -> Self { + if value.count_set_bits() == value.len() { + Validity::AllValid + } else if value.count_set_bits() == 0 { + Validity::AllInvalid + } else { + Validity::Array(BoolArray::from(value).into_array()) + } + } +} + +impl<'a> FromIterator> for Validity<'static> { + fn from_iter>>(_iter: T) -> Self { + todo!() + } +} + +impl FromIterator for Validity<'static> { + fn from_iter>(_iter: T) -> Self { + todo!() + } +} + +impl<'a, E> FromIterator<&'a Option> for Validity<'static> { + fn from_iter>>(iter: T) -> Self { + let bools: Vec = iter.into_iter().map(|option| option.is_some()).collect(); + Validity::from(bools) + } +} + +#[derive(Clone, Debug)] +pub enum LogicalValidity { + AllValid(usize), + AllInvalid(usize), + Array(ArrayData), +} + +impl LogicalValidity { + pub fn to_null_buffer(&self) -> VortexResult> { + match self { + LogicalValidity::AllValid(_) => Ok(None), + LogicalValidity::AllInvalid(l) => Ok(Some(NullBuffer::new_null(*l))), + LogicalValidity::Array(a) => Ok(Some(NullBuffer::new( + a.to_array().flatten_bool()?.boolean_buffer(), + ))), + } + } + + pub fn is_all_valid(&self) -> bool { + matches!(self, LogicalValidity::AllValid(_)) + } +} diff --git a/vortex-array2/src/view.rs b/vortex-array2/src/view.rs index 8ffaa0851e..57dbd18c5b 100644 --- a/vortex-array2/src/view.rs +++ b/vortex-array2/src/view.rs @@ -1,10 +1,10 @@ use std::fmt::{Debug, Formatter}; -use arrow_buffer::Buffer; use vortex::flatbuffers::array as fb; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use vortex_schema::DType; +use crate::buffer::Buffer; use crate::encoding::EncodingRef; use crate::stats::{EmptyStatistics, Statistics}; use crate::{Array, IntoArray, ToArray}; @@ -15,7 +15,7 @@ pub struct ArrayView<'v> { encoding: EncodingRef, dtype: &'v DType, array: fb::Array<'v>, - buffers: &'v [Buffer], + buffers: &'v [Buffer<'v>], ctx: &'v SerdeContext, // TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array // metadata, but only the buffers from the selected columns. Therefore we need to know @@ -63,7 +63,8 @@ impl<'v> ArrayView<'v> { // Validate here that the metadata correctly parses, so that an encoding can infallibly // implement Encoding::with_view(). - encoding.with_view_mut(&view, &mut |_| Ok(()))?; + // FIXME(ngates): validate the metadata + view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?; Ok(view) } @@ -80,7 +81,8 @@ impl<'v> ArrayView<'v> { self.array.metadata().map(|m| m.bytes()) } - pub fn child(&self, idx: usize, dtype: &'v DType) -> Option> { + // TODO(ngates): should we separate self and DType lifetimes? Should DType be cloned? + pub fn child(&'v self, idx: usize, dtype: &'v DType) -> Option> { let child = self.array_child(idx)?; // Figure out how many buffers to skip... @@ -90,12 +92,7 @@ impl<'v> ArrayView<'v> { .children()? .iter() .take(idx) - .map(|child| { - child - .child() - .map(|c| Self::cumulative_nbuffers(c)) - .unwrap_or_default() - }) + .map(|child| Self::cumulative_nbuffers(child)) .sum(); let buffer_count = Self::cumulative_nbuffers(child); @@ -113,7 +110,7 @@ impl<'v> ArrayView<'v> { fn array_child(&self, idx: usize) -> Option> { let children = self.array.children()?; if idx < children.len() { - children.get(idx).child() + Some(children.get(idx)) } else { None } @@ -128,10 +125,7 @@ impl<'v> ArrayView<'v> { fn cumulative_nbuffers(array: fb::Array) -> usize { let mut nbuffers = array.nbuffers() as usize; for child in array.children().unwrap_or_default() { - nbuffers += child - .child() - .map(|c| Self::cumulative_nbuffers(c)) - .unwrap_or_default(); + nbuffers += Self::cumulative_nbuffers(child) } nbuffers } @@ -171,8 +165,7 @@ impl ArrayParts for ArrayView<'_> { self.array.children().map(|c| c.len()).unwrap_or_default() } - fn statistics(&self) -> &dyn Statistics { - // TODO(ngates): serialize statistics into fb::Array + fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a) { &EmptyStatistics } } diff --git a/vortex-array2/src/visitor.rs b/vortex-array2/src/visitor.rs index db69096d7e..1442b17113 100644 --- a/vortex-array2/src/visitor.rs +++ b/vortex-array2/src/visitor.rs @@ -1,6 +1,6 @@ -use arrow_buffer::Buffer; use vortex_error::VortexResult; +use crate::buffer::Buffer; use crate::validity::Validity; use crate::Array; @@ -10,11 +10,6 @@ pub trait AcceptArrayVisitor { // TODO(ngates): maybe we make this more like the inverse of TryFromParts? pub trait ArrayVisitor { - /// Visit a child column of this array. - fn visit_column(&mut self, _name: &str, _array: &Array) -> VortexResult<()> { - Ok(()) - } - /// Visit a child of this array. fn visit_child(&mut self, _name: &str, _array: &Array) -> VortexResult<()> { Ok(()) diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index 561cf88183..91ffc167e3 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -37,6 +37,9 @@ impl ScalarAtFn for DictArray { impl TakeFn for DictArray { fn take(&self, indices: &dyn Array) -> VortexResult { + // Dict + // codes: 0 0 1 + // dict: a b c d e f g h let codes = take(self.codes(), indices)?; Ok(DictArray::new(codes, self.values().clone()).into_array()) } diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index 9d2bba8ebe..d9f9b3dbb8 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -28,5 +28,14 @@ vortex-schema = { path = "../vortex-schema" } flatc = { workspace = true } walkdir = { workspace = true } +[dev-dependencies] +criterion = { workspace = true } +rand = { workspace = true } +simplelog = { workspace = true } + [lints] -# workspace = false +workspace = true + +[[bench]] +name = "ipc_take" +harness = false diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs new file mode 100644 index 0000000000..279746111e --- /dev/null +++ b/vortex-ipc/benches/ipc_take.rs @@ -0,0 +1,62 @@ +use std::io::Cursor; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use vortex_array2::array::primitive::PrimitiveArray; +use vortex_array2::compute::take::take; +use vortex_array2::{IntoArray, SerdeContext}; +use vortex_ipc::iter::FallibleLendingIterator; +use vortex_ipc::reader::StreamReader; +use vortex_ipc::writer::StreamWriter; + +fn ipc_take(c: &mut Criterion) { + let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array(); + let data = PrimitiveArray::from(vec![5; 3_000_000]).into_array(); + // + // c.bench_function("take_data", |b| { + // b.iter(|| black_box(take(&data, &indices).unwrap())); + // }); + + // Try running take over an ArrayView. + let mut buffer = vec![]; + { + let mut cursor = Cursor::new(&mut buffer); + let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + data.with_dyn(|a| writer.write(a)).unwrap(); + } + + c.bench_function("take_view", |b| { + b.iter(|| { + let mut cursor = Cursor::new(&buffer); + let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut array_reader = reader.next().unwrap().unwrap(); + let array_view = array_reader.next().unwrap().unwrap().into_array(); + black_box(take(&array_view, &indices)) + }); + }); +} +// +// #[allow(dead_code)] +// fn ipc_take_old(c: &mut Criterion) { +// // Try the old way of taking data. +// let arr = PrimitiveArray::from(vec![5; 3_000_000]); +// let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]); +// +// let mut buffer = vec![]; +// { +// let mut cursor = Cursor::new(&mut buffer); +// let mut ctx = WriteCtx::new(&mut cursor); +// arr.serde().unwrap().write(&mut ctx).unwrap(); +// } +// +// c.bench_function("take_old", |b| { +// b.iter(|| { +// let mut cursor = Cursor::new(&buffer); +// let mut ctx = ReadCtx::new(arr.dtype(), &mut cursor); +// let arr = ctx.read().unwrap(); +// black_box(vortex::compute::take::take(arr.as_ref(), &indices).unwrap()) +// }); +// }); +// } + +criterion_group!(benches, ipc_take); +criterion_main!(benches); diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index b700590e26..560f9c54ad 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -40,10 +40,10 @@ mod tests { use std::io::{Cursor, Write}; use std::sync::Arc; - use vortex_array2::array::primitive::PrimitiveData; - use vortex_array2::array::r#struct::StructData; - use vortex_array2::{IntoArray, WithArray}; - use vortex_array2::{SerdeContext, ToArray, ToArrayData}; + use vortex_array2::array::primitive::PrimitiveArray; + use vortex_array2::array::r#struct::StructArray; + use vortex_array2::{IntoArray, IntoArrayData}; + use vortex_array2::{SerdeContext, ToArray}; use crate::iter::FallibleLendingIterator; use crate::reader::StreamReader; @@ -51,18 +51,17 @@ mod tests { #[test] fn test_write_flatbuffer() { - let col = PrimitiveData::from_vec(vec![0, 1, 2]).into_data(); - let nested_struct = StructData::try_new( + let col = PrimitiveArray::from(vec![0, 1, 2]).into_array_data(); + let nested_struct = StructArray::try_new( vec![Arc::new("x".into()), Arc::new("y".into())], vec![col.clone(), col.clone()], 3, ) - .unwrap() - .into_data(); + .unwrap(); - let arr = StructData::try_new( + let arr = StructArray::try_new( vec![Arc::new("a".into()), Arc::new("b".into())], - vec![col.clone(), nested_struct], + vec![col.clone(), nested_struct.into_array_data()], 3, ) .unwrap() @@ -73,7 +72,7 @@ mod tests { let mut cursor = Cursor::new(Vec::new()); let ctx = SerdeContext::default(); let mut writer = StreamWriter::try_new_unbuffered(&mut cursor, ctx).unwrap(); - arr.with_array(|a| writer.write(a)).unwrap(); + arr.with_dyn(|a| writer.write(a)).unwrap(); cursor.flush().unwrap(); cursor.set_position(0); @@ -86,7 +85,7 @@ mod tests { // Read some number of chunks from the stream. while let Some(chunk) = array_reader.next().unwrap() { println!("VIEW: {:?}", &chunk); - let _data = chunk.to_array().to_array_data(); + let _data = chunk.to_array().into_array_data(); // let taken = take(&chunk, &PrimitiveArray::from(vec![0, 3, 0, 1])).unwrap(); // let taken = taken.as_primitive().typed_data::(); // println!("Taken: {:?}", &taken); diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index dceae9f0a1..646de10da1 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -1,7 +1,6 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; use vortex::flatbuffers::array as fba; -use vortex::flatbuffers::array::{ArrayChild, ArrayChildArgs}; use vortex_array2::encoding::find_encoding; use vortex_array2::{ArrayData, SerdeContext}; use vortex_error::{vortex_err, VortexError}; @@ -177,12 +176,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { let children = column_data .children() .iter() - .map(|child| { - let child = child - .as_ref() - .map(|c| IPCArray(self.0, c).write_flatbuffer(fbb)); - ArrayChild::create(fbb, &ArrayChildArgs { child }) - }) + .map(|child| IPCArray(self.0, child).write_flatbuffer(fbb)) .collect_vec(); let children = Some(fbb.create_vector(&children)); diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 23d6839658..c5cfc0c862 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -1,10 +1,11 @@ use std::io; use std::io::{BufReader, Read}; -use arrow_buffer::Buffer; +use arrow_buffer::MutableBuffer; use nougat::gat; use vortex::array::composite::COMPOSITE_EXTENSIONS; -use vortex_array2::{ArrayView, SerdeContext, ToArray, WithArray}; +use vortex_array2::buffer::Buffer; +use vortex_array2::{ArrayView, SerdeContext, ToArray}; use vortex_error::{vortex_err, VortexError, VortexResult}; use vortex_flatbuffers::{FlatBufferReader, ReadFlatBuffer}; use vortex_schema::{DType, DTypeSerdeContext}; @@ -98,7 +99,7 @@ pub struct StreamArrayChunkReader<'a, R: Read> { read: &'a mut R, ctx: &'a SerdeContext, dtype: DType, - buffers: Vec, + buffers: Vec>, column_msg_buffer: Vec, } @@ -140,9 +141,10 @@ impl<'iter, R: Read> FallibleLendingIterator for StreamArrayChunkReader<'iter, R let to_kill = buffer.offset() - offset; io::copy(&mut self.read.take(to_kill), &mut io::sink()).unwrap(); - let mut bytes = vec![0u8; buffer.length() as usize]; - self.read.read_exact(&mut bytes).unwrap(); - self.buffers.push(Buffer::from(bytes)); + let mut bytes = MutableBuffer::with_capacity(buffer.length() as usize); + unsafe { bytes.set_len(buffer.length() as usize) } + self.read.read_exact(bytes.as_slice_mut()).unwrap(); + self.buffers.push(Buffer::Owned(bytes.into())); offset = buffer.offset() + buffer.length(); } @@ -151,10 +153,10 @@ impl<'iter, R: Read> FallibleLendingIterator for StreamArrayChunkReader<'iter, R let to_kill = chunk_msg.buffer_size() - offset; io::copy(&mut self.read.take(to_kill), &mut io::sink()).unwrap(); - let view = ArrayView::try_new(self.ctx, &self.dtype, col_array, &self.buffers)?; + let view = ArrayView::try_new(self.ctx, &self.dtype, col_array, self.buffers.as_slice())?; // Validate it - view.to_array().with_array(|_| Ok::<(), VortexError>(()))?; + view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?; Ok(Some(view)) }