From 78cb4a3411a4a2d228d331c365e33988bb66f8e5 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 15 Oct 2024 17:51:09 -0400 Subject: [PATCH 01/15] wip on stats --- vortex-array/src/lib.rs | 2 + vortex-array/src/scalars.rs | 87 +++++++++++++++++++++ vortex-serde/src/layouts/write/writer.rs | 96 ++++++++++++++++-------- 3 files changed, 154 insertions(+), 31 deletions(-) create mode 100644 vortex-array/src/scalars.rs diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 2d554b99cc..9cc73b9d29 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -48,6 +48,8 @@ pub mod encoding; mod implementation; pub mod iter; mod metadata; +pub mod opaque; +pub mod scalars; pub mod stats; pub mod stream; mod tree; diff --git a/vortex-array/src/scalars.rs b/vortex-array/src/scalars.rs new file mode 100644 index 0000000000..a44856e23b --- /dev/null +++ b/vortex-array/src/scalars.rs @@ -0,0 +1,87 @@ +use std::collections::HashSet; + +use itertools::Itertools as _; +use vortex_dtype::{match_each_native_ptype, DType}; +use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; +use vortex_scalar::{Scalar, ScalarValue}; + +use crate::array::builder::VarBinBuilder; +use crate::array::{BoolArray, NullArray, PrimitiveArray}; +use crate::{Array, IntoArray as _}; + +impl Array { + pub fn from_scalar_values(dtype: &DType, values: Vec) -> VortexResult { + let mismatched_values = values + .iter() + .filter(|v| !v.is_instance_of(dtype)) + .collect_vec(); + if !mismatched_values.is_empty() { + let mismatch_str = mismatched_values.iter().map(|v| v.to_string()).join(", "); + vortex_bail!("Expected all scalars to be of type {dtype}; found {mismatch_str}"); + } + + Ok(match dtype { + DType::Bool(_) => BoolArray::from_iter(values.into_iter().map(|s| { + s.as_bool() + .vortex_expect("Expected ScalarValue to be a bool") + })) + .into_array(), + DType::Primitive(ptype, _) => { + match_each_native_ptype!(ptype, |$P| { + PrimitiveArray::from_nullable_vec(values.iter().map(|s| { + s.as_pvalue() + .vortex_expect("Expected ScalarValue to be a primitive") + .map(|p| p.as_primitive::<$P>().vortex_expect("Expected ScalarValue to be a primitive")) + }).collect_vec()) + .into_array() + }) + } + DType::Binary(_) => { + let mut builder = VarBinBuilder::::with_capacity(values.len()); + for value in values { + let buf = value.as_buffer()?; + builder.push(buf.as_ref().map(|b| b.as_slice())); + } + builder.finish(dtype.clone()).into_array() + } + DType::Utf8(_) => { + let mut builder = VarBinBuilder::::with_capacity(values.len()); + for value in values { + let buf_str = value.as_buffer_string()?; + builder.push(buf_str.as_ref().map(|b| b.as_bytes())); + } + builder.finish(dtype.clone()).into_array() + } + DType::List(..) => vortex_bail!("Cannot convert ScalarValues to ListArray"), + DType::Struct(..) => vortex_bail!("Cannot convert ScalarValues to StructArray"), + DType::Null => NullArray::new(values.len()).into_array(), + DType::Extension(..) => vortex_bail!("Cannot convert ScalarValues to ExtensionArray"), + }) + } + + pub fn from_scalars(scalars: &[Scalar]) -> VortexResult { + if scalars.is_empty() { + vortex_bail!("Cannot convert empty Vec to canonical"); + } + + let scalar = scalars[0].clone(); + let dtype = scalar.dtype(); + + let mismatched_types: HashSet<&DType> = scalars + .iter() + .skip(1) + .filter(|s| s.dtype() != dtype) + .map(|s| s.dtype()) + .collect(); + if !mismatched_types.is_empty() { + let mismatch_str = mismatched_types.iter().map(|t| t.to_string()).join(", "); + vortex_bail!("Expected all scalars to be of type {dtype}; also found {mismatch_str}"); + } + + let scalar_values = scalars.iter().map(|s| s.value().clone()).collect_vec(); + Array::from_scalar_values(dtype, scalar_values) + } +} + +#[cfg(test)] +mod test {} diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 9b442452b4..4de5ab398a 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -4,6 +4,7 @@ use std::{io, mem}; use flatbuffers::FlatBufferBuilder; use futures::{Stream, TryStreamExt}; use vortex::array::{ChunkedArray, StructArray}; +use vortex::stats::{ArrayStatistics, Stat}; use vortex::stream::ArrayStream; use vortex::validity::Validity; use vortex::{Array, ArrayDType, IntoArray}; @@ -11,6 +12,7 @@ use vortex_buffer::io_buf::IoBuf; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; use vortex_flatbuffers::WriteFlatBuffer; +use vortex_scalar::Scalar; use crate::io::VortexWrite; use crate::layouts::write::footer::{Footer, Postscript}; @@ -24,9 +26,11 @@ pub struct LayoutWriter { row_count: u64, dtype: Option, - column_chunks: Vec, + column_chunks: Vec, } +const PRUNING_STATS: [Stat; 4] = [Stat::Min, Stat::Max, Stat::NullCount, Stat::TrueCount]; + impl LayoutWriter { pub fn new(write: W) -> Self { LayoutWriter { @@ -85,42 +89,54 @@ impl LayoutWriter { where S: Stream> + Unpin, { - let mut row_offsets: Vec = Vec::new(); - let mut byte_offsets = vec![self.msgs.tell()]; - - let mut n_rows_written = match self.column_chunks.get(column_idx) { + let size_hint = stream.size_hint().0; + let accumulator = match self.column_chunks.get_mut(column_idx) { None => { - row_offsets.push(0); - 0 - } - Some(x) => { - let last = x.row_offsets.last(); - *last.vortex_expect("row offsets is non-empty") + self.column_chunks + .push(ColumnChunkAccumulator::new(size_hint)); + + assert_eq!( + self.column_chunks.len(), + column_idx + 1, + "write_column_chunks must be called in order by column index! got column index {} but column chunks has {} columns", + column_idx, + self.column_chunks.len() + ); + + self.column_chunks + .last_mut() + .vortex_expect("column chunks cannot be empty, just pushed") } + Some(x) => x, }; + let mut n_rows_written = *accumulator + .row_offsets + .last() + .vortex_expect("row offsets cannot be empty by construction"); + + let mut byte_offsets = Vec::with_capacity(size_hint); + byte_offsets.push(self.msgs.tell()); while let Some(chunk) = stream.try_next().await? { + for stat in PRUNING_STATS { + accumulator.push_stat(stat, chunk.statistics().compute(stat)); + } + n_rows_written += chunk.len() as u64; - row_offsets.push(n_rows_written); + accumulator.push_row_offset(n_rows_written); + self.msgs.write_batch(chunk).await?; byte_offsets.push(self.msgs.tell()); } - - if let Some(batches) = self.column_chunks.get_mut(column_idx) { - batches.row_offsets.extend(row_offsets); - batches.batch_byte_offsets.push(byte_offsets); - } else { - self.column_chunks - .push(BatchOffsets::new(row_offsets, vec![byte_offsets])); - } + accumulator.push_batch_byte_offsets(byte_offsets); Ok(()) } async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = Vec::with_capacity(self.column_chunks.len()); - for mut chunk in mem::take(&mut self.column_chunks) { - let mut chunks: VecDeque = chunk + for mut column_accumulator in mem::take(&mut self.column_chunks) { + let mut chunks: VecDeque = column_accumulator .batch_byte_offsets .iter() .flat_map(|byte_offsets| { @@ -130,15 +146,17 @@ impl LayoutWriter { .map(|(begin, end)| Layout::flat(ByteRange::new(*begin, *end))) }) .collect(); - let len = chunk.row_offsets.len() - 1; - chunk.row_offsets.truncate(len); - assert_eq!(chunks.len(), chunk.row_offsets.len()); + // we don't need the last row offset; that's just the total number of rows + let length = column_accumulator.row_offsets.len() - 1; + column_accumulator.row_offsets.truncate(length); + + assert_eq!(chunks.len(), column_accumulator.row_offsets.len()); let metadata_array = StructArray::try_new( ["row_offset".into()].into(), - vec![chunk.row_offsets.into_array()], - len, + vec![column_accumulator.row_offsets.into_array()], + length, Validity::NonNullable, )?; @@ -201,18 +219,34 @@ async fn write_fb_raw(mut writer: W, fb: F) } #[derive(Clone, Debug)] -pub struct BatchOffsets { +pub struct ColumnChunkAccumulator { pub row_offsets: Vec, pub batch_byte_offsets: Vec>, + pub pruning_stats: HashMap>, } -impl BatchOffsets { - pub fn new(row_offsets: Vec, batch_byte_offsets: Vec>) -> Self { +impl ColumnChunkAccumulator { + pub fn new(size_hint: usize) -> Self { + let mut row_offsets = Vec::with_capacity(size_hint + 1); + row_offsets.push(0); Self { row_offsets, - batch_byte_offsets, + batch_byte_offsets: Vec::new(), + pruning_stats: HashMap::with_capacity(PRUNING_STATS.len()), } } + + pub fn push_row_offset(&mut self, row_offset: u64) { + self.row_offsets.push(row_offset); + } + + pub fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { + self.batch_byte_offsets.push(batch_byte_offsets); + } + + pub fn push_stat(&mut self, stat: Stat, value: Option) { + self.pruning_stats.insert(stat, value); + } } #[cfg(test)] From d132ff0b71ca8423e21de332bb9e626c3b112f54 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 15 Oct 2024 18:24:17 -0400 Subject: [PATCH 02/15] write stats --- vortex-array/src/scalars.rs | 6 +- vortex-serde/src/layouts/write/writer.rs | 109 +++++++++++++++++------ 2 files changed, 83 insertions(+), 32 deletions(-) diff --git a/vortex-array/src/scalars.rs b/vortex-array/src/scalars.rs index a44856e23b..81a910d0fe 100644 --- a/vortex-array/src/scalars.rs +++ b/vortex-array/src/scalars.rs @@ -10,10 +10,10 @@ use crate::array::{BoolArray, NullArray, PrimitiveArray}; use crate::{Array, IntoArray as _}; impl Array { - pub fn from_scalar_values(dtype: &DType, values: Vec) -> VortexResult { + pub fn from_scalar_values(dtype: DType, values: Vec) -> VortexResult { let mismatched_values = values .iter() - .filter(|v| !v.is_instance_of(dtype)) + .filter(|v| !v.is_instance_of(&dtype)) .collect_vec(); if !mismatched_values.is_empty() { let mismatch_str = mismatched_values.iter().map(|v| v.to_string()).join(", "); @@ -79,7 +79,7 @@ impl Array { } let scalar_values = scalars.iter().map(|s| s.value().clone()).collect_vec(); - Array::from_scalar_values(dtype, scalar_values) + Array::from_scalar_values(dtype.clone(), scalar_values) } } diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 4de5ab398a..36700354dd 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -7,12 +7,15 @@ use vortex::array::{ChunkedArray, StructArray}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::stream::ArrayStream; use vortex::validity::Validity; -use vortex::{Array, ArrayDType, IntoArray}; +use vortex::{Array, Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; use vortex_buffer::io_buf::IoBuf; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_error::{ + vortex_bail, vortex_bail, vortex_err, vortex_err, VortexExpect, VortexExpect, VortexResult, + VortexResult, +}; use vortex_flatbuffers::WriteFlatBuffer; -use vortex_scalar::Scalar; +use vortex_scalar::{Scalar, ScalarValue}; use crate::io::VortexWrite; use crate::layouts::write::footer::{Footer, Postscript}; @@ -136,34 +139,14 @@ impl LayoutWriter { async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = Vec::with_capacity(self.column_chunks.len()); for mut column_accumulator in mem::take(&mut self.column_chunks) { - let mut chunks: VecDeque = column_accumulator - .batch_byte_offsets - .iter() - .flat_map(|byte_offsets| { - byte_offsets - .iter() - .zip(byte_offsets.iter().skip(1)) - .map(|(begin, end)| Layout::flat(ByteRange::new(*begin, *end))) - }) - .collect(); - - // we don't need the last row offset; that's just the total number of rows - let length = column_accumulator.row_offsets.len() - 1; - column_accumulator.row_offsets.truncate(length); - - assert_eq!(chunks.len(), column_accumulator.row_offsets.len()); - - let metadata_array = StructArray::try_new( - ["row_offset".into()].into(), - vec![column_accumulator.row_offsets.into_array()], - length, - Validity::NonNullable, - )?; + let (mut chunks, metadata_array) = column_accumulator.into_chunks_and_metadata()?; let dtype_begin = self.msgs.tell(); self.msgs.write_dtype(metadata_array.dtype()).await?; let dtype_end = self.msgs.tell(); - self.msgs.write_batch(metadata_array.into_array()).await?; + self.msgs.write_batch(metadata_array).await?; + // push the metadata table as the first chunk + // NB(wmanning): I hate this so much chunks.push_front(Layout::inlined_schema( vec![Layout::flat(ByteRange::new(dtype_end, self.msgs.tell()))], ByteRange::new(dtype_begin, dtype_end), @@ -222,7 +205,7 @@ async fn write_fb_raw(mut writer: W, fb: F) pub struct ColumnChunkAccumulator { pub row_offsets: Vec, pub batch_byte_offsets: Vec>, - pub pruning_stats: HashMap>, + pub pruning_stats: HashMap>>, } impl ColumnChunkAccumulator { @@ -245,7 +228,75 @@ impl ColumnChunkAccumulator { } pub fn push_stat(&mut self, stat: Stat, value: Option) { - self.pruning_stats.insert(stat, value); + self.pruning_stats.entry(stat).or_default().push(value); + } + + pub fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { + // we don't need the last row offset; that's just the total number of rows + let length = self.row_offsets.len() - 1; + self.row_offsets.truncate(length); + + let chunks: VecDeque = self + .batch_byte_offsets + .iter() + .flat_map(|byte_offsets| { + byte_offsets + .iter() + .zip(byte_offsets.iter().skip(1)) + .map(|(begin, end)| Layout::Flat(FlatLayout::new(*begin, *end))) + }) + .collect(); + + if chunks.len() != self.row_offsets.len() { + vortex_bail!( + "Expected {} chunks based on row offsets, found {} based on byte offsets", + self.row_offsets.len(), + chunks.len() + ); + } + + let mut names = vec!["row_offset".into()]; + let mut fields = vec![self.row_offsets.into_array()]; + + for stat in PRUNING_STATS { + let values = self.pruning_stats.entry(stat).or_default(); + if values.len() != length { + vortex_bail!( + "Expected {} values for stat {}, found {}", + length, + stat, + values.len() + ); + } + + let Some(dtype) = values + .iter() + .filter(|v| v.is_some()) + .flatten() + .map(|v| v.dtype()) + .next() + else { + // no point in writing all nulls + continue; + }; + let dtype = dtype.as_nullable(); + let values = values + .iter() + .map(|v| { + v.as_ref() + .map(|s| s.cast(&dtype).map(|s| s.into_value())) + .unwrap_or_else(|| Ok(ScalarValue::Null)) + }) + .collect::>>()?; + + names.push(format!("{stat}").to_lowercase().into()); + fields.push(Array::from_scalar_values(dtype, values)?); + } + + Ok(( + chunks, + StructArray::try_new(names.into(), fields, length, Validity::NonNullable)?.into_array(), + )) } } From 1ba8821013a0e3faa54c05efffd26d7672d6d3cd Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 22 Oct 2024 12:05:52 -0400 Subject: [PATCH 03/15] cosmetic change --- vortex-serde/src/layouts/write/layouts.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-serde/src/layouts/write/layouts.rs b/vortex-serde/src/layouts/write/layouts.rs index cc6491aa37..9706b24ad9 100644 --- a/vortex-serde/src/layouts/write/layouts.rs +++ b/vortex-serde/src/layouts/write/layouts.rs @@ -16,10 +16,10 @@ pub struct Layout { } impl Layout { - pub fn flat(buffer: ByteRange) -> Self { + pub fn flat(byte_range: ByteRange) -> Self { Self { id: FLAT_LAYOUT_ID, - buffers: Some(vec![buffer]), + buffers: Some(vec![byte_range]), children: None, metadata: None, } From 9325ca7dc2862b308d64e27ad66b5699921213c0 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 22 Oct 2024 13:00:19 -0400 Subject: [PATCH 04/15] trying to do this with rust types is awful --- vortex-serde/src/layouts/write/writer.rs | 53 ++++++++++++++++++++---- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 36700354dd..6e8068a7d0 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -9,6 +9,7 @@ use vortex::stream::ArrayStream; use vortex::validity::Validity; use vortex::{Array, Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; use vortex_buffer::io_buf::IoBuf; +use vortex_buffer::{Buffer, BufferString}; use vortex_dtype::DType; use vortex_error::{ vortex_bail, vortex_bail, vortex_err, vortex_err, VortexExpect, VortexExpect, VortexResult, @@ -201,37 +202,71 @@ async fn write_fb_raw(mut writer: W, fb: F) Ok(writer) } +fn new_accumulator(size_hint: usize, dtype: &DType) -> Box { + match dtype { + DType::Bool(_) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), + DType::Null => Box::new(TypedColumnChunkAccumulator::<()>::new(size_hint)), + DType::Primitive(ptype, nullability) => todo!(), + DType::Utf8(nullability) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), + DType::Binary(nullability) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), + DType::Struct(struct_dtype, nullability) => todo!(), + DType::List(arc, nullability) => todo!(), + DType::Extension(ext_dtype, nullability) => todo!(), + } +} + +trait ColumnChunkAccumulator { + fn push_row_offset(&mut self, row_offset: u64); + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec); + fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()>; + fn into_chunks_and_metadata(self) -> VortexResult<(VecDeque, Array)>; +} + #[derive(Clone, Debug)] -pub struct ColumnChunkAccumulator { +struct TypedColumnChunkAccumulator { pub row_offsets: Vec, pub batch_byte_offsets: Vec>, - pub pruning_stats: HashMap>>, + pub minima: Vec>, + pub maxima: Vec>, + pub null_counts: Vec, + pub true_counts: Vec, } -impl ColumnChunkAccumulator { +impl TypedColumnChunkAccumulator { pub fn new(size_hint: usize) -> Self { let mut row_offsets = Vec::with_capacity(size_hint + 1); row_offsets.push(0); Self { row_offsets, batch_byte_offsets: Vec::new(), - pruning_stats: HashMap::with_capacity(PRUNING_STATS.len()), + minima: Vec::with_capacity(size_hint), + maxima: Vec::with_capacity(size_hint), + null_counts: Vec::with_capacity(size_hint), + true_counts: Vec::with_capacity(size_hint), } } +} - pub fn push_row_offset(&mut self, row_offset: u64) { +impl ColumnChunkAccumulator for TypedColumnChunkAccumulator { + fn push_row_offset(&mut self, row_offset: u64) { self.row_offsets.push(row_offset); } - pub fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { self.batch_byte_offsets.push(batch_byte_offsets); } - pub fn push_stat(&mut self, stat: Stat, value: Option) { - self.pruning_stats.entry(stat).or_default().push(value); + fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()> { + match stat { + Stat::Min => self.minima.push(value), + Stat::Max => self.maxima.push(value), + Stat::NullCount => self.null_counts.push(value.map_or(0, |v| v.into_value().as_pvalue().vortex_expect("null count is a primitive value").map(|v| v.as_u64().unwrap_or(0))),), + Stat::TrueCount => self.true_counts.push(value.map_or(0, |v| v.into_value().as_pvalue().vortex_expect("true count is a primitive value").map(|v| v.as_u64().unwrap_or(0))),), + _ => vortex_bail!("Unsupported pruning stat: {stat}"), + } } - pub fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { + fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { // we don't need the last row offset; that's just the total number of rows let length = self.row_offsets.len() - 1; self.row_offsets.truncate(length); From 9eefb9f52d66a5b1fe6c0de8efa06b41a5b1b9b0 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 22 Oct 2024 13:28:33 -0400 Subject: [PATCH 05/15] wip --- vortex-serde/src/layouts/mod.rs | 8 +++ vortex-serde/src/layouts/tests.rs | 7 +- vortex-serde/src/layouts/write/writer.rs | 84 ++++++++++++------------ 3 files changed, 55 insertions(+), 44 deletions(-) diff --git a/vortex-serde/src/layouts/mod.rs b/vortex-serde/src/layouts/mod.rs index d374ef8679..e6d1be25e1 100644 --- a/vortex-serde/src/layouts/mod.rs +++ b/vortex-serde/src/layouts/mod.rs @@ -1,3 +1,8 @@ +use std::sync::Arc; + +use vortex_dtype::FieldNames; +use vortex_stats::Stat; + mod read; mod write; @@ -15,5 +20,8 @@ pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2); pub const COLUMN_LAYOUT_ID: LayoutId = LayoutId(3); pub const INLINE_SCHEMA_LAYOUT_ID: LayoutId = LayoutId(4); +pub const PRUNING_STATS: [Stat; 4] = [Stat::Min, Stat::Max, Stat::NullCount, Stat::TrueCount]; +pub const METADATA_FIELD_NAMES: [&str; 5] = ["row_offset", "min", "max", "null_count", "true_count"]; + pub use read::*; pub use write::*; diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 4ddd1c87c4..03423e2199 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -14,7 +14,12 @@ use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_expr::{BinaryExpr, Column, Literal, Operator}; use crate::layouts::write::LayoutWriter; -use crate::layouts::{LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter}; +use crate::layouts::{LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter, METADATA_FIELD_NAMES, PRUNING_STATS}; + +#[test] +fn metadata_field_names() { + assert!(PRUNING_STATS.iter().all(|s| METADATA_FIELD_NAMES.contains(&format!("{}", s).into()))); +} #[tokio::test] #[cfg_attr(miri, ignore)] diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 6e8068a7d0..0afd52e251 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -7,13 +7,12 @@ use vortex::array::{ChunkedArray, StructArray}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::stream::ArrayStream; use vortex::validity::Validity; -use vortex::{Array, Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; +use vortex::{Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; use vortex_buffer::io_buf::IoBuf; use vortex_buffer::{Buffer, BufferString}; use vortex_dtype::DType; use vortex_error::{ - vortex_bail, vortex_bail, vortex_err, vortex_err, VortexExpect, VortexExpect, VortexResult, - VortexResult, + vortex_bail, vortex_err, vortex_err, VortexExpect, VortexExpect, VortexResult, VortexResult, }; use vortex_flatbuffers::WriteFlatBuffer; use vortex_scalar::{Scalar, ScalarValue}; @@ -21,7 +20,7 @@ use vortex_scalar::{Scalar, ScalarValue}; use crate::io::VortexWrite; use crate::layouts::write::footer::{Footer, Postscript}; use crate::layouts::write::layouts::Layout; -use crate::layouts::{EOF_SIZE, MAGIC_BYTES, VERSION}; +use crate::layouts::{EOF_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, VERSION}; use crate::stream_writer::ByteRange; use crate::MessageWriter; @@ -33,8 +32,6 @@ pub struct LayoutWriter { column_chunks: Vec, } -const PRUNING_STATS: [Stat; 4] = [Stat::Min, Stat::Max, Stat::NullCount, Stat::TrueCount]; - impl LayoutWriter { pub fn new(write: W) -> Self { LayoutWriter { @@ -202,41 +199,22 @@ async fn write_fb_raw(mut writer: W, fb: F) Ok(writer) } -fn new_accumulator(size_hint: usize, dtype: &DType) -> Box { - match dtype { - DType::Bool(_) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), - DType::Null => Box::new(TypedColumnChunkAccumulator::<()>::new(size_hint)), - DType::Primitive(ptype, nullability) => todo!(), - DType::Utf8(nullability) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), - DType::Binary(nullability) => Box::new(TypedColumnChunkAccumulator::::new(size_hint)), - DType::Struct(struct_dtype, nullability) => todo!(), - DType::List(arc, nullability) => todo!(), - DType::Extension(ext_dtype, nullability) => todo!(), - } -} - -trait ColumnChunkAccumulator { - fn push_row_offset(&mut self, row_offset: u64); - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec); - fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()>; - fn into_chunks_and_metadata(self) -> VortexResult<(VecDeque, Array)>; -} - -#[derive(Clone, Debug)] -struct TypedColumnChunkAccumulator { +struct ColumnChunkAccumulator { + pub dtype: DType, pub row_offsets: Vec, pub batch_byte_offsets: Vec>, - pub minima: Vec>, - pub maxima: Vec>, - pub null_counts: Vec, - pub true_counts: Vec, + pub minima: Vec>, + pub maxima: Vec>, + pub null_counts: Vec>, + pub true_counts: Vec>, } -impl TypedColumnChunkAccumulator { - pub fn new(size_hint: usize) -> Self { +impl ColumnChunkAccumulator { + pub fn new(size_hint: usize, dtype: DType) -> Self { let mut row_offsets = Vec::with_capacity(size_hint + 1); row_offsets.push(0); Self { + dtype, row_offsets, batch_byte_offsets: Vec::new(), minima: Vec::with_capacity(size_hint), @@ -245,9 +223,7 @@ impl TypedColumnChunkAccumulator { true_counts: Vec::with_capacity(size_hint), } } -} -impl ColumnChunkAccumulator for TypedColumnChunkAccumulator { fn push_row_offset(&mut self, row_offset: u64) { self.row_offsets.push(row_offset); } @@ -257,13 +233,35 @@ impl ColumnChunkAccumulator for TypedColumnChunkAccumulator { } fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()> { - match stat { - Stat::Min => self.minima.push(value), - Stat::Max => self.maxima.push(value), - Stat::NullCount => self.null_counts.push(value.map_or(0, |v| v.into_value().as_pvalue().vortex_expect("null count is a primitive value").map(|v| v.as_u64().unwrap_or(0))),), - Stat::TrueCount => self.true_counts.push(value.map_or(0, |v| v.into_value().as_pvalue().vortex_expect("true count is a primitive value").map(|v| v.as_u64().unwrap_or(0))),), - _ => vortex_bail!("Unsupported pruning stat: {stat}"), + if matches!(stat, Stat::Min | Stat::Max) { + if let Some(value) = value { + if !value.value().is_instance_of(&self.dtype) { + vortex_bail!( + "Expected all min/max values to have dtype {}, got {}", + self.dtype, + value.dtype() + ); + } + } } + + Ok(match stat { + Stat::Min => self.minima.push(value.map(|v| v.into_value())), + Stat::Max => self.maxima.push(value.map(|v| v.into_value())), + Stat::NullCount => self.null_counts.push(value.map_or(0, |v| { + v.into_value() + .as_pvalue() + .vortex_expect("null count is a primitive value") + .and_then(|v| v.as_u64()) + })), + Stat::TrueCount => self.true_counts.push(value.map_or(0, |v| { + v.into_value() + .as_pvalue() + .vortex_expect("true count is a primitive value") + .and_then(|v| v.as_u64()) + })), + _ => vortex_bail!("Unsupported pruning stat: {stat}"), + }) } fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { @@ -290,7 +288,7 @@ impl ColumnChunkAccumulator for TypedColumnChunkAccumulator { ); } - let mut names = vec!["row_offset".into()]; + let mut names = METADATA_FIELD_NAMES.clone(); let mut fields = vec![self.row_offsets.into_array()]; for stat in PRUNING_STATS { From ef51d5cb6546b69c46831120f6b8c18fbdf0500f Mon Sep 17 00:00:00 2001 From: Will Manning Date: Fri, 25 Oct 2024 10:19:51 -0400 Subject: [PATCH 06/15] cleanup --- vortex-serde/src/layouts/mod.rs | 5 +- vortex-serde/src/layouts/tests.rs | 19 ++++- vortex-serde/src/layouts/write/writer.rs | 101 ++++++++++++++--------- 3 files changed, 80 insertions(+), 45 deletions(-) diff --git a/vortex-serde/src/layouts/mod.rs b/vortex-serde/src/layouts/mod.rs index e6d1be25e1..f1089d0634 100644 --- a/vortex-serde/src/layouts/mod.rs +++ b/vortex-serde/src/layouts/mod.rs @@ -1,7 +1,4 @@ -use std::sync::Arc; - -use vortex_dtype::FieldNames; -use vortex_stats::Stat; +use vortex::stats::Stat; mod read; mod write; diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 03423e2199..32c32bfef6 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -4,6 +4,7 @@ use std::iter; use std::sync::Arc; use futures::StreamExt; +use itertools::Itertools as _; use vortex::accessor::ArrayAccessor; use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; use vortex::validity::Validity; @@ -14,11 +15,25 @@ use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_expr::{BinaryExpr, Column, Literal, Operator}; use crate::layouts::write::LayoutWriter; -use crate::layouts::{LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter, METADATA_FIELD_NAMES, PRUNING_STATS}; +use crate::layouts::{ + LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter, EOF_SIZE, FOOTER_POSTSCRIPT_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, PRUNING_STATS, VERSION +}; + +#[test] +fn format_constants() { + assert_eq!(VERSION, 1); + assert_eq!(FOOTER_POSTSCRIPT_SIZE, 32); // cannot change this without bumping the version + assert_eq!(MAGIC_BYTES, *b"VRTX"); // this can never change + assert_eq!(EOF_SIZE, 8); // this can never change +} #[test] fn metadata_field_names() { - assert!(PRUNING_STATS.iter().all(|s| METADATA_FIELD_NAMES.contains(&format!("{}", s).into()))); + let names = Some("row_offset".to_string()) + .into_iter() + .chain(PRUNING_STATS.iter().map(|s| format!("{}", s))) + .collect_vec(); + assert_eq!(names, METADATA_FIELD_NAMES); } #[tokio::test] diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 0afd52e251..75dd89c521 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::sync::Arc; use std::{io, mem}; use flatbuffers::FlatBufferBuilder; @@ -10,9 +11,10 @@ use vortex::validity::Validity; use vortex::{Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; use vortex_buffer::io_buf::IoBuf; use vortex_buffer::{Buffer, BufferString}; -use vortex_dtype::DType; +use vortex_dtype::{DType, Nullability, PType}; use vortex_error::{ - vortex_bail, vortex_err, vortex_err, VortexExpect, VortexExpect, VortexResult, VortexResult, + vortex_bail, vortex_bail, vortex_err, vortex_err, vortex_err, vortex_panic, VortexExpect, + VortexExpect, VortexExpect as _, VortexResult, VortexResult, VortexResult, }; use vortex_flatbuffers::WriteFlatBuffer; use vortex_scalar::{Scalar, ScalarValue}; @@ -20,7 +22,7 @@ use vortex_scalar::{Scalar, ScalarValue}; use crate::io::VortexWrite; use crate::layouts::write::footer::{Footer, Postscript}; use crate::layouts::write::layouts::Layout; -use crate::layouts::{EOF_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, VERSION}; +use crate::layouts::{EOF_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, PRUNING_STATS, VERSION}; use crate::stream_writer::ByteRange; use crate::MessageWriter; @@ -88,13 +90,13 @@ impl LayoutWriter { async fn write_column_chunks(&mut self, mut stream: S, column_idx: usize) -> VortexResult<()> where - S: Stream> + Unpin, + S: Stream> + Unpin + ArrayStream, { let size_hint = stream.size_hint().0; let accumulator = match self.column_chunks.get_mut(column_idx) { None => { self.column_chunks - .push(ColumnChunkAccumulator::new(size_hint)); + .push(ColumnChunkAccumulator::new(size_hint, stream.dtype())); assert_eq!( self.column_chunks.len(), @@ -120,7 +122,7 @@ impl LayoutWriter { while let Some(chunk) = stream.try_next().await? { for stat in PRUNING_STATS { - accumulator.push_stat(stat, chunk.statistics().compute(stat)); + accumulator.push_stat(stat, chunk.statistics().compute(stat))?; } n_rows_written += chunk.len() as u64; @@ -143,8 +145,6 @@ impl LayoutWriter { self.msgs.write_dtype(metadata_array.dtype()).await?; let dtype_end = self.msgs.tell(); self.msgs.write_batch(metadata_array).await?; - // push the metadata table as the first chunk - // NB(wmanning): I hate this so much chunks.push_front(Layout::inlined_schema( vec![Layout::flat(ByteRange::new(dtype_end, self.msgs.tell()))], ByteRange::new(dtype_begin, dtype_end), @@ -203,18 +203,18 @@ struct ColumnChunkAccumulator { pub dtype: DType, pub row_offsets: Vec, pub batch_byte_offsets: Vec>, - pub minima: Vec>, - pub maxima: Vec>, + pub minima: Vec, + pub maxima: Vec, pub null_counts: Vec>, pub true_counts: Vec>, } impl ColumnChunkAccumulator { - pub fn new(size_hint: usize, dtype: DType) -> Self { + pub fn new(size_hint: usize, dtype: &DType) -> Self { let mut row_offsets = Vec::with_capacity(size_hint + 1); row_offsets.push(0); Self { - dtype, + dtype: dtype.as_nullable(), row_offsets, batch_byte_offsets: Vec::new(), minima: Vec::with_capacity(size_hint), @@ -234,7 +234,7 @@ impl ColumnChunkAccumulator { fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()> { if matches!(stat, Stat::Min | Stat::Max) { - if let Some(value) = value { + if let Some(ref value) = value { if !value.value().is_instance_of(&self.dtype) { vortex_bail!( "Expected all min/max values to have dtype {}, got {}", @@ -245,23 +245,32 @@ impl ColumnChunkAccumulator { } } - Ok(match stat { - Stat::Min => self.minima.push(value.map(|v| v.into_value())), - Stat::Max => self.maxima.push(value.map(|v| v.into_value())), - Stat::NullCount => self.null_counts.push(value.map_or(0, |v| { + match stat { + Stat::Min => self.minima.push( + value + .map(|v| v.into_value()) + .unwrap_or_else(|| ScalarValue::Null), + ), + Stat::Max => self.maxima.push( + value + .map(|v| v.into_value()) + .unwrap_or_else(|| ScalarValue::Null), + ), + Stat::NullCount => self.null_counts.push(value.and_then(|v| { v.into_value() .as_pvalue() .vortex_expect("null count is a primitive value") .and_then(|v| v.as_u64()) })), - Stat::TrueCount => self.true_counts.push(value.map_or(0, |v| { + Stat::TrueCount => self.true_counts.push(value.and_then(|v| { v.into_value() .as_pvalue() .vortex_expect("true count is a primitive value") .and_then(|v| v.as_u64()) })), _ => vortex_bail!("Unsupported pruning stat: {stat}"), - }) + } + Ok(()) } fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { @@ -288,11 +297,27 @@ impl ColumnChunkAccumulator { ); } - let mut names = METADATA_FIELD_NAMES.clone(); - let mut fields = vec![self.row_offsets.into_array()]; + let mut names: Vec> = vec!["row_offset".into()]; + let mut fields = vec![mem::take(&mut self.row_offsets).into_array()]; for stat in PRUNING_STATS { - let values = self.pruning_stats.entry(stat).or_default(); + let values = match stat { + Stat::Min => mem::take(&mut self.minima), + Stat::Max => mem::take(&mut self.maxima), + Stat::NullCount => self + .null_counts + .iter() + .cloned() + .map(ScalarValue::from) + .collect(), + Stat::TrueCount => self + .true_counts + .iter() + .cloned() + .map(ScalarValue::from) + .collect(), + _ => vortex_bail!("Unsupported pruning stat: {}", stat), + }; if values.len() != length { vortex_bail!( "Expected {} values for stat {}, found {}", @@ -302,29 +327,28 @@ impl ColumnChunkAccumulator { ); } - let Some(dtype) = values - .iter() - .filter(|v| v.is_some()) - .flatten() - .map(|v| v.dtype()) - .next() - else { + if values.iter().all(|v| v.is_null()) { // no point in writing all nulls continue; }; - let dtype = dtype.as_nullable(); - let values = values - .iter() - .map(|v| { - v.as_ref() - .map(|s| s.cast(&dtype).map(|s| s.into_value())) - .unwrap_or_else(|| Ok(ScalarValue::Null)) - }) - .collect::>>()?; + + let dtype = match stat { + Stat::Min | Stat::Max => self.dtype.clone(), + _ => DType::Primitive(PType::U64, Nullability::Nullable), + }; names.push(format!("{stat}").to_lowercase().into()); fields.push(Array::from_scalar_values(dtype, values)?); } + for name in &names { + if !METADATA_FIELD_NAMES.contains(&name.as_ref()) { + vortex_panic!( + "Found unexpected metadata field name {}, expected one of {:?}", + name, + METADATA_FIELD_NAMES + ); + } + } Ok(( chunks, @@ -376,6 +400,5 @@ mod tests { buffer[buffer_begin..buffer_end].len(), FOOTER_POSTSCRIPT_SIZE ); - assert_eq!(buffer[buffer_begin..buffer_end].len(), 32); } } From 6218d422e9bb095054056634cbd01656760464d5 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 29 Oct 2024 16:26:32 -0400 Subject: [PATCH 07/15] feat: store min, max, null count, and true count in column metadata --- docs/quickstart.rst | 2 +- vortex-array/src/lib.rs | 2 - vortex-array/src/scalars.rs | 87 ------ vortex-buffer/src/string.rs | 6 + vortex-dtype/src/dtype.rs | 10 + vortex-scalar/src/binary.rs | 45 ++- vortex-scalar/src/bool.rs | 41 ++- vortex-scalar/src/lib.rs | 1 + vortex-scalar/src/null.rs | 35 +++ vortex-scalar/src/primitive.rs | 24 +- vortex-scalar/src/utf8.rs | 59 +++- vortex-scalar/src/value.rs | 7 + vortex-serde/src/layouts/mod.rs | 5 - vortex-serde/src/layouts/tests.rs | 22 +- vortex-serde/src/layouts/write/layouts.rs | 4 +- .../layouts/write/metadata_accumulators.rs | 263 ++++++++++++++++++ vortex-serde/src/layouts/write/mod.rs | 1 + vortex-serde/src/layouts/write/writer.rs | 226 ++------------- 18 files changed, 501 insertions(+), 339 deletions(-) delete mode 100644 vortex-array/src/scalars.rs create mode 100644 vortex-scalar/src/null.rs create mode 100644 vortex-serde/src/layouts/write/metadata_accumulators.rs diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 65a71cb7c3..074913b48d 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -70,7 +70,7 @@ similar to or smaller than Parquet. >>> from os.path import getsize >>> getsize("example.vortex") / getsize("_static/example.parquet") - 2.1... + 2... Read ^^^^ diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 9cc73b9d29..2d554b99cc 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -48,8 +48,6 @@ pub mod encoding; mod implementation; pub mod iter; mod metadata; -pub mod opaque; -pub mod scalars; pub mod stats; pub mod stream; mod tree; diff --git a/vortex-array/src/scalars.rs b/vortex-array/src/scalars.rs deleted file mode 100644 index 81a910d0fe..0000000000 --- a/vortex-array/src/scalars.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::collections::HashSet; - -use itertools::Itertools as _; -use vortex_dtype::{match_each_native_ptype, DType}; -use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; -use vortex_scalar::{Scalar, ScalarValue}; - -use crate::array::builder::VarBinBuilder; -use crate::array::{BoolArray, NullArray, PrimitiveArray}; -use crate::{Array, IntoArray as _}; - -impl Array { - pub fn from_scalar_values(dtype: DType, values: Vec) -> VortexResult { - let mismatched_values = values - .iter() - .filter(|v| !v.is_instance_of(&dtype)) - .collect_vec(); - if !mismatched_values.is_empty() { - let mismatch_str = mismatched_values.iter().map(|v| v.to_string()).join(", "); - vortex_bail!("Expected all scalars to be of type {dtype}; found {mismatch_str}"); - } - - Ok(match dtype { - DType::Bool(_) => BoolArray::from_iter(values.into_iter().map(|s| { - s.as_bool() - .vortex_expect("Expected ScalarValue to be a bool") - })) - .into_array(), - DType::Primitive(ptype, _) => { - match_each_native_ptype!(ptype, |$P| { - PrimitiveArray::from_nullable_vec(values.iter().map(|s| { - s.as_pvalue() - .vortex_expect("Expected ScalarValue to be a primitive") - .map(|p| p.as_primitive::<$P>().vortex_expect("Expected ScalarValue to be a primitive")) - }).collect_vec()) - .into_array() - }) - } - DType::Binary(_) => { - let mut builder = VarBinBuilder::::with_capacity(values.len()); - for value in values { - let buf = value.as_buffer()?; - builder.push(buf.as_ref().map(|b| b.as_slice())); - } - builder.finish(dtype.clone()).into_array() - } - DType::Utf8(_) => { - let mut builder = VarBinBuilder::::with_capacity(values.len()); - for value in values { - let buf_str = value.as_buffer_string()?; - builder.push(buf_str.as_ref().map(|b| b.as_bytes())); - } - builder.finish(dtype.clone()).into_array() - } - DType::List(..) => vortex_bail!("Cannot convert ScalarValues to ListArray"), - DType::Struct(..) => vortex_bail!("Cannot convert ScalarValues to StructArray"), - DType::Null => NullArray::new(values.len()).into_array(), - DType::Extension(..) => vortex_bail!("Cannot convert ScalarValues to ExtensionArray"), - }) - } - - pub fn from_scalars(scalars: &[Scalar]) -> VortexResult { - if scalars.is_empty() { - vortex_bail!("Cannot convert empty Vec to canonical"); - } - - let scalar = scalars[0].clone(); - let dtype = scalar.dtype(); - - let mismatched_types: HashSet<&DType> = scalars - .iter() - .skip(1) - .filter(|s| s.dtype() != dtype) - .map(|s| s.dtype()) - .collect(); - if !mismatched_types.is_empty() { - let mismatch_str = mismatched_types.iter().map(|t| t.to_string()).join(", "); - vortex_bail!("Expected all scalars to be of type {dtype}; also found {mismatch_str}"); - } - - let scalar_values = scalars.iter().map(|s| s.value().clone()).collect_vec(); - Array::from_scalar_values(dtype.clone(), scalar_values) - } -} - -#[cfg(test)] -mod test {} diff --git a/vortex-buffer/src/string.rs b/vortex-buffer/src/string.rs index 4e6ae2210b..de3c7748c3 100644 --- a/vortex-buffer/src/string.rs +++ b/vortex-buffer/src/string.rs @@ -66,3 +66,9 @@ impl AsRef for BufferString { self.as_str() } } + +impl AsRef<[u8]> for BufferString { + fn as_ref(&self) -> &[u8] { + self.as_str().as_bytes() + } +} diff --git a/vortex-dtype/src/dtype.rs b/vortex-dtype/src/dtype.rs index b2b16e6487..d941dce1e6 100644 --- a/vortex-dtype/src/dtype.rs +++ b/vortex-dtype/src/dtype.rs @@ -15,6 +15,16 @@ pub type FieldNames = Arc<[FieldName]>; pub type Metadata = Vec; +pub fn fieldnames_from_strings(value: Vec) -> FieldNames { + Arc::from( + value + .iter() + .map(|x| Arc::from(x.as_str())) + .collect::>() + .into_boxed_slice(), + ) +} + /// Array logical types. /// /// Vortex arrays preserve a single logical type, while the encodings allow for multiple diff --git a/vortex-scalar/src/binary.rs b/vortex-scalar/src/binary.rs index 10a6ee2f74..f4d8628620 100644 --- a/vortex-scalar/src/binary.rs +++ b/vortex-scalar/src/binary.rs @@ -51,9 +51,48 @@ impl<'a> TryFrom<&'a Scalar> for BinaryScalar<'a> { impl<'a> TryFrom<&'a Scalar> for Buffer { type Error = VortexError; - fn try_from(value: &'a Scalar) -> VortexResult { - BinaryScalar::try_from(value)? - .value() + fn try_from(scalar: &'a Scalar) -> VortexResult { + Buffer::try_from(scalar.value()) + } +} + +impl TryFrom for Buffer { + type Error = VortexError; + + fn try_from(scalar: Scalar) -> VortexResult { + Buffer::try_from(&scalar) + } +} + +impl TryFrom<&ScalarValue> for Buffer { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> Result { + Option::::try_from(value)? .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) } } + +impl TryFrom for Buffer { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + Buffer::try_from(&value) + } +} + +impl TryFrom<&ScalarValue> for Option { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> Result { + value.as_buffer() + } +} + +impl TryFrom for Option { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + Option::::try_from(&value) + } +} diff --git a/vortex-scalar/src/bool.rs b/vortex-scalar/src/bool.rs index 263a8ec872..69cb194f35 100644 --- a/vortex-scalar/src/bool.rs +++ b/vortex-scalar/src/bool.rs @@ -64,6 +64,14 @@ impl TryFrom<&Scalar> for bool { } } +impl TryFrom for bool { + type Error = VortexError; + + fn try_from(value: Scalar) -> VortexResult { + bool::try_from(&value) + } +} + impl From for Scalar { fn from(value: bool) -> Self { Self { @@ -73,19 +81,42 @@ impl From for Scalar { } } +impl From for ScalarValue { + fn from(value: bool) -> Self { + ScalarValue::Bool(value) + } +} + +impl TryFrom<&ScalarValue> for Option { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> VortexResult { + value.as_bool() + } +} + +impl TryFrom for Option { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> VortexResult { + Option::::try_from(&value) + } +} + impl TryFrom<&ScalarValue> for bool { type Error = VortexError; fn try_from(value: &ScalarValue) -> VortexResult { - value - .as_bool()? + Option::::try_from(value)? .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) } } -impl From for ScalarValue { - fn from(value: bool) -> Self { - ScalarValue::Bool(value) +impl TryFrom for bool { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> VortexResult { + bool::try_from(&value) } } diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index f535cc1112..cc63358d14 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -12,6 +12,7 @@ mod datafusion; mod display; mod extension; mod list; +mod null; mod primitive; mod pvalue; mod scalar_type; diff --git a/vortex-scalar/src/null.rs b/vortex-scalar/src/null.rs new file mode 100644 index 0000000000..483c4f40e0 --- /dev/null +++ b/vortex-scalar/src/null.rs @@ -0,0 +1,35 @@ +use vortex_error::VortexError; + +use crate::{Scalar, ScalarValue}; + +impl TryFrom<&Scalar> for () { + type Error = VortexError; + + fn try_from(scalar: &Scalar) -> Result { + scalar.value().as_null() + } +} + +impl TryFrom for () { + type Error = VortexError; + + fn try_from(scalar: Scalar) -> Result { + <()>::try_from(&scalar) + } +} + +impl TryFrom<&ScalarValue> for () { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> Result { + value.as_null() + } +} + +impl TryFrom for () { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + <()>::try_from(&value) + } +} diff --git a/vortex-scalar/src/primitive.rs b/vortex-scalar/src/primitive.rs index b7add0d85e..fedd92d7bc 100644 --- a/vortex-scalar/src/primitive.rs +++ b/vortex-scalar/src/primitive.rs @@ -154,19 +154,37 @@ macro_rules! primitive_scalar { impl TryFrom<&ScalarValue> for $T { type Error = VortexError; + fn try_from(value: &ScalarValue) -> Result { + Option::<$T>::try_from(value)? + .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) + } + } + + impl TryFrom for $T { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + <$T>::try_from(&value) + } + } + + impl TryFrom<&ScalarValue> for Option<$T> { + type Error = VortexError; + fn try_from(value: &ScalarValue) -> Result { match value { - ScalarValue::Primitive(pvalue) => <$T>::try_from(*pvalue), + ScalarValue::Null => Ok(None), + ScalarValue::Primitive(pvalue) => Ok(Some(<$T>::try_from(*pvalue)?)), _ => vortex_bail!("expected primitive"), } } } - impl TryFrom for $T { + impl TryFrom for Option<$T> { type Error = VortexError; fn try_from(value: ScalarValue) -> Result { - <$T>::try_from(&value) + Option::<$T>::try_from(&value) } } }; diff --git a/vortex-scalar/src/utf8.rs b/vortex-scalar/src/utf8.rs index cf74c43235..2bb4692f8f 100644 --- a/vortex-scalar/src/utf8.rs +++ b/vortex-scalar/src/utf8.rs @@ -62,16 +62,6 @@ impl<'a> TryFrom<&'a Scalar> for Utf8Scalar<'a> { } } -impl<'a> TryFrom<&'a Scalar> for BufferString { - type Error = VortexError; - - fn try_from(value: &'a Scalar) -> VortexResult { - Utf8Scalar::try_from(value)? - .value() - .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) - } -} - impl<'a> TryFrom<&'a Scalar> for String { type Error = VortexError; @@ -88,3 +78,52 @@ impl From<&str> for Scalar { } } } + +impl<'a> TryFrom<&'a Scalar> for BufferString { + type Error = VortexError; + + fn try_from(scalar: &'a Scalar) -> VortexResult { + BufferString::try_from(scalar.value()) + } +} + +impl TryFrom for BufferString { + type Error = VortexError; + + fn try_from(scalar: Scalar) -> Result { + BufferString::try_from(&scalar) + } +} + +impl TryFrom<&ScalarValue> for BufferString { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> Result { + Option::::try_from(value)? + .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) + } +} + +impl TryFrom for BufferString { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + BufferString::try_from(&value) + } +} + +impl TryFrom<&ScalarValue> for Option { + type Error = VortexError; + + fn try_from(value: &ScalarValue) -> Result { + value.as_buffer_string() + } +} + +impl TryFrom for Option { + type Error = VortexError; + + fn try_from(value: ScalarValue) -> Result { + Option::::try_from(&value) + } +} diff --git a/vortex-scalar/src/value.rs b/vortex-scalar/src/value.rs index 89cbcfbb84..98ef02cda6 100644 --- a/vortex-scalar/src/value.rs +++ b/vortex-scalar/src/value.rs @@ -94,6 +94,13 @@ impl ScalarValue { } } + pub fn as_null(&self) -> VortexResult<()> { + match self { + Self::Null => Ok(()), + _ => Err(vortex_err!("Expected a Null scalar, found {:?}", self)), + } + } + pub fn as_bool(&self) -> VortexResult> { match self { Self::Null => Ok(None), diff --git a/vortex-serde/src/layouts/mod.rs b/vortex-serde/src/layouts/mod.rs index f1089d0634..d374ef8679 100644 --- a/vortex-serde/src/layouts/mod.rs +++ b/vortex-serde/src/layouts/mod.rs @@ -1,5 +1,3 @@ -use vortex::stats::Stat; - mod read; mod write; @@ -17,8 +15,5 @@ pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2); pub const COLUMN_LAYOUT_ID: LayoutId = LayoutId(3); pub const INLINE_SCHEMA_LAYOUT_ID: LayoutId = LayoutId(4); -pub const PRUNING_STATS: [Stat; 4] = [Stat::Min, Stat::Max, Stat::NullCount, Stat::TrueCount]; -pub const METADATA_FIELD_NAMES: [&str; 5] = ["row_offset", "min", "max", "null_count", "true_count"]; - pub use read::*; pub use write::*; diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 32c32bfef6..4ddd1c87c4 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -4,7 +4,6 @@ use std::iter; use std::sync::Arc; use futures::StreamExt; -use itertools::Itertools as _; use vortex::accessor::ArrayAccessor; use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; use vortex::validity::Validity; @@ -15,26 +14,7 @@ use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_expr::{BinaryExpr, Column, Literal, Operator}; use crate::layouts::write::LayoutWriter; -use crate::layouts::{ - LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter, EOF_SIZE, FOOTER_POSTSCRIPT_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, PRUNING_STATS, VERSION -}; - -#[test] -fn format_constants() { - assert_eq!(VERSION, 1); - assert_eq!(FOOTER_POSTSCRIPT_SIZE, 32); // cannot change this without bumping the version - assert_eq!(MAGIC_BYTES, *b"VRTX"); // this can never change - assert_eq!(EOF_SIZE, 8); // this can never change -} - -#[test] -fn metadata_field_names() { - let names = Some("row_offset".to_string()) - .into_iter() - .chain(PRUNING_STATS.iter().map(|s| format!("{}", s))) - .collect_vec(); - assert_eq!(names, METADATA_FIELD_NAMES); -} +use crate::layouts::{LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter}; #[tokio::test] #[cfg_attr(miri, ignore)] diff --git a/vortex-serde/src/layouts/write/layouts.rs b/vortex-serde/src/layouts/write/layouts.rs index 9706b24ad9..cc6491aa37 100644 --- a/vortex-serde/src/layouts/write/layouts.rs +++ b/vortex-serde/src/layouts/write/layouts.rs @@ -16,10 +16,10 @@ pub struct Layout { } impl Layout { - pub fn flat(byte_range: ByteRange) -> Self { + pub fn flat(buffer: ByteRange) -> Self { Self { id: FLAT_LAYOUT_ID, - buffers: Some(vec![byte_range]), + buffers: Some(vec![buffer]), children: None, metadata: None, } diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs new file mode 100644 index 0000000000..c11e0fb96c --- /dev/null +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -0,0 +1,263 @@ +//! Metadata accumulators track the per-chunk-of-a-column metadata, layout locations, and row counts. + +use std::collections::VecDeque; +use std::mem; + +use vortex::array::{BoolArray, NullArray, PrimitiveArray, StructArray, VarBinViewArray}; +use vortex::stats::{ArrayStatistics as _, Stat}; +use vortex::validity::Validity; +use vortex::{Array, IntoArray as _}; +use vortex_buffer::{Buffer, BufferString}; +use vortex_dtype::{ + fieldnames_from_strings, match_each_native_ptype, DType, NativePType, Nullability, +}; +use vortex_error::{vortex_bail, VortexError, VortexExpect as _, VortexResult}; +use vortex_scalar::Scalar; + +use super::layouts::Layout; +use crate::stream_writer::ByteRange; + +pub fn new_metadata_accumulator(hint: usize, dtype: &DType) -> Box { + match dtype { + DType::Null => Box::new(ExtremaAccumulator::<()>::new(hint, into_null_array)), + DType::Bool(..) => Box::new(ExtremaAccumulator::::new(hint, into_bool_array)), + DType::Primitive(ptype, ..) => { + match_each_native_ptype!(ptype, |$P| { + Box::new(ExtremaAccumulator::<$P>::new(hint, into_primitive_array::<$P>)) + }) + } + DType::Utf8(..) => Box::new(ExtremaAccumulator::::new( + hint, + into_utf8_array, + )), + DType::Binary(..) => Box::new(ExtremaAccumulator::::new(hint, into_binary_array)), + DType::Struct(..) => Box::new(BasicAccumulator::new(hint)), + DType::List(..) => Box::new(BasicAccumulator::new(hint)), + DType::Extension(..) => Box::new(BasicAccumulator::new(hint)), + } +} + +pub trait MetadataAccumulator { + fn push_chunk(&mut self, array: &Array) -> VortexResult<()>; + + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec); + + fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)>; + + fn into_layouts_and_metadata_parts( + self, + ) -> VortexResult<(VecDeque, Vec, Vec)>; +} + +struct ExtremaAccumulator { + minima: Vec>, + maxima: Vec>, + to_array: fn(Vec>) -> Array, + basic_metadata: Box, +} + +impl ExtremaAccumulator { + fn new(size_hint: usize, to_array: fn(Vec>) -> Array) -> Self { + Self { + minima: Vec::with_capacity(size_hint), + maxima: Vec::with_capacity(size_hint), + to_array, + basic_metadata: Box::new(BasicAccumulator::new(size_hint)), + } + } +} + +impl MetadataAccumulator for ExtremaAccumulator +where + T: TryFrom, +{ + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.minima.push( + array + .statistics() + .compute(Stat::Min) + .map(T::try_from) + .transpose()?, + ); + self.maxima.push( + array + .statistics() + .compute(Stat::Max) + .map(T::try_from) + .transpose()?, + ); + + self.basic_metadata.push_chunk(array) + } + + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { + self.basic_metadata + .push_batch_byte_offsets(batch_byte_offsets); + } + + fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)> { + let (chunks, names, fields) = self.into_layouts_and_metadata_parts()?; + let n_chunks = chunks.len(); + let names = fieldnames_from_strings(names); + Ok(( + chunks, + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, + )) + } + + fn into_layouts_and_metadata_parts( + mut self, + ) -> VortexResult<(VecDeque, Vec, Vec)> { + let (chunks, mut names, mut fields) = + self.basic_metadata.into_layouts_and_metadata_parts()?; + + if self.minima.iter().any(Option::is_some) { + names.push("min".into()); + fields.push((self.to_array)(mem::take(&mut self.minima))); + } + + if self.maxima.iter().any(Option::is_some) { + names.push("max".into()); + fields.push((self.to_array)(mem::take(&mut self.maxima))); + } + + Ok((chunks, names, fields)) + } +} + +struct BasicAccumulator { + row_offsets: Vec, + batch_byte_offsets: Vec>, + null_counts: Vec>, + true_counts: Vec>, +} + +impl BasicAccumulator { + pub fn new(size_hint: usize) -> Self { + let mut row_offsets = Vec::with_capacity(size_hint + 1); + row_offsets.push(0); + Self { + row_offsets, + batch_byte_offsets: Vec::new(), + null_counts: Vec::with_capacity(size_hint), + true_counts: Vec::with_capacity(size_hint), + } + } + + fn n_rows_written(&self) -> u64 { + *self + .row_offsets + .last() + .vortex_expect("row offsets cannot be empty by construction") + } +} + +impl MetadataAccumulator for BasicAccumulator { + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.row_offsets + .push(self.n_rows_written() + array.len() as u64); + + self.null_counts.push( + array + .statistics() + .compute(Stat::NullCount) + .map(u64::try_from) + .transpose()?, + ); + + self.true_counts.push( + array + .statistics() + .compute(Stat::TrueCount) + .map(u64::try_from) + .transpose()?, + ); + + Ok(()) + } + + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { + self.batch_byte_offsets.push(batch_byte_offsets); + } + + fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)> { + let (chunks, names, fields) = self.into_layouts_and_metadata_parts()?; + let n_chunks = chunks.len(); + let names = fieldnames_from_strings(names); + Ok(( + chunks, + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, + )) + } + + fn into_layouts_and_metadata_parts( + mut self, + ) -> VortexResult<(VecDeque, Vec, Vec)> { + // we don't need the last row offset; that's just the total number of rows + let length = self.row_offsets.len() - 1; + self.row_offsets.truncate(length); + + let chunks: VecDeque = self + .batch_byte_offsets + .iter() + .flat_map(|byte_offsets| { + byte_offsets + .iter() + .zip(byte_offsets.iter().skip(1)) + .map(|(begin, end)| Layout::flat(ByteRange::new(*begin, *end))) + }) + .collect(); + + if chunks.len() != self.row_offsets.len() { + vortex_bail!( + "Expected {} chunks based on row offsets, found {} based on byte offsets", + self.row_offsets.len(), + chunks.len() + ); + } + + let mut names: Vec = vec!["row_offset".into()]; + let mut fields = vec![mem::take(&mut self.row_offsets).into_array()]; + + if self.null_counts.iter().any(Option::is_some) { + names.push("null_count".into()); + fields.push( + PrimitiveArray::from_nullable_vec(mem::take(&mut self.null_counts)).into_array(), + ); + } + + if self.true_counts.iter().any(Option::is_some) { + names.push("true_count".into()); + fields.push( + PrimitiveArray::from_nullable_vec(mem::take(&mut self.true_counts)).into_array(), + ); + } + + Ok((chunks, names, fields)) + } +} + +fn into_null_array(vec: Vec>) -> Array { + NullArray::new(vec.len()).into_array() +} + +fn into_bool_array(vec: Vec>) -> Array { + BoolArray::from_iter(vec).into_array() +} + +fn into_primitive_array

