Skip to content

Commit

Permalink
bitbox sync sync
Browse files Browse the repository at this point in the history
  • Loading branch information
pepyakin committed Aug 24, 2024
1 parent 95a36d2 commit 53e2db0
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 506 deletions.
142 changes: 26 additions & 116 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parking_lot::{ArcRwLockReadGuard, RwLock};
use std::{
collections::{HashMap, HashSet},
fs::File,
os::fd::{AsRawFd, RawFd},
os::fd::RawFd,
sync::Arc,
};

Expand All @@ -13,13 +13,10 @@ use crate::{
page_cache::PageDiff,
};

use self::{
meta_map::MetaMap,
store::Store,
wal::{Batch as WalBatch, ConsistencyError, Entry as WalEntry, WalWriter},
};
use self::{meta_map::MetaMap, store::Store};

pub use self::store::create;
pub use wal::WalBlobBuilder;

mod meta_map;
mod store;
Expand All @@ -35,8 +32,6 @@ pub struct DB {

pub struct Shared {
store: Store,
// TODO: probably RwLock can be avoided being used only during commit
wal: RwLock<WalWriter>,
meta_map: Arc<RwLock<MetaMap>>,
io_handle: IoHandle,
}
Expand All @@ -60,46 +55,12 @@ impl DB {
}
};

// Open the WAL, check its integrity and make sure the store is consistent with it
let wal = wal::WalChecker::open_and_recover(wal_fd);
let _pending_batch = match wal.check_consistency(sync_seqn) {
Ok(()) => {
println!("Wal and Store are consistent, last sequence number: {sync_seqn}");
None
}
Err(ConsistencyError::LastBatchCrashed(crashed_batch)) => {
println!(
"Wal and Store are not consistent, pending sequence number: {}",
sync_seqn + 1
);
Some(crashed_batch)
}
Err(ConsistencyError::NotConsistent(wal_seqn)) => {
// This is useful for testing. If the WAL sequence number is zero, it means the WAL is empty.
// For example, it could have been deleted, and it's okay to continue working on the store
// by appending new batches to the new WAL
if wal_seqn == 0 {
None
} else {
panic!(
"Store and Wal have two inconsistent serial numbers. wal: {wal_seqn}, store: {sync_seqn}"
);
}
}
};

// Create a WalWriter, able to append new batch and prune older ones
let wal = match wal::WalWriter::open(wal_fd) {
Ok(x) => x,
Err(e) => {
anyhow::bail!("encountered error in opening wal: {e:?}")
}
};
// TODO: implement WAL recovery.
let _ = (sync_seqn, wal_fd);

Ok(Self {
shared: Arc::new(Shared {
store,
wal: RwLock::new(wal),
meta_map: Arc::new(RwLock::new(meta_map)),
io_handle: io_pool.make_handle(),
}),
Expand All @@ -115,13 +76,11 @@ impl DB {
}
}

// TODO: update with async sync apporach
pub fn sync_begin(
pub fn prepare_sync(
&self,
changes: Vec<(PageId, BucketIndex, Option<(Vec<u8>, PageDiff)>)>,
sync_seqn: u32,
ht_fd: &std::fs::File,
) -> anyhow::Result<u64> {
wal_blob_builder: &mut WalBlobBuilder,
) -> anyhow::Result<WriteoutData> {
// Steps are:
// 0. Increase sequence number
// 1. compute the WalBatch
Expand All @@ -132,13 +91,10 @@ impl DB {
// 7. write meta page and fsync
// 8. prune the wal

let mut wal = self.shared.wal.write();
let mut meta_map = self.shared.meta_map.write();

let mut changed_meta_pages = HashSet::new();
let next_sequence_number = sync_seqn as u64;
let mut wal_batch = WalBatch::new(next_sequence_number);
let mut bucket_writes = Vec::new();
let mut ht_pages = Vec::new();

for (page_id, BucketIndex(bucket), page_info) in changes {
// let's extract its bucket
Expand All @@ -158,85 +114,39 @@ impl DB {
changed_meta_pages.insert(meta_map.page_index(bucket as usize));
}

// fill the WalBatch and the pages that need to be written to disk
wal_batch.append_entry(WalEntry::Update {
page_id: page_id.encode(),
changed: get_changed(&page, &page_diff),
page_diff: page_diff.get_raw(),
bucket_index: bucket as u64,
});
let changed = get_changed(&page, &page_diff);
wal_blob_builder.write_update(
page_id.encode(),
page_diff.get_raw(),
changed,
bucket,
);

bucket_writes.push((bucket, page));
let pn = self.shared.store.data_page_index(bucket);
ht_pages.push((pn, page));
}
None => {
meta_map.set_tombstone(bucket as usize);
changed_meta_pages.insert(meta_map.page_index(bucket as usize));

wal_batch.append_entry(WalEntry::Clear {
bucket_index: bucket as u64,
});
wal_blob_builder.write_clear(bucket);
}
};
}

let prev_wal_size = wal.file_size();
wal_batch.data().len();
wal.apply_batch(&wal_batch).unwrap();

let mut submitted: u32 = 0;
let mut completed: u32 = 0;

// Issue all bucket writes
for (bucket, page) in bucket_writes {
let command = IoCommand {
kind: IoKind::Write(
ht_fd.as_raw_fd(),
self.shared.store.data_page_index(bucket),
page,
),
user_data: 0, // unimportant.
};
// TODO: handle error
self.shared.io_handle.send(command).unwrap();
submitted += 1;
}

// apply changed meta pages
for changed_meta_page in changed_meta_pages {
let mut buf = Box::new(Page::zeroed());
buf[..].copy_from_slice(meta_map.page_slice(changed_meta_page));
let command = IoCommand {
kind: IoKind::Write(
ht_fd.as_raw_fd(),
self.shared.store.meta_bytes_index(changed_meta_page as u64),
buf,
),
user_data: 0, // unimportant
};
submitted += 1;
// TODO: handle error
self.shared.io_handle.send(command).unwrap();
}

// wait for all writes command to be finished
while completed < submitted {
let completion = self.shared.io_handle.recv().expect("I/O worker dropped");
assert!(completion.result.is_ok());
completed += 1;
let pn = self.shared.store.meta_bytes_index(changed_meta_page as u64);
ht_pages.push((pn, buf));
}

// sync all writes
ht_fd.sync_all().expect("ht file: error performing fsync");

Ok(prev_wal_size)
Ok(WriteoutData { ht_pages })
}
}

pub fn sync_end(&self, prev_wal_size: u64) -> anyhow::Result<()> {
// clear the WAL.
let wal = self.shared.wal.write();
wal.prune_front(prev_wal_size);
Ok(())
}
pub struct WriteoutData {
/// The pages to write out to the ht file.
pub ht_pages: Vec<(u64, Box<Page>)>,
}

/// A utility for loading pages from bitbox.
Expand Down
Loading

0 comments on commit 53e2db0

Please sign in to comment.