diff --git a/crates/engine/src/memory_engine/mod.rs b/crates/engine/src/memory_engine/mod.rs index 7260e9681..ccab2e308 100644 --- a/crates/engine/src/memory_engine/mod.rs +++ b/crates/engine/src/memory_engine/mod.rs @@ -26,7 +26,7 @@ use crate::{ type MemoryTable = HashMap, Vec>; /// Memory Storage Engine Implementation -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct MemoryEngine { /// The inner storage engine of `MemoryStorage` inner: Arc>>, @@ -62,7 +62,15 @@ impl StorageEngine for MemoryEngine { #[inline] fn transaction(&self) -> MemoryTransaction { - MemoryTransaction {} + let inner_r = self.inner.read(); + let mut state = HashMap::new(); + for table in inner_r.keys() { + let _ignore = state.insert(table.clone(), HashMap::new()); + } + MemoryTransaction { + db: self.clone(), + state: RwLock::new(state), + } } #[inline] diff --git a/crates/engine/src/memory_engine/transaction.rs b/crates/engine/src/memory_engine/transaction.rs index aeb3876bd..9ebf34880 100644 --- a/crates/engine/src/memory_engine/transaction.rs +++ b/crates/engine/src/memory_engine/transaction.rs @@ -1,18 +1,154 @@ #![allow(clippy::module_name_repetitions)] #![allow(clippy::multiple_inherent_impl)] -use crate::TransactionApi; +use std::{cmp::Ordering, collections::HashMap}; + +use parking_lot::{RwLock, RwLockWriteGuard}; + +use crate::{ + api::transaction_api::TransactionApi, error::EngineError, memory_engine::MemoryEngine, + StorageOps, WriteOperation, +}; + +/// The memory table used in transaction state +type StateMemoryTable = HashMap, Option>>; /// A transaction of the `MemoryEngine` -#[derive(Copy, Clone, Debug, Default)] -pub struct MemoryTransaction; +#[derive(Debug, Default)] +pub struct MemoryTransaction { + /// The memory engine + pub(super) db: MemoryEngine, + /// The inner storage engine of `MemoryStorage` + pub(super) state: RwLock>, +} + +impl StorageOps for MemoryTransaction { + fn write(&self, op: WriteOperation<'_>, _sync: bool) -> Result<(), EngineError> { + let mut state_w = self.state.write(); + self.write_op(&mut state_w, op) + } + + fn write_multi(&self, ops: Vec>, _sync: bool) -> Result<(), EngineError> { + let mut state_w = self.state.write(); + for op in ops { + self.write_op(&mut state_w, op)?; + } + Ok(()) + } + + fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result>, EngineError> { + let state_r = self.state.read(); + let state_table = state_r + .get(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + + if let Some(val) = state_table.get(key.as_ref()) { + return Ok(val.clone()); + } + + let db_inner_r = self.db.inner.read(); + let db_table = db_inner_r + .get(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + + Ok(db_table.get(key.as_ref()).cloned()) + } + + fn get_multi( + &self, + table: &str, + keys: &[impl AsRef<[u8]>], + ) -> Result>>, EngineError> { + keys.iter().map(|key| self.get(table, key)).collect() + } +} impl TransactionApi for MemoryTransaction { - fn commit(self) -> Result<(), crate::EngineError> { + fn commit(self) -> Result<(), EngineError> { + let mut state_w = self.state.write(); + let mut db_inner_w = self.db.inner.write(); + for (name, mut table) in state_w.drain() { + let db_table = db_inner_w + .get_mut(&name) + .ok_or_else(|| EngineError::TableNotFound(name.clone()))?; + + for (key, val_opt) in table.drain() { + if let Some(val) = val_opt { + let _ignore = db_table.insert(key, val); + } else { + let _ignore = db_table.remove(&key); + } + } + } + Ok(()) } - fn rollback(&self) -> Result<(), crate::EngineError> { + fn rollback(&self) -> Result<(), EngineError> { + let mut state_w = self.state.write(); + for table in state_w.values_mut() { + table.clear(); + } + + Ok(()) + } +} + +impl MemoryTransaction { + /// Write an op to the transaction + fn write_op( + &self, + state_w: &mut RwLockWriteGuard<'_, HashMap>, + op: WriteOperation<'_>, + ) -> Result<(), EngineError> { + match op { + WriteOperation::Put { table, key, value } => { + let table = state_w + .get_mut(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + let _ignore = table.insert(key, Some(value)); + } + WriteOperation::Delete { table, key } => { + let table = state_w + .get_mut(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + let _ignore = table.insert(key.to_vec(), None); + } + WriteOperation::DeleteRange { table, from, to } => { + let db_inner_r = self.db.inner.read(); + let db_table = db_inner_r + .get(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + let state_table = state_w + .get(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + + let to_delete: Vec<_> = db_table + .keys() + .chain(state_table.keys()) + .filter(|key| { + let key_slice = key.as_slice(); + match key_slice.cmp(from) { + Ordering::Less => false, + Ordering::Equal => true, + Ordering::Greater => match key_slice.cmp(to) { + Ordering::Less => true, + Ordering::Equal | Ordering::Greater => false, + }, + } + }) + .cloned() + .collect(); + + let table = state_w + .get_mut(table) + .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; + // `None` works as a tombstone of the key + for key in to_delete { + let _ignore = table.insert(key.clone(), None); + } + } + } Ok(()) } }