Skip to content

Commit

Permalink
week 3 day 3 done. there is one assertion in test week3::test_task2_l…
Browse files Browse the repository at this point in the history
…sm_iterator_mvcc that is not passing. need to figure out why. some edge case about excluding the first and last key in an sst and the middle key is deleted so no values should be returned
  • Loading branch information
redixhumayun committed Jun 12, 2024
1 parent f5381e8 commit d447b67
Show file tree
Hide file tree
Showing 14 changed files with 475 additions and 63 deletions.
89 changes: 82 additions & 7 deletions mini-lsm-starter/src/lsm_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ pub struct LsmIterator {
upper_bound: Bound<Bytes>,
is_valid: bool,
prev_key: Option<Bytes>,
read_ts: u64,
}

impl LsmIterator {
pub(crate) fn new(iter: LsmIteratorInner, upper_bound: Bound<Bytes>) -> Result<Self> {
pub(crate) fn new(
iter: LsmIteratorInner,
upper_bound: Bound<Bytes>,
read_ts: u64,
) -> Result<Self> {
let mut lsm_iter = Self {
is_valid: iter.is_valid(),
inner: iter,
upper_bound,
prev_key: None,
read_ts,
};

lsm_iter.skip_keys()?;
Expand Down Expand Up @@ -65,13 +71,25 @@ impl LsmIterator {
Ok(())
}

fn is_identical_key(&self, key: &Bytes) -> bool {
self.inner.key().key_ref() == key
}

fn is_invalid_read_ts(&self) -> bool {
self.inner.key().ts() > self.read_ts
}

fn should_skip(&self, key: &Bytes) -> bool {
self.is_identical_key(key) || self.is_invalid_read_ts()
}

fn skip_keys(&mut self) -> Result<()> {
loop {
let key = self.prev_key.take().unwrap_or(Bytes::from_static(&[]));
// skip all identical keys
while self.inner.is_valid() && self.inner.key().key_ref() == key {
while self.inner.is_valid() && self.should_skip(&key) {
self.next_inner()?;
}
if self.inner.is_valid() {}
if !self.inner.is_valid() {
self.is_valid = false;
break;
Expand Down Expand Up @@ -224,13 +242,12 @@ mod tests {
let sst = Arc::new(sst);
let l1_iter = SstConcatIterator::create_and_seek_to_first(vec![Arc::clone(&sst)]).unwrap();
let l1_merge_iter = MergeIterator::create(vec![Box::new(l1_iter)]);
println!("the l1 merge iter {:?}\n", l1_merge_iter);

let memtable = MemTable::create(1);
for i in 0..=100 {
let key = Bytes::from(format!("key{:03}", i));
let ts = 100 + i;
let value = Bytes::from(format!("value{:03}", i));
let value = Bytes::from(format!("memvalue{:03}", i));
memtable
.put(KeySlice::for_testing_from_slice_with_ts(&key, ts), &value)
.unwrap();
Expand All @@ -242,13 +259,71 @@ mod tests {
TwoMergeIterator::create(memtable_merge_iter, MergeIterator::create(Vec::new()))
.unwrap();
let lsm_iter_inner = TwoMergeIterator::create(two_merge_iter_1, l1_merge_iter).unwrap();
let mut lsm_iter = LsmIterator::new(lsm_iter_inner, Bound::Unbounded).unwrap();
let mut lsm_iter = LsmIterator::new(lsm_iter_inner, Bound::Unbounded, 201).unwrap();

// assert that each key is only seen once
let expected_keys = (0..=100).map(|i| Bytes::from(format!("key{:03}", i)));
for key in expected_keys {
let expected_values = (0..=100).map(|i| Bytes::from(format!("memvalue{:03}", i)));
for (key, value) in expected_keys.zip(expected_values) {
assert_eq!(lsm_iter.key(), key);
assert_eq!(lsm_iter.value(), value);
lsm_iter.next().unwrap();
}
assert!(!lsm_iter.is_valid());
}

/// This test asserts that an LSM iterator will respect snapshot txn semantics and return keys less than or equal
/// to a specific timestamp
#[test]
fn lsm_iter_read_ts() {
let dir = tempdir().unwrap();
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
let storage = LsmStorageInner::open(&dir, options).unwrap();
let mut sst_data: Vec<((Bytes, u64), Bytes)> = Vec::new();
for i in 0..=100 {
let key = Bytes::from(format!("key{:03}", i));
let ts = i;
let value: Bytes = Bytes::from(format!("value{:03}", i));
sst_data.push(((key, ts), value));
}
let sst = generate_sst_with_ts(
0,
&dir.path().join("0.sst"),
sst_data,
Some(storage.block_cache),
);
let sst = Arc::new(sst);
let l1_iter = SstConcatIterator::create_and_seek_to_first(vec![Arc::clone(&sst)]).unwrap();
let l1_merge_iter = MergeIterator::create(vec![Box::new(l1_iter)]);

let memtable = MemTable::create(1);
for i in 0..=100 {
let key = Bytes::from(format!("key{:03}", i));
let ts = 101 + i;
let value = Bytes::from(format!("memvalue{:03}", i));
memtable
.put(KeySlice::for_testing_from_slice_with_ts(&key, ts), &value)
.unwrap();
}
let memtable_iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
let memtable_merge_iter = MergeIterator::create(vec![Box::new(memtable_iter)]);

let two_merge_iter_1 =
TwoMergeIterator::create(memtable_merge_iter, MergeIterator::create(Vec::new()))
.unwrap();
let lsm_iter_inner = TwoMergeIterator::create(two_merge_iter_1, l1_merge_iter).unwrap();
let mut lsm_iter = LsmIterator::new(lsm_iter_inner, Bound::Unbounded, 100).unwrap();

// assert that each key is only seen once
let expected_keys = (0..=100).map(|i| Bytes::from(format!("key{:03}", i)));
let expected_values = (0..=100).map(|i| Bytes::from(format!("value{:03}", i)));
for (key, value) in expected_keys.zip(expected_values) {
assert_eq!(lsm_iter.key(), key);
assert_eq!(lsm_iter.value(), value);
if lsm_iter.is_valid() {
lsm_iter.next().unwrap();
}
}
assert!(!lsm_iter.is_valid());
}
}
67 changes: 44 additions & 23 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::key::{Key, KeyBytes, KeySlice, TS_RANGE_BEGIN, TS_RANGE_END};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable, MemTableIterator};
use crate::mvcc::txn::{Transaction, TxnIterator};
use crate::mvcc::LsmMvccInner;
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};

Expand Down Expand Up @@ -213,7 +214,7 @@ impl MiniLsm {
}))
}

pub fn new_txn(&self) -> Result<()> {
pub fn new_txn(&self) -> Result<Arc<Transaction>> {
self.inner.new_txn()
}

Expand Down Expand Up @@ -241,11 +242,7 @@ impl MiniLsm {
self.inner.sync()
}

pub fn scan(
&self,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
self.inner.scan(lower, upper)
}

Expand Down Expand Up @@ -308,6 +305,7 @@ impl LsmStorageInner {
};

let mut next_sst_id = 1;
let mut recovered_ts = 0;
let manifest_path = path.join("MANIFEST");
let manifest = {
if !manifest_path.exists() {
Expand Down Expand Up @@ -361,6 +359,7 @@ impl LsmStorageInner {
Some(Arc::clone(&block_cache)),
FileObject::open(&Self::path_of_sst_static(path, *sst_id))?,
)?;
recovered_ts = recovered_ts.max(sst.max_ts());
state.sstables.insert(*sst_id, Arc::new(sst));
}
// read the remaining sstables from disk
Expand All @@ -371,6 +370,7 @@ impl LsmStorageInner {
Some(Arc::clone(&block_cache)),
FileObject::open(&Self::path_of_sst_static(path, *sst_id))?,
)?;
recovered_ts = recovered_ts.max(sst.max_ts());
state.sstables.insert(*sst_id, Arc::new(sst));
}
}
Expand All @@ -386,6 +386,7 @@ impl LsmStorageInner {
);
}
let memtable = MemTable::recover_from_wal(memtable_id, wal_path)?;
recovered_ts = recovered_ts.max(memtable.get_max_ts());
state.imm_memtables.insert(0, Arc::new(memtable));
}
}
Expand Down Expand Up @@ -419,13 +420,20 @@ impl LsmStorageInner {
compaction_controller,
manifest: Some(manifest),
options: options.into(),
mvcc: Some(LsmMvccInner::new(0)),
mvcc: Some(LsmMvccInner::new(recovered_ts)),
compaction_filters: Arc::new(Mutex::new(Vec::new())),
};

Ok(storage)
}

