Skip to content

Commit

Permalink
added gc for completed txns
Browse files Browse the repository at this point in the history
  • Loading branch information
redixhumayun committed Jun 17, 2024
1 parent a44011a commit 27fe151
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
4 changes: 1 addition & 3 deletions mini-lsm-starter/src/mvcc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod

pub mod txn;
pub mod watermark;

Expand All @@ -17,6 +14,7 @@ use crate::lsm_storage::LsmStorageInner;

use self::{txn::Transaction, watermark::Watermark};

#[derive(Debug, Clone)]
pub(crate) struct CommittedTxnData {
pub(crate) key_hashes: HashSet<u32>,
#[allow(dead_code)]
Expand Down
16 changes: 11 additions & 5 deletions mini-lsm-starter/src/mvcc/txn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod

use core::fmt;
use std::{
collections::HashSet,
Expand Down Expand Up @@ -167,6 +164,7 @@ impl Transaction {
let expected_commit_ts = self.inner.mvcc()?.latest_commit_ts() + 1;
self.is_txn_valid(self.read_ts, expected_commit_ts, &rwset)?;

// valid txn that can be committed
self.committed
.store(true, std::sync::atomic::Ordering::SeqCst);
let batch = self
Expand All @@ -186,6 +184,14 @@ impl Transaction {
commit_ts: expected_commit_ts,
},
);

// clean up old txn's below watermark
let watermark = self.inner.mvcc()?.watermark();
self.inner
.mvcc()?
.committed_txns
.lock()
.retain(|ts, _| *ts >= watermark);
Ok(())
}

Expand All @@ -197,11 +203,11 @@ impl Transaction {
) -> Result<()> {
let txns = self.inner.mvcc()?.committed_txns.lock();
let txns = txns.range((Bound::Excluded(lower), Bound::Excluded(upper)));
for (ts, txn_data) in txns.into_iter() {
for (_, txn_data) in txns.into_iter() {
let is_overlap = rwset
.read_set
.iter()
.any(|write_key| txn_data.key_hashes.contains(write_key));
.any(|read_key| txn_data.key_hashes.contains(read_key));
if is_overlap {
return Err(anyhow::anyhow!(
"this txn cannot be committed because it violates SSI"
Expand Down
17 changes: 16 additions & 1 deletion mini-lsm-starter/src/tests/week3_day6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ fn test_serializable_3_ts_range() {
assert_eq!(storage.get(b"key2").unwrap(), Some(Bytes::from("2")));
}

// TODO: Failing test FIX!!
#[test]
fn test_serializable_4_scan() {
let dir = tempdir().unwrap();
Expand Down Expand Up @@ -107,3 +106,19 @@ fn test_serializable_5_read_only() {
assert_eq!(storage.get(b"key1").unwrap(), Some(Bytes::from("2")));
assert_eq!(storage.get(b"key2").unwrap(), Some(Bytes::from("2")));
}

#[test]
fn test_serializable_6_gc() {
let dir = tempdir().unwrap();
let mut options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
options.serializable = true;
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
let txn1 = storage.new_txn().unwrap();
let txn2 = storage.new_txn().unwrap();
txn1.put(b"key1", b"1");
txn2.put(b"key1", b"2");
txn2.commit().unwrap();
txn1.commit().unwrap();
let v = storage.get(b"key1").unwrap().unwrap();
assert_eq!(v, Bytes::from("1"));
}

0 comments on commit 27fe151

Please sign in to comment.