Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
DAG creation first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 24, 2024
1 parent da5647b commit fe38e4b
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 488 deletions.
1 change: 1 addition & 0 deletions eggstrain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arrow = "50"
async-trait = "0.1"
datafusion = "35"
datafusion-common = "35"
futures = "0.3"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
Expand Down
3 changes: 0 additions & 3 deletions eggstrain/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
pub mod operators;
pub mod query_dag;

#[cfg(test)]
mod tests;
76 changes: 0 additions & 76 deletions eggstrain/src/execution/operators/forward_toy.rs

This file was deleted.

16 changes: 10 additions & 6 deletions eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use async_trait::async_trait;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};

pub mod forward_toy;
pub mod order_by;
pub mod project;

pub trait Operator {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
}

#[async_trait]
pub(crate) trait UnaryOperator: Send {
pub(crate) trait UnaryOperator: Operator + Send + Sync {
type In;
type Out;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>>;
fn into_unary(self) -> Arc<dyn UnaryOperator<In = Self::In, Out = Self::Out>>;

async fn execute(&self, rx: Receiver<Self::In>, tx: Sender<Self::Out>);
}

#[async_trait]
pub(crate) trait BinaryOperator: Send {
pub(crate) trait BinaryOperator: Operator + Send + Sync {
type InLeft;
type InRight;
type Out;

fn into_binary(
self,
) -> Box<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>;
) -> Arc<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>;

async fn execute(
&self,
Expand Down
102 changes: 0 additions & 102 deletions eggstrain/src/execution/operators/order_by.rs

This file was deleted.

86 changes: 46 additions & 40 deletions eggstrain/src/execution/operators/project.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
use super::*;
use arrow::{
array::RecordBatch,
datatypes::{DataType, Field, Fields, Schema},
};
use super::{Operator, UnaryOperator};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use async_trait::async_trait;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::{projection::ProjectionExec, ExecutionPlan};
use datafusion_common::Result;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;

pub struct Project {
pub schema: Arc<Schema>,
/// TODO:
/// https://docs.rs/substrait/latest/substrait/proto/struct.ProjectRel.html
/// Need to make these expressions rather than a bunch of columns
pub expressions: Vec<usize>,
pub output_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
pub input_schema: SchemaRef, // TODO
pub output_schema: SchemaRef,
pub children: Vec<Arc<dyn ExecutionPlan>>,
}

impl Project {
pub fn new(_schema: Arc<Schema>, expressions: Vec<usize>) -> Self {
// TODO placeholder
let field_a = Field::new("a", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a]));
let res = Self {
schema,
expressions,
};
pub fn new(input_schema: SchemaRef, projection_plan: &ProjectionExec) -> Self {
Self {
output_expr: Vec::from(projection_plan.expr()),
input_schema,
output_schema: projection_plan.schema(),
children: projection_plan.children(),
}
}

let fields = res.schema.fields();
fn apply_projection(&self, rb: RecordBatch) -> Result<RecordBatch> {
assert_eq!(rb.schema(), self.input_schema);

debug_assert!(res.is_valid_projection(fields));
res
}
let num_rows = rb.num_rows();

fn is_valid_projection(&self, fields: &Fields) -> bool {
self.expressions.iter().all(|&col| col < fields.len())
}
let mut columns = Vec::with_capacity(self.output_expr.len());

fn project_record_batch(&self, batch: RecordBatch) -> RecordBatch {
let schema = batch.schema();
for (expr, name) in &self.output_expr {
let col_val = expr.evaluate(&rb).expect("expr.evaluate() fails");
let column = col_val.into_array(num_rows)?;
columns.push((name, column, expr.nullable(&self.input_schema)?));
}

let projected = self
.expressions
.iter()
.map(|&i| (schema.field(i).name(), batch.column(i).clone()));
Ok(RecordBatch::try_from_iter_with_nullable(columns)?)
}
}

RecordBatch::try_from_iter(projected).expect("Unable to create the RecordBatch")
impl Operator for Project {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
self.children.clone()
}
}

Expand All @@ -52,19 +53,24 @@ impl UnaryOperator for Project {
type In = RecordBatch;
type Out = RecordBatch;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Box::new(self)
fn into_unary(self) -> Arc<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Arc::new(self)
}

async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) {
// For now assume that each record batch has the same type

async fn execute(
&self,
mut rx: broadcast::Receiver<Self::In>,
tx: broadcast::Sender<Self::Out>,
) {
loop {
match rx.recv().await {
Ok(batch) => {
debug_assert!(batch.schema() == self.schema, "RecordBatch {:?} does not have the correct schema. Schema is {:?}, supposed to be {:?}", batch, batch.schema(), self.schema);
tx.send(self.project_record_batch(batch))
.expect("Sending failed");
let projected_batch = self
.apply_projection(batch)
.expect("Unable to apply projection");

tx.send(projected_batch)
.expect("Unable to send the projected batch");
}
Err(e) => match e {
RecvError::Closed => break,
Expand Down
Loading

0 comments on commit fe38e4b

Please sign in to comment.