Skip to content

Commit

Permalink
initial horrific version of merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ckampfe committed May 23, 2024
1 parent 9885cc6 commit 5bff041
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 59 deletions.
246 changes: 212 additions & 34 deletions src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ where
tx_id: u128,
}

// public impls
impl<K> Base<K>
where
K: Eq + Hash + Serialize + DeserializeOwned + Send,
{
pub(crate) async fn new(db_directory: &Path, options: Options) -> crate::Result<Self> {
let mut db_file_ids = Self::db_file_ids(db_directory).await?;
let mut db_file_ids = Self::all_db_file_ids(db_directory).await?;

db_file_ids.sort();

let latest_file_id = db_file_ids.last().map(|id| id + 1).unwrap_or(0);
let latest_file_id = db_file_ids.last().unwrap_or(&0);

let active_file_id = latest_file_id + 1;

let all_entries_with_livenesses: HashMap<K, EntryWithLiveness> =
<EntryWithLiveness as Loadable<K>>::load_latest_entries(db_directory, db_file_ids)
<EntryWithLiveness as Loadable<K>>::load_latest_entries(db_directory, &db_file_ids)
.await?;

let all_entries = all_entries_with_livenesses
Expand Down Expand Up @@ -132,41 +133,194 @@ where
self.keydir.keys()
}

/// # invariants
///
/// ### affected data files
/// - only inactive data files may be merged.
/// - the active data file SHALL NOT be changed at all.
/// - opening a new database ALWAYS opens a new active file, no matter what
///
/// ### latest data
/// - for a given key `K`, only the record with the latest TXID
/// may remain in the dataset.
/// - if the record for a given key `K` with the latest TXID is a delete,
/// it SHALL NOT appear in the inactive data files
///
/// ### cannot add data
/// - the total size of all data on disk AFTER merge
/// SHALL be less than or equal to the total size of all data
/// on disk BEFORE merge.
///
/// ### no dangling files
/// - all database files SHALL have a size > 0 bytes.
pub(crate) async fn merge(&mut self) -> crate::Result<()> {
let all_db_files = Self::db_file_ids(&self.db_directory).await?;

let inactive_db_files: Vec<_> = all_db_files
.into_iter()
.filter(|file_id| *file_id != self.active_file_id)
.collect();
// TODO
// take current keydir into account when determining whether to keep
// a given merge record.
// when writing a merge record, only write it if it does not appear in the
// active file
let mut inactive_db_files = self.inactive_db_file_ids().await?;

let merge_pointers = <MergePointer as Loadable<K>>::load_latest_entries(
&self.db_directory,
inactive_db_files,
&inactive_db_files,
)
.await?;

let live_merge_pointers: Vec<_> = merge_pointers
let live_merge_pointers = merge_pointers
.into_iter()
.filter(|(key, merge_pointer)| merge_pointer.liveness == Liveness::Live)
.collect();
.filter(|(_key, merge_pointer)| merge_pointer.liveness == Liveness::Live);

// let mut current_write_file = tokio::io::BufWriter::new(write_file);
let mut current_write_file: Option<tokio::io::BufWriter<tokio::fs::File>> = None;

let mut offset = 0;

for (key, merge_pointer) in live_merge_pointers {
//
if let Some(entry) = self.keydir.get(&key) {
// IFF the entry has been written to the active file,
// do not process it at all, as its latest version
// is later than anything seen by the merge process,
// as the merge process only touches inactive files
if entry.tx_id > merge_pointer.tx_id {
continue;
}
}

let mut current_write_file_id = inactive_db_files.pop().unwrap();

let mut current_file_path = self.db_directory.to_owned();
let mut current_file_name = current_write_file_id.to_string();
current_file_name.push_str(".merge");
current_file_path.push(current_file_name);

// db directory
// merge pointers
// max file size in bytes
// nonactive_file_ids
if let Some(write_file) = current_write_file.as_mut() {
if offset > self.options.max_file_size_bytes {
write_file.flush().await?;

current_write_file_id = inactive_db_files.pop().unwrap();
// current_file_path = self.db_directory.to_owned();
self.db_directory.clone_into(&mut current_file_path);
current_file_name = current_write_file_id.to_string();
current_file_name.push_str(".merge");
current_file_path.push(current_file_name);

let write_file = tokio::fs::File::options()
.append(true)
.create_new(true)
.open(&current_file_path)
.await?;

current_write_file = Some(tokio::io::BufWriter::new(write_file));
}
} else {
let write_file = tokio::fs::File::options()
.append(true)
.create_new(true)
.open(&current_file_path)
.await?;

current_write_file = Some(tokio::io::BufWriter::new(write_file));
}

let mut reader_path = self.db_directory.to_owned();

reader_path.push(merge_pointer.file_id.to_string());

let read_file = tokio::fs::File::open(reader_path).await?;

let mut read_file = tokio::io::BufReader::new(read_file);

read_file
.seek(std::io::SeekFrom::Start(merge_pointer.record_offset))
.await?;

let mut take_handle = read_file.take(merge_pointer.record_size);

let bytes_read = {
// DUMB, BAD
let mut writer = current_write_file.unwrap();

let bytes_read = tokio::io::copy_buf(&mut take_handle, &mut writer).await?;

// DUMB, BAD
current_write_file = Some(writer);

bytes_read
};

assert!(bytes_read == merge_pointer.record_size);

let value_position =
offset + crate::record::Record::HEADER_SIZE as u64 + merge_pointer.key_size as u64;

offset += merge_pointer.record_size;

let new_entry = EntryPointer {
file_id: current_write_file_id,
value_position,
value_size: merge_pointer.value_size,
tx_id: merge_pointer.tx_id,
};

self.keydir.insert(key, new_entry);
}

if let Some(mut write_file) = current_write_file {
write_file.flush().await?;
}

// rm all inactive db files
for file_id in self.inactive_db_file_ids().await? {
let mut filename = self.db_directory.clone();
filename.push(file_id.to_string());
tokio::fs::remove_file(filename).await?;
}

// rename all .merge files
for merge_path in self.merge_files().await? {
if tokio::fs::metadata(&merge_path).await?.len() > 0 {
let normal_filename = merge_path.file_stem().unwrap();
let mut normal_path = self.db_directory.clone();
normal_path.push(normal_filename);
tokio::fs::rename(&merge_path, normal_path).await?;
} else {
tokio::fs::remove_file(merge_path).await?;
}
}

Ok(())
}

pub(crate) async fn flush(&mut self) -> crate::Result<()> {
self.active_file.flush().await.map_err(|e| e.into())
}
}

