diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index d893fd242f..a42c5ddd31 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,18 +12,24 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema, SchemaRef}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::{DataFrame, SessionContext}; -use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; +use datafusion_common::stats::Precision; +use datafusion_common::{ + exec_datafusion_err, ColumnStatistics, DataFusionError, Result as DFResult, ScalarValue, + Statistics, +}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{Expr, Operator}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; +use itertools::Itertools; use memory::{VortexMemTable, VortexMemTableOptions}; use persistent::config::VortexTableOptions; use persistent::provider::VortexFileTableProvider; use vortex::array::ChunkedArray; +use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDType, IntoArrayVariant}; use vortex_dtype::field::Field; -use vortex_error::vortex_err; +use vortex_error::{vortex_err, VortexExpect, VortexResult}; pub mod memory; pub mod persistent; @@ -209,23 +215,21 @@ pub(crate) struct VortexRecordBatchStream { impl Stream for VortexRecordBatchStream { type Item = DFResult; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - if this.idx >= this.num_chunks { + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.idx >= self.num_chunks { return Poll::Ready(None); } // Grab next chunk, project and convert to Arrow. - let chunk = this.chunks.chunk(this.idx)?; - this.idx += 1; + let chunk = self.chunks.chunk(self.idx)?; + self.idx += 1; let struct_array = chunk - .clone() .into_struct() .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; let projected_struct = struct_array - .project(&this.projection) + .project(&self.projection) .map_err(|vortex_err| { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; @@ -288,4 +292,56 @@ impl ExecutionPlan for VortexScanExec { .collect(), })) } + + fn statistics(&self) -> DFResult { + let mut nbytes: usize = 0; + let column_statistics = self.array.as_ref().with_dyn(|a| { + let struct_arr = a + .as_struct_array() + .ok_or_else(|| vortex_err!("Not a struct array"))?; + self.scan_projection + .iter() + .map(|i| { + struct_arr + .field(*i) + .ok_or_else(|| vortex_err!("Projection references unknown field {i}")) + }) + .map_ok(|arr| { + nbytes += arr.nbytes(); + ColumnStatistics { + null_count: arr + .statistics() + .get_as::(Stat::NullCount) + .map(|n| n as usize) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + max_value: arr + .statistics() + .get(Stat::Max) + .map(|n| { + ScalarValue::try_from(n) + .vortex_expect("cannot convert scalar to df scalar") + }) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + min_value: arr + .statistics() + .get(Stat::Min) + .map(|n| { + ScalarValue::try_from(n) + .vortex_expect("cannot convert scalar to df scalar") + }) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + distinct_count: Precision::Absent, + } + }) + .collect::>>() + })?; + Ok(Statistics { + num_rows: Precision::Exact(self.array.len()), + total_byte_size: Precision::Exact(nbytes), + column_statistics, + }) + } }