Skip to content

Commit

Permalink
wal tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
redixhumayun committed Jun 7, 2024
1 parent bca0057 commit d5df57f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 89 deletions.
31 changes: 14 additions & 17 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl LsmStorageInner {
/// run the compaction task to get the id's that need to be added and then apply the compaction on the state
/// to get the id's that need to be removed
fn trigger_compaction(&self) -> Result<()> {
let mut snapshot = {
let snapshot = {
let state = self.state.read();
state.as_ref().clone()
};
Expand All @@ -458,22 +458,18 @@ impl LsmStorageInner {
self.dump_structure();
println!("created a compaction task {:?}", task);
let sstables_to_add = self.compact(&task)?;
let table_ids_to_add: Vec<usize> = {
let _state_lock = self.state_lock.lock();
let mut table_ids_to_add = Vec::with_capacity(sstables_to_add.len());

let (sstable_ids_to_add, sstable_ids_to_remove) = {
let state_lock = self.state_lock.lock();
let mut snapshot = self.state.read().as_ref().clone();
let mut sstable_ids_to_add = Vec::with_capacity(sstables_to_add.len());
for sstable in sstables_to_add {
table_ids_to_add.push(sstable.sst_id());
sstable_ids_to_add.push(sstable.sst_id());
snapshot.sstables.insert(sstable.sst_id(), sstable);
}
table_ids_to_add
};

let (mut new_state, sstable_ids_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &table_ids_to_add);

{
let state_lock = self.state_lock.lock();
let (mut new_state, sstable_ids_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &sstable_ids_to_add);
for sstable_id in &sstable_ids_to_remove {
let sst = new_state.sstables.remove(&sstable_id);
assert!(sst.is_some());
Expand All @@ -490,13 +486,14 @@ impl LsmStorageInner {
})?
.add_record(
&state_lock,
ManifestRecord::Compaction(task, table_ids_to_add.clone()),
ManifestRecord::Compaction(task, sstable_ids_to_add.clone()),
)?;
}
(sstable_ids_to_add, sstable_ids_to_remove)
};

println!(
"compaction finished -> files added {:?}, files removed {:?}",
table_ids_to_add, sstable_ids_to_remove
sstable_ids_to_add, sstable_ids_to_remove
);
for sstable_id in &sstable_ids_to_remove {
std::fs::remove_file(self.path_of_sst(*sstable_id))?;
Expand Down
169 changes: 115 additions & 54 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality

use core::fmt;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -174,6 +174,12 @@ impl MiniLsm {
}
}

if self.inner.options.enable_wal {
self.inner.sync()?;
self.inner.sync_dir()?;
return Ok(());
}

if !self.inner.state.read().memtable.is_empty() {
let state_lock = self.inner.state_lock.lock();
self.inner.force_freeze_memtable(&state_lock)?;
Expand Down Expand Up @@ -303,63 +309,106 @@ impl LsmStorageInner {

let mut next_sst_id = 1;
let manifest_path = path.join("MANIFEST");
let manifest =
{
if !manifest_path.exists() {
let manifest = Manifest::create(manifest_path)?;
manifest
} else {
let (manifest, existing_records) = Manifest::recover(manifest_path)?;
for record in existing_records {
match record {
ManifestRecord::Flush(sst_id) => {
if CompactionController::flush_to_l0(&compaction_controller) {
state.l0_sstables.insert(0, sst_id);
} else {
state.levels.insert(0, (sst_id, vec![sst_id]));
}
}
ManifestRecord::NewMemtable(_memtable_id) => {
// how do i store the data of the memtable?
println!("read new memtable");
}
ManifestRecord::Compaction(task, new_sst_ids) => {
let (new_snapshot, _) = compaction_controller
.apply_compaction_result(&state, &task, &new_sst_ids);
state = new_snapshot;
next_sst_id = next_sst_id
.max(new_sst_ids.iter().max().copied().unwrap_or_default());
let manifest = {
if !manifest_path.exists() {
if options.enable_wal {
state.memtable = Arc::new(MemTable::create_with_wal(
state.memtable.id(),
Self::path_of_wal_static(path, state.memtable.id()),
)?);
}
let manifest = Manifest::create(manifest_path)?;
manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
manifest
} else {
let (manifest, existing_records) = Manifest::recover(manifest_path)?;
let mut recovered_memtable_ids = BTreeSet::new();
for record in existing_records {
match record {
ManifestRecord::Flush(sst_id) => {
assert!(
recovered_memtable_ids.remove(&sst_id),
"memtable {} in manifest does not exist",
sst_id
);
if CompactionController::flush_to_l0(&compaction_controller) {
state.l0_sstables.insert(0, sst_id);
} else {
state.levels.insert(0, (sst_id, vec![sst_id]));
}
next_sst_id = next_sst_id.max(sst_id);
}
ManifestRecord::NewMemtable(memtable_id) => {
recovered_memtable_ids.insert(memtable_id);
next_sst_id = next_sst_id.max(memtable_id);
}
ManifestRecord::Compaction(task, new_sst_ids) => {
let (new_snapshot, _) = compaction_controller.apply_compaction_result(
&state,
&task,
&new_sst_ids,
);
state = new_snapshot;
next_sst_id = next_sst_id
.max(new_sst_ids.iter().max().copied().unwrap_or_default());
}
}
// read the l0 sstables from disk
for sst_id in &state.l0_sstables {
}
// read the l0 sstables from disk
for sst_id in &state.l0_sstables {
let sst = SsTable::open(
*sst_id,
Some(Arc::clone(&block_cache)),
FileObject::open(&Self::path_of_sst_static(path, *sst_id))?,
)?;
state.sstables.insert(*sst_id, Arc::new(sst));
}
// read the remaining sstables from disk
for (_, sst_ids) in &state.levels {
for sst_id in sst_ids {
let sst = SsTable::open(
*sst_id,
Some(Arc::clone(&block_cache)),
FileObject::open(&Self::path_of_sst_static(path, *sst_id))?,
)?;
state.sstables.insert(*sst_id, Arc::new(sst));
}
// read the remaining sstables from disk
for (_, sst_ids) in &state.levels {
for sst_id in sst_ids {
let sst = SsTable::open(
*sst_id,
Some(Arc::clone(&block_cache)),
FileObject::open(&Self::path_of_sst_static(path, *sst_id))?,
)?;
state.sstables.insert(*sst_id, Arc::new(sst));
}

// read the recovered memtable ids
if options.enable_wal {
for memtable_id in recovered_memtable_ids {
let wal_path = Self::path_of_wal_static(path, memtable_id);
if !wal_path.exists() {
eprintln!(
"wal file not found for memtable {} while recovering from disk",
memtable_id
);
}
let memtable = MemTable::recover_from_wal(memtable_id, wal_path)?;
state.imm_memtables.insert(0, Arc::new(memtable));
}
next_sst_id += 1;
let memtable = Arc::new(MemTable::create(next_sst_id));
state.memtable = memtable;
next_sst_id += 1;

manifest
}
};

// create a new memtable for the engine
next_sst_id += 1;
let memtable = {
if options.enable_wal {
Arc::new(MemTable::create_with_wal(
next_sst_id,
Self::path_of_wal_static(path, next_sst_id),
)?)
} else {
Arc::new(MemTable::create(next_sst_id))
}
};
state.memtable = memtable;
next_sst_id += 1;
manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;

manifest
}
};

let storage = Self {
state: Arc::new(RwLock::new(Arc::new(state))),
Expand All @@ -378,7 +427,7 @@ impl LsmStorageInner {
}

pub fn sync(&self) -> Result<()> {
unimplemented!()
self.state.read().memtable.sync_wal()
}

pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
Expand Down Expand Up @@ -533,20 +582,32 @@ impl LsmStorageInner {
};

let memtable_id = self.next_sst_id();
let new_memtable = Arc::new(MemTable::create(memtable_id));
let new_memtable = {
if self.options.enable_wal {
let wal_path = self.path_of_wal(memtable_id);
Arc::new(MemTable::create_with_wal(memtable_id, wal_path)?)
} else {
Arc::new(MemTable::create(memtable_id))
}
};

let mut state_guard = self.state.write();
let state = Arc::make_mut(&mut state_guard);
state.imm_memtables.insert(0, current_memtable);
state.memtable = new_memtable;

if let Some(manifest) = self.manifest.as_ref() {
manifest.add_record(
&_state_lock_observer,
ManifestRecord::NewMemtable(memtable_id),
)?;
} else {
eprintln!("no manifest handle found while freezing memtable");
if self.options.enable_wal {
self.manifest
.as_ref()
.map(|manifest| {
manifest.add_record(
&_state_lock_observer,
ManifestRecord::NewMemtable(memtable_id),
)
})
.ok_or_else(|| {
anyhow::anyhow!("no manifest handle found while freezing memtable")
})??;
}
self.sync_dir()?;
Ok(())
Expand Down
37 changes: 29 additions & 8 deletions mini-lsm-starter/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ impl fmt::Debug for MemTable {

write!(
f,
"Key range: {} -> {} | Value range: {} -> {}",
first_key, last_key, trimmed_first_value, trimmed_last_value
"memtable id: {}, key range: {} -> {} | Value range: {} -> {}",
self.id(),
first_key,
last_key,
trimmed_first_value,
trimmed_last_value
)
}
_ => {
Expand All @@ -66,23 +70,36 @@ pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {

impl MemTable {
/// Create a new mem-table.
pub fn create(_id: usize) -> Self {
pub fn create(id: usize) -> Self {
MemTable {
map: Arc::new(SkipMap::new()),
wal: Option::None,
id: _id,
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
}
}

/// Create a new mem-table with WAL
pub fn create_with_wal(_id: usize, _path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
pub fn create_with_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
let wal = Wal::create(path)?;
Ok(MemTable {
map: Arc::new(SkipMap::new()),
wal: Some(wal),
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
})
}

/// Create a memtable from WAL
pub fn recover_from_wal(_id: usize, _path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
pub fn recover_from_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
let skiplist: SkipMap<Bytes, Bytes> = SkipMap::new();
let wal = Wal::recover(path, &skiplist)?;
Ok(MemTable {
map: Arc::new(skiplist),
wal: Some(wal),
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
})
}

pub fn for_testing_put_slice(&self, key: &[u8], value: &[u8]) -> Result<()> {
Expand Down Expand Up @@ -123,6 +140,10 @@ impl MemTable {
let new_size = current_size + key_value_size;
self.approximate_size
.store(new_size, std::sync::atomic::Ordering::Relaxed);
if let Some(wal) = self.wal.as_ref() {
wal.put(key, value)?;
}
self.sync_wal()?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-starter/src/tests/week2_day6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn test_integration(compaction_options: CompactionOptions) {
storage.dump_structure();
drop(storage);
dump_files_in_dir(&dir);

// loop {}
let storage = MiniLsm::open(&dir, options).unwrap();
assert_eq!(&storage.get(b"0").unwrap().unwrap()[..], b"v20".as_slice());
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"v20".as_slice());
Expand Down
15 changes: 6 additions & 9 deletions mini-lsm-starter/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ impl Wal {
let mut reader = BufReader::new(&file);
loop {
let mut key_length_buf = [0u8; 8];
if let Err(e) = reader.read_exact(&mut key_length_buf) {
println!(
"encountered error while reading wal. maybe reached eof {}",
e
);
break;
};
match reader.read_exact(&mut key_length_buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => eprintln!("encountered error while reading wal: {}", e),
}
let key_length = u64::from_be_bytes(key_length_buf);

let mut key = vec![0u8; key_length as usize];
Expand Down Expand Up @@ -87,8 +85,7 @@ impl Wal {
.map_err(|e| anyhow::anyhow!("failed to flush wal from program buffer: {}", e))?;
file.get_mut()
.sync_all()
.map_err(|e| anyhow::anyhow!("failed to flush wal from os buffer: {}", e))?;
unimplemented!()
.map_err(|e| anyhow::anyhow!("failed to flush wal from os buffer: {}", e))
}
}

Expand Down

0 comments on commit d5df57f

Please sign in to comment.