Skip to content

Commit

Permalink
Reorder row filters (#825)
Browse files Browse the repository at this point in the history
Reorder filtering expressions by very roughly estimated cost, this
change is intended as a building block for more advanced IO pruning.
  • Loading branch information
AdamGS authored Sep 16, 2024
1 parent cd6ccd8 commit f590725
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions vortex-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,19 @@ vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-proto = { workspace = true, optional = true }
vortex-scalar = { workspace = true }
vortex-schema = { workspace = true }

[features]
datafusion = ["dep:datafusion-common", "dep:datafusion-expr", "dep:datafusion-physical-expr", "dep:arrow-schema", "vortex-scalar/datafusion"]
proto = ["dep:prost", "vortex-dtype/proto", "vortex-scalar/proto", "vortex-proto/expr"]
datafusion = [
"dep:datafusion-common",
"dep:datafusion-expr",
"dep:datafusion-physical-expr",
"vortex-scalar/datafusion",
]
proto = [
"dep:prost",
"vortex-dtype/proto",
"vortex-scalar/proto",
"vortex-proto/expr",
]
serde = ["dep:serde", "vortex-dtype/serde", "vortex-scalar/serde"]
36 changes: 35 additions & 1 deletion vortex-expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ use vortex::compute::{compare, Operator as ArrayOperator};
use vortex::variants::StructArrayTrait;
use vortex::{Array, IntoArray};
use vortex_dtype::field::Field;
use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult, VortexUnwrap};
use vortex_scalar::Scalar;
use vortex_schema::Schema;

use crate::Operator;

const NON_PRIMITIVE_COST_ESTIMATE: usize = 64;
const COLUMN_COST_MULTIPLIER: usize = 1024;

pub trait VortexExpr: Debug + Send + Sync + PartialEq<dyn Any> {
fn as_any(&self) -> &dyn Any;

fn evaluate(&self, array: &Array) -> VortexResult<Array>;

fn references(&self) -> HashSet<Field>;

fn estimate_cost(&self, schema: &Schema) -> usize;
}

// Taken from apache-datafusion, necessary since you can't require VortexExpr implement PartialEq<dyn VortexExpr>
Expand Down Expand Up @@ -110,6 +117,12 @@ impl VortexExpr for Column {
fn references(&self) -> HashSet<Field> {
HashSet::from([self.field.clone()])
}

fn estimate_cost(&self, schema: &Schema) -> usize {
let field_dtype = schema.field_type(self.field()).vortex_unwrap();

dtype_cost_estimate(&field_dtype) * COLUMN_COST_MULTIPLIER
}
}

impl PartialEq<dyn Any> for Column {
Expand Down Expand Up @@ -144,6 +157,10 @@ impl VortexExpr for Literal {
fn references(&self) -> HashSet<Field> {
HashSet::new()
}

fn estimate_cost(&self, _schema: &Schema) -> usize {
dtype_cost_estimate(self.value.dtype())
}
}

impl PartialEq<dyn Any> for Literal {
Expand Down Expand Up @@ -183,6 +200,10 @@ impl VortexExpr for BinaryExpr {
res.extend(self.rhs.references());
res
}

fn estimate_cost(&self, schema: &Schema) -> usize {
self.lhs.estimate_cost(schema) + self.rhs.estimate_cost(schema)
}
}

impl PartialEq<dyn Any> for BinaryExpr {
Expand All @@ -206,10 +227,23 @@ impl VortexExpr for NoOp {
fn references(&self) -> HashSet<Field> {
HashSet::new()
}

fn estimate_cost(&self, _schema: &Schema) -> usize {
0
}
}

impl PartialEq<dyn Any> for NoOp {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other).downcast_ref::<Self>().is_some()
}
}

fn dtype_cost_estimate(dtype: &DType) -> usize {
match dtype {
vortex_dtype::DType::Null => 0,
vortex_dtype::DType::Bool(_) => 1,
vortex_dtype::DType::Primitive(p, _) => p.byte_width(),
_ => NON_PRIMITIVE_COST_ESTIMATE,
}
}
17 changes: 16 additions & 1 deletion vortex-schema/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use vortex_dtype::field::Field;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use self::projection::Projection;

Expand Down Expand Up @@ -29,6 +30,20 @@ impl Schema {
pub fn dtype(&self) -> &DType {
&self.0
}

pub fn field_type(&self, field: &Field) -> VortexResult<DType> {
let DType::Struct(s, _) = &self.0 else {
vortex_bail!("Can't project non struct types")
};

let idx = match field {
Field::Name(name) => s.find_name(name),
Field::Index(i) => Some(*i),
};

idx.and_then(|idx| s.dtypes().get(idx).cloned())
.ok_or_else(|| vortex_err!("Couldn't find struct by {field}"))
}
}

