Skip to content

Commit

Permalink
storage part 2
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Feb 20, 2022
1 parent 4298877 commit 2b3e25c
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 64 deletions.
1 change: 1 addition & 0 deletions code/03-00/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log = "0.4"
prettytable-rs = { version = "0.8", default-features = false }
rustyline = "9"
sqlparser = "0.13"
tempfile = "3"
thiserror = "1"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros", "fs"] }
tokio-stream = "0.1"
Expand Down
12 changes: 3 additions & 9 deletions code/03-00/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::executor::{ExecuteError, ExecutorBuilder};
use crate::logical_planner::{LogicalPlanError, LogicalPlanner};
use crate::parser::{parse, ParserError};
use crate::physical_planner::{PhysicalPlanError, PhysicalPlanner};
use crate::storage::DiskStorage;
use crate::storage::{DiskStorage, StorageOptions};

/// The database instance.
pub struct Database {
Expand All @@ -21,17 +21,11 @@ pub struct Database {
runtime: Runtime,
}

impl Default for Database {
fn default() -> Self {
Self::new()
}
}

impl Database {
/// Create a new database instance.
pub fn new() -> Self {
pub fn new(options: StorageOptions) -> Self {
let catalog = Arc::new(DatabaseCatalog::new());
let storage = Arc::new(DiskStorage::new());
let storage = Arc::new(DiskStorage::new(options));
let parallel = matches!(std::env::var("LIGHT_PARALLEL"), Ok(s) if s == "1");
let runtime = if parallel {
tokio::runtime::Builder::new_multi_thread()
Expand Down
8 changes: 7 additions & 1 deletion code/03-00/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ impl InsertExecutor {
)
.collect_vec();
let mut count = 0;

let mut txn = table.write().await?;

#[for_await]
for chunk in self.child {
let chunk = transform_chunk(chunk?, &output_columns);
count += chunk.cardinality();
table.append(chunk).await?;
txn.append(chunk).await?;
}

txn.commit().await?;

yield DataChunk::single(count as i32);
}
}
Expand Down
6 changes: 5 additions & 1 deletion code/03-00/src/executor/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ impl SeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecuteError)]
pub async fn execute(self) {
let table = self.storage.get_table(self.table_ref_id)?;
for chunk in table.all_chunks().await? {
let txn = table.read().await?;

for chunk in txn.all_chunks().await? {
yield chunk;
}

txn.commit().await?;
}
}
5 changes: 4 additions & 1 deletion code/03-00/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! A simple interactive shell of the database.
use risinglight_03_00::storage::StorageOptions;
use risinglight_03_00::Database;
use rustyline::error::ReadlineError;
use rustyline::Editor;

fn main() {
env_logger::init();

let db = Database::new();
let db = Database::new(StorageOptions {
base_path: "risinglight.db".into(),
});

let mut rl = Editor::<()>::new();
loop {
Expand Down
25 changes: 25 additions & 0 deletions code/03-00/src/storage/column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use anyhow::anyhow;
use bytes::{Buf, BufMut};

use super::StorageResult;
use crate::array::{Array, ArrayBuilder, I32Array, I32ArrayBuilder};

/// Encode an `I32Array` into a `Vec<u8>`.
pub fn encode_int32_column(a: &I32Array, mut buffer: impl BufMut) -> StorageResult<()> {
for item in a.iter() {
if let Some(item) = item {
buffer.put_i32_le(*item);
} else {
return Err(anyhow!("nullable encoding not supported!").into());
}
}
Ok(())
}

pub fn decode_int32_column(mut data: impl Buf) -> StorageResult<I32Array> {
let mut builder = I32ArrayBuilder::with_capacity(data.remaining() / 4);
while data.has_remaining() {
builder.push(Some(&data.get_i32_le()));
}
Ok(builder.finish())
}
157 changes: 106 additions & 51 deletions code/03-00/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! On-disk storage
mod column;
mod rowset;

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, RwLock};

use anyhow::anyhow;
use bytes::{Buf, BufMut};

use crate::array::{Array, ArrayBuilder, ArrayImpl, DataChunk, I32Array, I32ArrayBuilder};
use self::rowset::{DiskRowset, RowSetBuilder};
use crate::array::DataChunk;
use crate::catalog::{ColumnDesc, TableRefId};

/// The error type of storage operations.
Expand All @@ -26,13 +30,16 @@ pub struct DiskStorage {
/// All tables in the current storage engine.
tables: RwLock<HashMap<TableRefId, StorageTableRef>>,

/// Generator for RowSet id.
rowset_id_generator: Arc<AtomicU32>,

/// The storage options.
options: Arc<StorageOptions>,
}

pub struct StorageOptions {
/// The directory of the storage
base_path: PathBuf,
pub base_path: PathBuf,
}

pub fn err(error: impl Into<anyhow::Error>) -> StorageError {
Expand All @@ -49,22 +56,21 @@ pub struct DiskTable {

/// The storage options.
options: Arc<StorageOptions>,
}

impl Default for DiskStorage {
fn default() -> Self {
Self::new()
}
/// Generator for RowSet id.
rowset_id_generator: Arc<AtomicU32>,

/// RowSets in the table
rowsets: RwLock<Vec<DiskRowset>>,
}

impl DiskStorage {
/// Create a new in-memory storage.
pub fn new() -> Self {
pub fn new(options: StorageOptions) -> Self {
DiskStorage {
tables: RwLock::new(HashMap::new()),
options: Arc::new(StorageOptions {
base_path: "risinglight.db".into(),
}),
options: Arc::new(options),
rowset_id_generator: Arc::new(AtomicU32::new(0)),
}
}

Expand All @@ -75,6 +81,8 @@ impl DiskStorage {
id,
options: self.options.clone(),
column_descs: column_descs.into(),
rowsets: RwLock::new(Vec::new()),
rowset_id_generator: self.rowset_id_generator.clone(),
};
let res = tables.insert(id, table.into());
if res.is_some() {
Expand All @@ -93,61 +101,108 @@ impl DiskStorage {
}
}

/// Encode an `I32Array` into a `Vec<u8>`.
fn encode_int32_column(a: &I32Array) -> StorageResult<Vec<u8>> {
let mut buffer = Vec::with_capacity(a.len() * 4);
for item in a.iter() {
if let Some(item) = item {
buffer.put_i32_le(*item);
} else {
return Err(anyhow!("nullable encoding not supported!").into());
}
impl DiskTable {
/// Start a transaction which only contains write.
pub async fn write(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
let rowsets = self.rowsets.read().unwrap();
Ok(DiskTransaction {
read_only: false,
table: self.clone(),
rowset_snapshot: rowsets.clone(),
builder: None,
finished: false,
})
}
Ok(buffer)
}

fn decode_int32_column(mut data: &[u8]) -> StorageResult<I32Array> {
let mut builder = I32ArrayBuilder::with_capacity(data.len() / 4);
while data.has_remaining() {
builder.push(Some(&data.get_i32_le()));
/// Start a transaction which only contains read.
pub async fn read(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
let rowsets = self.rowsets.read().unwrap();
Ok(DiskTransaction {
read_only: true,
table: self.clone(),
rowset_snapshot: rowsets.clone(),
builder: None,
finished: false,
})
}
Ok(builder.finish())
}

impl DiskTable {
fn table_path(&self) -> PathBuf {
pub fn table_path(&self) -> PathBuf {
self.options.base_path.join(self.id.table_id.to_string())
}

fn column_path(&self, column_id: usize) -> PathBuf {
self.table_path().join(format!("{}.col", column_id))
pub fn rowset_path_of(&self, rowset_id: u32) -> PathBuf {
self.table_path().join(rowset_id.to_string())
}
}

pub struct DiskTransaction {
/// If this txn is read only.
read_only: bool,

/// Reference to table object
table: Arc<DiskTable>,

/// Current snapshot of RowSets
rowset_snapshot: Vec<DiskRowset>,

/// Builder for the RowSet
builder: Option<RowSetBuilder>,

/// Indicates whether the transaction is committed or aborted. If
/// the [`SecondaryTransaction`] object is dropped without finishing,
/// the transaction will panic.
finished: bool,
}

impl Drop for DiskTransaction {
fn drop(&mut self) {
if !self.finished {
warn!("Transaction dropped without committing or aborting");
}
}
}

impl DiskTransaction {
/// Append a chunk to the table.
pub async fn append(&self, chunk: DataChunk) -> StorageResult<()> {
for (idx, column) in chunk.arrays().iter().enumerate() {
if let ArrayImpl::Int32(column) = column {
let column_path = self.column_path(idx);
let data = encode_int32_column(column)?;
tokio::fs::create_dir_all(column_path.parent().unwrap())
.await
.map_err(err)?;
tokio::fs::write(column_path, data).await.map_err(err)?;
} else {
return Err(anyhow!("unsupported column type").into());
}
pub async fn append(&mut self, chunk: DataChunk) -> StorageResult<()> {
if self.read_only {
return Err(anyhow!("cannot append chunks in read only txn!").into());
}
if self.builder.is_none() {
self.builder = Some(RowSetBuilder::new(self.table.column_descs.clone()));
}
let builder = self.builder.as_mut().unwrap();

builder.append(chunk)?;

Ok(())
}

pub async fn commit(mut self) -> StorageResult<()> {
self.finished = true;

if let Some(builder) = self.builder.take() {
use std::sync::atomic::Ordering::SeqCst;
let rowset_id = self.table.rowset_id_generator.fetch_add(1, SeqCst);
let rowset_path = self
.table
.options
.base_path
.join(self.table.rowset_path_of(rowset_id));
let rowset = builder.flush(rowset_id, rowset_path).await?;
let mut rowsets = self.table.rowsets.write().unwrap();
rowsets.push(rowset);
}

Ok(())
}

/// Get all chunks of the table.
pub async fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
let mut columns = vec![];
for (idx, _) in self.column_descs.iter().enumerate() {
let column_path = self.column_path(idx);
let data = tokio::fs::read(column_path).await.map_err(err)?;
columns.push(decode_int32_column(&data)?);
let mut chunks = vec![];
for rowset in &self.rowset_snapshot {
chunks.push(rowset.as_chunk().await?);
}
Ok(vec![columns.into_iter().map(ArrayImpl::Int32).collect()])
Ok(chunks)
}
}
Loading

0 comments on commit 2b3e25c

Please sign in to comment.