Skip to content

Commit

Permalink
week 3 day 6 done
Browse files Browse the repository at this point in the history
  • Loading branch information
redixhumayun committed Jun 14, 2024
1 parent 3601aa8 commit d76535a
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 19 deletions.
54 changes: 39 additions & 15 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,19 +544,7 @@ impl LsmStorageInner {
txn.get(key)
}

/// Write a batch of data into the storage.
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
let _ = self.mvcc()?.write_lock.lock();
let ts = self.mvcc()?.latest_commit_ts() + 1;

for record in batch {
self.write_record(record, ts)?;
}
self.mvcc()?.update_commit_ts(ts);

Ok(())
}

/// Helper method to make writes to the engine
fn write_record<T: AsRef<[u8]>>(&self, record: &WriteBatchRecord<T>, ts: u64) -> Result<()> {
let state = self.state.read();
{
Expand All @@ -583,13 +571,49 @@ impl LsmStorageInner {
Ok(())
}

pub fn write_batch_inner<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
let _ = self.mvcc()?.write_lock.lock();
let ts = self.mvcc()?.latest_commit_ts() + 1;

for record in batch {
self.write_record(record, ts)?;
}
self.mvcc()?.update_commit_ts(ts);
Ok(ts)
}

/// Write a batch of data into the storage.
pub fn write_batch<T: AsRef<[u8]>>(
self: &Arc<Self>,
batch: &[WriteBatchRecord<T>],
) -> Result<()> {
if !self.options.serializable {
self.write_batch_inner(batch)?;
return Ok(());
}

let txn = self
.mvcc()?
.new_txn(Arc::clone(self), self.options.serializable);
for record in batch {
match record {
WriteBatchRecord::Put(key, value) => {
txn.put(key.as_ref(), value.as_ref());
}
WriteBatchRecord::Del(key) => txn.delete(key.as_ref()),
}
}
txn.commit()?;
Ok(())
}

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
pub fn put(self: &Arc<Self>, key: &[u8], value: &[u8]) -> Result<()> {
self.write_batch(&[WriteBatchRecord::Put(key, value)])
}

/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, key: &[u8]) -> Result<()> {
pub fn delete(self: &Arc<Self>, key: &[u8]) -> Result<()> {
self.write_batch(&[WriteBatchRecord::Del(key)])
}

Expand Down
13 changes: 12 additions & 1 deletion mini-lsm-starter/src/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{

use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
use txn::ReadWriteSets;

use crate::lsm_storage::LsmStorageInner;

Expand Down Expand Up @@ -59,12 +60,22 @@ impl LsmMvccInner {
let mut ts = self.ts.lock();
let timestamp = ts.0;
ts.1.add_reader(timestamp);
let key_hashes = {
if serializable {
Some(Mutex::new(ReadWriteSets::new(
HashSet::new(),
HashSet::new(),
)))
} else {
None
}
};
Arc::new(Transaction {
read_ts: timestamp,
inner,
local_storage: Arc::new(SkipMap::new()),
committed: Arc::new(AtomicBool::new(false)),
key_hashes: Some(Mutex::new((HashSet::new(), HashSet::new()))),
key_hashes,
})
}
}
129 changes: 126 additions & 3 deletions mini-lsm-starter/src/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,47 @@ use anyhow::{Ok, Result};
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use ouroboros::self_referencing;
use parking_lot::Mutex;
use parking_lot::{lock_api::MutexGuard, Mutex, RawMutex};

use crate::{
iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator},
lsm_iterator::{FusedIterator, LsmIterator},
lsm_storage::{LsmStorageInner, WriteBatchRecord},
};

use super::CommittedTxnData;

pub struct ReadWriteSets {
read_set: HashSet<u32>,
write_set: HashSet<u32>,
}

impl ReadWriteSets {
pub fn new(read_set: HashSet<u32>, write_set: HashSet<u32>) -> Self {
Self {
read_set,
write_set,
}
}

pub fn add_to_read_set<T: AsRef<[u8]>>(&mut self, key: T) {
let hash = farmhash::fingerprint32(key.as_ref());
self.read_set.insert(hash);
}

pub fn add_to_write_set<T: AsRef<[u8]>>(&mut self, key: T) {
let hash = farmhash::fingerprint32(key.as_ref());
self.write_set.insert(hash);
}
}

pub struct Transaction {
pub(crate) read_ts: u64,
pub(crate) inner: Arc<LsmStorageInner>,
pub(crate) local_storage: Arc<SkipMap<Bytes, Bytes>>,
pub(crate) committed: Arc<AtomicBool>,
/// Write set and read set
pub(crate) key_hashes: Option<Mutex<(HashSet<u32>, HashSet<u32>)>>,
pub(crate) key_hashes: Option<Mutex<ReadWriteSets>>,
}

