Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
final proposal commit
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Jan 31, 2024
1 parent 6310e0c commit 100954e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 70 deletions.
169 changes: 100 additions & 69 deletions proposal/presentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,93 +17,87 @@ Vectorized Push-Based inspired Execution Engine
---


# Design Rationale
# Overview

Push vs Pull Based

| Push | Pull |
| --- | --- |
| Improves cache efficiency by removing control flow logic | Easier to implement |
| Forking is efficient: You push a thing only once | Operators like LIMIT make their producers aware of when to stop running (Headache for the optimizer) |
| Parallelization is easier | Parallelization is harder |
We will be taking heavy inspiration from:
* [DataFusion](https://arrow.apache.org/datafusion/)
* [Velox](https://velox-lib.io/)
* [InfluxDB](https://github.com/influxdata/influxdb)
* which is built on top of DataFusion


---


# Step 1: Finalize Interfaces
# Our Design Goals

Finalize API with other teams:
![bg right:50% 120%](./images/robustness.png)

* I/O Service
* Catalog
* Scheduler
* Robustness
* Modularity
* Extensibility
* Forward Compatibility


---


# Potential `StorageClient` API
# Features

```rust
/// Will probably end up re-exporting this type:
pub type SendableRecordBatchStream =
Pin<Box<
dyn RecordBatchStream<Item =
Result<RecordBatch, DataFusionError>
> + Send
>>;
* Encode behavior in the type system
* Provide bare minimum statistics the optimizer needs
* Timing
* Cardinality

impl StorageClient {
/// Have some sort of way to create a `StorageClient` on our local node.
pub fn new(_id: usize) -> Self {
Self
}

pub async fn request_data(
&self,
_request: BlobData,
) -> SendableRecordBatchStream {
todo!()
}
}
```
---


# List of rust crates we plan to use

* `arrow`: for handling the Apache Arrow format
* `tokio`: high performance `async` runtime
* `rayon`: data parallelism crate
* `anyhow`: ergonomic `Error` handling


---


# Example usage of the storage client
# Design Rationale

```rust
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize a storage client
let sc = storage_client::StorageClient::new(42);
Push vs Pull Based

// Formualte a request we want to make to the storage client
let request = create_column_request();
| Push | Pull |
| --- | --- |
| Improves cache efficiency by removing control flow logic | Easier to implement |
| Forking is efficient: You push a thing only once | Operators like LIMIT make their producers aware of when to stop running (Headache for the optimizer) |
| Parallelization is easier | Parallelization is harder |

// Request data from the storage client
// Note that this request could fail
let stream = sc.request_data(request).await?;

// Executor node returns a future containing
// another stream that can be sent to another operator
let table_scan_node = operators::TableScan::new();
let result = table_scan_node.execute_with_stream(stream);
---


# Step 1: Finalize Interfaces

Finalize API with other teams:

* I/O Service
* Catalog
* Scheduler

Ok(())
}
```

---


# Step 2: Buffer Pool Manager

Need to spill the data to local disk.
![bg right:50% 80%](./images/bufferpool.png)

Need to spill the data to local disk.

* Can potentially rip out the [`memory_pool`](https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/index.html)


---

Expand Down Expand Up @@ -134,17 +128,6 @@ Need to spill the data to local disk.
---


# Our Design Goals

* Robustnes
* Forward Compatibility
* Provide bare minimum statistics the optimizer needs
![bg right:50% 120%](./images/robustness.png)


---


# Testing
* Unit tests for each operator
* Timing each operator's performance to benchmark our code
Expand Down Expand Up @@ -173,8 +156,56 @@ Need to spill the data to local disk.
---


# List of rust crates we plan to use
# Potential `StorageClient` API

```rust
/// Will probably end up re-exporting this type:
pub type SendableRecordBatchStream =
Pin<Box<
dyn RecordBatchStream<Item =
Result<RecordBatch, DataFusionError>
> + Send
>>;

impl StorageClient {
/// Have some sort of way to create a `StorageClient` on our local node.
pub fn new(_id: usize) -> Self {
Self
}

pub async fn request_data(
&self,
_request: BlobData,
) -> SendableRecordBatchStream {
todo!()
}
}
```


---


# Example usage of the storage client

```rust
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize a storage client
let sc = storage_client::StorageClient::new(42);

// Formualte a request we want to make to the storage client
let request = create_column_request();

// Request data from the storage client
// Note that this request could fail
let stream = sc.request_data(request).await?;

* `arrow` : for handling the Apache Arrow format
* `tokio` : high performance async runtime
* `rayon` : data parallelism crate
// Executor node returns a future containing
// another stream that can be sent to another operator
let table_scan_node = operators::TableScan::new();
let result = table_scan_node.execute_with_stream(stream);

Ok(())
}
```
2 changes: 1 addition & 1 deletion proposal/proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ The IO Service will retrieve the data (presumably by talking with the Catalog) f
A potential `StorageClient` API in Rust that the IO service would expose:

```rust
//! Right now we have this in a submodule `storage_client.rs`, but the IO service
//! Right now we have this in a submodule `storage_client.rs`, but the IO service
//! team would probably create a crate and we could import it easily into our `Cargo.toml` file

use anyhow::Result;
Expand Down

0 comments on commit 100954e

Please sign in to comment.