Skip to content

Commit

Permalink
Struct Array
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Apr 8, 2024
1 parent 0cf5158 commit c530cf8
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 51 deletions.
19 changes: 13 additions & 6 deletions vortex-array2/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,15 @@ impl<'a> TryFromArrayParts<'a, PrimitiveMetadata> for PrimitiveArray<'a> {

impl PrimitiveData {
fn try_new<T: NativePType>(buffer: ScalarBuffer<T>, validity: Validity) -> VortexResult<Self> {
ArrayData::try_new(
&PrimitiveEncoding,
Ok(Self::new_unchecked(
DType::from(T::PTYPE).with_nullability(validity.nullability()),
Arc::new(PrimitiveMetadata {
ptype: T::PTYPE,
validity: validity.to_metadata(buffer.len() / T::PTYPE.byte_width())?,
}),
vec![buffer.into_inner()].into(),
vec![validity.into_array_data()].into(),
)
.unwrap()
.try_into()
))
}

pub fn from_vec<T: NativePType + ArrowNativeType>(values: Vec<T>) -> Self {
Expand All @@ -96,7 +93,17 @@ impl ArrayValidity for PrimitiveArray<'_> {

impl ToArrayData for PrimitiveArray<'_> {
fn to_array_data(&self) -> ArrayData {
todo!()
ArrayData::try_new(
&PrimitiveEncoding,
self.dtype().clone(),
Arc::new(PrimitiveMetadata {
ptype: self.ptype,
validity: self.validity().to_metadata(self.len()).unwrap(),
}),
vec![self.buffer().clone()].into(),
vec![].into(),
)
.unwrap()
}
}

Expand Down
12 changes: 4 additions & 8 deletions vortex-array2/src/array/ree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@ impl REEArray<'_> {

impl REEData {
pub fn try_new(ends: ArrayData, values: ArrayData, length: usize) -> VortexResult<Self> {
ArrayData::try_new(
&REEEncoding,
Ok(Self::new_unchecked(
values.dtype().clone(),
REEMetadata {
Arc::new(REEMetadata {
length,
ends_dtype: ends.dtype().clone(),
}
.into_arc(),
}),
vec![].into(),
vec![Some(ends), Some(values)].into(),
)
.unwrap()
.try_into()
))
}
}

Expand Down
43 changes: 40 additions & 3 deletions vortex-array2/src/array/struct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use serde::{Deserialize, Serialize};
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::{DType, FieldNames};

use crate::impl_encoding;
use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, ToArray, WithArray};
use crate::{Array, ArrayMetadata};
use crate::{ArrayData, TypedArrayData};
use crate::{ArrayView, ToArrayData};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<'a> StructArray<'a> {
fields.as_slice()
}

pub fn ncolumns(&self) -> usize {
pub fn nfields(&self) -> usize {
self.fields().len()
}

Expand All @@ -59,6 +59,30 @@ impl<'a> StructArray<'a> {
}
}

impl StructData {
pub fn try_new(names: FieldNames, fields: Vec<ArrayData>, length: usize) -> VortexResult<Self> {
if names.len() != fields.len() {
vortex_bail!("Got {} names and {} fields", names.len(), fields.len());
}

if fields
.iter()
.any(|a| a.to_array().with_array(|a| a.len()) != length)
{
vortex_bail!("Expected all struct fields to have length {}", length);
}

let field_dtypes: Vec<_> = fields.iter().map(|d| d.dtype()).cloned().collect();
let fields: Vec<_> = fields.iter().cloned().map(Some).collect();
Ok(Self::new_unchecked(
DType::Struct(names, field_dtypes),
Arc::new(StructMetadata { length }),
vec![].into(),
fields.into(),
))
}
}

impl<'v> TryFromArrayParts<'v, StructMetadata> for StructArray<'v> {
fn try_from_parts(
parts: &'v dyn ArrayParts,
Expand Down Expand Up @@ -100,7 +124,20 @@ impl ArrayValidity for StructArray<'_> {

impl ToArrayData for StructArray<'_> {
fn to_array_data(&self) -> ArrayData {
todo!()
ArrayData::try_new(
&StructEncoding,
self.dtype().clone(),
Arc::new(StructMetadata {
length: self.length,
}),
vec![].into(),
(0..self.nfields())
.map(|idx| self.child(idx).unwrap())
.map(|a| Some(a.to_array_data()))
.collect::<Vec<_>>()
.into(),
)
.unwrap()
}
}

Expand Down
89 changes: 70 additions & 19 deletions vortex-array2/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,85 @@
use arrow_buffer::Buffer;
use vortex::serde::data::ArrayData;
use vortex_error::VortexResult;
use vortex_schema::DType;

use crate::visitor::ArrayVisitor;
use crate::Array;
use crate::{Array, ArrayData, ArrayTrait, ToArrayData, WithArray};

/// A column batch contains the flattened list of columns.
#[derive(Debug)]
pub struct ColumnBatch {
dtype: DType,
columns: Vec<Array<'static>>,
columns: Vec<ArrayData>,
length: usize,
}

