Skip to content

Commit

Permalink
Add method for converting VortexExpr into equivalent pruning expressi…
Browse files Browse the repository at this point in the history
…on (#701)

Pruning expression when evaluated on the statistics of the block of data
will tell us if the block MIGHT contain matching values.
  • Loading branch information
robert3005 authored Aug 30, 2024
1 parent 7c017cb commit cdfb190
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 33 deletions.
2 changes: 1 addition & 1 deletion vortex-expr/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub fn convert_expr_to_vortex(
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
let expr = Column::new(col_expr.name().to_owned());
let expr = Column::from(col_expr.name().to_owned());

return Ok(Arc::new(expr) as _);
}
Expand Down
127 changes: 106 additions & 21 deletions vortex-expr/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -12,46 +13,85 @@ use vortex_scalar::Scalar;

use crate::Operator;

pub trait VortexExpr: Debug + Send + Sync {
pub trait VortexExpr: Debug + Send + Sync + PartialEq<dyn Any> {
fn as_any(&self) -> &dyn Any;

fn evaluate(&self, array: &Array) -> VortexResult<Array>;

fn references(&self) -> HashSet<Field>;
}

#[derive(Debug)]
// Taken from apache-datafusion, necessary since you can't require VortexExpr implement PartialEq<dyn VortexExpr>
fn unbox_any(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn VortexExpr>>() {
any.downcast_ref::<Arc<dyn VortexExpr>>().unwrap().as_any()
} else if any.is::<Box<dyn VortexExpr>>() {
any.downcast_ref::<Box<dyn VortexExpr>>().unwrap().as_any()
} else {
any
}
}

#[derive(Debug, PartialEq, Hash, Clone)]
pub struct NoOp;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BinaryExpr {
left: Arc<dyn VortexExpr>,
right: Arc<dyn VortexExpr>,
lhs: Arc<dyn VortexExpr>,
operator: Operator,
rhs: Arc<dyn VortexExpr>,
}

impl BinaryExpr {
pub fn new(left: Arc<dyn VortexExpr>, operator: Operator, right: Arc<dyn VortexExpr>) -> Self {
Self {
left,
right,
operator,
}
pub fn new(lhs: Arc<dyn VortexExpr>, operator: Operator, rhs: Arc<dyn VortexExpr>) -> Self {
Self { lhs, rhs, operator }
}

pub fn lhs(&self) -> &Arc<dyn VortexExpr> {
&self.lhs
}

pub fn rhs(&self) -> &Arc<dyn VortexExpr> {
&self.rhs
}

pub fn op(&self) -> Operator {
self.operator
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Hash, Clone)]
pub struct Column {
field: Field,
}

impl Column {
pub fn new(field: String) -> Self {
Self {
field: Field::from(field),
}
pub fn new(field: Field) -> Self {
Self { field }
}

pub fn field(&self) -> &Field {
&self.field
}
}

impl From<String> for Column {
fn from(value: String) -> Self {
Column::new(value.into())
}
}

impl From<usize> for Column {
fn from(value: usize) -> Self {
Column::new(value.into())
}
}

impl VortexExpr for Column {
fn as_any(&self) -> &dyn Any {
self
}

fn evaluate(&self, array: &Array) -> VortexResult<Array> {
let s = StructArray::try_from(array)?;

Expand All @@ -68,7 +108,16 @@ impl VortexExpr for Column {
}
}

#[derive(Debug)]
impl PartialEq<dyn Any> for Column {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other)
.downcast_ref::<Self>()
.map(|x| x == self)
.unwrap_or(false)
}
}

#[derive(Debug, PartialEq)]
pub struct Literal {
value: Scalar,
}
Expand All @@ -80,6 +129,10 @@ impl Literal {
}

impl VortexExpr for Literal {
fn as_any(&self) -> &dyn Any {
self
}

fn evaluate(&self, array: &Array) -> VortexResult<Array> {
Ok(ConstantArray::new(self.value.clone(), array.len()).into_array())
}
Expand All @@ -89,10 +142,23 @@ impl VortexExpr for Literal {
}
}

impl PartialEq<dyn Any> for Literal {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other)
.downcast_ref::<Self>()
.map(|x| x == self)
.unwrap_or(false)
}
}

impl VortexExpr for BinaryExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn evaluate(&self, array: &Array) -> VortexResult<Array> {
let lhs = self.left.evaluate(array)?;
let rhs = self.right.evaluate(array)?;
let lhs = self.lhs.evaluate(array)?;
let rhs = self.rhs.evaluate(array)?;

let array = match self.operator {
Operator::Eq => compare(&lhs, &rhs, ArrayOperator::Eq)?,
Expand All @@ -109,13 +175,26 @@ impl VortexExpr for BinaryExpr {
}

fn references(&self) -> HashSet<Field> {
let mut res = self.left.references();
res.extend(self.right.references());
let mut res = self.lhs.references();
res.extend(self.rhs.references());
res
}
}

impl PartialEq<dyn Any> for BinaryExpr {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other)
.downcast_ref::<Self>()
.map(|x| x.operator == self.operator && x.lhs.eq(&self.lhs) && x.rhs.eq(&self.rhs))
.unwrap_or(false)
}
}

impl VortexExpr for NoOp {
fn as_any(&self) -> &dyn Any {
self
}

fn evaluate(&self, _array: &Array) -> VortexResult<Array> {
vortex_bail!("NoOp::evaluate() should not be called")
}
Expand All @@ -124,3 +203,9 @@ impl VortexExpr for NoOp {
HashSet::new()
}
}

impl PartialEq<dyn Any> for NoOp {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other).downcast_ref::<Self>().is_some()
}
}
2 changes: 1 addition & 1 deletion vortex-expr/src/operators.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use std::fmt::{Display, Formatter};

#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Operator {
// comparison
Expand Down
1 change: 1 addition & 0 deletions vortex-serde/src/layouts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod read;
mod write;

mod pruning;
#[cfg(test)]
mod tests;

Expand Down
Loading

0 comments on commit cdfb190

Please sign in to comment.