From 6e9d779a693b2041fa5c481ddd6e861652fab4e5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Fri, 6 Dec 2024 16:40:00 -0500 Subject: [PATCH] feat: add into_arrow to IntoCanonicalVTable (#1604) Historically, we've gated the ability to go from Vortex -> Arrow arrays behind the `Canonical` type, which picks one "blessed" Arrow encoding for each of our DTypes. Since the introduction of VarBinView in #1082, we are in a position where there are now 2 Vortex string encodings that can each be directly converted to Arrow. What's more, FSSTArray internally uses a `VarBin` array to encode the FSST-compressed strings. It delegates in its CompareFn implementation to running a comparison against the values, which are `VarBin` that will use the default `compare` codepath which does `into_canonical()?.into_arrow()?` and then uses the Arrow codec. This is slow now, because VarBin.into_canonical() will iterate over all the strings to build a canonical `VarBinView`. This requires a full decompress which makes the pushdown pointless. This PR augments the existing `IntoCanonicalVTable` allowing encodings to implement their own `into_arrow()` method. The default continues to call `into_canonical().into_arrow()`, but we implement a fast version for VarBin. --- bench-vortex/benches/compress.rs | 2 +- bench-vortex/src/lib.rs | 4 +-- pyvortex/src/array.rs | 8 ++---- vortex-array/src/array/varbin/flatten.rs | 7 +++++ vortex-array/src/arrow/datum.rs | 4 +-- vortex-array/src/canonical.rs | 28 +++++++++++-------- vortex-array/src/compute/filter.rs | 2 +- vortex-array/src/encoding/opaque.rs | 8 ++++++ vortex-datafusion/src/memory/plans.rs | 6 ++-- .../src/persistent/statistics.rs | 8 +++--- vortex-ipc/src/stream_writer/tests.rs | 2 +- 11 files changed, 47 insertions(+), 32 deletions(-) diff --git a/bench-vortex/benches/compress.rs b/bench-vortex/benches/compress.rs index 2cfd12d7df..ab7016958d 100644 --- a/bench-vortex/benches/compress.rs +++ b/bench-vortex/benches/compress.rs @@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult = chunked_array .chunks() - .map(|chunk| -> PyResult { - let canonical = chunk.into_canonical()?; - Ok(canonical.into_arrow()?) - }) + .map(|chunk| -> PyResult { Ok(chunk.into_arrow()?) }) .collect::>>()?; if chunks.is_empty() { return Err(PyValueError::new_err("No chunks in array")); @@ -145,8 +142,7 @@ impl PyArray { } else { Ok(vortex .clone() - .into_canonical() - .and_then(|arr| arr.into_arrow())? + .into_arrow()? .into_data() .to_pyarrow(py)? .into_bound(py)) diff --git a/vortex-array/src/array/varbin/flatten.rs b/vortex-array/src/array/varbin/flatten.rs index 67a4d12505..b8245fb11f 100644 --- a/vortex-array/src/array/varbin/flatten.rs +++ b/vortex-array/src/array/varbin/flatten.rs @@ -1,3 +1,4 @@ +use arrow_array::ArrayRef; use arrow_schema::DataType; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -21,6 +22,12 @@ impl IntoCanonical for VarBinArray { VarBinViewArray::try_from(ArrayData::from_arrow(array, nullable)).map(Canonical::VarBinView) } + + fn into_arrow(self) -> VortexResult { + // Specialized implementation of `into_arrow` for VarBin since it has a direct + // Arrow representation. + varbin_to_arrow(&self) + } } #[cfg(test)] diff --git a/vortex-array/src/arrow/datum.rs b/vortex-array/src/arrow/datum.rs index 4dca544bf5..b0d0ba08a6 100644 --- a/vortex-array/src/arrow/datum.rs +++ b/vortex-array/src/arrow/datum.rs @@ -21,12 +21,12 @@ impl TryFrom for Datum { .unwrap_or_default() { Ok(Self { - array: slice(array, 0, 1)?.into_canonical()?.into_arrow()?, + array: slice(array, 0, 1)?.into_arrow()?, is_scalar: true, }) } else { Ok(Self { - array: array.into_canonical()?.into_arrow()?, + array: array.into_arrow()?, is_scalar: false, }) } diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 11d4f0ca15..3cbeb0debc 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -84,7 +84,7 @@ impl Canonical { // Convert storage array directly into arrow, losing type information // that will let us round-trip. // TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167 - a.storage().into_canonical()?.into_arrow()? + a.storage().into_arrow()? } } }) @@ -234,7 +234,7 @@ fn list_to_arrow(list: ListArray) -> VortexResult { list.validity().nullability().into(), )); - let values = list.elements().into_canonical()?.into_arrow()?; + let values = list.elements().into_arrow()?; let nulls = list.logical_validity().to_null_buffer()?; Ok(match offsets.ptype() { @@ -330,7 +330,7 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult { }) } -/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. +/// Support trait for transmuting an array into the canonical encoding for its [vortex_dtype::DType]. /// /// This conversion ensures that the array's encoding matches one of the builtin canonical /// encodings, each of which has a corresponding [Canonical] variant. @@ -340,12 +340,21 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult { /// The DType of the array will be unchanged by canonicalization. pub trait IntoCanonical { fn into_canonical(self) -> VortexResult; + + fn into_arrow(self) -> VortexResult + where + Self: Sized, + { + self.into_canonical()?.into_arrow() + } } /// Encoding VTable for canonicalizing an array. #[allow(clippy::wrong_self_convention)] pub trait IntoCanonicalVTable { fn into_canonical(&self, array: ArrayData) -> VortexResult; + + fn into_arrow(&self, array: ArrayData) -> VortexResult; } /// Implement the [IntoCanonicalVTable] for all encodings with arrays implementing [IntoCanonical]. @@ -359,6 +368,10 @@ where canonical.inherit_statistics(data.statistics()); Ok(canonical) } + + fn into_arrow(&self, array: ArrayData) -> VortexResult { + E::Array::try_from(array)?.into_arrow() + } } /// Trait for types that can be converted from an owned type into an owned array variant. @@ -525,8 +538,6 @@ mod test { .unwrap(); let arrow_struct = nested_struct_array - .into_canonical() - .unwrap() .into_arrow() .unwrap() .as_any() @@ -597,12 +608,7 @@ mod test { assert_eq!( &arrow_struct, - vortex_struct - .into_canonical() - .unwrap() - .into_arrow() - .unwrap() - .as_struct() + vortex_struct.into_arrow().unwrap().as_struct() ); } } diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index 21ecf6fb7f..5c951ccba6 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -102,7 +102,7 @@ pub fn filter(array: &ArrayData, mask: FilterMask) -> VortexResult { array.encoding().id(), ); - let array_ref = array.clone().into_canonical()?.into_arrow()?; + let array_ref = array.clone().into_arrow()?; let mask_array = BooleanArray::new(mask.to_boolean_buffer()?, None); let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?; diff --git a/vortex-array/src/encoding/opaque.rs b/vortex-array/src/encoding/opaque.rs index 6b00b97809..b25c0763cd 100644 --- a/vortex-array/src/encoding/opaque.rs +++ b/vortex-array/src/encoding/opaque.rs @@ -2,6 +2,7 @@ use std::any::Any; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; +use arrow_array::ArrayRef; use vortex_error::{vortex_bail, vortex_panic, VortexResult}; use crate::compute::ComputeVTable; @@ -47,6 +48,13 @@ impl IntoCanonicalVTable for OpaqueEncoding { self.0 ) } + + fn into_arrow(&self, _array: ArrayData) -> VortexResult { + vortex_bail!( + "OpaqueEncoding: into_arrow cannot be called for opaque array ({})", + self.0 + ) + } } impl ComputeVTable for OpaqueEncoding {} diff --git a/vortex-datafusion/src/memory/plans.rs b/vortex-datafusion/src/memory/plans.rs index c999849299..1a05c7640e 100644 --- a/vortex-datafusion/src/memory/plans.rs +++ b/vortex-datafusion/src/memory/plans.rs @@ -160,7 +160,6 @@ impl Stream for RowIndicesStream { .conjunction_expr .evaluate(vortex_struct.as_ref()) .map_err(|e| DataFusionError::External(e.into()))? - .into_canonical()? .into_arrow()?; // Convert the `selection` BooleanArray into a UInt64Array of indices. @@ -349,9 +348,8 @@ where // We should find a way to avoid decoding the filter columns and only decode the other // columns, then stitch the StructArray back together from those. let projected_for_output = chunk.project(this.output_projection)?; - let decoded = take(projected_for_output, &row_indices, TakeOptions::default())? - .into_canonical()? - .into_arrow()?; + let decoded = + take(projected_for_output, &row_indices, TakeOptions::default())?.into_arrow()?; // Send back a single record batch of the decoded data. let output_batch = RecordBatch::from(decoded.as_struct()); diff --git a/vortex-datafusion/src/persistent/statistics.rs b/vortex-datafusion/src/persistent/statistics.rs index 99409676db..3113a755be 100644 --- a/vortex-datafusion/src/persistent/statistics.rs +++ b/vortex-datafusion/src/persistent/statistics.rs @@ -14,7 +14,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult(); let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::(); @@ -22,7 +22,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult VortexResult VortexResult> { match array.field_by_name(Stat::UncompressedSizeInBytes.name()) { None => Ok(None), Some(array) => { - let array = array.into_canonical()?.into_arrow()?; + let array = array.into_arrow()?; let array = array.as_primitive::(); let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::(); diff --git a/vortex-ipc/src/stream_writer/tests.rs b/vortex-ipc/src/stream_writer/tests.rs index bf3f4c6f60..15d32bad5d 100644 --- a/vortex-ipc/src/stream_writer/tests.rs +++ b/vortex-ipc/src/stream_writer/tests.rs @@ -34,6 +34,6 @@ async fn broken_data() { .collect_chunked() .await .unwrap(); - let round_tripped = arr.into_canonical().unwrap().into_arrow().unwrap(); + let round_tripped = arr.into_arrow().unwrap(); assert_eq!(&arrow_arr, round_tripped.as_primitive::()); }