(vec: Vec>) -> Array +where + P: NativePType, + P: TryFrom, + P: 'static, +{ + PrimitiveArray::from_nullable_vec(vec).into_array() +} + +fn into_utf8_array(x: Vec>) -> Array { + VarBinViewArray::from_iter(x, DType::Utf8(Nullability::Nullable)).into_array() +} + +fn into_binary_array(x: Vec>) -> Array { + VarBinViewArray::from_iter(x, DType::Binary(Nullability::Nullable)).into_array() +} diff --git a/vortex-serde/src/layouts/write/mod.rs b/vortex-serde/src/layouts/write/mod.rs index 2956adda4b..c6dbe070d4 100644 --- a/vortex-serde/src/layouts/write/mod.rs +++ b/vortex-serde/src/layouts/write/mod.rs @@ -2,4 +2,5 @@ pub use writer::LayoutWriter; mod footer; mod layouts; +mod metadata_accumulators; mod writer; diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 75dd89c521..43382ad9bb 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -1,28 +1,20 @@ -use std::collections::VecDeque; -use std::sync::Arc; use std::{io, mem}; use flatbuffers::FlatBufferBuilder; use futures::{Stream, TryStreamExt}; use vortex::array::{ChunkedArray, StructArray}; -use vortex::stats::{ArrayStatistics, Stat}; use vortex::stream::ArrayStream; -use vortex::validity::Validity; -use vortex::{Array, ArrayDType, ArrayDType as _, IntoArray, IntoArray}; +use vortex::{Array, ArrayDType as _, IntoArray}; use vortex_buffer::io_buf::IoBuf; -use vortex_buffer::{Buffer, BufferString}; -use vortex_dtype::{DType, Nullability, PType}; -use vortex_error::{ - vortex_bail, vortex_bail, vortex_err, vortex_err, vortex_err, vortex_panic, VortexExpect, - VortexExpect, VortexExpect as _, VortexResult, VortexResult, VortexResult, -}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; use vortex_flatbuffers::WriteFlatBuffer; -use vortex_scalar::{Scalar, ScalarValue}; use crate::io::VortexWrite; use crate::layouts::write::footer::{Footer, Postscript}; use crate::layouts::write::layouts::Layout; -use crate::layouts::{EOF_SIZE, MAGIC_BYTES, METADATA_FIELD_NAMES, PRUNING_STATS, VERSION}; +use crate::layouts::write::metadata_accumulators::{new_metadata_accumulator, MetadataAccumulator}; +use crate::layouts::{EOF_SIZE, MAGIC_BYTES, VERSION}; use crate::stream_writer::ByteRange; use crate::MessageWriter; @@ -31,7 +23,7 @@ pub struct LayoutWriter { row_count: u64, dtype: Option, - column_chunks: Vec, + column_metadata: Vec>, } impl LayoutWriter { @@ -39,7 +31,7 @@ impl LayoutWriter { LayoutWriter { msgs: MessageWriter::new(write), dtype: None, - column_chunks: Vec::new(), + column_metadata: Vec::new(), row_count: 0, } } @@ -93,63 +85,55 @@ impl LayoutWriter { S: Stream> + Unpin + ArrayStream, { let size_hint = stream.size_hint().0; - let accumulator = match self.column_chunks.get_mut(column_idx) { + let metadata = match self.column_metadata.get_mut(column_idx) { None => { - self.column_chunks - .push(ColumnChunkAccumulator::new(size_hint, stream.dtype())); + self.column_metadata + .push(new_metadata_accumulator(size_hint, stream.dtype())); assert_eq!( - self.column_chunks.len(), + self.column_metadata.len(), column_idx + 1, - "write_column_chunks must be called in order by column index! got column index {} but column chunks has {} columns", + "write_column_metadata must be called in order by column index! got column index {} but column chunks has {} columns", column_idx, - self.column_chunks.len() + self.column_metadata.len() ); - self.column_chunks + self.column_metadata .last_mut() .vortex_expect("column chunks cannot be empty, just pushed") } Some(x) => x, }; - let mut n_rows_written = *accumulator - .row_offsets - .last() - .vortex_expect("row offsets cannot be empty by construction"); - let mut byte_offsets = Vec::with_capacity(size_hint); + let mut byte_offsets = Vec::with_capacity(size_hint + 1); byte_offsets.push(self.msgs.tell()); - while let Some(chunk) = stream.try_next().await? { - for stat in PRUNING_STATS { - accumulator.push_stat(stat, chunk.statistics().compute(stat))?; - } - - n_rows_written += chunk.len() as u64; - accumulator.push_row_offset(n_rows_written); - + metadata.push_chunk(&chunk)?; self.msgs.write_batch(chunk).await?; byte_offsets.push(self.msgs.tell()); } - accumulator.push_batch_byte_offsets(byte_offsets); + + metadata.push_batch_byte_offsets(byte_offsets); Ok(()) } async fn write_metadata_arrays(&mut self) -> VortexResult { - let mut column_layouts = Vec::with_capacity(self.column_chunks.len()); - for mut column_accumulator in mem::take(&mut self.column_chunks) { - let (mut chunks, metadata_array) = column_accumulator.into_chunks_and_metadata()?; + let mut column_layouts = Vec::with_capacity(self.column_metadata.len()); + for column_metadata in mem::take(&mut self.column_metadata) { + let (mut chunk_layouts, metadata_array) = + column_metadata.into_layouts_and_metadata()?; let dtype_begin = self.msgs.tell(); self.msgs.write_dtype(metadata_array.dtype()).await?; let dtype_end = self.msgs.tell(); - self.msgs.write_batch(metadata_array).await?; - chunks.push_front(Layout::inlined_schema( + self.msgs.write_batch(metadata_array.into_array()).await?; + + chunk_layouts.push_front(Layout::inlined_schema( vec![Layout::flat(ByteRange::new(dtype_end, self.msgs.tell()))], ByteRange::new(dtype_begin, dtype_end), )); - column_layouts.push(Layout::chunked(chunks.into(), true)); + column_layouts.push(Layout::chunked(chunk_layouts.into(), true)); } Ok(Layout::column(column_layouts)) @@ -199,164 +183,6 @@ async fn write_fb_raw(mut writer: W, fb: F) Ok(writer) } -struct ColumnChunkAccumulator { - pub dtype: DType, - pub row_offsets: Vec, - pub batch_byte_offsets: Vec>, - pub minima: Vec, - pub maxima: Vec, - pub null_counts: Vec>, - pub true_counts: Vec>, -} - -impl ColumnChunkAccumulator { - pub fn new(size_hint: usize, dtype: &DType) -> Self { - let mut row_offsets = Vec::with_capacity(size_hint + 1); - row_offsets.push(0); - Self { - dtype: dtype.as_nullable(), - row_offsets, - batch_byte_offsets: Vec::new(), - minima: Vec::with_capacity(size_hint), - maxima: Vec::with_capacity(size_hint), - null_counts: Vec::with_capacity(size_hint), - true_counts: Vec::with_capacity(size_hint), - } - } - - fn push_row_offset(&mut self, row_offset: u64) { - self.row_offsets.push(row_offset); - } - - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { - self.batch_byte_offsets.push(batch_byte_offsets); - } - - fn push_stat(&mut self, stat: Stat, value: Option) -> VortexResult<()> { - if matches!(stat, Stat::Min | Stat::Max) { - if let Some(ref value) = value { - if !value.value().is_instance_of(&self.dtype) { - vortex_bail!( - "Expected all min/max values to have dtype {}, got {}", - self.dtype, - value.dtype() - ); - } - } - } - - match stat { - Stat::Min => self.minima.push( - value - .map(|v| v.into_value()) - .unwrap_or_else(|| ScalarValue::Null), - ), - Stat::Max => self.maxima.push( - value - .map(|v| v.into_value()) - .unwrap_or_else(|| ScalarValue::Null), - ), - Stat::NullCount => self.null_counts.push(value.and_then(|v| { - v.into_value() - .as_pvalue() - .vortex_expect("null count is a primitive value") - .and_then(|v| v.as_u64()) - })), - Stat::TrueCount => self.true_counts.push(value.and_then(|v| { - v.into_value() - .as_pvalue() - .vortex_expect("true count is a primitive value") - .and_then(|v| v.as_u64()) - })), - _ => vortex_bail!("Unsupported pruning stat: {stat}"), - } - Ok(()) - } - - fn into_chunks_and_metadata(mut self) -> VortexResult<(VecDeque, Array)> { - // we don't need the last row offset; that's just the total number of rows - let length = self.row_offsets.len() - 1; - self.row_offsets.truncate(length); - - let chunks: VecDeque = self - .batch_byte_offsets - .iter() - .flat_map(|byte_offsets| { - byte_offsets - .iter() - .zip(byte_offsets.iter().skip(1)) - .map(|(begin, end)| Layout::Flat(FlatLayout::new(*begin, *end))) - }) - .collect(); - - if chunks.len() != self.row_offsets.len() { - vortex_bail!( - "Expected {} chunks based on row offsets, found {} based on byte offsets", - self.row_offsets.len(), - chunks.len() - ); - } - - let mut names: Vec> = vec!["row_offset".into()]; - let mut fields = vec![mem::take(&mut self.row_offsets).into_array()]; - - for stat in PRUNING_STATS { - let values = match stat { - Stat::Min => mem::take(&mut self.minima), - Stat::Max => mem::take(&mut self.maxima), - Stat::NullCount => self - .null_counts - .iter() - .cloned() - .map(ScalarValue::from) - .collect(), - Stat::TrueCount => self - .true_counts - .iter() - .cloned() - .map(ScalarValue::from) - .collect(), - _ => vortex_bail!("Unsupported pruning stat: {}", stat), - }; - if values.len() != length { - vortex_bail!( - "Expected {} values for stat {}, found {}", - length, - stat, - values.len() - ); - } - - if values.iter().all(|v| v.is_null()) { - // no point in writing all nulls - continue; - }; - - let dtype = match stat { - Stat::Min | Stat::Max => self.dtype.clone(), - _ => DType::Primitive(PType::U64, Nullability::Nullable), - }; - - names.push(format!("{stat}").to_lowercase().into()); - fields.push(Array::from_scalar_values(dtype, values)?); - } - for name in &names { - if !METADATA_FIELD_NAMES.contains(&name.as_ref()) { - vortex_panic!( - "Found unexpected metadata field name {}, expected one of {:?}", - name, - METADATA_FIELD_NAMES - ); - } - } - - Ok(( - chunks, - StructArray::try_new(names.into(), fields, length, Validity::NonNullable)?.into_array(), - )) - } -} - #[cfg(test)] mod tests { use flatbuffers::FlatBufferBuilder; From b9df1a57fcf4dd4e767fae824756673ef588cf42 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 10:28:42 -0400 Subject: [PATCH 08/15] address comments --- .../layouts/write/metadata_accumulators.rs | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index c11e0fb96c..7c07c1cd4e 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -43,17 +43,13 @@ pub trait MetadataAccumulator { fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec); fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)>; - - fn into_layouts_and_metadata_parts( - self, - ) -> VortexResult<(VecDeque, Vec, Vec)>; } struct ExtremaAccumulator { minima: Vec>, maxima: Vec>, to_array: fn(Vec>) -> Array, - basic_metadata: Box, + basic_metadata: BasicAccumulator, } impl ExtremaAccumulator { @@ -62,7 +58,7 @@ impl ExtremaAccumulator { minima: Vec::with_capacity(size_hint), maxima: Vec::with_capacity(size_hint), to_array, - basic_metadata: Box::new(BasicAccumulator::new(size_hint)), + basic_metadata: BasicAccumulator::new(size_hint), } } } @@ -95,19 +91,9 @@ where .push_batch_byte_offsets(batch_byte_offsets); } - fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)> { - let (chunks, names, fields) = self.into_layouts_and_metadata_parts()?; - let n_chunks = chunks.len(); - let names = fieldnames_from_strings(names); - Ok(( - chunks, - StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, - )) - } - - fn into_layouts_and_metadata_parts( - mut self, - ) -> VortexResult<(VecDeque, Vec, Vec)> { + fn into_layouts_and_metadata( + mut self: Box, + ) -> VortexResult<(VecDeque, StructArray)> { let (chunks, mut names, mut fields) = self.basic_metadata.into_layouts_and_metadata_parts()?; @@ -121,7 +107,12 @@ where fields.push((self.to_array)(mem::take(&mut self.maxima))); } - Ok((chunks, names, fields)) + let n_chunks = chunks.len(); + let names = fieldnames_from_strings(names); + Ok(( + chunks, + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, + )) } } @@ -189,7 +180,9 @@ impl MetadataAccumulator for BasicAccumulator { StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, )) } +} +impl BasicAccumulator { fn into_layouts_and_metadata_parts( mut self, ) -> VortexResult<(VecDeque, Vec, Vec)> { From 2c69ec68f48278f1e16174172cf981006e13423c Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 11:35:46 -0400 Subject: [PATCH 09/15] address rob comments --- vortex-dtype/src/dtype.rs | 10 --------- .../layouts/write/metadata_accumulators.rs | 21 +++++++++---------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/vortex-dtype/src/dtype.rs b/vortex-dtype/src/dtype.rs index d941dce1e6..b2b16e6487 100644 --- a/vortex-dtype/src/dtype.rs +++ b/vortex-dtype/src/dtype.rs @@ -15,16 +15,6 @@ pub type FieldNames = Arc<[FieldName]>; pub type Metadata = Vec; -pub fn fieldnames_from_strings(value: Vec) -> FieldNames { - Arc::from( - value - .iter() - .map(|x| Arc::from(x.as_str())) - .collect::>() - .into_boxed_slice(), - ) -} - /// Array logical types. /// /// Vortex arrays preserve a single logical type, while the encodings allow for multiple diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index 7c07c1cd4e..f0b34ec2d0 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -2,15 +2,14 @@ use std::collections::VecDeque; use std::mem; +use std::sync::Arc; use vortex::array::{BoolArray, NullArray, PrimitiveArray, StructArray, VarBinViewArray}; use vortex::stats::{ArrayStatistics as _, Stat}; use vortex::validity::Validity; use vortex::{Array, IntoArray as _}; use vortex_buffer::{Buffer, BufferString}; -use vortex_dtype::{ - fieldnames_from_strings, match_each_native_ptype, DType, NativePType, Nullability, -}; +use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability}; use vortex_error::{vortex_bail, VortexError, VortexExpect as _, VortexResult}; use vortex_scalar::Scalar; @@ -98,17 +97,17 @@ where self.basic_metadata.into_layouts_and_metadata_parts()?; if self.minima.iter().any(Option::is_some) { - names.push("min".into()); + names.push(Arc::from("min")); fields.push((self.to_array)(mem::take(&mut self.minima))); } if self.maxima.iter().any(Option::is_some) { - names.push("max".into()); + names.push(Arc::from("max")); fields.push((self.to_array)(mem::take(&mut self.maxima))); } let n_chunks = chunks.len(); - let names = fieldnames_from_strings(names); + let names = Arc::from(names); Ok(( chunks, StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, @@ -174,7 +173,7 @@ impl MetadataAccumulator for BasicAccumulator { fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)> { let (chunks, names, fields) = self.into_layouts_and_metadata_parts()?; let n_chunks = chunks.len(); - let names = fieldnames_from_strings(names); + let names = Arc::from(names); Ok(( chunks, StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, @@ -185,7 +184,7 @@ impl MetadataAccumulator for BasicAccumulator { impl BasicAccumulator { fn into_layouts_and_metadata_parts( mut self, - ) -> VortexResult<(VecDeque, Vec, Vec)> { + ) -> VortexResult<(VecDeque, Vec>, Vec)> { // we don't need the last row offset; that's just the total number of rows let length = self.row_offsets.len() - 1; self.row_offsets.truncate(length); @@ -209,18 +208,18 @@ impl BasicAccumulator { ); } - let mut names: Vec = vec!["row_offset".into()]; + let mut names: Vec> = vec![Arc::from("row_offset")]; let mut fields = vec![mem::take(&mut self.row_offsets).into_array()]; if self.null_counts.iter().any(Option::is_some) { - names.push("null_count".into()); + names.push(Arc::from("null_count")); fields.push( PrimitiveArray::from_nullable_vec(mem::take(&mut self.null_counts)).into_array(), ); } if self.true_counts.iter().any(Option::is_some) { - names.push("true_count".into()); + names.push(Arc::from("true_count")); fields.push( PrimitiveArray::from_nullable_vec(mem::take(&mut self.true_counts)).into_array(), ); From 7e2bd0ab41df0b99d990c6e8074ed28b8ef0b11f Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 12:00:14 -0400 Subject: [PATCH 10/15] clippy --- vortex-serde/src/layouts/write/metadata_accumulators.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index f0b34ec2d0..4d45d918cd 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -181,10 +181,10 @@ impl MetadataAccumulator for BasicAccumulator { } } +type LayoutsAndMetadataParts = (VecDeque, Vec>, Vec); + impl BasicAccumulator { - fn into_layouts_and_metadata_parts( - mut self, - ) -> VortexResult<(VecDeque, Vec>, Vec)> { + fn into_layouts_and_metadata_parts(mut self) -> VortexResult { // we don't need the last row offset; that's just the total number of rows let length = self.row_offsets.len() - 1; self.row_offsets.truncate(length); From a2e41f1d91c4c568f791ab4a88d6ea4b477d2418 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 17:48:30 -0400 Subject: [PATCH 11/15] maybe address comments? --- vortex-array/src/array/from.rs | 55 +++ vortex-array/src/array/mod.rs | 1 + .../layouts/write/metadata_accumulators.rs | 340 +++++++++--------- vortex-serde/src/layouts/write/writer.rs | 82 ++++- 4 files changed, 284 insertions(+), 194 deletions(-) create mode 100644 vortex-array/src/array/from.rs diff --git a/vortex-array/src/array/from.rs b/vortex-array/src/array/from.rs new file mode 100644 index 0000000000..b92ffcba54 --- /dev/null +++ b/vortex-array/src/array/from.rs @@ -0,0 +1,55 @@ +use vortex_buffer::{Buffer, BufferString}; +use vortex_dtype::half::f16; +use vortex_dtype::{DType, Nullability}; + +use super::{BoolArray, PrimitiveArray, VarBinViewArray}; +use crate::validity::Validity; +use crate::{Array, IntoArray as _}; + +// `From>> for Array` requries the experimental uninhabited type: !. + +impl From>> for Array { + fn from(value: Vec>) -> Self { + BoolArray::from_iter(value).into_array() + } +} + +macro_rules! impl_from_primitive_for_array { + ($P:ty) => { + impl From> for Array { + fn from(value: Vec<$P>) -> Self { + PrimitiveArray::from_vec(value, Validity::NonNullable).into_array() + } + } + + impl From>> for Array { + fn from(value: Vec>) -> Self { + PrimitiveArray::from_nullable_vec(value).into_array() + } + } + }; +} + +impl_from_primitive_for_array!(u8); +impl_from_primitive_for_array!(u16); +impl_from_primitive_for_array!(u32); +impl_from_primitive_for_array!(u64); +impl_from_primitive_for_array!(i8); +impl_from_primitive_for_array!(i16); +impl_from_primitive_for_array!(i32); +impl_from_primitive_for_array!(i64); +impl_from_primitive_for_array!(f16); +impl_from_primitive_for_array!(f32); +impl_from_primitive_for_array!(f64); + +impl From>> for Array { + fn from(value: Vec>) -> Self { + VarBinViewArray::from_iter(value, DType::Utf8(Nullability::Nullable)).into_array() + } +} + +impl From>> for Array { + fn from(value: Vec>) -> Self { + VarBinViewArray::from_iter(value, DType::Binary(Nullability::Nullable)).into_array() + } +} diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 34440bca85..e51cfa7f84 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -15,6 +15,7 @@ mod struct_; mod varbin; mod varbinview; +pub mod from; pub mod visitor; #[cfg(feature = "arbitrary")] diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index 4d45d918cd..bd9f135a9a 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -1,255 +1,243 @@ //! Metadata accumulators track the per-chunk-of-a-column metadata, layout locations, and row counts. -use std::collections::VecDeque; -use std::mem; use std::sync::Arc; -use vortex::array::{BoolArray, NullArray, PrimitiveArray, StructArray, VarBinViewArray}; +use vortex::array::StructArray; use vortex::stats::{ArrayStatistics as _, Stat}; use vortex::validity::Validity; -use vortex::{Array, IntoArray as _}; +use vortex::{Array, IntoArray}; use vortex_buffer::{Buffer, BufferString}; -use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability}; -use vortex_error::{vortex_bail, VortexError, VortexExpect as _, VortexResult}; +use vortex_dtype::{match_each_native_ptype, DType, FieldName}; +use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; -use super::layouts::Layout; -use crate::stream_writer::ByteRange; - pub fn new_metadata_accumulator(hint: usize, dtype: &DType) -> Box { match dtype { - DType::Null => Box::new(ExtremaAccumulator::<()>::new(hint, into_null_array)), - DType::Bool(..) => Box::new(ExtremaAccumulator::::new(hint, into_bool_array)), + DType::Null => Box::new(BasicAccumulator::new(hint)), + DType::Bool(..) => Box::new(BoolAccumulator::new(hint)), DType::Primitive(ptype, ..) => { match_each_native_ptype!(ptype, |$P| { - Box::new(ExtremaAccumulator::<$P>::new(hint, into_primitive_array::<$P>)) + Box::new(StandardAccumulator::<$P>::new(hint)) }) } - DType::Utf8(..) => Box::new(ExtremaAccumulator::::new( - hint, - into_utf8_array, - )), - DType::Binary(..) => Box::new(ExtremaAccumulator::::new(hint, into_binary_array)), + DType::Utf8(..) => Box::new(StandardAccumulator::::new(hint)), + DType::Binary(..) => Box::new(StandardAccumulator::::new(hint)), DType::Struct(..) => Box::new(BasicAccumulator::new(hint)), DType::List(..) => Box::new(BasicAccumulator::new(hint)), DType::Extension(..) => Box::new(BasicAccumulator::new(hint)), } } +/// Accumulates zero or more series of metadata across the chunks of a column. pub trait MetadataAccumulator { fn push_chunk(&mut self, array: &Array) -> VortexResult<()>; - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec); - - fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)>; + fn into_array(self: Box) -> VortexResult; } -struct ExtremaAccumulator { - minima: Vec>, - maxima: Vec>, - to_array: fn(Vec>) -> Array, - basic_metadata: BasicAccumulator, +/// Accumulator for bool-typed columns. +struct BoolAccumulator { + row_offsets: RowOffsetsAccumulator, + maxima: UnwrappedStatAccumulator, + minima: UnwrappedStatAccumulator, + true_count: UnwrappedStatAccumulator, + null_count: UnwrappedStatAccumulator, } -impl ExtremaAccumulator { - fn new(size_hint: usize, to_array: fn(Vec>) -> Array) -> Self { +impl BoolAccumulator { + fn new(hint: usize) -> Self { Self { - minima: Vec::with_capacity(size_hint), - maxima: Vec::with_capacity(size_hint), - to_array, - basic_metadata: BasicAccumulator::new(size_hint), + row_offsets: RowOffsetsAccumulator::new(), + maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into(), hint), + minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into(), hint), + true_count: UnwrappedStatAccumulator::new(Stat::TrueCount, "true_count".into(), hint), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), } } } -impl MetadataAccumulator for ExtremaAccumulator -where - T: TryFrom, -{ +impl MetadataAccumulator for BoolAccumulator { fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { - self.minima.push( - array - .statistics() - .compute(Stat::Min) - .map(T::try_from) - .transpose()?, - ); - self.maxima.push( - array - .statistics() - .compute(Stat::Max) - .map(T::try_from) - .transpose()?, - ); - - self.basic_metadata.push_chunk(array) + self.row_offsets.push_chunk(array)?; + self.maxima.push_chunk(array)?; + self.minima.push_chunk(array)?; + self.true_count.push_chunk(array)?; + self.null_count.push_chunk(array) } - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { - self.basic_metadata - .push_batch_byte_offsets(batch_byte_offsets); + fn into_array(self: Box) -> VortexResult { + let (names, fields): (Vec, Vec) = [ + self.row_offsets.into_column(), + self.maxima.into_column(), + self.minima.into_column(), + self.true_count.into_column(), + self.null_count.into_column(), + ] + .into_iter() + .filter_map(|o| o) + .unzip(); + let names = Arc::from(names); + let n_chunks = fields[0].len(); + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable) + .map(IntoArray::into_array) } +} - fn into_layouts_and_metadata( - mut self: Box, - ) -> VortexResult<(VecDeque, StructArray)> { - let (chunks, mut names, mut fields) = - self.basic_metadata.into_layouts_and_metadata_parts()?; +/// An accumulator for the minima, maxima, null counts, and row offsets. +struct StandardAccumulator { + row_offsets: RowOffsetsAccumulator, + maxima: UnwrappedStatAccumulator, + minima: UnwrappedStatAccumulator, + null_count: UnwrappedStatAccumulator, +} - if self.minima.iter().any(Option::is_some) { - names.push(Arc::from("min")); - fields.push((self.to_array)(mem::take(&mut self.minima))); +impl StandardAccumulator { + fn new(hint: usize) -> Self { + Self { + row_offsets: RowOffsetsAccumulator::new(), + maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into(), hint), + minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into(), hint), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), } + } +} - if self.maxima.iter().any(Option::is_some) { - names.push(Arc::from("max")); - fields.push((self.to_array)(mem::take(&mut self.maxima))); - } +impl MetadataAccumulator for StandardAccumulator +where + T: TryFrom, + Array: From>>, +{ + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.row_offsets.push_chunk(array)?; + self.maxima.push_chunk(array)?; + self.minima.push_chunk(array)?; + self.null_count.push_chunk(array) + } - let n_chunks = chunks.len(); + fn into_array(self: Box) -> VortexResult { + let (names, fields): (Vec, Vec) = [ + self.row_offsets.into_column(), + self.maxima.into_column(), + self.minima.into_column(), + self.null_count.into_column(), + ] + .into_iter() + .filter_map(|o| o) + .unzip(); let names = Arc::from(names); - Ok(( - chunks, - StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, - )) + let n_chunks = fields[0].len(); + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable) + .map(IntoArray::into_array) } } +/// A minimal accumulator which only tracks null counts and row offsets. struct BasicAccumulator { - row_offsets: Vec, - batch_byte_offsets: Vec>, - null_counts: Vec>, - true_counts: Vec>, + row_offsets: RowOffsetsAccumulator, + null_count: UnwrappedStatAccumulator, } impl BasicAccumulator { - pub fn new(size_hint: usize) -> Self { - let mut row_offsets = Vec::with_capacity(size_hint + 1); - row_offsets.push(0); + fn new(hint: usize) -> Self { Self { - row_offsets, - batch_byte_offsets: Vec::new(), - null_counts: Vec::with_capacity(size_hint), - true_counts: Vec::with_capacity(size_hint), + row_offsets: RowOffsetsAccumulator::new(), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), } } - - fn n_rows_written(&self) -> u64 { - *self - .row_offsets - .last() - .vortex_expect("row offsets cannot be empty by construction") - } } impl MetadataAccumulator for BasicAccumulator { fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { - self.row_offsets - .push(self.n_rows_written() + array.len() as u64); - - self.null_counts.push( - array - .statistics() - .compute(Stat::NullCount) - .map(u64::try_from) - .transpose()?, - ); - - self.true_counts.push( - array - .statistics() - .compute(Stat::TrueCount) - .map(u64::try_from) - .transpose()?, - ); - - Ok(()) - } - - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { - self.batch_byte_offsets.push(batch_byte_offsets); + self.row_offsets.push_chunk(array)?; + self.null_count.push_chunk(array) } - fn into_layouts_and_metadata(self: Box) -> VortexResult<(VecDeque, StructArray)> { - let (chunks, names, fields) = self.into_layouts_and_metadata_parts()?; - let n_chunks = chunks.len(); + fn into_array(self: Box) -> VortexResult { + let (names, fields): (Vec, Vec) = [ + self.row_offsets.into_column(), + self.null_count.into_column(), + ] + .into_iter() + .filter_map(|o| o) + .unzip(); let names = Arc::from(names); - Ok(( - chunks, - StructArray::try_new(names, fields, n_chunks, Validity::NonNullable)?, - )) + let n_chunks = fields[0].len(); + StructArray::try_new(names, fields, n_chunks, Validity::NonNullable) + .map(IntoArray::into_array) } } -type LayoutsAndMetadataParts = (VecDeque, Vec>, Vec); +/// Accumulates a single series of values across the chunks of a column. +trait SingularAccumulator { + fn push_chunk(&mut self, array: &Array) -> VortexResult<()>; -impl BasicAccumulator { - fn into_layouts_and_metadata_parts(mut self) -> VortexResult { - // we don't need the last row offset; that's just the total number of rows - let length = self.row_offsets.len() - 1; - self.row_offsets.truncate(length); - - let chunks: VecDeque = self - .batch_byte_offsets - .iter() - .flat_map(|byte_offsets| { - byte_offsets - .iter() - .zip(byte_offsets.iter().skip(1)) - .map(|(begin, end)| Layout::flat(ByteRange::new(*begin, *end))) - }) - .collect(); - - if chunks.len() != self.row_offsets.len() { - vortex_bail!( - "Expected {} chunks based on row offsets, found {} based on byte offsets", - self.row_offsets.len(), - chunks.len() - ); - } + fn into_column(self) -> Option<(FieldName, Array)>; +} - let mut names: Vec> = vec![Arc::from("row_offset")]; - let mut fields = vec![mem::take(&mut self.row_offsets).into_array()]; +struct UnwrappedStatAccumulator { + stat: Stat, + name: FieldName, + values: Vec>, +} - if self.null_counts.iter().any(Option::is_some) { - names.push(Arc::from("null_count")); - fields.push( - PrimitiveArray::from_nullable_vec(mem::take(&mut self.null_counts)).into_array(), - ); +impl UnwrappedStatAccumulator { + fn new(stat: Stat, name: FieldName, hint: usize) -> Self { + Self { + stat, + name, + values: Vec::with_capacity(hint), } + } +} - if self.true_counts.iter().any(Option::is_some) { - names.push(Arc::from("true_count")); - fields.push( - PrimitiveArray::from_nullable_vec(mem::take(&mut self.true_counts)).into_array(), - ); - } +impl SingularAccumulator for UnwrappedStatAccumulator +where + T: TryFrom, + Array: From>>, +{ + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.values.push( + array + .statistics() + .compute(self.stat) + .map(T::try_from) + .transpose()?, + ); + Ok(()) + } - Ok((chunks, names, fields)) + fn into_column(self) -> Option<(FieldName, Array)> { + if self.values.iter().any(Option::is_some) { + return Some((self.name, Array::from(self.values))); + } + None } } -fn into_null_array(vec: Vec>) -> Array { - NullArray::new(vec.len()).into_array() +struct RowOffsetsAccumulator { + row_offsets: Vec, + n_rows: u64, } -fn into_bool_array(vec: Vec>) -> Array { - BoolArray::from_iter(vec).into_array() +impl RowOffsetsAccumulator { + fn new() -> Self { + Self { + row_offsets: Vec::new(), + n_rows: 0, + } + } } -fn into_primitive_array

