diff --git a/Cargo.lock b/Cargo.lock index c7ad543185..156b5fa1ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,9 +133,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" [[package]] name = "arbitrary" @@ -899,9 +899,9 @@ dependencies = [ [[package]] name = "croaring" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f916e6377da947177a7db813c0a13589fe0ba39d0284d37444b1ce225129c905" +checksum = "79c3802db87f376a58d7a956f10f874110d0f7da89ba381cbeee94360c42a753" dependencies = [ "croaring-sys", ] @@ -3638,9 +3638,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.129" +version = "1.0.131" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbcf9b78a125ee667ae19388837dd12294b858d101fdd393cb9d5501ef09eb2" +checksum = "67d42a0bd4ac281beff598909bb56a86acaf979b84483e1c79c10dcaf98f8cf3" dependencies = [ "itoa", "memchr", diff --git a/README.md b/README.md index b3ee59eb39..59ac73af4c 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,12 @@ Vortex is an extensible, state-of-the-art columnar file format, with associated tools for working with compressed Apache Arrow arrays in-memory, on-disk, and over-the-wire. -Vortex is an aspiring successor to Apache Parquet, with dramatically faster random access reads (100-200x faster) -and scans (2-10x faster), while preserving approximately the same compression ratio and write throughput as Parquet with zstd. -It will also support very wide tables (at least 10s of thousands of columns) and (eventually) on-device decompression on GPUs. +Vortex is an aspiring successor to Apache Parquet, with dramatically faster random access reads (100-200x faster) and scans (2-10x faster), +while preserving approximately the same compression ratio and write throughput as Parquet with zstd. +It is designed to support very wide tables (at least 10s of thousands of columns) and (eventually) on-device decompression on GPUs. -Vortex is designed to be to columnar file formats what Apache DataFusion is to query engines: highly extensible, -extremely fast, batteries-included. +Vortex is intended to be to columnar file formats what Apache DataFusion is to query engines: highly extensible, +extremely fast, & batteries-included. > [!CAUTION] > This library is still under rapid development and is a work in progress! @@ -58,7 +58,12 @@ One of the unique attributes of the (in-progress) Vortex file format is that it file's footer. This allows the file format to be effectively self-describing and to evolve without breaking changes to the file format specification. -In fact, the format is designed to support forward compatibility by optionally embedding WASM decoders directly into the files +For example, the Compressor implementation can choose to chunk data into a Parquet-like layout with +row groups and aligned pages (ChunkedArray of StructArray of ChunkedArrays with equal chunk sizes). Alternatively, it can choose +to chunk different columns differently based on their compressed size and data distributions (e.g., a column that is constant +across all rows can be a single chunk, whereas a large string column may be split arbitrarily many times). + +In the same vein, the format is designed to support forward compatibility by optionally embedding WASM decoders directly into the files themselves. This should help avoid the rapid calcification that has plagued other columnar file formats. ## Components @@ -120,10 +125,10 @@ in-memory array implementation, allowing us to defer decompression. Currently, t Vortex's default compression strategy is based on the [BtrBlocks](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf) paper. -Roughly, for each chunk of data, a sample of at least ~1% of the data is taken. Compression is then attempted ( -recursively) with a set of lightweight encodings. The best-performing combination of encodings is then chosen to encode -the entire chunk. This sounds like it would be very expensive, but given basic statistics about a chunk, it is -possible to cheaply prune many encodings and ensure the search space does not explode in size. +Roughly, for each chunk of data, a sample of at least ~1% of the data is taken. Compression is then attempted +(recursively) with a set of lightweight encodings. The best-performing combination of encodings is then chosen to encode +the entire chunk. This sounds like it would be very expensive, but given the logical types and basic statistics about a +chunk, it is possible to cheaply prune many encodings and ensure the search space does not explode in size. ### Compute @@ -224,7 +229,7 @@ Expect more details on this in Q4 2024. This project is inspired by and--in some cases--directly based upon the existing, excellent work of many researchers and OSS developers. -In particular, the following academic papers greatly influenced the development: +In particular, the following academic papers have strongly influenced development: * Maximilian Kuschewski, David Sauerwein, Adnan Alhomssi, and Viktor Leis. [BtrBlocks: Efficient Columnar Compression for Data Lakes](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf). @@ -240,12 +245,14 @@ In particular, the following academic papers greatly influenced the development: * Biswapesh Chattopadhyay, Priyam Dutta, Weiran Liu, Ott Tinn, Andrew Mccormick, Aniket Mokashi, Paul Harvey, Hector Gonzalez, David Lomax, Sagar Mittal, et al. [Procella: Unifying serving and analytical data at YouTube](https://dl.acm.org/citation.cfm?id=3360438). PVLDB, 12(12): 2022-2034, 2019. +* Dominik Durner, Viktor Leis, and Thomas Neumann. [Exploiting Cloud Object Storage for High-Performance + Analytics](https://www.durner.dev/app/media/papers/anyblob-vldb23.pdf). PVLDB, 16(11): 2769-2782, 2023. Additionally, we benefited greatly from: -* the existence, ideas, & implementation of [Apache Arrow](https://arrow.apache.org). -* likewise for the excellent [Apache DataFusion](https://github.com/apache/datafusion) project. +* the existence, ideas, & implementations of both [Apache Arrow](https://arrow.apache.org) and + [Apache DataFusion](https://github.com/apache/datafusion). * the [parquet2](https://github.com/jorgecarleitao/parquet2) project by [Jorge Leitao](https://github.com/jorgecarleitao). * the public discussions around choices of compression codecs, as well as the C++ implementations thereof, from [duckdb](https://github.com/duckdb/duckdb). diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index 09500569fe..10b8043fe4 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -4,7 +4,8 @@ use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyList}; use vortex::array::ChunkedArray; -use vortex::compute::take; +use vortex::compute::unary::fill_forward; +use vortex::compute::{slice, take}; use vortex::{Array, ArrayDType, IntoCanonical}; use crate::dtype::PyDType; @@ -138,8 +139,64 @@ impl PyArray { PyDType::wrap(self_.py(), self_.inner.dtype().clone()) } + /// Fill forward non-null values over runs of nulls. + /// + /// Leading nulls are replaced with the "zero" for that type. For integral and floating-point + /// types, this is zero. For the Boolean type, this is `:obj:`False`. + /// + /// Returns + /// ------- + /// :class:`vortex.encoding.Array` + /// + /// Examples + /// -------- + /// + /// Fill forward sensor values over intermediate missing values. Note that leading nulls are + /// replaced with 0.0: + /// + /// >>> a = vortex.encoding.array([ + /// ... None, None, 30.29, 30.30, 30.30, None, None, 30.27, 30.25, + /// ... 30.22, None, None, None, None, 30.12, 30.11, 30.11, 30.11, + /// ... 30.10, 30.08, None, 30.21, 30.03, 30.03, 30.05, 30.07, 30.07, + /// ... ]) + /// >>> a.fill_forward().to_arrow_array() + /// + /// [ + /// 0, + /// 0, + /// 30.29, + /// 30.3, + /// 30.3, + /// 30.3, + /// 30.3, + /// 30.27, + /// 30.25, + /// 30.22, + /// ... + /// 30.11, + /// 30.1, + /// 30.08, + /// 30.08, + /// 30.21, + /// 30.03, + /// 30.03, + /// 30.05, + /// 30.07, + /// 30.07 + /// ] + fn fill_forward(&self) -> PyResult { + fill_forward(&self.inner) + .map_err(PyVortexError::map_err) + .map(|arr| PyArray { inner: arr }) + } + /// Filter, permute, and/or repeat elements by their index. /// + /// Parameters + /// ---------- + /// indices : :class:`vortex.encoding.Array` + /// An array of indices to keep. + /// /// Returns /// ------- /// :class:`vortex.encoding.Array` @@ -186,6 +243,63 @@ impl PyArray { .and_then(|arr| Bound::new(py, PyArray { inner: arr })) } + /// Keep only a contiguous subset of elements. + /// + /// Parameters + /// ---------- + /// start : :class:`int` + /// The start index of the range to keep, inclusive. + /// + /// end : :class:`int` + /// The end index, exclusive. + /// + /// Returns + /// ------- + /// :class:`vortex.encoding.Array` + /// + /// Examples + /// -------- + /// + /// Keep only the second through third elements: + /// + /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) + /// >>> a.slice(1, 3).to_arrow_array() + /// + /// [ + /// "b", + /// "c" + /// ] + /// + /// Keep none of the elements: + /// + /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) + /// >>> a.slice(3, 3).to_arrow_array() + /// + /// [] + /// + /// Unlike Python, it is an error to slice outside the bounds of the array: + /// + /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) + /// >>> a.slice(2, 10).to_arrow_array() + /// Traceback (most recent call last): + /// ... + /// ValueError: index 10 out of bounds from 0 to 4 + /// + /// Or to slice with a negative value: + /// + /// >>> a = vortex.encoding.array(['a', 'b', 'c', 'd']) + /// >>> a.slice(-2, -1).to_arrow_array() + /// Traceback (most recent call last): + /// ... + /// OverflowError: can't convert negative int to unsigned + /// + #[pyo3(signature = (start, end, *))] + fn slice(&self, start: usize, end: usize) -> PyResult { + slice(&self.inner, start, end) + .map(PyArray::new) + .map_err(PyVortexError::map_err) + } + /// Internal technical details about the encoding of this Array. /// /// Warnings diff --git a/vortex-array/src/arrow/dtype.rs b/vortex-array/src/arrow/dtype.rs index b71554bb2b..0bc4253229 100644 --- a/vortex-array/src/arrow/dtype.rs +++ b/vortex-array/src/arrow/dtype.rs @@ -1,10 +1,23 @@ +//! Convert between Vortex [vortex_dtype::DType] and Apache Arrow [arrow_schema::DataType]. +//! +//! Apache Arrow's type system includes physical information, which could lead to ambiguities as +//! Vortex treats encodings as separate from logical types. +//! +//! [`infer_schema`] and its sibling [`infer_data_type`] use a simple algorithm, where every +//! logical type is encoded in its simplest corresponding Arrow type. This reflects the reality that +//! most compute engines don't make use of the entire type range arrow-rs supports. +//! +//! For this reason, it's recommended to do as much computation as possible within Vortex, and then +//! materialize an Arrow ArrayRef at the very end of the processing chain. + use std::sync::Arc; -use arrow_schema::{DataType, Field, SchemaRef}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; use itertools::Itertools; -use vortex_datetime_dtype::arrow::make_temporal_ext_dtype; +use vortex_datetime_dtype::arrow::{make_arrow_temporal_dtype, make_temporal_ext_dtype}; +use vortex_datetime_dtype::is_temporal_ext_type; use vortex_dtype::{DType, Nullability, PType, StructDType}; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::arrow::{FromArrowType, TryFromArrowType}; @@ -91,3 +104,177 @@ impl FromArrowType<&Field> for DType { } } } + +/// Convert a Vortex [struct DType][DType] to an Arrow [Schema]. +pub fn infer_schema(dtype: &DType) -> VortexResult { + let DType::Struct(struct_dtype, nullable) = dtype else { + vortex_bail!("only DType::Struct can be converted to arrow schema"); + }; + + if *nullable != Nullability::NonNullable { + vortex_bail!("top-level struct in Schema must be NonNullable"); + } + + let mut builder = SchemaBuilder::with_capacity(struct_dtype.names().len()); + for (field_name, field_dtype) in struct_dtype + .names() + .iter() + .zip(struct_dtype.dtypes().iter()) + { + builder.push(FieldRef::from(Field::new( + field_name.to_string(), + infer_data_type(field_dtype)?, + field_dtype.is_nullable(), + ))); + } + + Ok(builder.finish()) +} + +pub fn infer_data_type(dtype: &DType) -> VortexResult { + Ok(match dtype { + DType::Null => DataType::Null, + DType::Bool(_) => DataType::Boolean, + DType::Primitive(ptype, _) => match ptype { + PType::U8 => DataType::UInt8, + PType::U16 => DataType::UInt16, + PType::U32 => DataType::UInt32, + PType::U64 => DataType::UInt64, + PType::I8 => DataType::Int8, + PType::I16 => DataType::Int16, + PType::I32 => DataType::Int32, + PType::I64 => DataType::Int64, + PType::F16 => DataType::Float16, + PType::F32 => DataType::Float32, + PType::F64 => DataType::Float64, + }, + DType::Utf8(_) => DataType::Utf8, + DType::Binary(_) => DataType::Binary, + DType::Struct(struct_dtype, _) => { + let mut fields = Vec::with_capacity(struct_dtype.names().len()); + for (field_name, field_dt) in struct_dtype + .names() + .iter() + .zip(struct_dtype.dtypes().iter()) + { + fields.push(FieldRef::from(Field::new( + field_name.to_string(), + infer_data_type(field_dt)?, + field_dt.is_nullable(), + ))); + } + + DataType::Struct(Fields::from(fields)) + } + // There are four kinds of lists: List (32-bit offsets), Large List (64-bit), List View + // (32-bit), Large List View (64-bit). We cannot both guarantee zero-copy and commit to an + // Arrow dtype because we do not how large our offsets are. + DType::List(..) => vortex_bail!("Unsupported dtype: {}", dtype), + DType::Extension(ext_dtype, _) => { + // Try and match against the known extension DTypes. + if is_temporal_ext_type(ext_dtype.id()) { + make_arrow_temporal_dtype(ext_dtype) + } else { + vortex_bail!("Unsupported extension type \"{}\"", ext_dtype.id()) + } + } + }) +} + +#[cfg(test)] +mod test { + use arrow_schema::{DataType, Field, FieldRef, Fields, Schema}; + use vortex_dtype::{ + DType, ExtDType, ExtID, FieldName, FieldNames, Nullability, PType, StructDType, + }; + + use super::*; + + #[test] + fn test_dtype_conversion_success() { + assert_eq!(infer_data_type(&DType::Null).unwrap(), DataType::Null); + + assert_eq!( + infer_data_type(&DType::Bool(Nullability::NonNullable)).unwrap(), + DataType::Boolean + ); + + assert_eq!( + infer_data_type(&DType::Primitive(PType::U64, Nullability::NonNullable)).unwrap(), + DataType::UInt64 + ); + + assert_eq!( + infer_data_type(&DType::Utf8(Nullability::NonNullable)).unwrap(), + DataType::Utf8 + ); + + assert_eq!( + infer_data_type(&DType::Binary(Nullability::NonNullable)).unwrap(), + DataType::Binary + ); + + assert_eq!( + infer_data_type(&DType::Struct( + StructDType::new( + FieldNames::from(vec![FieldName::from("field_a"), FieldName::from("field_b")]), + vec![DType::Bool(false.into()), DType::Utf8(true.into())], + ), + Nullability::NonNullable, + )) + .unwrap(), + DataType::Struct(Fields::from(vec![ + FieldRef::from(Field::new("field_a", DataType::Boolean, false)), + FieldRef::from(Field::new("field_b", DataType::Utf8, true)), + ])) + ); + } + + #[test] + #[should_panic] + fn test_dtype_conversion_panics() { + let _ = infer_data_type(&DType::Extension( + ExtDType::new(ExtID::from("my-fake-ext-dtype"), None), + Nullability::NonNullable, + )) + .unwrap(); + } + + #[test] + fn test_schema_conversion() { + let struct_dtype = the_struct(); + let schema_nonnull = DType::Struct(struct_dtype.clone(), Nullability::NonNullable); + + assert_eq!( + infer_schema(&schema_nonnull).unwrap(), + Schema::new(Fields::from(vec![ + Field::new("field_a", DataType::Boolean, false), + Field::new("field_b", DataType::Utf8, false), + Field::new("field_c", DataType::Int32, true), + ])) + ); + } + + #[test] + #[should_panic] + fn test_schema_conversion_panics() { + let struct_dtype = the_struct(); + let schema_null = DType::Struct(struct_dtype.clone(), Nullability::Nullable); + let _ = infer_schema(&schema_null).unwrap(); + } + + fn the_struct() -> StructDType { + StructDType::new( + FieldNames::from([ + FieldName::from("field_a"), + FieldName::from("field_b"), + FieldName::from("field_c"), + ]), + vec![ + DType::Bool(Nullability::NonNullable), + DType::Utf8(Nullability::NonNullable), + DType::Primitive(PType::I32, Nullability::Nullable), + ], + ) + } +} diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index 926760b553..4db9618e5c 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -1,5 +1,7 @@ use vortex_error::VortexResult; +pub use crate::arrow::dtype::{infer_data_type, infer_schema}; + mod array; mod dtype; mod recordbatch; diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs deleted file mode 100644 index 6848850056..0000000000 --- a/vortex-datafusion/src/datatype.rs +++ /dev/null @@ -1,212 +0,0 @@ -//! Convert between Vortex [vortex_dtype::DType] and Apache Arrow [arrow_schema::DataType]. -//! -//! Apache Arrow's type system includes physical information, which could lead to ambiguities as -//! Vortex treats encodings as separate from logical types. -//! -//! [`infer_schema`] and its sibling [`infer_data_type`] use a simple algorithm, where every -//! logical type is encoded in its simplest corresponding Arrow type. This reflects the reality that -//! most compute engines don't make use of the entire type range arrow-rs supports. -//! -//! For this reason, it's recommended to do as much computation as possible within Vortex, and then -//! materialize an Arrow ArrayRef at the very end of the processing chain. - -use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder}; -use vortex_datetime_dtype::arrow::make_arrow_temporal_dtype; -use vortex_datetime_dtype::is_temporal_ext_type; -use vortex_dtype::{DType, Nullability, PType}; -use vortex_error::vortex_panic; - -/// Convert a Vortex [struct DType][DType] to an Arrow [Schema]. -/// -/// # Panics -/// -/// This function will panic if the provided `dtype` is not a StructDType, or if the struct DType -/// has top-level nullability. -pub(crate) fn infer_schema(dtype: &DType) -> Schema { - let DType::Struct(struct_dtype, nullable) = dtype else { - vortex_panic!("only DType::Struct can be converted to arrow schema"); - }; - - if *nullable != Nullability::NonNullable { - vortex_panic!("top-level struct in Schema must be NonNullable"); - } - - let mut builder = SchemaBuilder::with_capacity(struct_dtype.names().len()); - for (field_name, field_dtype) in struct_dtype - .names() - .iter() - .zip(struct_dtype.dtypes().iter()) - { - builder.push(FieldRef::from(Field::new( - field_name.to_string(), - infer_data_type(field_dtype), - field_dtype.is_nullable(), - ))); - } - - builder.finish() -} - -pub(crate) fn infer_data_type(dtype: &DType) -> DataType { - match dtype { - DType::Null => DataType::Null, - DType::Bool(_) => DataType::Boolean, - DType::Primitive(ptype, _) => match ptype { - PType::U8 => DataType::UInt8, - PType::U16 => DataType::UInt16, - PType::U32 => DataType::UInt32, - PType::U64 => DataType::UInt64, - PType::I8 => DataType::Int8, - PType::I16 => DataType::Int16, - PType::I32 => DataType::Int32, - PType::I64 => DataType::Int64, - PType::F16 => DataType::Float16, - PType::F32 => DataType::Float32, - PType::F64 => DataType::Float64, - }, - DType::Utf8(_) => DataType::Utf8, - DType::Binary(_) => DataType::Binary, - DType::Struct(struct_dtype, _) => { - let mut fields = Vec::with_capacity(struct_dtype.names().len()); - for (field_name, field_dt) in struct_dtype - .names() - .iter() - .zip(struct_dtype.dtypes().iter()) - { - fields.push(FieldRef::from(Field::new( - field_name.to_string(), - infer_data_type(field_dt), - field_dt.is_nullable(), - ))); - } - - DataType::Struct(Fields::from(fields)) - } - DType::List(list_dt, _) => { - let dtype: &DType = list_dt; - DataType::List(FieldRef::from(Field::new( - "element", - infer_data_type(dtype), - dtype.is_nullable(), - ))) - } - DType::Extension(ext_dtype, _) => { - // Try and match against the known extension DTypes. - if is_temporal_ext_type(ext_dtype.id()) { - make_arrow_temporal_dtype(ext_dtype) - } else { - vortex_panic!("Unsupported extension type \"{}\"", ext_dtype.id()) - } - } - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use arrow_schema::{DataType, Field, FieldRef, Fields, Schema}; - use vortex_dtype::{ - DType, ExtDType, ExtID, FieldName, FieldNames, Nullability, PType, StructDType, - }; - - use super::*; - - #[test] - fn test_dtype_conversion_success() { - assert_eq!(infer_data_type(&DType::Null), DataType::Null); - - assert_eq!( - infer_data_type(&DType::Bool(Nullability::NonNullable)), - DataType::Boolean - ); - - assert_eq!( - infer_data_type(&DType::Primitive(PType::U64, Nullability::NonNullable)), - DataType::UInt64 - ); - - assert_eq!( - infer_data_type(&DType::Utf8(Nullability::NonNullable)), - DataType::Utf8 - ); - - assert_eq!( - infer_data_type(&DType::Binary(Nullability::NonNullable)), - DataType::Binary - ); - - assert_eq!( - infer_data_type(&DType::List( - Arc::new(DType::Bool(Nullability::NonNullable)), - Nullability::Nullable, - )), - DataType::List(FieldRef::from(Field::new( - "element".to_string(), - DataType::Boolean, - false, - ))) - ); - - assert_eq!( - infer_data_type(&DType::Struct( - StructDType::new( - FieldNames::from(vec![FieldName::from("field_a"), FieldName::from("field_b")]), - vec![DType::Bool(false.into()), DType::Utf8(true.into())], - ), - Nullability::NonNullable, - )), - DataType::Struct(Fields::from(vec![ - FieldRef::from(Field::new("field_a", DataType::Boolean, false)), - FieldRef::from(Field::new("field_b", DataType::Utf8, true)), - ])) - ); - } - - #[test] - #[should_panic] - fn test_dtype_conversion_panics() { - let _ = infer_data_type(&DType::Extension( - ExtDType::new(ExtID::from("my-fake-ext-dtype"), None), - Nullability::NonNullable, - )); - } - - #[test] - fn test_schema_conversion() { - let struct_dtype = the_struct(); - let schema_nonnull = DType::Struct(struct_dtype.clone(), Nullability::NonNullable); - - assert_eq!( - infer_schema(&schema_nonnull), - Schema::new(Fields::from(vec![ - Field::new("field_a", DataType::Boolean, false), - Field::new("field_b", DataType::Utf8, false), - Field::new("field_c", DataType::Int32, true), - ])) - ); - } - - #[test] - #[should_panic] - fn test_schema_conversion_panics() { - let struct_dtype = the_struct(); - let schema_null = DType::Struct(struct_dtype.clone(), Nullability::Nullable); - let _ = infer_schema(&schema_null); - } - - fn the_struct() -> StructDType { - StructDType::new( - FieldNames::from([ - FieldName::from("field_a"), - FieldName::from("field_b"), - FieldName::from("field_c"), - ]), - vec![ - DType::Bool(Nullability::NonNullable), - DType::Utf8(Nullability::NonNullable), - DType::Primitive(PType::I32, Nullability::Nullable), - ], - ) - } -} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index cfb501ad42..e731766d18 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -30,7 +30,6 @@ use crate::statistics::chunked_array_df_stats; pub mod memory; pub mod persistent; -mod datatype; mod plans; mod statistics; diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index 68d2ae0f21..257677c8d8 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -13,12 +13,12 @@ use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties}; use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}; use itertools::Itertools; use vortex::array::ChunkedArray; +use vortex::arrow::infer_schema; use vortex::{Array, ArrayDType as _}; use vortex_error::{VortexError, VortexExpect as _}; use vortex_expr::datafusion::convert_expr_to_vortex; use vortex_expr::VortexExpr; -use crate::datatype::infer_schema; use crate::plans::{RowSelectorExec, TakeRowsExec}; use crate::{can_be_pushed_down, VortexScanExec}; @@ -40,7 +40,7 @@ impl VortexMemTable { /// /// Creation will panic if the provided array is not of `DType::Struct` type. pub fn new(array: Array, options: VortexMemTableOptions) -> Self { - let arrow_schema = infer_schema(array.dtype()); + let arrow_schema = infer_schema(array.dtype()).vortex_expect("schema is inferable"); let schema_ref = SchemaRef::new(arrow_schema); let array = match ChunkedArray::try_from(&array) { diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 44a607de14..92097ddb6a 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -386,13 +386,13 @@ mod test { use datafusion_physical_expr::create_physical_expr; use itertools::Itertools; use vortex::array::{BoolArray, ChunkedArray, PrimitiveArray, StructArray}; + use vortex::arrow::infer_schema; use vortex::validity::Validity; use vortex::{ArrayDType, IntoArray}; use vortex_dtype::field::Field; use vortex_dtype::FieldName; use vortex_expr::datafusion::convert_expr_to_vortex; - use crate::datatype::infer_schema; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; #[tokio::test] @@ -414,7 +414,7 @@ mod test { let chunked_array = ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap(); - let schema = infer_schema(chunk.dtype()); + let schema = infer_schema(chunk.dtype()).unwrap(); let logical_expr = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); let df_expr = create_physical_expr( &logical_expr,