Skip to content

Commit

Permalink
Expression proto serde (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Jun 12, 2024
1 parent 74dceca commit 3f8282b
Show file tree
Hide file tree
Showing 19 changed files with 207 additions and 71 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-array/benches/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rand::{thread_rng, Rng};
use vortex::array::bool::BoolArray;
use vortex::IntoArray;
use vortex_error::VortexError;
use vortex_expr::operators::Operator;
use vortex_expr::Operator;

fn filter_bool_indices(c: &mut Criterion) {
let mut group = c.benchmark_group("compare");
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/benches/filter_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use rand::{thread_rng, Rng};
use vortex::IntoArray;
use vortex_dtype::field_paths::FieldPath;
use vortex_error::VortexError;
use vortex_expr::expressions::{lit, Conjunction, Disjunction};
use vortex_expr::field_paths::FieldPathOperations;
use vortex_expr::FieldPathOperations;
use vortex_expr::{lit, Conjunction, Disjunction};

fn filter_indices(c: &mut Criterion) {
let mut group = c.benchmark_group("filter_indices");
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::{BitAnd, BitOr, BitXor, Not};

use vortex_error::VortexResult;
use vortex_expr::operators::Operator;
use vortex_expr::Operator;

use crate::array::bool::BoolArray;
use crate::compute::compare::CompareFn;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/primitive/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::BitAnd;
use arrow_buffer::BooleanBuffer;
use vortex_dtype::{match_each_native_ptype, NativePType};
use vortex_error::VortexResult;
use vortex_expr::operators::Operator;
use vortex_expr::Operator;

use crate::array::bool::BoolArray;
use crate::array::primitive::PrimitiveArray;
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/array/primitive/compute/filter_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::{BitAnd, BitOr};
use arrow_buffer::BooleanBuffer;
use vortex_dtype::{match_each_native_ptype, NativePType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_expr::expressions::{Disjunction, Predicate, Value};
use vortex_expr::{Disjunction, Predicate, Value};

use crate::array::bool::BoolArray;
use crate::array::primitive::PrimitiveArray;
Expand Down Expand Up @@ -71,8 +71,8 @@ fn apply_predicate<T: NativePType, F: Fn(&T, &T) -> bool>(
mod test {
use itertools::Itertools;
use vortex_dtype::field_paths::FieldPathBuilder;
use vortex_expr::expressions::{lit, Conjunction};
use vortex_expr::field_paths::FieldPathOperations;
use vortex_expr::FieldPathOperations;
use vortex_expr::{lit, Conjunction};

use super::*;
use crate::validity::Validity;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_expr::operators::Operator;
use vortex_expr::Operator;

use crate::{Array, ArrayDType};

Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/compute/filter_indices.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_expr::expressions::Disjunction;
use vortex_expr::Disjunction;

use crate::{Array, ArrayDType};

Expand Down
2 changes: 1 addition & 1 deletion vortex-dtype/proto/vortex/dtype/dtype.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ message Extension {
}

message DType {
oneof type {
oneof dtype_type {
Null null = 1;
Bool bool = 2;
Primitive primitive = 3;
Expand Down
14 changes: 14 additions & 0 deletions vortex-dtype/proto/vortex/dtype/field_path.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package vortex.dtype;

message FieldPath {
repeated Part parts = 1;

message Part {
oneof part_type {
string name = 1;
int32 index = 2;
}
}
}
35 changes: 16 additions & 19 deletions vortex-dtype/src/field_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt::{Display, Formatter};
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct FieldPath {
field_names: Vec<FieldIdentifier>,
parts: Vec<FieldIdentifier>,
}

impl FieldPath {
Expand All @@ -13,20 +13,20 @@ impl FieldPath {
}

pub fn head(&self) -> Option<&FieldIdentifier> {
self.field_names.first()
self.parts.first()
}

pub fn tail(&self) -> Option<Self> {
if self.head().is_none() {
None
} else {
let new_field_names = self.field_names[1..self.field_names.len()].to_vec();
Some(Self::builder().join_all(new_field_names).build())
let new_parts = self.parts[1..self.parts.len()].to_vec();
Some(Self::builder().join_all(new_parts).build())
}
}

pub fn parts(&self) -> &[FieldIdentifier] {
&self.field_names
&self.parts
}
}

Expand All @@ -38,31 +38,30 @@ pub enum FieldIdentifier {
}

pub struct FieldPathBuilder {
field_names: Vec<FieldIdentifier>,
parts: Vec<FieldIdentifier>,
}

impl FieldPathBuilder {
pub fn new() -> Self {
Self {
field_names: Vec::new(),
}
Self { parts: Vec::new() }
}

pub fn push<T: Into<FieldIdentifier>>(&mut self, identifier: T) {
self.parts.push(identifier.into());
}

pub fn join<T: Into<FieldIdentifier>>(mut self, identifier: T) -> Self {
self.field_names.push(identifier.into());
self.push(identifier);
self
}

pub fn join_all(mut self, identifiers: Vec<impl Into<FieldIdentifier>>) -> Self {
self.field_names
.extend(identifiers.into_iter().map(|v| v.into()));
self.parts.extend(identifiers.into_iter().map(|v| v.into()));
self
}

pub fn build(self) -> FieldPath {
FieldPath {
field_names: self.field_names,
}
FieldPath { parts: self.parts }
}
}

Expand All @@ -78,9 +77,7 @@ pub fn field(x: impl Into<FieldIdentifier>) -> FieldPath {

impl From<FieldIdentifier> for FieldPath {
fn from(value: FieldIdentifier) -> Self {
FieldPath {
field_names: vec![value],
}
FieldPath { parts: vec![value] }
}
}

Expand Down Expand Up @@ -108,7 +105,7 @@ impl Display for FieldIdentifier {
impl Display for FieldPath {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let formatted = self
.field_names
.parts
.iter()
.map(|fid| format!("{fid}"))
.collect::<Vec<_>>()
Expand Down
61 changes: 41 additions & 20 deletions vortex-dtype/src/serde/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,27 @@ use std::sync::Arc;

use vortex_error::{vortex_err, VortexError, VortexResult};

use crate::proto::dtype::d_type::Type;
use crate::field_paths::{FieldPath, FieldPathBuilder};
use crate::proto::dtype::d_type::DtypeType;
use crate::proto::dtype::field_path::part::PartType;
use crate::{proto::dtype as pb, DType, ExtDType, ExtID, ExtMetadata, PType, StructDType};

impl TryFrom<&pb::DType> for DType {
type Error = VortexError;

fn try_from(value: &pb::DType) -> Result<Self, Self::Error> {
match value
.r#type
.dtype_type
.as_ref()
.ok_or_else(|| vortex_err!(InvalidSerde: "Unrecognized DType"))?
{
Type::Null(_) => Ok(Self::Null),
Type::Bool(b) => Ok(Self::Bool(b.nullable.into())),
Type::Primitive(p) => Ok(Self::Primitive(p.r#type().into(), p.nullable.into())),
Type::Decimal(_) => todo!("Not Implemented"),
Type::Utf8(u) => Ok(Self::Utf8(u.nullable.into())),
Type::Binary(b) => Ok(Self::Binary(b.nullable.into())),
Type::Struct(s) => Ok(Self::Struct(
DtypeType::Null(_) => Ok(Self::Null),
DtypeType::Bool(b) => Ok(Self::Bool(b.nullable.into())),
DtypeType::Primitive(p) => Ok(Self::Primitive(p.r#type().into(), p.nullable.into())),
DtypeType::Decimal(_) => todo!("Not Implemented"),
DtypeType::Utf8(u) => Ok(Self::Utf8(u.nullable.into())),
DtypeType::Binary(b) => Ok(Self::Binary(b.nullable.into())),
DtypeType::Struct(s) => Ok(Self::Struct(
StructDType::new(
s.names.iter().map(|s| s.as_str().into()).collect(),
s.dtypes
Expand All @@ -32,7 +34,7 @@ impl TryFrom<&pb::DType> for DType {
),
s.nullable.into(),
)),
Type::List(l) => {
DtypeType::List(l) => {
let nullable = l.nullable.into();
Ok(Self::List(
l.element_type
Expand All @@ -44,7 +46,7 @@ impl TryFrom<&pb::DType> for DType {
nullable,
))
}
Type::Extension(e) => Ok(Self::Extension(
DtypeType::Extension(e) => Ok(Self::Extension(
ExtDType::new(
ExtID::from(e.id.as_str()),
e.metadata.as_ref().map(|m| ExtMetadata::from(m.as_ref())),
Expand All @@ -58,31 +60,31 @@ impl TryFrom<&pb::DType> for DType {
impl From<&DType> for pb::DType {
fn from(value: &DType) -> Self {
Self {
r#type: Some(match value {
DType::Null => Type::Null(pb::Null {}),
DType::Bool(n) => Type::Bool(pb::Bool {
dtype_type: Some(match value {
DType::Null => DtypeType::Null(pb::Null {}),
DType::Bool(n) => DtypeType::Bool(pb::Bool {
nullable: (*n).into(),
}),
DType::Primitive(ptype, n) => Type::Primitive(pb::Primitive {
DType::Primitive(ptype, n) => DtypeType::Primitive(pb::Primitive {
r#type: pb::PType::from(*ptype).into(),
nullable: (*n).into(),
}),
DType::Utf8(n) => Type::Utf8(pb::Utf8 {
DType::Utf8(n) => DtypeType::Utf8(pb::Utf8 {
nullable: (*n).into(),
}),
DType::Binary(n) => Type::Binary(pb::Binary {
DType::Binary(n) => DtypeType::Binary(pb::Binary {
nullable: (*n).into(),
}),
DType::Struct(s, n) => Type::Struct(pb::Struct {
DType::Struct(s, n) => DtypeType::Struct(pb::Struct {
names: s.names().iter().map(|s| s.as_ref().to_string()).collect(),
dtypes: s.dtypes().iter().map(Into::into).collect(),
nullable: (*n).into(),
}),
DType::List(l, n) => Type::List(Box::new(pb::List {
DType::List(l, n) => DtypeType::List(Box::new(pb::List {
element_type: Some(Box::new(l.as_ref().into())),
nullable: (*n).into(),
})),
DType::Extension(e, n) => Type::Extension(pb::Extension {
DType::Extension(e, n) => DtypeType::Extension(pb::Extension {
id: e.id().as_ref().into(),
metadata: e.metadata().map(|m| m.as_ref().into()),
nullable: (*n).into(),
Expand Down Expand Up @@ -129,3 +131,22 @@ impl From<PType> for pb::PType {
}
}
}

impl TryFrom<&pb::FieldPath> for FieldPath {
type Error = VortexError;

fn try_from(value: &pb::FieldPath) -> Result<Self, Self::Error> {
let mut builder = FieldPathBuilder::new();
for part in value.parts.iter() {
match part
.part_type
.as_ref()
.ok_or_else(|| vortex_err!(InvalidSerde: "FieldPath part missing type"))?
{
PartType::Name(name) => builder.push(name.as_str()),
PartType::Index(idx) => builder.push(*idx as u64),
}
}
Ok(builder.build())
}
}
7 changes: 5 additions & 2 deletions vortex-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ workspace = true
[dependencies]
datafusion-common = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
prost = { workspace = true, optional = true }
vortex-dtype = { path = "../vortex-dtype" }
vortex-error = { path = "../vortex-error" }
vortex-scalar = { path = "../vortex-scalar" }
serde = { workspace = true, optional = true, features = ["derive"] }

[dev-dependencies]
[build-dependencies]
build-vortex = { path = "../build-vortex" }

[features]
default = []
default = ["proto"]
datafusion = ["dep:datafusion-common", "dep:datafusion-expr", "vortex-scalar/datafusion"]
proto = ["dep:prost", "vortex-dtype/proto", "vortex-scalar/proto"]
serde = ["dep:serde", "vortex-dtype/serde", "vortex-scalar/serde"]
3 changes: 3 additions & 0 deletions vortex-expr/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub fn main() {
build_vortex::build();
}
33 changes: 33 additions & 0 deletions vortex-expr/proto/vortex/expr/expr.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package vortex.expr;

import "vortex/dtype/field_path.proto";
import "vortex/scalar/scalar.proto";

message Disjunction {
repeated Conjunction conjunctions = 1;
}

message Conjunction {
repeated Predicate predicates = 1;
}

message Predicate {
vortex.dtype.FieldPath left = 1;
Operator op = 2;
oneof right {
vortex.dtype.FieldPath field = 3;
vortex.scalar.Scalar scalar = 4;
}
}

enum Operator {
UNKNOWN = 0;
EQ = 1;
NEQ = 2;
LT = 3;
LTE = 4;
GT = 5;
GTE = 6;
}
Loading

0 comments on commit 3f8282b

Please sign in to comment.