(vec: Vec>) -> Array -where - P: NativePType, - P: TryFrom, - P: 'static, -{ - PrimitiveArray::from_nullable_vec(vec).into_array() -} +impl SingularAccumulator for RowOffsetsAccumulator { + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.row_offsets.push(self.n_rows); + self.n_rows += array.len() as u64; -fn into_utf8_array(x: Vec>) -> Array { - VarBinViewArray::from_iter(x, DType::Utf8(Nullability::Nullable)).into_array() -} + Ok(()) + } -fn into_binary_array(x: Vec>) -> Array { - VarBinViewArray::from_iter(x, DType::Binary(Nullability::Nullable)).into_array() + fn into_column(self) -> Option<(FieldName, Array)> { + // intentionally excluding the last n_rows, b/c it is just the total number of rows + return Some(("row_offsets".into(), Array::from(self.row_offsets))); + } } diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 43382ad9bb..83e168cac8 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -1,10 +1,10 @@ use std::{io, mem}; use flatbuffers::FlatBufferBuilder; -use futures::{Stream, TryStreamExt}; +use futures::TryStreamExt; use vortex::array::{ChunkedArray, StructArray}; use vortex::stream::ArrayStream; -use vortex::{Array, ArrayDType as _, IntoArray}; +use vortex::{Array, ArrayDType as _}; use vortex_buffer::io_buf::IoBuf; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; @@ -23,7 +23,7 @@ pub struct LayoutWriter { row_count: u64, dtype: Option, - column_metadata: Vec>, + column_metadata: Vec, } impl LayoutWriter { @@ -82,13 +82,15 @@ impl LayoutWriter { async fn write_column_chunks(&mut self, mut stream: S, column_idx: usize) -> VortexResult<()> where - S: Stream> + Unpin + ArrayStream, + S: ArrayStream + Unpin, { let size_hint = stream.size_hint().0; let metadata = match self.column_metadata.get_mut(column_idx) { None => { - self.column_metadata - .push(new_metadata_accumulator(size_hint, stream.dtype())); + self.column_metadata.push(ColumnMetadataAndOffsets { + metadata: new_metadata_accumulator(size_hint, stream.dtype()), + batch_byte_offsets: Vec::new(), + }); assert_eq!( self.column_metadata.len(), @@ -121,18 +123,7 @@ impl LayoutWriter { async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = Vec::with_capacity(self.column_metadata.len()); for column_metadata in mem::take(&mut self.column_metadata) { - let (mut chunk_layouts, metadata_array) = - column_metadata.into_layouts_and_metadata()?; - - let dtype_begin = self.msgs.tell(); - self.msgs.write_dtype(metadata_array.dtype()).await?; - let dtype_end = self.msgs.tell(); - self.msgs.write_batch(metadata_array.into_array()).await?; - - chunk_layouts.push_front(Layout::inlined_schema( - vec![Layout::flat(ByteRange::new(dtype_end, self.msgs.tell()))], - ByteRange::new(dtype_begin, dtype_end), - )); + let chunk_layouts = column_metadata.write_into(&mut self.msgs).await?; column_layouts.push(Layout::chunked(chunk_layouts.into(), true)); } @@ -183,6 +174,60 @@ async fn write_fb_raw(mut writer: W, fb: F) Ok(writer) } +struct ColumnMetadataAndOffsets { + metadata: Box, + batch_byte_offsets: Vec>, +} + +impl ColumnMetadataAndOffsets { + fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { + self.metadata.push_chunk(array) + } + + fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { + self.batch_byte_offsets.push(batch_byte_offsets); + } + + async fn write_into( + self, + msgs: &mut MessageWriter, + ) -> VortexResult> { + let metadata_array = self.metadata.into_array()?; + let expected_n_data_chunks = metadata_array.len(); + + let dtype_begin = msgs.tell(); + msgs.write_dtype(metadata_array.dtype()).await?; + let dtype_end = msgs.tell(); + msgs.write_batch(metadata_array).await?; + let metadata_array_end = msgs.tell(); + + let data_chunks = self.batch_byte_offsets.iter().flat_map(|byte_offsets| { + byte_offsets + .iter() + .zip(byte_offsets.iter().skip(1)) + .map(|(begin, end)| Layout::flat(ByteRange::new(*begin, *end))) + }); + + let layouts: Vec = [Layout::inlined_schema( + vec![Layout::flat(ByteRange::new(dtype_end, metadata_array_end))], + ByteRange::new(dtype_begin, dtype_end), + )] + .into_iter() + .chain(data_chunks) + .collect(); + + if layouts.len() != expected_n_data_chunks + 1 { + vortex_bail!( + "Expected {} layouts based on row offsets, found {} based on byte offsets", + expected_n_data_chunks + 1, + layouts.len() + ); + } + + Ok(layouts) + } +} + #[cfg(test)] mod tests { use flatbuffers::FlatBufferBuilder; @@ -226,5 +271,6 @@ mod tests { buffer[buffer_begin..buffer_end].len(), FOOTER_POSTSCRIPT_SIZE ); + assert_eq!(buffer[buffer_begin..buffer_end].len(), 32); } } From 278dc093f3e48e22543eb562894820880d88a6b3 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 18:11:47 -0400 Subject: [PATCH 12/15] fix up the writer too --- vortex-serde/src/layouts/write/writer.rs | 74 +++++++++++++----------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 83e168cac8..caa49dfba6 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -23,7 +23,7 @@ pub struct LayoutWriter { row_count: u64, dtype: Option, - column_metadata: Vec, + column_writers: Vec, } impl LayoutWriter { @@ -31,7 +31,7 @@ impl LayoutWriter { LayoutWriter { msgs: MessageWriter::new(write), dtype: None, - column_metadata: Vec::new(), + column_writers: Vec::new(), row_count: 0, } } @@ -80,51 +80,39 @@ impl LayoutWriter { Ok(self) } - async fn write_column_chunks(&mut self, mut stream: S, column_idx: usize) -> VortexResult<()> + async fn write_column_chunks(&mut self, stream: S, column_idx: usize) -> VortexResult<()> where S: ArrayStream + Unpin, { let size_hint = stream.size_hint().0; - let metadata = match self.column_metadata.get_mut(column_idx) { + let column_writer = match self.column_writers.get_mut(column_idx) { None => { - self.column_metadata.push(ColumnMetadataAndOffsets { - metadata: new_metadata_accumulator(size_hint, stream.dtype()), - batch_byte_offsets: Vec::new(), - }); + self.column_writers + .push(ColumnWriter::new(size_hint, stream.dtype())); assert_eq!( - self.column_metadata.len(), + self.column_writers.len(), column_idx + 1, - "write_column_metadata must be called in order by column index! got column index {} but column chunks has {} columns", + "write_column_chunks must be called in order by column index! got column index {} but column chunks has {} columns", column_idx, - self.column_metadata.len() + self.column_writers.len() ); - self.column_metadata + self.column_writers .last_mut() .vortex_expect("column chunks cannot be empty, just pushed") } Some(x) => x, }; - let mut byte_offsets = Vec::with_capacity(size_hint + 1); - byte_offsets.push(self.msgs.tell()); - while let Some(chunk) = stream.try_next().await? { - metadata.push_chunk(&chunk)?; - self.msgs.write_batch(chunk).await?; - byte_offsets.push(self.msgs.tell()); - } - - metadata.push_batch_byte_offsets(byte_offsets); - - Ok(()) + column_writer.write_chunks(stream, &mut self.msgs).await } async fn write_metadata_arrays(&mut self) -> VortexResult { - let mut column_layouts = Vec::with_capacity(self.column_metadata.len()); - for column_metadata in mem::take(&mut self.column_metadata) { - let chunk_layouts = column_metadata.write_into(&mut self.msgs).await?; - column_layouts.push(Layout::chunked(chunk_layouts.into(), true)); + let mut column_layouts = Vec::with_capacity(self.column_writers.len()); + for column_writer in mem::take(&mut self.column_writers) { + let chunk_layouts = column_writer.write_into(&mut self.msgs).await?; + column_layouts.push(Layout::chunked(chunk_layouts, true)); } Ok(Layout::column(column_layouts)) @@ -174,18 +162,38 @@ async fn write_fb_raw(mut writer: W, fb: F) Ok(writer) } -struct ColumnMetadataAndOffsets { +struct ColumnWriter { metadata: Box, batch_byte_offsets: Vec>, + size_hint: usize, } -impl ColumnMetadataAndOffsets { - fn push_chunk(&mut self, array: &Array) -> VortexResult<()> { - self.metadata.push_chunk(array) +impl ColumnWriter { + fn new(size_hint: usize, dtype: &DType) -> Self { + Self { + metadata: new_metadata_accumulator(size_hint, dtype), + batch_byte_offsets: Vec::new(), + size_hint, + } } - fn push_batch_byte_offsets(&mut self, batch_byte_offsets: Vec) { - self.batch_byte_offsets.push(batch_byte_offsets); + async fn write_chunks( + &mut self, + mut stream: S, + msgs: &mut MessageWriter, + ) -> VortexResult<()> { + let mut offsets = Vec::with_capacity(self.size_hint + 1); + offsets.push(msgs.tell()); + + while let Some(chunk) = stream.try_next().await? { + self.metadata.push_chunk(&chunk)?; + msgs.write_batch(chunk).await?; + offsets.push(msgs.tell()); + } + + self.batch_byte_offsets.push(offsets); + + Ok(()) } async fn write_into( From da68ffa346b69359344337053a9ae11e8637185f Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 31 Oct 2024 18:18:42 -0400 Subject: [PATCH 13/15] final cleanup of writer and accumulators --- .../layouts/write/metadata_accumulators.rs | 44 +++++++++---------- vortex-serde/src/layouts/write/writer.rs | 12 ++--- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index bd9f135a9a..6fb7dfe260 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -11,20 +11,20 @@ use vortex_dtype::{match_each_native_ptype, DType, FieldName}; use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; -pub fn new_metadata_accumulator(hint: usize, dtype: &DType) -> Box { +pub fn new_metadata_accumulator(dtype: &DType) -> Box { match dtype { - DType::Null => Box::new(BasicAccumulator::new(hint)), - DType::Bool(..) => Box::new(BoolAccumulator::new(hint)), + DType::Null => Box::new(BasicAccumulator::new()), + DType::Bool(..) => Box::new(BoolAccumulator::new()), DType::Primitive(ptype, ..) => { match_each_native_ptype!(ptype, |$P| { - Box::new(StandardAccumulator::<$P>::new(hint)) + Box::new(StandardAccumulator::<$P>::new()) }) } - DType::Utf8(..) => Box::new(StandardAccumulator::::new(hint)), - DType::Binary(..) => Box::new(StandardAccumulator::::new(hint)), - DType::Struct(..) => Box::new(BasicAccumulator::new(hint)), - DType::List(..) => Box::new(BasicAccumulator::new(hint)), - DType::Extension(..) => Box::new(BasicAccumulator::new(hint)), + DType::Utf8(..) => Box::new(StandardAccumulator::::new()), + DType::Binary(..) => Box::new(StandardAccumulator::::new()), + DType::Struct(..) => Box::new(BasicAccumulator::new()), + DType::List(..) => Box::new(BasicAccumulator::new()), + DType::Extension(..) => Box::new(BasicAccumulator::new()), } } @@ -45,13 +45,13 @@ struct BoolAccumulator { } impl BoolAccumulator { - fn new(hint: usize) -> Self { + fn new() -> Self { Self { row_offsets: RowOffsetsAccumulator::new(), - maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into(), hint), - minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into(), hint), - true_count: UnwrappedStatAccumulator::new(Stat::TrueCount, "true_count".into(), hint), - null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), + maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into()), + minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into()), + true_count: UnwrappedStatAccumulator::new(Stat::TrueCount, "true_count".into()), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()), } } } @@ -92,12 +92,12 @@ struct StandardAccumulator { } impl StandardAccumulator { - fn new(hint: usize) -> Self { + fn new() -> Self { Self { row_offsets: RowOffsetsAccumulator::new(), - maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into(), hint), - minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into(), hint), - null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), + maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into()), + minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into()), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()), } } } @@ -138,10 +138,10 @@ struct BasicAccumulator { } impl BasicAccumulator { - fn new(hint: usize) -> Self { + fn new() -> Self { Self { row_offsets: RowOffsetsAccumulator::new(), - null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into(), hint), + null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()), } } } @@ -181,11 +181,11 @@ struct UnwrappedStatAccumulator { } impl UnwrappedStatAccumulator { - fn new(stat: Stat, name: FieldName, hint: usize) -> Self { + fn new(stat: Stat, name: FieldName) -> Self { Self { stat, name, - values: Vec::with_capacity(hint), + values: Vec::new(), } } } diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index caa49dfba6..ba0fd6136c 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -84,11 +84,9 @@ impl LayoutWriter { where S: ArrayStream + Unpin, { - let size_hint = stream.size_hint().0; let column_writer = match self.column_writers.get_mut(column_idx) { None => { - self.column_writers - .push(ColumnWriter::new(size_hint, stream.dtype())); + self.column_writers.push(ColumnWriter::new(stream.dtype())); assert_eq!( self.column_writers.len(), @@ -165,15 +163,13 @@ async fn write_fb_raw(mut writer: W, fb: F) struct ColumnWriter { metadata: Box, batch_byte_offsets: Vec>, - size_hint: usize, } impl ColumnWriter { - fn new(size_hint: usize, dtype: &DType) -> Self { + fn new(dtype: &DType) -> Self { Self { - metadata: new_metadata_accumulator(size_hint, dtype), + metadata: new_metadata_accumulator(dtype), batch_byte_offsets: Vec::new(), - size_hint, } } @@ -182,7 +178,7 @@ impl ColumnWriter { mut stream: S, msgs: &mut MessageWriter, ) -> VortexResult<()> { - let mut offsets = Vec::with_capacity(self.size_hint + 1); + let mut offsets = Vec::with_capacity(stream.size_hint().0 + 1); offsets.push(msgs.tell()); while let Some(chunk) = stream.try_next().await? { From 68b68e314e15f6495fd0c083e936b6177ccfcac1 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Fri, 1 Nov 2024 10:04:01 -0400 Subject: [PATCH 14/15] clippy + added some metadata schema tests --- vortex-serde/src/layouts/tests.rs | 22 +++++- .../layouts/write/metadata_accumulators.rs | 75 ++++++++++++++++++- vortex-serde/src/layouts/write/writer.rs | 4 +- 3 files changed, 94 insertions(+), 7 deletions(-) diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 4ddd1c87c4..4af61c58b7 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -14,7 +14,27 @@ use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_expr::{BinaryExpr, Column, Literal, Operator}; use crate::layouts::write::LayoutWriter; -use crate::layouts::{LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter}; +use crate::layouts::{ + LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter, CHUNKED_LAYOUT_ID, + COLUMN_LAYOUT_ID, EOF_SIZE, FLAT_LAYOUT_ID, FOOTER_POSTSCRIPT_SIZE, INLINE_SCHEMA_LAYOUT_ID, + MAGIC_BYTES, VERSION, +}; + +#[test] +fn test_constants() { + // the footer postscript size can change iff we increment the version + // i.e., it must be 32 bytes iff VERSION == 1 + assert_eq!(VERSION, 1); + assert_eq!(FOOTER_POSTSCRIPT_SIZE, 32); + + // these constants can never change (without breaking all existing files) + assert_eq!(MAGIC_BYTES, *b"VRTX"); + assert_eq!(EOF_SIZE, 8); + assert_eq!(FLAT_LAYOUT_ID.0, 1); + assert_eq!(CHUNKED_LAYOUT_ID.0, 2); + assert_eq!(COLUMN_LAYOUT_ID.0, 3); + assert_eq!(INLINE_SCHEMA_LAYOUT_ID.0, 4); +} #[tokio::test] #[cfg_attr(miri, ignore)] diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index 6fb7dfe260..aacc1028b0 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -74,7 +74,7 @@ impl MetadataAccumulator for BoolAccumulator { self.null_count.into_column(), ] .into_iter() - .filter_map(|o| o) + .flatten() .unzip(); let names = Arc::from(names); let n_chunks = fields[0].len(); @@ -122,7 +122,7 @@ where self.null_count.into_column(), ] .into_iter() - .filter_map(|o| o) + .flatten() .unzip(); let names = Arc::from(names); let n_chunks = fields[0].len(); @@ -158,7 +158,7 @@ impl MetadataAccumulator for BasicAccumulator { self.null_count.into_column(), ] .into_iter() - .filter_map(|o| o) + .flatten() .unzip(); let names = Arc::from(names); let n_chunks = fields[0].len(); @@ -238,6 +238,73 @@ impl SingularAccumulator for RowOffsetsAccumulator { fn into_column(self) -> Option<(FieldName, Array)> { // intentionally excluding the last n_rows, b/c it is just the total number of rows - return Some(("row_offsets".into(), Array::from(self.row_offsets))); + Some(("row_offsets".into(), Array::from(self.row_offsets))) + } +} + +#[cfg(test)] +mod tests { + use vortex::array::{BoolArray, PrimitiveArray}; + use vortex::variants::StructArrayTrait; + + use super::*; + + fn assert_field_names(struct_array: &StructArray, names: &[&str]) { + assert_eq!( + struct_array.names(), + &names + .iter() + .map(|s| FieldName::from(s.to_string())) + .collect::>() + .into() + ); + } + + #[test] + fn test_bool_metadata_schema() { + let mut bool_accumulator = BoolAccumulator::new(); + let chunk = BoolArray::from_vec(vec![true], Validity::AllValid).into_array(); + bool_accumulator.push_chunk(&chunk).unwrap(); + + let struct_array = + StructArray::try_from(Box::new(bool_accumulator).into_array().unwrap()).unwrap(); + assert_eq!(struct_array.len(), 1); + assert_field_names(&struct_array, &["row_offsets", "max", "min", "true_count"]); + } + + #[test] + fn test_standard_metadata_schema_nonnullable() { + let mut standard_accumulator = StandardAccumulator::::new(); + let chunk = PrimitiveArray::from_nullable_vec(vec![Some(1u64)]).into_array(); + standard_accumulator.push_chunk(&chunk).unwrap(); + + let struct_array = + StructArray::try_from(Box::new(standard_accumulator).into_array().unwrap()).unwrap(); + assert_eq!(struct_array.len(), 1); + assert_field_names(&struct_array, &["row_offsets", "max", "min", "null_count"]); + } + + #[test] + fn test_standard_metadata_schema_nullable() { + let mut standard_accumulator = StandardAccumulator::::new(); + let chunk = PrimitiveArray::from_nullable_vec(vec![Some(1u64)]).into_array(); + standard_accumulator.push_chunk(&chunk).unwrap(); + + let struct_array = + StructArray::try_from(Box::new(standard_accumulator).into_array().unwrap()).unwrap(); + assert_eq!(struct_array.len(), 1); + assert_field_names(&struct_array, &["row_offsets", "max", "min", "null_count"]); + } + + #[test] + fn test_basic_metadata_schema() { + let mut basic_accumulator = BasicAccumulator::new(); + let chunk = PrimitiveArray::from_nullable_vec(vec![Some(1u64)]).into_array(); + basic_accumulator.push_chunk(&chunk).unwrap(); + + let struct_array = + StructArray::try_from(Box::new(basic_accumulator).into_array().unwrap()).unwrap(); + assert_eq!(struct_array.len(), 1); + assert_field_names(&struct_array, &["row_offsets", "null_count"]); } } diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index ba0fd6136c..5458732e5b 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -109,7 +109,7 @@ impl LayoutWriter { async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = Vec::with_capacity(self.column_writers.len()); for column_writer in mem::take(&mut self.column_writers) { - let chunk_layouts = column_writer.write_into(&mut self.msgs).await?; + let chunk_layouts = column_writer.write_metadata(&mut self.msgs).await?; column_layouts.push(Layout::chunked(chunk_layouts, true)); } @@ -192,7 +192,7 @@ impl ColumnWriter { Ok(()) } - async fn write_into( + async fn write_metadata( self, msgs: &mut MessageWriter, ) -> VortexResult> { From 14d11796fc1e24aad382e2b773af23a9fab2530e Mon Sep 17 00:00:00 2001 From: Will Manning Date: Fri, 1 Nov 2024 10:13:54 -0400 Subject: [PATCH 15/15] better stats coverage --- vortex-array/src/array/bool/stats.rs | 2 ++ vortex-array/src/array/constant/stats.rs | 15 ++++++++++++++- .../src/layouts/write/metadata_accumulators.rs | 11 ++++++++--- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/array/bool/stats.rs b/vortex-array/src/array/bool/stats.rs index d9703bca4e..5e8a09338b 100644 --- a/vortex-array/src/array/bool/stats.rs +++ b/vortex-array/src/array/bool/stats.rs @@ -120,6 +120,7 @@ impl BoolStatsAccumulator { StatsSet::from(HashMap::from([ (Stat::Min, (self.true_count == self.len).into()), (Stat::Max, (self.true_count > 0).into()), + (Stat::NullCount, self.null_count.into()), ( Stat::IsConstant, (self.null_count == 0 && (self.true_count == self.len || self.true_count == 0) @@ -155,6 +156,7 @@ mod test { assert!(!bool_arr.statistics().compute_is_constant().unwrap()); assert!(!bool_arr.statistics().compute_min::().unwrap()); assert!(bool_arr.statistics().compute_max::().unwrap()); + assert_eq!(bool_arr.statistics().compute_null_count().unwrap(), 0); assert_eq!(bool_arr.statistics().compute_run_count().unwrap(), 5); assert_eq!(bool_arr.statistics().compute_true_count().unwrap(), 4); } diff --git a/vortex-array/src/array/constant/stats.rs b/vortex-array/src/array/constant/stats.rs index 2fd147d9cb..8006124409 100644 --- a/vortex-array/src/array/constant/stats.rs +++ b/vortex-array/src/array/constant/stats.rs @@ -7,7 +7,11 @@ use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; impl ArrayStatisticsCompute for ConstantArray { fn compute_statistics(&self, _stat: Stat) -> VortexResult { - let mut stats_map = HashMap::from([(Stat::IsConstant, true.into())]); + let mut stats_map = HashMap::from([ + (Stat::IsConstant, true.into()), + (Stat::IsSorted, true.into()), + (Stat::IsStrictSorted, (self.len() <= 1).into()), + ]); if let ScalarValue::Bool(b) = self.scalar_value() { let true_count = if *b { self.len() as u64 } else { 0 }; @@ -15,6 +19,15 @@ impl ArrayStatisticsCompute for ConstantArray { stats_map.insert(Stat::TrueCount, true_count.into()); } + stats_map.insert( + Stat::NullCount, + self.scalar_value() + .is_null() + .then_some(self.len() as u64) + .unwrap_or_default() + .into(), + ); + Ok(StatsSet::from(stats_map)) } } diff --git a/vortex-serde/src/layouts/write/metadata_accumulators.rs b/vortex-serde/src/layouts/write/metadata_accumulators.rs index aacc1028b0..5511f8bed5 100644 --- a/vortex-serde/src/layouts/write/metadata_accumulators.rs +++ b/vortex-serde/src/layouts/write/metadata_accumulators.rs @@ -244,8 +244,9 @@ impl SingularAccumulator for RowOffsetsAccumulator { #[cfg(test)] mod tests { - use vortex::array::{BoolArray, PrimitiveArray}; + use vortex::array::{BoolArray, ConstantArray, PrimitiveArray}; use vortex::variants::StructArrayTrait; + use vortex_dtype::Nullability; use super::*; @@ -269,7 +270,10 @@ mod tests { let struct_array = StructArray::try_from(Box::new(bool_accumulator).into_array().unwrap()).unwrap(); assert_eq!(struct_array.len(), 1); - assert_field_names(&struct_array, &["row_offsets", "max", "min", "true_count"]); + assert_field_names( + &struct_array, + &["row_offsets", "max", "min", "true_count", "null_count"], + ); } #[test] @@ -287,7 +291,8 @@ mod tests { #[test] fn test_standard_metadata_schema_nullable() { let mut standard_accumulator = StandardAccumulator::::new(); - let chunk = PrimitiveArray::from_nullable_vec(vec![Some(1u64)]).into_array(); + let chunk = + ConstantArray::new(Scalar::primitive(1u64, Nullability::Nullable), 10).into_array(); standard_accumulator.push_chunk(&chunk).unwrap(); let struct_array =