impl From<Schema> for DType {
Expand Down
8 changes: 4 additions & 4 deletions vortex-serde/src/layouts/read/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ impl BatchReader {
}
}

pub fn read(&mut self) -> VortexResult<Option<ReadResult>> {
pub(crate) fn read(&mut self) -> VortexResult<Option<ReadResult>> {
let mut messages = Vec::new();
for (i, child_array) in self
.arrays
.iter_mut()
.enumerate()
.filter(|(_, a)| a.is_none())
{
match self.children[i].read()? {
match self.children[i].read_next()? {
Some(rr) => match rr {
ReadResult::GetMsgs(message) => {
ReadResult::ReadMore(message) => {
messages.extend(message);
}
ReadResult::Batch(a) => *child_array = Some(a),
Expand All @@ -60,7 +60,7 @@ impl BatchReader {
.into_array(),
)));
} else {
Ok(Some(ReadResult::GetMsgs(messages)))
Ok(Some(ReadResult::ReadMore(messages)))
}
}
}
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ impl BufferedReader {
fn buffer(&mut self) -> VortexResult<Option<ReadResult>> {
while self.buffered_row_count() < self.batch_size {
if let Some(mut layout) = self.layouts.pop_front() {
let read = layout.read()?;
let read = layout.read_next()?;
if let Some(rr) = read {
self.layouts.push_front(layout);
match rr {
g @ ReadResult::GetMsgs(..) => {
g @ ReadResult::ReadMore(..) => {
return Ok(Some(g));
}
ReadResult::Batch(a) => self.arrays.push_back(a),
Expand All @@ -60,7 +60,7 @@ impl BufferedReader {

if let Some(rr) = self.buffer()? {
match rr {
g @ ReadResult::GetMsgs(..) => return Ok(Some(g)),
g @ ReadResult::ReadMore(..) => return Ok(Some(g)),
ReadResult::Batch(_) => {
unreachable!("Batches should be handled inside the buffer call")
}
Expand Down
10 changes: 8 additions & 2 deletions vortex-serde/src/layouts/read/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex::{Array, ArrayDType};
use vortex_dtype::field::Field;
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::projection::Projection;
use vortex_schema::Schema;

use crate::io::VortexReadAt;
use crate::layouts::read::cache::{LayoutMessageCache, RelativeLayoutCache};
Expand Down Expand Up @@ -76,7 +77,7 @@ impl<R: VortexReadAt> LayoutReaderBuilder<R> {
let (read_projection, result_projection) = if let Some(filter_columns) = self
.row_filter
.as_ref()
.map(|f| f.filter.references())
.map(|f| f.references())
.filter(|refs| !refs.is_empty())
.map(|refs| footer.resolve_references(&refs.into_iter().collect::<Vec<_>>()))
.transpose()?
Expand Down Expand Up @@ -108,10 +109,15 @@ impl<R: VortexReadAt> LayoutReaderBuilder<R> {
Projection::Flat(projection) => footer.projected_dtype(projection)?,
};

let filter = self.row_filter.map(|f| {
let schema = Schema::new(projected_dtype.clone());
f.reorder(&schema)
});

let scan = Scan {
projection: read_projection,
indices: self.indices,
filter: self.row_filter,
filter,
batch_size,
};

Expand Down
46 changes: 43 additions & 3 deletions vortex-serde/src/layouts/read/filtering.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,60 @@
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use vortex_dtype::field::FieldPath;
use vortex_expr::VortexExpr;
use vortex::Array;
use vortex_dtype::field::{Field, FieldPath};
use vortex_error::VortexResult;
use vortex_expr::{BinaryExpr, VortexExpr};

use crate::layouts::Schema;

#[derive(Debug, Clone)]
pub struct RowFilter {
pub(crate) filter: Arc<dyn VortexExpr>,
filter: Arc<dyn VortexExpr>,
}

impl RowFilter {
pub fn new(filter: Arc<dyn VortexExpr>) -> Self {
Self { filter }
}

// Evaluate the underlying filter against a target array, returning a boolean mask
pub fn evaluate(&self, target: &Array) -> VortexResult<Array> {
self.filter.evaluate(target)
}

/// Returns a set of all referenced fields in the underlying filter
pub fn references(&self) -> HashSet<Field> {
self.filter.references()
}

pub fn project(&self, _fields: &[FieldPath]) -> Self {
todo!()
}

/// Re-order the expression so the sub-expressions estimated to be the "cheapest" are first (to the left of the expression)
pub fn reorder(mut self, schema: &Schema) -> RowFilter {
let expr = reorder_expr_impl(self.filter.clone(), schema);
self.filter = expr;
self
}
}

fn reorder_expr_impl(expr: Arc<dyn VortexExpr>, schema: &Schema) -> Arc<dyn VortexExpr> {
if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
let lhs = reorder_expr_impl(binary.lhs().clone(), schema);
let rhs = reorder_expr_impl(binary.rhs().clone(), schema);

let (lhs, rhs, operator) =
if binary.lhs().estimate_cost(schema) > binary.rhs().estimate_cost(schema) {
(rhs, lhs, binary.op().swap())
} else {
(lhs, rhs, binary.op())
};

Arc::new(BinaryExpr::new(lhs, operator, rhs))
} else {
expr
}
}
4 changes: 2 additions & 2 deletions vortex-serde/src/layouts/read/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ pub struct Footer {
}

impl Footer {
pub fn leftovers_layout_offset(&self) -> usize {
fn leftovers_layout_offset(&self) -> usize {
(self.layout_offset - self.leftovers_offset) as usize
}

pub fn leftovers_schema_offset(&self) -> usize {
fn leftovers_schema_offset(&self) -> usize {
(self.schema_offset - self.leftovers_offset) as usize
}

Expand Down
12 changes: 6 additions & 6 deletions vortex-serde/src/layouts/read/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl FlatLayout {
}

impl Layout for FlatLayout {
fn read(&mut self) -> VortexResult<Option<ReadResult>> {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match self.state {
FlatLayoutState::Init => {
self.state = FlatLayoutState::ReadBatch;
Ok(Some(ReadResult::GetMsgs(vec![(
Ok(Some(ReadResult::ReadMore(vec![(
self.cache.absolute_id(&[]),
self.range,
)])))
Expand Down Expand Up @@ -177,7 +177,7 @@ impl ColumnLayout {
}

impl Layout for ColumnLayout {
fn read(&mut self) -> VortexResult<Option<ReadResult>> {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ColumnLayoutState::Init => {
let DType::Struct(s, ..) = self.message_cache.dtype() else {
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Layout for ColumnLayout {

let reader = BatchReader::new(s.names().clone(), column_layouts);
self.state = ColumnLayoutState::ReadColumns(reader);
self.read()
self.read_next()
}
ColumnLayoutState::ReadColumns(br) => br.read(),
}
Expand Down Expand Up @@ -297,7 +297,7 @@ impl ChunkedLayout {
}

impl Layout for ChunkedLayout {
fn read(&mut self) -> VortexResult<Option<ReadResult>> {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ChunkedLayoutState::Init => {
let children = self
Expand All @@ -320,7 +320,7 @@ impl Layout for ChunkedLayout {
.collect::<VortexResult<VecDeque<_>>>()?;
let reader = BufferedReader::new(children, self.scan.batch_size);
self.state = ChunkedLayoutState::ReadChunks(reader);
self.read()
self.read_next()
}
ChunkedLayoutState::ReadChunks(cr) => cr.read(),
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-serde/src/layouts/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub type MessageId = Vec<LayoutPartId>;

#[derive(Debug)]
pub enum ReadResult {
GetMsgs(Vec<(MessageId, ByteRange)>),
ReadMore(Vec<(MessageId, ByteRange)>),
Batch(Array),
}

Expand All @@ -55,7 +55,7 @@ pub trait Layout: Debug + Send {
/// and then call back into this function.
///
/// The layout is finished reading when it returns None
fn read(&mut self) -> VortexResult<Option<ReadResult>>;
fn read_next(&mut self) -> VortexResult<Option<ReadResult>>;

// TODO(robert): Support stats pruning via planning. Requires propagating all the metadata
// to top level and then pushing down the result of it
Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for LayoutBatchStream<R> {
loop {
match &mut self.state {
StreamingState::Init => {
if let Some(read) = self.layout.read()? {
if let Some(read) = self.layout.read_next()? {
match read {
ReadResult::GetMsgs(messages) => {
ReadResult::ReadMore(messages) => {
let reader = mem::take(&mut self.reader)
.ok_or_else(|| vortex_err!("Invalid state transition"))?;
let read_future = read_ranges(reader, messages).boxed();
Expand All @@ -121,7 +121,7 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for LayoutBatchStream<R> {
}

if let Some(row_filter) = &self.scan.filter {
let mask = row_filter.filter.evaluate(&batch)?;
let mask = row_filter.evaluate(&batch)?;
let filter_array = null_as_false(mask.into_bool()?)?;
batch = filter(&batch, &filter_array)?;
}
Expand Down
Loading

0 comments on commit f590725

Please sign in to comment.