impl Transaction {
Expand All @@ -39,6 +65,13 @@ impl Transaction {
}

pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
if self.committed.load(std::sync::atomic::Ordering::SeqCst) {
return Err(anyhow::anyhow!("Transaction already committed"));
}
if let Some(key_hashes) = self.key_hashes.as_ref() {
let mut rwset = key_hashes.lock();
rwset.add_to_read_set(key);
}
if let Some(entry) = self.local_storage.get(key) {
match entry.value().is_empty() {
true => return Ok(None),
Expand All @@ -65,19 +98,75 @@ impl Transaction {
}

pub fn put(&self, key: &[u8], value: &[u8]) {
if self.committed.load(std::sync::atomic::Ordering::SeqCst) {
panic!("Transaction already committed");
}
self.local_storage
.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
if let Some(key_hashes) = self.key_hashes.as_ref() {
let mut rwset = key_hashes.lock();
rwset.add_to_write_set(key);
}
}

pub fn delete(&self, key: &[u8]) {
if self.committed.load(std::sync::atomic::Ordering::SeqCst) {
panic!("Transaction already committed");
}
self.local_storage
.insert(Bytes::copy_from_slice(key), Bytes::from(Vec::new()));
if let Some(key_hashes) = self.key_hashes.as_ref() {
let mut rwset = key_hashes.lock();
rwset.add_to_write_set(key);
}
}

pub fn commit(&self) -> Result<()> {
if self.committed.load(std::sync::atomic::Ordering::SeqCst) {
return Err(anyhow::anyhow!("Transaction already committed"));
}
// txn's aren't serializable, no checks
if matches!(self.key_hashes.as_ref(), None) {
self.committed
.store(true, std::sync::atomic::Ordering::SeqCst);
let batch = self
.local_storage
.iter()
.map(|entry| match entry.value().is_empty() {
true => WriteBatchRecord::Del(entry.key().clone()),
false => WriteBatchRecord::Put(entry.key().clone(), entry.value().clone()),
})
.collect::<Vec<WriteBatchRecord<Bytes>>>();
self.inner.write_batch_inner(&batch)?;
return Ok(());
}
// txn's are serializable, must check for conflicts
let _guard = self.inner.mvcc()?.commit_lock.lock();
let rwset = self
.key_hashes
.as_ref()
.expect("could not obtain key hashes for a serializable txn")
.lock();

// no write set, read-only txn's require no checks
if rwset.write_set.is_empty() {
self.committed
.store(true, std::sync::atomic::Ordering::SeqCst);
let batch = self
.local_storage
.iter()
.map(|entry| match entry.value().is_empty() {
true => WriteBatchRecord::Del(entry.key().clone()),
false => WriteBatchRecord::Put(entry.key().clone(), entry.value().clone()),
})
.collect::<Vec<WriteBatchRecord<Bytes>>>();
self.inner.write_batch_inner(&batch)?;
return Ok(());
}

let expected_commit_ts = self.inner.mvcc()?.latest_commit_ts() + 1;
self.is_txn_valid(self.read_ts, expected_commit_ts, &rwset)?;

self.committed
.store(true, std::sync::atomic::Ordering::SeqCst);
let batch = self
Expand All @@ -88,7 +177,38 @@ impl Transaction {
false => WriteBatchRecord::Put(entry.key().clone(), entry.value().clone()),
})
.collect::<Vec<WriteBatchRecord<Bytes>>>();
self.inner.write_batch(&batch)
let commit_ts = self.inner.write_batch_inner(&batch)?;
self.inner.mvcc()?.committed_txns.lock().insert(
commit_ts,
CommittedTxnData {
key_hashes: rwset.write_set.clone(),
read_ts: self.read_ts,
commit_ts: expected_commit_ts,
},
);
Ok(())
}

fn is_txn_valid(
&self,
lower: u64,
upper: u64,
rwset: &MutexGuard<RawMutex, ReadWriteSets>,
) -> Result<()> {
let txns = self.inner.mvcc()?.committed_txns.lock();
let txns = txns.range((Bound::Excluded(lower), Bound::Excluded(upper)));
for (ts, txn_data) in txns.into_iter() {
let is_overlap = rwset
.read_set
.iter()
.any(|write_key| txn_data.key_hashes.contains(write_key));
if is_overlap {
return Err(anyhow::anyhow!(
"this txn cannot be committed because it violates SSI"
));
}
}
Ok(())
}
}

Expand Down Expand Up @@ -188,6 +308,9 @@ impl StorageIterator for TxnIterator {

fn next(&mut self) -> Result<()> {
if self.is_valid() {
if let Some(key_hashes) = &self.txn.key_hashes {
key_hashes.lock().add_to_read_set(self.key());
}
self.iter.next()?;
}
while self.iter.is_valid() && self.iter.value().is_empty() {
Expand Down
1 change: 1 addition & 0 deletions mini-lsm-starter/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ mod week3_day2;
mod week3_day3;
mod week3_day4;
mod week3_day5;
mod week3_day6;
Loading

0 comments on commit d76535a

Please sign in to comment.