pub struct ColumnBatchBuilder {
columns: Vec<ArrayData>,
impl ColumnBatch {
pub fn from_array(array: &dyn ArrayTrait) -> Self {
// We want to walk the struct array extracting all nested columns
let mut batch = ColumnBatch {
columns: vec![],
length: array.len(),
};
array.accept(&mut batch).unwrap();
batch
}

pub fn columns(&self) -> &[ArrayData] {
self.columns.as_slice()
}
}

impl ArrayVisitor for ColumnBatchBuilder {
fn visit_column(&mut self, _name: &str, _array: &Array) -> VortexResult<()> {
todo!()
impl From<&Array<'_>> for ColumnBatch {
fn from(value: &Array) -> Self {
value.with_array(|a| ColumnBatch::from_array(a))
}
}

fn visit_child(&mut self, _name: &str, _array: &Array) -> VortexResult<()> {
// If the array is a struct, then pull out each column.
// But we can't do this in case some non-column child is a struct.
// Can we ask an array for column(idx)? Seems like a lot of work.
todo!()
/// Collect all the nested column leaves from an array.
impl ArrayVisitor for ColumnBatch {
fn visit_column(&mut self, _name: &str, array: &Array) -> VortexResult<()> {
let ncols = self.columns.len();
array.with_array(|a| a.accept(self))?;
if ncols == self.columns.len() {
assert_eq!(self.length, array.len());
self.columns.push(array.to_array_data())
}
Ok(())
}

fn visit_buffer(&mut self, _buffer: &Buffer) -> VortexResult<()> {
todo!()
fn visit_child(&mut self, _name: &str, array: &Array) -> VortexResult<()> {
// Stop traversing when we hit the first non-column array.
assert_eq!(self.length, array.len());
self.columns.push(array.to_array_data());
Ok(())
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use crate::array::primitive::PrimitiveData;
use crate::array::r#struct::StructData;
use crate::batch::ColumnBatch;
use crate::IntoArray;

#[test]
fn batch_visitor() {
let col = PrimitiveData::from_vec(vec![0, 1, 2]).into_data();
let nested_struct = StructData::try_new(
vec![Arc::new("x".into()), Arc::new("y".into())],
vec![col.clone(), col.clone()],
3,
)
.unwrap()
.into_data();

let arr = StructData::try_new(
vec![Arc::new("a".into()), Arc::new("b".into())],
vec![col.clone(), nested_struct],
3,
)
.unwrap()
.into_array();

let batch = ColumnBatch::from(&arr);
assert_eq!(batch.columns().len(), 3);
}
}
8 changes: 0 additions & 8 deletions vortex-array2/src/implementation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,6 @@ macro_rules! impl_encoding {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}

fn to_arc(&self) -> Arc<dyn ArrayMetadata> {
Arc::new(self.clone())
}

fn into_arc(self) -> Arc<dyn ArrayMetadata> {
Arc::new(self)
}
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions vortex-array2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ impl ArrayVisitor for NBytesVisitor {
impl ToArrayData for Array<'_> {
fn to_array_data(&self) -> ArrayData {
match self {
Array::Data(d) => d.encoding().with_data(d, |a| a.to_array_data()),
Array::DataRef(d) => d.encoding().with_data(d, |a| a.to_array_data()),
Array::Data(d) => d.clone(),
Array::DataRef(d) => (*d).clone(),
Array::View(v) => v.encoding().with_view(v, |a| a.to_array_data()),
}
}
Expand Down
2 changes: 0 additions & 2 deletions vortex-array2/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use vortex_error::{vortex_err, VortexResult};
pub trait ArrayMetadata: 'static + Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
fn to_arc(&self) -> Arc<dyn ArrayMetadata>;
fn into_arc(self) -> Arc<dyn ArrayMetadata>;
}

pub trait TrySerializeArrayMetadata {
Expand Down
12 changes: 9 additions & 3 deletions vortex-array2/src/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ pub trait AcceptArrayVisitor {
// TODO(ngates): maybe we make this more like the inverse of TryFromParts?
pub trait ArrayVisitor {
/// Visit a child column of this array.
fn visit_column(&mut self, name: &str, array: &Array) -> VortexResult<()>;
fn visit_column(&mut self, _name: &str, _array: &Array) -> VortexResult<()> {
Ok(())
}

/// Visit a child of this array.
fn visit_child(&mut self, name: &str, array: &Array) -> VortexResult<()>;
fn visit_child(&mut self, _name: &str, _array: &Array) -> VortexResult<()> {
Ok(())
}

/// Utility for visiting Array validity.
fn visit_validity(&mut self, validity: &Validity) -> VortexResult<()> {
Expand All @@ -25,5 +29,7 @@ pub trait ArrayVisitor {
}
}

fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()>;
fn visit_buffer(&mut self, _buffer: &Buffer) -> VortexResult<()> {
Ok(())
}
}

0 comments on commit c530cf8

Please sign in to comment.