Skip to content

Commit

Permalink
VortexScanExec reports statistics to datafusion (#909)
Browse files Browse the repository at this point in the history
This accounts for majority (~25-30%) of 40% difference in Q22. I guess
the rest is our overhead of converting to arrow
  • Loading branch information
robert3005 authored Sep 23, 2024
1 parent 71f699d commit 77d155e
Showing 1 changed file with 65 additions and 9 deletions.
74 changes: 65 additions & 9 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult};
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_datafusion_err, ColumnStatistics, DataFusionError, Result as DFResult, ScalarValue,
Statistics,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{Expr, Operator};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::Stream;
use itertools::Itertools;
use memory::{VortexMemTable, VortexMemTableOptions};
use persistent::config::VortexTableOptions;
use persistent::provider::VortexFileTableProvider;
use vortex::array::ChunkedArray;
use vortex::stats::{ArrayStatistics, Stat};
use vortex::{Array, ArrayDType, IntoArrayVariant};
use vortex_dtype::field::Field;
use vortex_error::vortex_err;
use vortex_error::{vortex_err, VortexExpect, VortexResult};

pub mod memory;
pub mod persistent;
Expand Down Expand Up @@ -209,23 +215,21 @@ pub(crate) struct VortexRecordBatchStream {
impl Stream for VortexRecordBatchStream {
type Item = DFResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.idx >= this.num_chunks {
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.idx >= self.num_chunks {
return Poll::Ready(None);
}

// Grab next chunk, project and convert to Arrow.
let chunk = this.chunks.chunk(this.idx)?;
this.idx += 1;
let chunk = self.chunks.chunk(self.idx)?;
self.idx += 1;

let struct_array = chunk
.clone()
.into_struct()
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

let projected_struct = struct_array
.project(&this.projection)
.project(&self.projection)
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;
Expand Down Expand Up @@ -288,4 +292,56 @@ impl ExecutionPlan for VortexScanExec {
.collect(),
}))
}

fn statistics(&self) -> DFResult<Statistics> {
let mut nbytes: usize = 0;
let column_statistics = self.array.as_ref().with_dyn(|a| {
let struct_arr = a
.as_struct_array()
.ok_or_else(|| vortex_err!("Not a struct array"))?;
self.scan_projection
.iter()
.map(|i| {
struct_arr
.field(*i)
.ok_or_else(|| vortex_err!("Projection references unknown field {i}"))
})
.map_ok(|arr| {
nbytes += arr.nbytes();
ColumnStatistics {
null_count: arr
.statistics()
.get_as::<u64>(Stat::NullCount)
.map(|n| n as usize)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
max_value: arr
.statistics()
.get(Stat::Max)
.map(|n| {
ScalarValue::try_from(n)
.vortex_expect("cannot convert scalar to df scalar")
})
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
min_value: arr
.statistics()
.get(Stat::Min)
.map(|n| {
ScalarValue::try_from(n)
.vortex_expect("cannot convert scalar to df scalar")
})
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
})
.collect::<VortexResult<Vec<_>>>()
})?;
Ok(Statistics {
num_rows: Precision::Exact(self.array.len()),
total_byte_size: Precision::Exact(nbytes),
column_statistics,
})
}
}

0 comments on commit 77d155e

Please sign in to comment.