From f59072554679a03c804b29ad7afa58cce89f02ad Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 16 Sep 2024 14:06:58 +0100 Subject: [PATCH] Reorder row filters (#825) Reorder filtering expressions by very roughly estimated cost, this change is intended as a building block for more advanced IO pruning. --- Cargo.lock | 1 + vortex-expr/Cargo.toml | 15 ++++++- vortex-expr/src/expr.rs | 36 ++++++++++++++++- vortex-schema/src/lib.rs | 17 +++++++- vortex-serde/src/layouts/read/batch.rs | 8 ++-- vortex-serde/src/layouts/read/buffered.rs | 6 +-- vortex-serde/src/layouts/read/builder.rs | 10 ++++- vortex-serde/src/layouts/read/filtering.rs | 46 ++++++++++++++++++++-- vortex-serde/src/layouts/read/footer.rs | 4 +- vortex-serde/src/layouts/read/layouts.rs | 12 +++--- vortex-serde/src/layouts/read/mod.rs | 4 +- vortex-serde/src/layouts/read/stream.rs | 6 +-- vortex-serde/src/message_reader.rs | 3 +- 13 files changed, 138 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7ef62324c..e358c8d643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4475,6 +4475,7 @@ dependencies = [ "vortex-error", "vortex-proto", "vortex-scalar", + "vortex-schema", ] [[package]] diff --git a/vortex-expr/Cargo.toml b/vortex-expr/Cargo.toml index c364fdf761..dfaf023f56 100644 --- a/vortex-expr/Cargo.toml +++ b/vortex-expr/Cargo.toml @@ -31,8 +31,19 @@ vortex-dtype = { workspace = true } vortex-error = { workspace = true } vortex-proto = { workspace = true, optional = true } vortex-scalar = { workspace = true } +vortex-schema = { workspace = true } [features] -datafusion = ["dep:datafusion-common", "dep:datafusion-expr", "dep:datafusion-physical-expr", "dep:arrow-schema", "vortex-scalar/datafusion"] -proto = ["dep:prost", "vortex-dtype/proto", "vortex-scalar/proto", "vortex-proto/expr"] +datafusion = [ + "dep:datafusion-common", + "dep:datafusion-expr", + "dep:datafusion-physical-expr", + "vortex-scalar/datafusion", +] +proto = [ + "dep:prost", + "vortex-dtype/proto", + "vortex-scalar/proto", + "vortex-proto/expr", +] serde = ["dep:serde", "vortex-dtype/serde", "vortex-scalar/serde"] diff --git a/vortex-expr/src/expr.rs b/vortex-expr/src/expr.rs index 1164349208..b2db523d6e 100644 --- a/vortex-expr/src/expr.rs +++ b/vortex-expr/src/expr.rs @@ -8,17 +8,24 @@ use vortex::compute::{compare, Operator as ArrayOperator}; use vortex::variants::StructArrayTrait; use vortex::{Array, IntoArray}; use vortex_dtype::field::Field; -use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult, VortexUnwrap}; use vortex_scalar::Scalar; +use vortex_schema::Schema; use crate::Operator; +const NON_PRIMITIVE_COST_ESTIMATE: usize = 64; +const COLUMN_COST_MULTIPLIER: usize = 1024; + pub trait VortexExpr: Debug + Send + Sync + PartialEq { fn as_any(&self) -> &dyn Any; fn evaluate(&self, array: &Array) -> VortexResult; fn references(&self) -> HashSet; + + fn estimate_cost(&self, schema: &Schema) -> usize; } // Taken from apache-datafusion, necessary since you can't require VortexExpr implement PartialEq @@ -110,6 +117,12 @@ impl VortexExpr for Column { fn references(&self) -> HashSet { HashSet::from([self.field.clone()]) } + + fn estimate_cost(&self, schema: &Schema) -> usize { + let field_dtype = schema.field_type(self.field()).vortex_unwrap(); + + dtype_cost_estimate(&field_dtype) * COLUMN_COST_MULTIPLIER + } } impl PartialEq for Column { @@ -144,6 +157,10 @@ impl VortexExpr for Literal { fn references(&self) -> HashSet { HashSet::new() } + + fn estimate_cost(&self, _schema: &Schema) -> usize { + dtype_cost_estimate(self.value.dtype()) + } } impl PartialEq for Literal { @@ -183,6 +200,10 @@ impl VortexExpr for BinaryExpr { res.extend(self.rhs.references()); res } + + fn estimate_cost(&self, schema: &Schema) -> usize { + self.lhs.estimate_cost(schema) + self.rhs.estimate_cost(schema) + } } impl PartialEq for BinaryExpr { @@ -206,6 +227,10 @@ impl VortexExpr for NoOp { fn references(&self) -> HashSet { HashSet::new() } + + fn estimate_cost(&self, _schema: &Schema) -> usize { + 0 + } } impl PartialEq for NoOp { @@ -213,3 +238,12 @@ impl PartialEq for NoOp { unbox_any(other).downcast_ref::().is_some() } } + +fn dtype_cost_estimate(dtype: &DType) -> usize { + match dtype { + vortex_dtype::DType::Null => 0, + vortex_dtype::DType::Bool(_) => 1, + vortex_dtype::DType::Primitive(p, _) => p.byte_width(), + _ => NON_PRIMITIVE_COST_ESTIMATE, + } +} diff --git a/vortex-schema/src/lib.rs b/vortex-schema/src/lib.rs index 98266e8523..ebc3c36adc 100644 --- a/vortex-schema/src/lib.rs +++ b/vortex-schema/src/lib.rs @@ -1,5 +1,6 @@ +use vortex_dtype::field::Field; use vortex_dtype::DType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use self::projection::Projection; @@ -29,6 +30,20 @@ impl Schema { pub fn dtype(&self) -> &DType { &self.0 } + + pub fn field_type(&self, field: &Field) -> VortexResult { + let DType::Struct(s, _) = &self.0 else { + vortex_bail!("Can't project non struct types") + }; + + let idx = match field { + Field::Name(name) => s.find_name(name), + Field::Index(i) => Some(*i), + }; + + idx.and_then(|idx| s.dtypes().get(idx).cloned()) + .ok_or_else(|| vortex_err!("Couldn't find struct by {field}")) + } } impl From for DType { diff --git a/vortex-serde/src/layouts/read/batch.rs b/vortex-serde/src/layouts/read/batch.rs index c48b0c2ff9..dfe824cbba 100644 --- a/vortex-serde/src/layouts/read/batch.rs +++ b/vortex-serde/src/layouts/read/batch.rs @@ -24,7 +24,7 @@ impl BatchReader { } } - pub fn read(&mut self) -> VortexResult> { + pub(crate) fn read(&mut self) -> VortexResult> { let mut messages = Vec::new(); for (i, child_array) in self .arrays @@ -32,9 +32,9 @@ impl BatchReader { .enumerate() .filter(|(_, a)| a.is_none()) { - match self.children[i].read()? { + match self.children[i].read_next()? { Some(rr) => match rr { - ReadResult::GetMsgs(message) => { + ReadResult::ReadMore(message) => { messages.extend(message); } ReadResult::Batch(a) => *child_array = Some(a), @@ -60,7 +60,7 @@ impl BatchReader { .into_array(), ))); } else { - Ok(Some(ReadResult::GetMsgs(messages))) + Ok(Some(ReadResult::ReadMore(messages))) } } } diff --git a/vortex-serde/src/layouts/read/buffered.rs b/vortex-serde/src/layouts/read/buffered.rs index 61e342ca33..bb5a3176c3 100644 --- a/vortex-serde/src/layouts/read/buffered.rs +++ b/vortex-serde/src/layouts/read/buffered.rs @@ -34,11 +34,11 @@ impl BufferedReader { fn buffer(&mut self) -> VortexResult> { while self.buffered_row_count() < self.batch_size { if let Some(mut layout) = self.layouts.pop_front() { - let read = layout.read()?; + let read = layout.read_next()?; if let Some(rr) = read { self.layouts.push_front(layout); match rr { - g @ ReadResult::GetMsgs(..) => { + g @ ReadResult::ReadMore(..) => { return Ok(Some(g)); } ReadResult::Batch(a) => self.arrays.push_back(a), @@ -60,7 +60,7 @@ impl BufferedReader { if let Some(rr) = self.buffer()? { match rr { - g @ ReadResult::GetMsgs(..) => return Ok(Some(g)), + g @ ReadResult::ReadMore(..) => return Ok(Some(g)), ReadResult::Batch(_) => { unreachable!("Batches should be handled inside the buffer call") } diff --git a/vortex-serde/src/layouts/read/builder.rs b/vortex-serde/src/layouts/read/builder.rs index ccbfd56f02..3ef829d46c 100644 --- a/vortex-serde/src/layouts/read/builder.rs +++ b/vortex-serde/src/layouts/read/builder.rs @@ -6,6 +6,7 @@ use vortex::{Array, ArrayDType}; use vortex_dtype::field::Field; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::projection::Projection; +use vortex_schema::Schema; use crate::io::VortexReadAt; use crate::layouts::read::cache::{LayoutMessageCache, RelativeLayoutCache}; @@ -76,7 +77,7 @@ impl LayoutReaderBuilder { let (read_projection, result_projection) = if let Some(filter_columns) = self .row_filter .as_ref() - .map(|f| f.filter.references()) + .map(|f| f.references()) .filter(|refs| !refs.is_empty()) .map(|refs| footer.resolve_references(&refs.into_iter().collect::>())) .transpose()? @@ -108,10 +109,15 @@ impl LayoutReaderBuilder { Projection::Flat(projection) => footer.projected_dtype(projection)?, }; + let filter = self.row_filter.map(|f| { + let schema = Schema::new(projected_dtype.clone()); + f.reorder(&schema) + }); + let scan = Scan { projection: read_projection, indices: self.indices, - filter: self.row_filter, + filter, batch_size, }; diff --git a/vortex-serde/src/layouts/read/filtering.rs b/vortex-serde/src/layouts/read/filtering.rs index b4c6e18c87..0d06ad7386 100644 --- a/vortex-serde/src/layouts/read/filtering.rs +++ b/vortex-serde/src/layouts/read/filtering.rs @@ -1,12 +1,17 @@ +use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; -use vortex_dtype::field::FieldPath; -use vortex_expr::VortexExpr; +use vortex::Array; +use vortex_dtype::field::{Field, FieldPath}; +use vortex_error::VortexResult; +use vortex_expr::{BinaryExpr, VortexExpr}; + +use crate::layouts::Schema; #[derive(Debug, Clone)] pub struct RowFilter { - pub(crate) filter: Arc, + filter: Arc, } impl RowFilter { @@ -14,7 +19,42 @@ impl RowFilter { Self { filter } } + // Evaluate the underlying filter against a target array, returning a boolean mask + pub fn evaluate(&self, target: &Array) -> VortexResult { + self.filter.evaluate(target) + } + + /// Returns a set of all referenced fields in the underlying filter + pub fn references(&self) -> HashSet { + self.filter.references() + } + pub fn project(&self, _fields: &[FieldPath]) -> Self { todo!() } + + /// Re-order the expression so the sub-expressions estimated to be the "cheapest" are first (to the left of the expression) + pub fn reorder(mut self, schema: &Schema) -> RowFilter { + let expr = reorder_expr_impl(self.filter.clone(), schema); + self.filter = expr; + self + } +} + +fn reorder_expr_impl(expr: Arc, schema: &Schema) -> Arc { + if let Some(binary) = expr.as_any().downcast_ref::() { + let lhs = reorder_expr_impl(binary.lhs().clone(), schema); + let rhs = reorder_expr_impl(binary.rhs().clone(), schema); + + let (lhs, rhs, operator) = + if binary.lhs().estimate_cost(schema) > binary.rhs().estimate_cost(schema) { + (rhs, lhs, binary.op().swap()) + } else { + (lhs, rhs, binary.op()) + }; + + Arc::new(BinaryExpr::new(lhs, operator, rhs)) + } else { + expr + } } diff --git a/vortex-serde/src/layouts/read/footer.rs b/vortex-serde/src/layouts/read/footer.rs index 99d4f08ebf..7a6d7409ab 100644 --- a/vortex-serde/src/layouts/read/footer.rs +++ b/vortex-serde/src/layouts/read/footer.rs @@ -44,11 +44,11 @@ pub struct Footer { } impl Footer { - pub fn leftovers_layout_offset(&self) -> usize { + fn leftovers_layout_offset(&self) -> usize { (self.layout_offset - self.leftovers_offset) as usize } - pub fn leftovers_schema_offset(&self) -> usize { + fn leftovers_schema_offset(&self) -> usize { (self.schema_offset - self.leftovers_offset) as usize } diff --git a/vortex-serde/src/layouts/read/layouts.rs b/vortex-serde/src/layouts/read/layouts.rs index 23087a0b41..981bb8c242 100644 --- a/vortex-serde/src/layouts/read/layouts.rs +++ b/vortex-serde/src/layouts/read/layouts.rs @@ -45,11 +45,11 @@ impl FlatLayout { } impl Layout for FlatLayout { - fn read(&mut self) -> VortexResult> { + fn read_next(&mut self) -> VortexResult> { match self.state { FlatLayoutState::Init => { self.state = FlatLayoutState::ReadBatch; - Ok(Some(ReadResult::GetMsgs(vec![( + Ok(Some(ReadResult::ReadMore(vec![( self.cache.absolute_id(&[]), self.range, )]))) @@ -177,7 +177,7 @@ impl ColumnLayout { } impl Layout for ColumnLayout { - fn read(&mut self) -> VortexResult> { + fn read_next(&mut self) -> VortexResult> { match &mut self.state { ColumnLayoutState::Init => { let DType::Struct(s, ..) = self.message_cache.dtype() else { @@ -210,7 +210,7 @@ impl Layout for ColumnLayout { let reader = BatchReader::new(s.names().clone(), column_layouts); self.state = ColumnLayoutState::ReadColumns(reader); - self.read() + self.read_next() } ColumnLayoutState::ReadColumns(br) => br.read(), } @@ -297,7 +297,7 @@ impl ChunkedLayout { } impl Layout for ChunkedLayout { - fn read(&mut self) -> VortexResult> { + fn read_next(&mut self) -> VortexResult> { match &mut self.state { ChunkedLayoutState::Init => { let children = self @@ -320,7 +320,7 @@ impl Layout for ChunkedLayout { .collect::>>()?; let reader = BufferedReader::new(children, self.scan.batch_size); self.state = ChunkedLayoutState::ReadChunks(reader); - self.read() + self.read_next() } ChunkedLayoutState::ReadChunks(cr) => cr.read(), } diff --git a/vortex-serde/src/layouts/read/mod.rs b/vortex-serde/src/layouts/read/mod.rs index c62a29db48..3285278883 100644 --- a/vortex-serde/src/layouts/read/mod.rs +++ b/vortex-serde/src/layouts/read/mod.rs @@ -43,7 +43,7 @@ pub type MessageId = Vec; #[derive(Debug)] pub enum ReadResult { - GetMsgs(Vec<(MessageId, ByteRange)>), + ReadMore(Vec<(MessageId, ByteRange)>), Batch(Array), } @@ -55,7 +55,7 @@ pub trait Layout: Debug + Send { /// and then call back into this function. /// /// The layout is finished reading when it returns None - fn read(&mut self) -> VortexResult>; + fn read_next(&mut self) -> VortexResult>; // TODO(robert): Support stats pruning via planning. Requires propagating all the metadata // to top level and then pushing down the result of it diff --git a/vortex-serde/src/layouts/read/stream.rs b/vortex-serde/src/layouts/read/stream.rs index dd09bec253..1a017fc009 100644 --- a/vortex-serde/src/layouts/read/stream.rs +++ b/vortex-serde/src/layouts/read/stream.rs @@ -100,9 +100,9 @@ impl Stream for LayoutBatchStream { loop { match &mut self.state { StreamingState::Init => { - if let Some(read) = self.layout.read()? { + if let Some(read) = self.layout.read_next()? { match read { - ReadResult::GetMsgs(messages) => { + ReadResult::ReadMore(messages) => { let reader = mem::take(&mut self.reader) .ok_or_else(|| vortex_err!("Invalid state transition"))?; let read_future = read_ranges(reader, messages).boxed(); @@ -121,7 +121,7 @@ impl Stream for LayoutBatchStream { } if let Some(row_filter) = &self.scan.filter { - let mask = row_filter.filter.evaluate(&batch)?; + let mask = row_filter.evaluate(&batch)?; let filter_array = null_as_false(mask.into_bool()?)?; batch = filter(&batch, &filter_array)?; } diff --git a/vortex-serde/src/message_reader.rs b/vortex-serde/src/message_reader.rs index 4f088eecc5..0144c3ce2c 100644 --- a/vortex-serde/src/message_reader.rs +++ b/vortex-serde/src/message_reader.rs @@ -78,7 +78,7 @@ impl MessageReader { async fn next(&mut self) -> VortexResult { if self.finished { - vortex_bail!("Reader is finished, should've checked peek!") + vortex_bail!("Reader is finished, should've peeked!") } self.prev_message = self.message.split(); if !self.load_next_message().await? { @@ -314,6 +314,7 @@ impl ArrayBufferReader { .ok_or_else(|| vortex_err!("Checked in previous step")) } + /// Produce the array buffered in the reader pub fn into_array(self, ctx: Arc, dtype: DType) -> VortexResult { let length = self.fb_bytes_as_batch()?.length() as usize; let fb_msg = self