pub fn mvcc(&self) -> Result<&LsmMvccInner> {
let Some(mvcc) = self.mvcc.as_ref() else {
return Err(anyhow::anyhow!("MVCC not enabled"));
};
Ok(mvcc)
}

pub fn sync(&self) -> Result<()> {
self.state.read().memtable.sync_wal()
}
Expand All @@ -449,8 +457,7 @@ impl LsmStorageInner {
true
}

/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
pub fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result<Option<Bytes>> {
// probe the memtables
let snapshot = {
let state_guard = self.state.read();
Expand Down Expand Up @@ -521,15 +528,23 @@ impl LsmStorageInner {
let sst_iter = MergeIterator::create(level_iters);

let mem_l0_iter = TwoMergeIterator::create(memtable_iters, l0_iter)?;
let iter = TwoMergeIterator::create(mem_l0_iter, sst_iter)?;
let levels_iter = TwoMergeIterator::create(mem_l0_iter, sst_iter)?;
let iter = LsmIterator::new(levels_iter, Bound::Unbounded, read_ts)?;

if iter.is_valid() && iter.key().key_ref() == key && !iter.value().is_empty() {
if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}

Ok(None)
}

/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(self: &Arc<Self>, key: &[u8]) -> Result<Option<Bytes>> {
self.mvcc()?
.new_txn(Arc::clone(&self), self.options.serializable)
.get(key)
}

/// Write a batch of data into the storage.
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
for record in batch {
Expand All @@ -539,13 +554,10 @@ impl LsmStorageInner {
}

fn write_record<T: AsRef<[u8]>>(&self, record: &WriteBatchRecord<T>) -> Result<()> {
let Some(mvcc) = self.mvcc.as_ref() else {
return Err(anyhow::anyhow!("MVCC not enabled"));
};
let state = self.state.read();
{
let _ = mvcc.write_lock.lock();
let ts = mvcc.latest_commit_ts() + 1;
let _ = self.mvcc()?.write_lock.lock();
let ts = self.mvcc()?.latest_commit_ts() + 1;
match record {
WriteBatchRecord::Put(key, value) => {
state
Expand All @@ -558,7 +570,7 @@ impl LsmStorageInner {
.put(KeySlice::from_slice(key.as_ref(), ts), &[])?;
}
}
mvcc.update_commit_ts(ts);
self.mvcc()?.update_commit_ts(ts);
}

if state.memtable.approximate_size() >= self.options.target_sst_size {
Expand Down Expand Up @@ -679,16 +691,17 @@ impl LsmStorageInner {
Ok(())
}

pub fn new_txn(&self) -> Result<()> {
// no-op
Ok(())
pub fn new_txn(self: &Arc<Self>) -> Result<Arc<Transaction>> {
Ok(self
.mvcc()?
.new_txn(Arc::clone(&self), self.options.serializable))
}

/// Create an iterator over a range of keys.
pub fn scan(
pub fn scan_with_ts(
&self,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
read_ts: u64,
) -> Result<FusedIterator<LsmIterator>> {
let snapshot = {
let state_guard = self.state.read();
Expand Down Expand Up @@ -798,10 +811,18 @@ impl LsmStorageInner {

let iter = TwoMergeIterator::create(memtable_merge_iterator, l0_iter)?;
let iter = TwoMergeIterator::create(iter, sst_iter)?;
let lsm_iterator = LsmIterator::new(iter, map_bound(upper))?;
let lsm_iterator = LsmIterator::new(iter, map_bound(upper), read_ts)?;
Ok(FusedIterator::new(lsm_iterator))
}

/// Create an iterator over a range of keys.
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
let txn = self
.mvcc()?
.new_txn(Arc::clone(&self), self.options.serializable);
txn.scan(lower, upper)
}

fn range_overlap(
&self,
lower: Bound<&[u8]>,
Expand Down
10 changes: 9 additions & 1 deletion mini-lsm-starter/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl MemTable {
self.approximate_size
.store(new_size, std::sync::atomic::Ordering::Relaxed);
if let Some(wal) = self.wal.as_ref() {
wal.put(key, value)?; // TODO: BRING THIS BACK
wal.put(key, value)?;
}
Ok(())
}
Expand Down Expand Up @@ -213,6 +213,14 @@ impl MemTable {
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}

pub fn get_max_ts(&self) -> u64 {
self.map
.iter()
.map(|entry| entry.key().ts())
.max()
.unwrap_or(TS_DEFAULT)
}
}

type SkipMapRangeIter<'a> = crossbeam_skiplist::map::Range<
Expand Down
11 changes: 9 additions & 2 deletions mini-lsm-starter/src/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ mod watermark;

use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
sync::{atomic::AtomicBool, Arc},
};

use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;

use crate::lsm_storage::LsmStorageInner;
Expand Down Expand Up @@ -55,6 +56,12 @@ impl LsmMvccInner {
}

pub fn new_txn(&self, inner: Arc<LsmStorageInner>, serializable: bool) -> Arc<Transaction> {
unimplemented!()
Arc::new(Transaction {
read_ts: self.latest_commit_ts(),
inner,
local_storage: Arc::new(SkipMap::new()),
committed: Arc::new(AtomicBool::new(false)),
key_hashes: Some(Mutex::new((HashSet::new(), HashSet::new()))),
})
}
}
Loading

0 comments on commit d447b67

Please sign in to comment.