async fn write_delete(&mut self, k: K) -> crate::Result<()> {
// private impls
impl<K> Base<K>
where
K: Eq + Hash + Serialize + DeserializeOwned + Send,
{
// TODO investigate whether we can collapse write_delete and write_insert
async fn write_insert<V: Serialize + DeserializeOwned + Send>(
&mut self,
k: K,
v: V,
) -> crate::Result<()> {
self.tx_id += 1;
let encoded_tx_id = self.tx_id.to_be_bytes();
let encoded_key = bincode::serialize(&k).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let encoded_value = bincode::serialize(&Tombstone).map_err(|e| error::SerializeError {
let encoded_value = bincode::serialize(&v).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;
Expand All @@ -191,7 +345,17 @@ where
self.active_file.write_all(hash).await?;
self.active_file.write_all(&payload).await?;

self.keydir.remove(&k);
let value_position =
self.offset + crate::record::Record::HEADER_SIZE as u64 + key_size as u64;

let entry = EntryPointer {
file_id: self.active_file_id,
value_position,
value_size: value_size.try_into().unwrap(),
tx_id: self.tx_id,
};

self.keydir.insert(k, entry);

let entry_size = crate::record::Record::HEADER_SIZE + key_size + value_size;

Expand Down Expand Up @@ -223,19 +387,15 @@ where
}
}

async fn write_insert<V: Serialize + DeserializeOwned + Send>(
&mut self,
k: K,
v: V,
) -> crate::Result<()> {
async fn write_delete(&mut self, k: K) -> crate::Result<()> {
self.tx_id += 1;
let encoded_tx_id = self.tx_id.to_be_bytes();
let encoded_key = bincode::serialize(&k).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;

let encoded_value = bincode::serialize(&v).map_err(|e| error::SerializeError {
let encoded_value = bincode::serialize(&Tombstone).map_err(|e| error::SerializeError {
msg: "unable to serialize to bincode".to_string(),
source: e,
})?;
Expand All @@ -260,17 +420,7 @@ where
self.active_file.write_all(hash).await?;
self.active_file.write_all(&payload).await?;

let value_position =
self.offset + crate::record::Record::HEADER_SIZE as u64 + key_size as u64;

let entry = EntryPointer {
file_id: self.active_file_id,
value_position,
value_size: value_size.try_into().unwrap(),
tx_id: self.tx_id,
};

self.keydir.insert(k, entry);
self.keydir.remove(&k);

let entry_size = crate::record::Record::HEADER_SIZE + key_size + value_size;

Expand Down Expand Up @@ -302,7 +452,7 @@ where
}
}

async fn db_file_ids(db_directory: &Path) -> crate::Result<Vec<u64>> {
async fn all_db_file_ids(db_directory: &Path) -> crate::Result<Vec<u64>> {
let mut file_ids = vec![];

let mut dir_reader = tokio::fs::read_dir(db_directory).await?;
Expand All @@ -321,6 +471,34 @@ where

Ok(file_ids)
}

async fn inactive_db_file_ids(&self) -> crate::Result<Vec<u64>> {
let mut db_file_ids = Self::all_db_file_ids(&self.db_directory).await?;

db_file_ids.retain(|file_id| *file_id != self.active_file_id);

Ok(db_file_ids)
}

async fn merge_files(&self) -> crate::Result<Vec<PathBuf>> {
let mut file_ids = vec![];

let mut dir_reader = tokio::fs::read_dir(&self.db_directory).await?;

while let Some(dir_entry) = dir_reader.next_entry().await? {
if dir_entry.file_type().await?.is_file() {
let path = dir_entry.path();

if let Some(extension) = path.extension() {
if extension == "merge" {
file_ids.push(path);
}
}
}
}

Ok(file_ids)
}
}

impl<K: Eq + Hash + Serialize + DeserializeOwned + Send> Drop for Base<K> {
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::num::ParseIntError;
use thiserror::Error;

#[non_exhaustive]
#[derive(Debug, Error)]
pub enum Error {
#[error("io error occurred")]
Expand Down
Loading

0 comments on commit 5bff041

Please sign in to comment.