diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 9461e54b7b..d3d0779690 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -33,7 +33,7 @@ use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; static DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1))); + LazyLock::new(|| Arc::new(IoDispatcher::default())); pub const BATCH_SIZE: usize = 65_536; diff --git a/vortex-buffer/src/string.rs b/vortex-buffer/src/string.rs index de3c7748c3..952d3f92e1 100644 --- a/vortex-buffer/src/string.rs +++ b/vortex-buffer/src/string.rs @@ -44,6 +44,12 @@ impl From for BufferString { } } +impl From<&str> for BufferString { + fn from(value: &str) -> Self { + Self(Buffer::from(String::from(value).into_bytes())) + } +} + impl TryFrom for BufferString { type Error = Utf8Error; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index d408eb2d8d..6b418016a8 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; @@ -9,7 +9,9 @@ use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::SessionState; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; -use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics}; +use datafusion_common::{ + not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics, +}; use datafusion_expr::Expr; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -17,10 +19,15 @@ use datafusion_physical_plan::ExecutionPlan; use object_store::{ObjectMeta, ObjectStore}; use vortex_array::arrow::infer_schema; use vortex_array::Context; -use vortex_file::{read_initial_bytes, VORTEX_FILE_EXTENSION}; -use vortex_io::ObjectStoreReadAt; +use vortex_file::metadata::MetadataFetcher; +use vortex_file::{ + read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, + Scan, VORTEX_FILE_EXTENSION, +}; +use vortex_io::{IoDispatcher, ObjectStoreReadAt}; use super::execution::VortexExec; +use super::statistics::array_to_col_statistics; use crate::can_be_pushed_down; #[derive(Debug, Default)] @@ -86,13 +93,48 @@ impl FileFormat for VortexFormat { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; let layout = initial_read.fb_layout()?; + let dtype = initial_read.lazy_dtype().map_err(|e| { + DataFusionError::External(Box::new( + e.with_context("Failed to fetch dtype from initial read"), + )) + })?; let row_count = layout.row_count(); - let stats = Statistics { - num_rows: Precision::Exact(row_count as usize), - total_byte_size: Precision::Absent, - column_statistics: Statistics::unknown_column(&table_schema), - }; + let layout_deserializer = + LayoutDeserializer::new(Context::default().into(), LayoutContext::default().into()); + let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::new())); + let relative_message_cache = + RelativeLayoutCache::new(layout_message_cache.clone(), dtype.into()); + + let root_layout = vortex_file::read_layout_from_initial( + &initial_read, + &layout_deserializer, + Scan::empty(), + relative_message_cache, + )?; + + let io = IoDispatcher::default(); + let mut stats = Statistics::new_unknown(&table_schema); + stats.num_rows = Precision::Exact(row_count as usize); + + let metadata_table = + MetadataFetcher::fetch(os_read_at, io.into(), root_layout, layout_message_cache) + .await?; + + if let Some(metadata) = metadata_table { + let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); + + for col_stats in metadata.into_iter() { + let col_stats = match col_stats { + Some(array) => array_to_col_statistics(array.try_into()?)?, + None => ColumnStatistics::new_unknown(), + }; + + column_statistics.push(col_stats); + } + + stats.column_statistics = column_statistics; + } Ok(stats) } diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 7a985d2e17..da33dd239c 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,3 +2,4 @@ pub mod config; pub mod execution; pub mod format; pub mod opener; +pub mod statistics; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 893ece2c50..b7fdb0e70c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -14,7 +14,7 @@ use vortex_io::{IoDispatcher, ObjectStoreReadAt}; /// Share an IO dispatcher across all DataFusion instances. static IO_DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1))); + LazyLock::new(|| Arc::new(IoDispatcher::default())); pub struct VortexFileOpener { pub ctx: Arc, diff --git a/vortex-datafusion/src/persistent/statistics.rs b/vortex-datafusion/src/persistent/statistics.rs new file mode 100644 index 0000000000..de51e62b8f --- /dev/null +++ b/vortex-datafusion/src/persistent/statistics.rs @@ -0,0 +1,42 @@ +use arrow_array::cast::AsArray; +use arrow_array::types::UInt64Type; +use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_common::stats::Precision; +use datafusion_common::ColumnStatistics; +use datafusion_expr::Accumulator; +use vortex_array::array::StructArray; +use vortex_array::variants::StructArrayTrait as _; +use vortex_array::IntoCanonical; +use vortex_error::VortexResult; + +pub fn array_to_col_statistics(array: StructArray) -> VortexResult { + let mut stats = ColumnStatistics::new_unknown(); + + if let Some(null_count_array) = array.field_by_name("null_count") { + let array = null_count_array.into_canonical()?.into_arrow()?; + let array = array.as_primitive::(); + + let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::(); + stats.null_count = Precision::Exact(null_count as usize); + } + + if let Some(max_value_array) = array.field_by_name("max") { + let array = max_value_array.into_canonical()?.into_arrow()?; + let mut acc = MaxAccumulator::try_new(array.data_type())?; + acc.update_batch(&[array])?; + + let max_val = acc.evaluate()?; + stats.max_value = Precision::Exact(max_val) + } + + if let Some(min_value_array) = array.field_by_name("min") { + let array = min_value_array.into_canonical()?.into_arrow()?; + let mut acc = MinAccumulator::try_new(array.data_type())?; + acc.update_batch(&[array])?; + + let max_val = acc.evaluate()?; + stats.min_value = Precision::Exact(max_val) + } + + Ok(stats) +} diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 1e49922e8e..adacb109db 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -24,11 +24,7 @@ futures-executor = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } once_cell = { workspace = true } -tokio = { workspace = true, features = [ - "io-util", - "fs", - "rt-multi-thread", -] } +tokio = { workspace = true, features = ["io-util", "fs", "rt-multi-thread"] } tracing = { workspace = true, optional = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } @@ -50,8 +46,5 @@ workspace = true [features] futures = ["futures-util/io", "vortex-io/futures"] -object_store = [ - "vortex-error/object_store", - "vortex-io/object_store", -] +object_store = ["vortex-error/object_store", "vortex-io/object_store"] tracing = ["dep:tracing", "vortex-io/tracing"] diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index b32870396f..bbdd8e4cd1 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -165,9 +165,7 @@ impl VortexReadBuilder { .transpose()?; // Default: fallback to single-threaded tokio dispatcher. - let io_dispatcher = self - .io_dispatcher - .unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1))); + let io_dispatcher = self.io_dispatcher.unwrap_or_default(); Ok(VortexFileArrayStream::new( self.read_at, diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index 21aa568bc2..c9b9346bfc 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -1,5 +1,5 @@ use std::collections::BTreeSet; -use std::sync::RwLock; +use std::sync::{OnceLock, RwLock}; use bytes::Bytes; use itertools::Itertools; @@ -14,7 +14,7 @@ use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, - Scan, CHUNKED_LAYOUT_ID, + MetadataRead, Scan, CHUNKED_LAYOUT_ID, }; #[derive(Default, Debug)] @@ -84,7 +84,7 @@ impl ChunkedLayoutBuilder { self.fb_bytes.clone(), metadata_fb._tab.loc(), // TODO(robert): Create stats projection - Scan::new(None), + Scan::empty(), self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID), ) }) @@ -170,6 +170,7 @@ pub struct ChunkedLayoutReader { layouts: Vec, metadata_layout: Option>, in_progress_ranges: InProgressLayoutRanges, + cached_metadata: OnceLock, } impl ChunkedLayoutReader { @@ -181,6 +182,7 @@ impl ChunkedLayoutReader { layouts, metadata_layout, in_progress_ranges: RwLock::new(HashMap::new()), + cached_metadata: OnceLock::new(), } } @@ -234,7 +236,6 @@ impl ChunkedLayoutReader { self.layouts.len() } - #[allow(dead_code)] pub fn metadata_layout(&self) -> Option<&dyn LayoutReader> { self.metadata_layout.as_deref() } @@ -277,6 +278,29 @@ impl LayoutReader for ChunkedLayoutReader { Ok(None) } } + + fn read_metadata(&self) -> VortexResult { + match self.metadata_layout() { + None => Ok(MetadataRead::None), + Some(metadata_layout) => { + if let Some(md) = self.cached_metadata.get() { + return Ok(MetadataRead::Batches(vec![Some(md.clone())])); + } + + match metadata_layout + .read_selection(&RowMask::new_valid_between(0, self.n_chunks()))? + { + Some(BatchRead::Batch(array)) => { + // We don't care if the write failed + _ = self.cached_metadata.set(array.clone()); + Ok(MetadataRead::Batches(vec![Some(array)])) + } + Some(BatchRead::ReadMore(messages)) => Ok(MetadataRead::ReadMore(messages)), + None => Ok(MetadataRead::None), + } + } + } + } } #[cfg(test)] diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index 4ce305e99a..0cdc2befc5 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -9,8 +9,8 @@ use vortex_array::stats::ArrayStatistics; use vortex_array::validity::Validity; use vortex_array::{ArrayData, IntoArrayData}; use vortex_dtype::field::Field; -use vortex_dtype::FieldNames; -use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; +use vortex_dtype::{FieldName, FieldNames}; +use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult}; use vortex_expr::{Column, Select, VortexExpr}; use vortex_flatbuffers::footer; @@ -18,7 +18,7 @@ use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::expr_project::expr_project; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, RowFilter, Scan, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MetadataRead, RowFilter, Scan, COLUMNAR_LAYOUT_ID, }; @@ -203,6 +203,7 @@ pub struct ColumnarLayoutReader { expr: Option>, // TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not shortcircuit_siblings: bool, + in_progress_metadata: RwLock>>, } impl ColumnarLayoutReader { @@ -220,9 +221,10 @@ impl ColumnarLayoutReader { Self { names, children, - in_progress_ranges: RwLock::new(HashMap::new()), expr, shortcircuit_siblings, + in_progress_ranges: RwLock::new(HashMap::new()), + in_progress_metadata: RwLock::new(HashMap::new()), } } } @@ -303,6 +305,44 @@ impl LayoutReader for ColumnarLayoutReader { Ok(Some(BatchRead::ReadMore(messages))) } } + + fn read_metadata(&self) -> VortexResult { + let mut in_progress_metadata = self + .in_progress_metadata + .write() + .unwrap_or_else(|e| vortex_panic!("lock is poisoned: {e}")); + let mut messages = Vec::default(); + + for (name, child_reader) in self.names.iter().zip(self.children.iter()) { + match child_reader.read_metadata()? { + MetadataRead::Batches(data) => { + if data.len() != 1 { + vortex_bail!("expected exactly one metadata array per-child"); + } + in_progress_metadata.insert(name.clone(), data[0].clone()); + } + MetadataRead::ReadMore(rm) => { + messages.extend(rm); + } + MetadataRead::None => { + in_progress_metadata.insert(name.clone(), None); + } + } + } + + // We're done reading + if messages.is_empty() { + let child_arrays = self + .names + .iter() + .map(|name| in_progress_metadata[name].clone()) // TODO(Adam): Some columns might not have statistics + .collect::>(); + + Ok(MetadataRead::Batches(child_arrays)) + } else { + Ok(MetadataRead::ReadMore(messages)) + } + } } #[cfg(test)] diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index d1ef8e7c8e..148c24b50a 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -11,8 +11,8 @@ use vortex_ipc::stream_writer::ByteRange; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, Scan, - FLAT_LAYOUT_ID, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, MetadataRead, + Scan, FLAT_LAYOUT_ID, }; #[derive(Debug)] @@ -116,6 +116,10 @@ impl LayoutReader for FlatLayoutReader { Ok(Some(BatchRead::ReadMore(vec![self.own_message()]))) } } + + fn read_metadata(&self) -> VortexResult { + Ok(MetadataRead::None) + } } #[cfg(test)] diff --git a/vortex-file/src/read/layouts/inline_dtype.rs b/vortex-file/src/read/layouts/inline_dtype.rs index 074148cc12..f2ebc0cbd4 100644 --- a/vortex-file/src/read/layouts/inline_dtype.rs +++ b/vortex-file/src/read/layouts/inline_dtype.rs @@ -12,7 +12,7 @@ use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::mask::RowMask; use crate::{ BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, - Scan, INLINE_SCHEMA_LAYOUT_ID, + MetadataRead, Scan, INLINE_SCHEMA_LAYOUT_ID, }; #[derive(Debug)] @@ -136,4 +136,8 @@ impl LayoutReader for InlineDTypeLayoutReader { Ok(Some(BatchRead::ReadMore(vec![self.dtype_message()?]))) } } + + fn read_metadata(&self) -> VortexResult { + Ok(MetadataRead::None) + } } diff --git a/vortex-file/src/read/metadata.rs b/vortex-file/src/read/metadata.rs new file mode 100644 index 0000000000..2177ebb3a4 --- /dev/null +++ b/vortex-file/src/read/metadata.rs @@ -0,0 +1,265 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{ready, Context, Poll}; + +use futures::future::BoxFuture; +use futures::FutureExt as _; +use vortex_array::ArrayData; +use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult}; +use vortex_io::{Dispatch as _, IoDispatcher, VortexReadAt}; + +use super::stream::{read_ranges, StreamMessages}; +use super::{LayoutMessageCache, LayoutReader, MessageLocator, MetadataRead}; +use crate::read::stream::Message; + +pub struct MetadataFetcher { + input: R, + dispatcher: Arc, + root_layout: Box, + layout_cache: Arc>, + state: State, +} + +enum State { + Initial, + Reading(BoxFuture<'static, VortexResult>), +} + +impl MetadataFetcher { + pub fn fetch( + input: R, + dispatcher: Arc, + root_layout: Box, + layout_cache: Arc>, + ) -> Self { + Self { + input, + dispatcher, + root_layout, + layout_cache, + state: State::Initial, + } + } + + /// Schedule an asynchronous read of several byte ranges. + /// + /// IO is scheduled on the provided IO dispatcher. + fn read_ranges( + &self, + ranges: Vec, + ) -> BoxFuture<'static, VortexResult> { + let reader = self.input.clone(); + + let result_rx = self + .dispatcher + .dispatch(move || async move { read_ranges(reader, ranges).await }) + .vortex_expect("dispatch async task"); + + result_rx + .map(|res| match res { + Ok(result) => result, + Err(e) => vortex_bail!("dispatcher channel canceled: {e}"), + }) + .boxed() + } +} + +impl Future for MetadataFetcher { + type Output = VortexResult>>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match &mut self.state { + State::Initial => match self.root_layout.read_metadata()? { + MetadataRead::ReadMore(messages) => { + let read_future = self.read_ranges(messages); + self.state = State::Reading(read_future); + } + MetadataRead::Batches(array_data) => { + return Poll::Ready(Ok(Some(array_data))); + } + MetadataRead::None => { + return Poll::Ready(Ok(None)); + } + }, + State::Reading(ref mut f) => { + let messages = ready!(f.poll_unpin(cx))?; + + match self.layout_cache.write() { + Ok(mut cache) => { + for Message(message_id, bytes) in messages.into_iter() { + cache.set(message_id, bytes); + } + } + Err(poison) => { + vortex_panic!("Failed to write to message cache: {poison}") + } + } + + self.state = State::Initial; + } + } + } + } +} + +#[cfg(test)] +mod test { + use std::sync::{Arc, RwLock}; + + use vortex_array::array::{ChunkedArray, StructArray}; + use vortex_array::compute::unary::scalar_at; + use vortex_array::{ArrayDType as _, ArrayData, IntoArrayData as _}; + use vortex_buffer::{Buffer, BufferString}; + use vortex_io::IoDispatcher; + + use crate::metadata::MetadataFetcher; + use crate::{ + read_initial_bytes, read_layout_from_initial, LayoutDeserializer, LayoutMessageCache, + RelativeLayoutCache, Scan, VortexFileWriter, + }; + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_metadata_works() { + let name_chunk1 = ArrayData::from_iter(vec![ + Some("Joseph".to_owned()), + Some("James".to_owned()), + Some("Angela".to_owned()), + ]); + let age_chunk1 = ArrayData::from_iter(vec![Some(25_i32), Some(31), None]); + let name_chunk2 = ArrayData::from_iter(vec![ + Some("Pharrell".to_owned()), + Some("Khalil".to_owned()), + Some("Mikhail".to_owned()), + None, + ]); + let age_chunk2 = ArrayData::from_iter(vec![Some(57_i32), Some(18), None, Some(32)]); + + let chunk1 = StructArray::from_fields(&[("name", name_chunk1), ("age", age_chunk1)]) + .unwrap() + .into_array(); + let chunk2 = StructArray::from_fields(&[("name", name_chunk2), ("age", age_chunk2)]) + .unwrap() + .into_array(); + let dtype = chunk1.dtype().clone(); + + let array = ChunkedArray::try_new(vec![chunk1, chunk2], dtype) + .unwrap() + .into_array(); + + let buffer = Vec::new(); + let written_bytes = VortexFileWriter::new(buffer) + .write_array_columns(array) + .await + .unwrap() + .finalize() + .await + .unwrap(); + let written_bytes = Buffer::from(written_bytes); + + let n_bytes = written_bytes.len(); + let initial_read = read_initial_bytes(&written_bytes, n_bytes as u64) + .await + .unwrap(); + let lazy_dtype = Arc::new(initial_read.lazy_dtype().unwrap()); + let layout_deserializer = LayoutDeserializer::default(); + let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let layout_reader = read_layout_from_initial( + &initial_read, + &layout_deserializer, + Scan::empty(), + RelativeLayoutCache::new(layout_message_cache.clone(), lazy_dtype.clone()), + ) + .unwrap(); + let io = IoDispatcher::default(); + let metadata_table = MetadataFetcher::fetch( + written_bytes, + io.into(), + layout_reader, + layout_message_cache, + ) + .await + .unwrap(); + + assert!(metadata_table.is_some()); + let metadata_table = metadata_table.unwrap(); + assert!(metadata_table.len() == 2); + assert!(metadata_table.iter().all(Option::is_some)); + + metadata_table[0] + .as_ref() + .unwrap() + .with_dyn(|name_metadata_table| { + let name_metadata_table = name_metadata_table.as_struct_array().unwrap(); + + let min = name_metadata_table.field_by_name("min").unwrap(); + let chunk1_min = scalar_at(&min, 0).unwrap(); + let chunk2_min = scalar_at(&min, 1).unwrap(); + assert_eq!( + chunk1_min.as_utf8().value(), + Some(BufferString::from("Angela")) + ); + assert_eq!( + chunk2_min.as_utf8().value(), + Some(BufferString::from("Khalil")) + ); + + let max = name_metadata_table.field_by_name("max").unwrap(); + let chunk1_max = scalar_at(&max, 0).unwrap(); + let chunk2_max = scalar_at(&max, 1).unwrap(); + assert_eq!( + chunk1_max.as_utf8().value(), + Some(BufferString::from("Joseph")) + ); + assert_eq!( + chunk2_max.as_utf8().value(), + Some(BufferString::from("Pharrell")) + ); + + let null_count = name_metadata_table.field_by_name("null_count").unwrap(); + let chunk1_null_count = scalar_at(&null_count, 0).unwrap(); + let chunk2_null_count = scalar_at(&null_count, 1).unwrap(); + assert_eq!( + chunk1_null_count.as_primitive().typed_value::(), + Some(0) + ); + assert_eq!( + chunk2_null_count.as_primitive().typed_value::(), + Some(1) + ); + }); + + metadata_table[1] + .as_ref() + .unwrap() + .with_dyn(|age_metadata_table| { + let age_metadata_table = age_metadata_table.as_struct_array().unwrap(); + + let min = age_metadata_table.field_by_name("min").unwrap(); + let chunk1_min = scalar_at(&min, 0).unwrap(); + let chunk2_min = scalar_at(&min, 1).unwrap(); + assert_eq!(chunk1_min.as_primitive().typed_value::(), Some(25)); + assert_eq!(chunk2_min.as_primitive().typed_value::(), Some(18)); + + let max = age_metadata_table.field_by_name("max").unwrap(); + let chunk1_max = scalar_at(&max, 0).unwrap(); + let chunk2_max = scalar_at(&max, 1).unwrap(); + assert_eq!(chunk1_max.as_primitive().typed_value::(), Some(31)); + assert_eq!(chunk2_max.as_primitive().typed_value::(), Some(57)); + + let null_count = age_metadata_table.field_by_name("null_count").unwrap(); + let chunk1_null_count = scalar_at(&null_count, 0).unwrap(); + let chunk2_null_count = scalar_at(&null_count, 1).unwrap(); + assert_eq!( + chunk1_null_count.as_primitive().typed_value::(), + Some(1) + ); + assert_eq!( + chunk2_null_count.as_primitive().typed_value::(), + Some(1) + ); + }); + } +} diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index 6125b5b945..e2f9ee3df3 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -11,6 +11,7 @@ mod expr_project; mod filtering; pub mod layouts; mod mask; +pub mod metadata; pub mod projection; mod recordbatchreader; mod splits; @@ -39,6 +40,10 @@ pub struct Scan { } impl Scan { + pub fn empty() -> Self { + Self { expr: None } + } + pub fn new(expr: Option) -> Self { Self { expr } } @@ -59,6 +64,15 @@ pub enum BatchRead { Batch(ArrayData), } +#[derive(Debug)] +pub enum MetadataRead { + /// Layout has no metadata + None, + /// Additional IO is required + ReadMore(Vec), + Batches(Vec>), +} + /// A reader for a layout, a serialized sequence of Vortex arrays. /// /// Some layouts are _horizontally divisble_: they can read a sub-sequence of rows independently of @@ -83,4 +97,7 @@ pub trait LayoutReader: Debug + Send { /// /// The layout is finished producing data for selection when it returns None fn read_selection(&self, selector: &RowMask) -> VortexResult>; + + /// Reads the metadata of the layout, if it exists. + fn read_metadata(&self) -> VortexResult; } diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index fbf3e78a86..c2550d2feb 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -89,10 +89,10 @@ impl VortexFileArrayStream { /// A message that has had its bytes materialized onto the heap. #[derive(Debug, Clone)] -struct Message(pub MessageId, pub Bytes); +pub(crate) struct Message(pub MessageId, pub Bytes); -type StreamMessages = Vec; -type StreamStateFuture = BoxFuture<'static, VortexResult>; +pub(crate) type StreamMessages = Vec; +pub(crate) type StreamStateFuture = BoxFuture<'static, VortexResult>; enum ReadingFor { Read(StreamStateFuture, RowMask, MaskIteratorRef), @@ -107,8 +107,7 @@ enum ReadingPoll { impl ReadingFor { fn future(&mut self) -> &mut StreamStateFuture { match self { - ReadingFor::Read(future, ..) => future, - ReadingFor::NextSplit(future, ..) => future, + ReadingFor::Read(future, ..) | ReadingFor::NextSplit(future, ..) => future, } } @@ -297,7 +296,7 @@ impl VortexFileArrayStream { } #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))] -async fn read_ranges( +pub async fn read_ranges( reader: R, ranges: Vec, ) -> VortexResult> { diff --git a/vortex-io/src/dispatcher/mod.rs b/vortex-io/src/dispatcher/mod.rs index 0434b0bca3..069cd0733f 100644 --- a/vortex-io/src/dispatcher/mod.rs +++ b/vortex-io/src/dispatcher/mod.rs @@ -66,6 +66,17 @@ enum Inner { Compio(CompioDispatcher), } +impl Default for IoDispatcher { + fn default() -> Self { + #[cfg(feature = "tokio")] + return Self(Inner::Tokio(TokioDispatcher::new(1))); + #[cfg(all(feature = "compio", not(feature = "tokio")))] + return Self(Inner::Compio(CompioDispatcher::new(1))); + #[cfg(not(any(feature = "compio", feature = "tokio")))] + return Self(Inner {}); + } +} + impl Dispatch for IoDispatcher { #[allow(unused_variables)] fn dispatch(&self, task: F) -> VortexResult>