diff --git a/crates/curp/src/server/storage/wal/config.rs b/crates/curp/src/server/storage/wal/config.rs index c6e2627b3..70157ce0f 100644 --- a/crates/curp/src/server/storage/wal/config.rs +++ b/crates/curp/src/server/storage/wal/config.rs @@ -5,7 +5,16 @@ const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024; /// The config for WAL #[derive(Debug, Clone)] -pub(crate) struct WALConfig { +pub(crate) enum WALConfig { + /// Persistent implementation + Persistent(PersistentConfig), + /// Mock memory implementation + Memory, +} + +/// The config for persistent WAL +#[derive(Debug, Clone)] +pub(crate) struct PersistentConfig { /// The path of this config pub(super) dir: PathBuf, /// The maximum size of this segment @@ -17,17 +26,28 @@ pub(crate) struct WALConfig { impl WALConfig { /// Creates a new `WALConfig` pub(crate) fn new(dir: impl AsRef) -> Self { - Self { + Self::Persistent(PersistentConfig { dir: dir.as_ref().into(), max_segment_size: DEFAULT_SEGMENT_SIZE, - } + }) + } + + /// Creates a new memory `WALConfig` + pub(crate) fn new_memory() -> Self { + Self::Memory } /// Sets the `max_segment_size` pub(crate) fn with_max_segment_size(self, size: u64) -> Self { - Self { - dir: self.dir, - max_segment_size: size, + match self { + Self::Persistent(PersistentConfig { + dir, + max_segment_size, + }) => Self::Persistent(PersistentConfig { + dir, + max_segment_size: size, + }), + Self::Memory => Self::Memory, } } } diff --git a/crates/curp/src/server/storage/wal/fs.rs b/crates/curp/src/server/storage/wal/fs.rs new file mode 100644 index 000000000..a711bef5e --- /dev/null +++ b/crates/curp/src/server/storage/wal/fs.rs @@ -0,0 +1,5 @@ +#[cfg(not(madsim))] +pub(super) use std::fs::*; + +#[cfg(madsim)] +pub(super) use super::mock::fs::*; diff --git a/crates/curp/src/server/storage/wal/mock/fs.rs b/crates/curp/src/server/storage/wal/mock/fs.rs new file mode 100644 index 000000000..02d433838 --- /dev/null +++ b/crates/curp/src/server/storage/wal/mock/fs.rs @@ -0,0 +1,542 @@ +#![allow( + clippy::unnecessary_wraps, + unreachable_pub, + clippy::arithmetic_side_effects, + clippy::indexing_slicing +)] + +use std::{ + collections::HashMap, + ffi::OsString, + io::{self, Read, Write}, + path::{Path, PathBuf}, + sync::{Arc, OnceLock}, +}; + +use parking_lot::Mutex; + +/// The mock file system +static FS: OnceLock> = OnceLock::new(); + +/// Performs an operation on the global `Fs` instance +fn with_fs(op: F) -> R +where + F: FnOnce(&mut Fs) -> R, +{ + let fs = FS.get_or_init(|| Mutex::new(Fs::default())); + op(&mut fs.lock()) +} + +/// The mock file system +/// +/// TODO: This currently doesn't create a tree structure for directories. +/// Future improvements could address this. +#[derive(Default)] +struct Fs { + /// All files + entries: HashMap, +} + +impl Fs { + /// Creates a file + fn create_file(&mut self, path: PathBuf, file: File) -> io::Result<()> { + self.parent_should_exist(&path)?; + #[allow(clippy::pattern_type_mismatch)] + if self.entries.get(&path).is_some_and(File::is_dir) { + return Err(io::Error::from(io::ErrorKind::AlreadyExists)); + } + let _ignore = self.entries.insert(path, file); + + Ok(()) + } + + /// Opens a file + fn open_file(&self, path: &PathBuf) -> io::Result { + self.parent_should_exist(path)?; + self.entries + .get(path) + .map(File::clone) + .ok_or(io::Error::from(io::ErrorKind::NotFound)) + } + + /// Checks if the parent directory exists + fn parent_should_exist(&self, path: &Path) -> io::Result<()> { + let err = io::Error::from(io::ErrorKind::NotFound); + let Some(parent) = path.parent() else { + return Err(err); + }; + self.entries.contains_key(parent).then_some(()).ok_or(err) + } +} + +/// An object providing access to an open file on the filesystem. +#[derive(Debug)] +pub struct File { + /// Inner + inner: Arc>, + /// File offset + offset: usize, + /// The file type + ftype: Ftype, +} + +/// File data +#[derive(Debug, Default)] +struct Inner { + /// Buffered data + buffer: Vec, + /// Synced data + synced: Vec, + /// Metadata of the file + metadata: Metadata, + /// Indicate the file is locked + is_locked: bool, +} + +/// The type of the file +#[derive(Clone, Copy, Debug)] +enum Ftype { + /// A file + File, + /// A directory + Dir, +} + +impl File { + /// Attempts to open a file in read-only mode. + pub fn open>(path: P) -> io::Result { + open_file(path, false) + } + + /// Opens a file in write-only mode. + pub fn create>(path: P) -> io::Result { + open_file(path, true) + } + + /// Attempts to sync all OS-internal metadata to disk. + pub fn sync_all(&self) -> io::Result<()> { + self.sync_data() + } + + /// This function is similar to [`sync_all`], except that it might not + /// synchronize file metadata to the filesystem. + pub fn sync_data(&self) -> io::Result<()> { + let mut inner = self.inner.lock(); + let buffer = inner.buffer.drain(..).collect::>(); + inner.synced.extend(buffer); + Ok(()) + } + + /// Queries metadata about the underlying file. + pub fn metadata(&self) -> io::Result { + Ok(self.inner.lock().metadata.clone()) + } + + /// Creates a new file + fn new_file() -> Self { + Self { + inner: Arc::default(), + offset: 0, + ftype: Ftype::File, + } + } + + /// Creates a new directory + fn new_dir() -> Self { + Self { + inner: Arc::default(), + offset: 0, + ftype: Ftype::Dir, + } + } + + /// Clone the file pointer + fn clone(&self) -> Self { + File { + inner: Arc::clone(&self.inner), + offset: 0, + ftype: self.ftype, + } + } + + /// Returns the type of this file + fn is_file(&self) -> bool { + matches!(self.ftype, Ftype::File) + } + + /// Returns the type of this file + fn is_dir(&self) -> bool { + matches!(self.ftype, Ftype::Dir) + } + + /// Returns the type of this file + fn ftype(&self) -> Ftype { + self.ftype + } +} + +impl Drop for File { + fn drop(&mut self) { + self.inner.lock().is_locked = false; + } +} + +impl Read for File { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let inner = self.inner.lock(); + let total = inner.synced.len() + inner.buffer.len(); + if self.offset == total { + return Ok(0); + } + let to_copy = (total - self.offset).min(buf.len()); + let tmp: Vec<_> = inner + .synced + .iter() + .chain(inner.buffer.iter()) + .skip(self.offset) + .take(to_copy) + .copied() + .collect(); + buf[..to_copy].copy_from_slice(&tmp); + self.offset += to_copy; + Ok(to_copy) + } +} + +impl Write for File { + #[allow(clippy::as_conversions)] + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut inner = self.inner.lock(); + inner.buffer.extend_from_slice(buf); + self.offset += buf.len(); + if self.offset as u64 > inner.metadata.len { + inner.metadata.len = self.offset as u64; + } + + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[allow(clippy::unimplemented)] +impl fs2::FileExt for File { + fn duplicate(&self) -> io::Result { + unimplemented!(); + } + + fn allocated_size(&self) -> io::Result { + unimplemented!(); + } + + #[allow(clippy::cast_possible_truncation, clippy::as_conversions)] + fn allocate(&self, len: u64) -> io::Result<()> { + let mut inner = self.inner.lock(); + inner.metadata.len += len; + + Ok(()) + } + + fn lock_shared(&self) -> io::Result<()> { + unimplemented!(); + } + + fn lock_exclusive(&self) -> io::Result<()> { + unimplemented!(); + } + + fn try_lock_shared(&self) -> io::Result<()> { + unimplemented!(); + } + + fn try_lock_exclusive(&self) -> io::Result<()> { + let mut inner = self.inner.lock(); + if inner.is_locked { + return Err(io::Error::from(io::ErrorKind::PermissionDenied)); + } + inner.is_locked = true; + Ok(()) + } + + fn unlock(&self) -> io::Result<()> { + unimplemented!(); + } +} + +/// Iterator over the entries in a directory. +pub struct ReadDir { + /// Entries + entries: Box>, +} + +impl Iterator for ReadDir { + type Item = io::Result; + + fn next(&mut self) -> Option { + self.entries.next().map(DirEntry).map(Ok) + } +} + +/// Entries returned by the [`ReadDir`] iterator. +pub struct DirEntry(PathBuf); + +impl DirEntry { + /// Returns the full path to the file that this entry represents. + #[must_use] + pub fn path(&self) -> PathBuf { + self.0.clone() + } + + /// Returns the file name of this directory entry without any + /// leading path component(s). + #[must_use] + pub fn file_name(&self) -> OsString { + #[allow(clippy::unwrap_used)] + self.0.file_name().unwrap().to_os_string() + } +} + +/// Metadata information about a file. +#[derive(Debug, Default, Clone)] +pub struct Metadata { + /// The length of the file + len: u64, +} + +impl Metadata { + /// Returns the size of the file, in bytes, this metadata is for. + #[must_use] + pub fn len(&self) -> u64 { + self.len + } +} + +/// Options and flags which can be used to configure how a file is opened. +#[derive(Clone, Debug)] +pub struct OpenOptions { + /// Open mode + mode: u8, +} + +impl OpenOptions { + /// Creates a blank new set of options ready for configuration. + #[must_use] + pub fn new() -> Self { + OpenOptions { mode: 0 } + } + + /// Sets the option for read access. + pub fn read(&mut self, read: bool) -> &mut Self { + self.mode |= 1; + self + } + + /// Sets the option for write access. + pub fn write(&mut self, write: bool) -> &mut Self { + self.mode |= 1 << 1; + self + } + + /// Sets the option to create a new file, or open it if it already exists. + pub fn create(&mut self, create: bool) -> &mut Self { + self.mode |= 1 << 2; + self + } + + /// Opens a file at `path` with the options specified by `self`. + pub fn open>(&self, path: P) -> io::Result { + open_file(path, (self.mode >> 2 & 1) != 0) + } +} + +/// Recursively create a directory and all of its parent components if they +/// are missing. +pub fn create_dir_all>(path: P) -> io::Result<()> { + let mut path = path.as_ref().to_path_buf(); + assert!(path.is_absolute(), "relative path not supported yet"); + with_fs(|fs| { + while { + let _ignore = fs.entries.entry(path.clone()).or_insert_with(File::new_dir); + path.pop() + } {} + }); + Ok(()) +} + +/// Rename a file or directory to a new name, replacing the original file if +/// `to` already exists. +pub fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> { + let from = from.as_ref().to_path_buf(); + let to = to.as_ref().to_path_buf(); + with_fs(|fs| { + let f = fs + .entries + .remove(&from) + .ok_or(io::Error::from(io::ErrorKind::NotFound))?; + let _ignore = fs.entries.insert(to, f); + Ok(()) + }) +} + +/// Removes a file from the filesystem. +pub fn remove_file>(path: P) -> io::Result<()> { + let path = path.as_ref().to_path_buf(); + with_fs(|fs| fs.entries.remove(&path).map(|_| ())) + .ok_or(io::Error::from(io::ErrorKind::NotFound)) +} + +/// Returns an iterator over the entries within a directory. +pub fn read_dir>(path: P) -> io::Result { + let entries = path.as_ref(); + let entries = with_fs(|fs| { + fs.entries + .keys() + .filter(|p| p.parent().is_some_and(|pp| pp == entries)) + .map(PathBuf::clone) + .collect::>() + }); + Ok(ReadDir { + entries: Box::new(entries.into_iter()), + }) +} + +/// Given a path, query the file system to get information about a file, +/// directory, etc. +pub fn metadata>(path: P) -> io::Result { + let path = path.as_ref().to_path_buf(); + with_fs(|fs| fs.entries.get(&path).map(File::metadata)) + .ok_or(io::Error::from(io::ErrorKind::NotFound))? +} + +/// Adds a new file to the fs +fn open_file(path: impl AsRef, create: bool) -> io::Result { + let path = path.as_ref().to_path_buf(); + with_fs(|fs| { + if create { + if let Ok(file) = fs.open_file(&path) { + return Ok(file); + } + let file = File::new_file(); + fs.create_file(path, file.clone())?; + Ok(file) + } else { + fs.open_file(&path) + } + }) +} + +#[cfg(test)] +mod test { + use fs2::FileExt; + + use super::*; + use std::fs; + + #[test] + #[allow(clippy::verbose_file_reads)] + fn file_read_write_is_ok() { + create_dir_all("/tmp").unwrap(); + let path = PathBuf::from("/tmp/test_file_read_write_is_ok"); + let mut mock_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .unwrap(); + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .unwrap(); + mock_file.write_all(&[1, 2, 3]).unwrap(); + file.write_all(&[1, 2, 3]).unwrap(); + + assert_eq!(file.read_to_end(&mut Vec::new()).unwrap(), 0); + assert_eq!(mock_file.read_to_end(&mut Vec::new()).unwrap(), 0); + + let mut mock_file = OpenOptions::new().read(true).open(&path).unwrap(); + let mut file = fs::OpenOptions::new().read(true).open(&path).unwrap(); + let mut buf0 = Vec::new(); + let mut buf1 = Vec::new(); + file.read_to_end(&mut buf0).unwrap(); + mock_file.read_to_end(&mut buf1).unwrap(); + assert_eq!(buf0, buf1); + + assert_eq!( + metadata(&path).unwrap().len(), + fs::metadata(&path).unwrap().len() + ); + + fs::remove_file(&path).unwrap(); + } + + #[test] + fn read_dir_is_ok() { + create_dir_all("/a/b/c").unwrap(); + File::create("/a/b/1").unwrap(); + File::create("/a/b/2").unwrap(); + let names: Vec<_> = read_dir("/a/b") + .unwrap() + .into_iter() + .flatten() + .map(|d| d.file_name().into_string().unwrap()) + .collect(); + assert_eq!(names.len(), 3); + names.contains(&"c".to_string()); + names.contains(&"1".to_string()); + names.contains(&"2".to_string()); + } + + #[test] + fn dir_rename_is_ok() { + create_dir_all("/a/b/c").unwrap(); + File::create("/a/b/1").unwrap(); + rename("/a/b/c", "/a/b/d").unwrap(); + rename("/a/b/1", "/a/b/3").unwrap(); + + let names: Vec<_> = read_dir("/a/b") + .unwrap() + .into_iter() + .flatten() + .map(|d| d.file_name().into_string().unwrap()) + .collect(); + assert_eq!(names.len(), 2); + names.contains(&"d".to_string()); + names.contains(&"3".to_string()); + } + + #[test] + fn detect_file_exist_is_ok() { + assert!(metadata("/a/b").is_err()); + } + + #[test] + fn open_files_only_when_parent_dir_exists() { + assert!(File::create("/a/1").is_err()); + assert!(File::open("/a/1").is_err()); + assert!(fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open("/a/1") + .is_err()); + } + + #[test] + fn file_size_is_corrent() { + create_dir_all("/a/b").unwrap(); + let mut file = File::create("/a/b/1").unwrap(); + assert_eq!(file.metadata().unwrap().len(), 0); + file.allocate(128); + assert_eq!(file.metadata().unwrap().len(), 128); + file.write_all(&[0; 256]).unwrap(); + assert_eq!(file.metadata().unwrap().len(), 256); + file.sync_all(); + assert_eq!(file.metadata().unwrap().len(), 256); + file.allocate(128); + assert_eq!(file.metadata().unwrap().len(), 384); + } +} diff --git a/crates/curp/src/server/storage/wal/mock/mod.rs b/crates/curp/src/server/storage/wal/mock/mod.rs new file mode 100644 index 000000000..74cbb5b33 --- /dev/null +++ b/crates/curp/src/server/storage/wal/mock/mod.rs @@ -0,0 +1,64 @@ +/// Mock fs implementations +pub(super) mod fs; + +use std::{collections::VecDeque, io, marker::PhantomData}; + +use curp_external_api::LogIndex; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::log_entry::LogEntry; + +use super::{codec::DataFrame, config::WALConfig, WALStorageOps}; + +/// The mock WAL storage +#[derive(Debug)] +pub(crate) struct WALStorage { + /// Storage + entries: VecDeque>, +} + +impl WALStorage { + /// Creates a new mock `WALStorage` + pub(super) fn new() -> WALStorage { + Self { + entries: VecDeque::new(), + } + } +} + +impl WALStorageOps for WALStorage +where + C: Clone, +{ + fn recover(&mut self) -> io::Result>> { + Ok(self.entries.clone().into_iter().collect()) + } + + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + for frame in item { + if let DataFrame::Entry(entry) = frame { + self.entries.push_back(entry.clone()); + } + } + + Ok(()) + } + + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + while self + .entries + .front() + .is_some_and(|e| e.index <= compact_index) + { + let _ignore = self.entries.pop_front(); + } + Ok(()) + } + + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + while self.entries.back().is_some_and(|e| e.index > max_index) { + let _ignore = self.entries.pop_back(); + } + Ok(()) + } +} diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index fb86b4410..a2b91692e 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -32,269 +32,92 @@ mod util; /// Framed mod framed; -use std::{io, marker::PhantomData, ops::Mul}; +/// Mock WAL storage +mod mock; -use clippy_utilities::OverflowArithmetic; +/// WAL storage +mod storage; + +/// Fs components +mod fs; + +use std::{io, marker::PhantomData}; + +use codec::DataFrame; +use config::WALConfig; use curp_external_api::LogIndex; -use futures::{future::join_all, Future, SinkExt, StreamExt}; -use itertools::Itertools; use serde::{de::DeserializeOwned, Serialize}; -use tokio_util::codec::Framed; -use tracing::{debug, error, info, warn}; use crate::log_entry::LogEntry; -use self::{ - codec::{DataFrame, DataFrameOwned, WAL}, - config::WALConfig, - error::{CorruptType, WALError}, - pipeline::FilePipeline, - remover::SegmentRemover, - segment::WALSegment, - util::LockedFile, -}; +/// The wal file extension +const WAL_FILE_EXT: &str = ".wal"; -/// The magic of the WAL file -const WAL_MAGIC: u32 = 0xd86e_0be2; +/// Operations of a WAL storage +pub(crate) trait WALStorageOps { + /// Recover from the given directory if there's any segments + fn recover(&mut self) -> io::Result>>; -/// The current WAL version -const WAL_VERSION: u8 = 0x00; + /// Send frames with fsync + fn send_sync(&mut self, item: Vec>) -> io::Result<()>; -/// The wal file extension -const WAL_FILE_EXT: &str = ".wal"; + /// Tuncate all the logs whose index is less than or equal to `compact_index` + /// + /// `compact_index` should be the smallest index required in CURP + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()>; -/// The WAL storage + /// Tuncate all the logs whose index is greater than `max_index` + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()>; +} + +/// The WAL Storage #[derive(Debug)] -pub(super) struct WALStorage { - /// The config of wal files - config: WALConfig, - /// The pipeline that pre-allocates files - pipeline: FilePipeline, - /// WAL segments - segments: Vec, - /// The next segment id - next_segment_id: u64, - /// The next log index - next_log_index: LogIndex, - /// The phantom data - _phantom: PhantomData, +pub(crate) enum WALStorage { + /// Persistent storage + Persistent(storage::WALStorage), + /// Mock memory storage + Memory(mock::WALStorage), } impl WALStorage { - /// Creates a new `LogStorage` - pub(super) fn new(config: WALConfig) -> io::Result> { - if !config.dir.try_exists()? { - std::fs::create_dir_all(&config.dir); - } - let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size); - Ok(Self { - config, - pipeline, - segments: vec![], - next_segment_id: 0, - next_log_index: 0, - _phantom: PhantomData, + /// Creates a new `WALStorage` + pub(crate) fn new(config: WALConfig) -> io::Result { + Ok(match config { + WALConfig::Persistent(conf) => Self::Persistent(storage::WALStorage::new(conf)?), + WALConfig::Memory => Self::Memory(mock::WALStorage::new()), }) } } -impl WALStorage +impl WALStorageOps for WALStorage where - C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug, + C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug + Clone, { - /// Recover from the given directory if there's any segments - pub(super) fn recover(&mut self) -> io::Result>> { - /// Number of lines printed around the missing log in debug information - const NUM_LINES_DEBUG: usize = 3; - // We try to recover the removal first - SegmentRemover::recover(&self.config.dir)?; - - let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?; - let lfiles: Vec<_> = file_paths - .into_iter() - .map(LockedFile::open_rw) - .collect::>()?; - - let segment_opening = lfiles - .into_iter() - .map(|f| WALSegment::open(f, self.config.max_segment_size)); - - let mut segments = Self::take_until_io_error(segment_opening)?; - segments.sort_unstable(); - debug!("Recovered segments: {:?}", segments); - - let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs); - - let logs_batches = Self::take_until_io_error(logs_iter)?; - let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect(); - - let pos = Self::highest_valid_pos(&logs[..]); - if pos != logs.len() { - let debug_logs: Vec<_> = logs - .iter() - .skip(pos.overflow_sub(pos.min(NUM_LINES_DEBUG))) - .take(NUM_LINES_DEBUG.mul(2)) - .collect(); - error!( - "WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}", - CorruptType::LogNotContinue - ); - logs.truncate(pos); + fn recover(&mut self) -> io::Result>> { + match *self { + WALStorage::Persistent(ref mut s) => s.recover(), + WALStorage::Memory(ref mut s) => s.recover(), } - - let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1)); - let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1)); - self.next_segment_id = next_segment_id; - self.next_log_index = next_log_index; - self.segments = segments; - - self.open_new_segment()?; - info!("WAL successfully recovered"); - - Ok(logs) } - /// Send frames with fsync - #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy - pub(super) fn send_sync(&mut self, item: Vec>) -> io::Result<()> { - let last_segment = self - .segments - .last_mut() - .unwrap_or_else(|| unreachable!("there should be at least on segment")); - if let Some(DataFrame::Entry(entry)) = item.last() { - self.next_log_index = entry.index.overflow_add(1); - } - last_segment.write_sync(item, WAL::new())?; - - if last_segment.is_full() { - self.open_new_segment()?; + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.send_sync(item), + WALStorage::Memory(ref mut s) => s.send_sync(item), } - - Ok(()) } - /// Truncate all the logs whose index is less than or equal to `compact_index` - /// - /// `compact_index` should be the smallest index required in CURP - pub(super) fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { - if compact_index >= self.next_log_index { - warn!( - "head truncation: compact index too large, compact index: {}, storage next index: {}", - compact_index, self.next_log_index - ); - return Ok(()); + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.truncate_head(compact_index), + WALStorage::Memory(ref mut s) => s.truncate_head(compact_index), } - - debug!("performing head truncation on index: {compact_index}"); - - let mut to_remove_num = self - .segments - .iter() - .take_while(|s| s.base_index() <= compact_index) - .count() - .saturating_sub(1); - - if to_remove_num == 0 { - return Ok(()); - } - - // The last segment does not need to be removed - let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect(); - SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; - - Ok(()) } - /// Truncate all the logs whose index is greater than `max_index` - pub(super) fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { - // segments to truncate - let segments: Vec<_> = self - .segments - .iter_mut() - .rev() - .take_while_inclusive::<_>(|s| s.base_index() > max_index) - .collect(); - - for segment in segments { - segment.seal::(max_index)?; + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.truncate_tail(max_index), + WALStorage::Memory(ref mut s) => s.truncate_tail(max_index), } - - let to_remove = self.update_segments(); - SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; - - self.next_log_index = max_index.overflow_add(1); - self.open_new_segment()?; - - Ok(()) - } - - /// Opens a new WAL segment - fn open_new_segment(&mut self) -> io::Result<()> { - let lfile = self - .pipeline - .next() - .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; - - let segment = WALSegment::create( - lfile, - self.next_log_index, - self.next_segment_id, - self.config.max_segment_size, - )?; - - self.segments.push(segment); - self.next_segment_id = self.next_segment_id.overflow_add(1); - - Ok(()) - } - - /// Removes segments that are no longer needed - #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy - fn update_segments(&mut self) -> Vec { - let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect(); - let (to_remove, remaining): (Vec<_>, Vec<_>) = - self.segments.drain(..).zip(flags).partition(|(_, f)| *f); - - self.segments = remaining.into_iter().map(|(s, _)| s).collect(); - - to_remove.into_iter().map(|(s, _)| s).collect() - } - - /// Returns the highest valid position of the log entries, - /// the logs are continuous before this position - #[allow(clippy::pattern_type_mismatch)] // can't fix - fn highest_valid_pos(entries: &[LogEntry]) -> usize { - let iter = entries.iter(); - iter.clone() - .zip(iter.skip(1)) - .enumerate() - .find(|(_, (x, y))| x.index.overflow_add(1) != y.index) - .map_or(entries.len(), |(i, _)| i) - } - - /// Iterates until an `io::Error` occurs. - fn take_until_io_error(opening: I) -> io::Result> - where - I: IntoIterator>, - { - let mut ts = vec![]; - - for result in opening { - match result { - Ok(t) => ts.push(t), - Err(e) => { - let e = e.io_or_corrupt()?; - error!("WAL corrupted: {e}"); - } - } - } - - Ok(ts) - } -} - -impl Drop for WALStorage { - fn drop(&mut self) { - self.pipeline.stop(); } } diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 5e8385a37..8f4e88c42 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -14,7 +14,7 @@ use event_listener::Event; use thiserror::Error; use tracing::error; -use super::util::LockedFile; +use super::{fs, util::LockedFile}; /// The temp file extension const TEMP_FILE_EXT: &str = ".tmp"; @@ -116,11 +116,11 @@ impl FilePipeline { /// Cleans up all unused tempfiles fn clean_up(dir: &PathBuf) -> io::Result<()> { - for result in std::fs::read_dir(dir)? { + for result in fs::read_dir(dir)? { let file = result?; if let Some(filename) = file.file_name().to_str() { if filename.ends_with(TEMP_FILE_EXT) { - std::fs::remove_file(file.path())?; + fs::remove_file(file.path())?; } } } @@ -175,12 +175,12 @@ impl std::fmt::Debug for FilePipeline { #[cfg(test)] mod tests { use super::*; - use crate::server::storage::wal::util::get_file_paths_with_ext; + use crate::server::storage::wal::util::{get_file_paths_with_ext, tempdir}; #[tokio::test] async fn file_pipeline_is_ok() { let file_size = 1024; - let dir = tempfile::tempdir().unwrap(); + let dir = tempdir(); let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size); let check_size = |mut file: LockedFile| { diff --git a/crates/curp/src/server/storage/wal/remover.rs b/crates/curp/src/server/storage/wal/remover.rs index 854c641c8..d5dc77a1b 100644 --- a/crates/curp/src/server/storage/wal/remover.rs +++ b/crates/curp/src/server/storage/wal/remover.rs @@ -12,6 +12,7 @@ use tokio::{ }; use super::{ + fs, segment::WALSegment, util::{get_checksum, get_file_paths_with_ext, is_exist, parse_u64, validate_data, LockedFile}, }; @@ -140,8 +141,8 @@ impl SegmentRemover { } for path in to_remove_paths.clone() { - let _ignore = std::fs::metadata(path.as_ref())?; - std::fs::remove_file(path.as_ref())?; + let _ignore = fs::metadata(path.as_ref())?; + fs::remove_file(path.as_ref())?; } // Check if all record have been removed from fs @@ -157,7 +158,7 @@ impl SegmentRemover { /// Remove the RWAL fn remove_rwal(wal_path: impl AsRef) -> io::Result<()> { - std::fs::remove_file(wal_path.as_ref())?; + fs::remove_file(wal_path.as_ref())?; if is_exist(wal_path.as_ref()) { return Err(io::Error::from(io::ErrorKind::Other)); } @@ -176,11 +177,13 @@ impl SegmentRemover { mod tests { use futures::future::join_all; + use crate::server::storage::wal::util::tempdir; + use super::*; #[test] fn wal_removal_is_ok() { - let temp_dir = tempfile::tempdir().unwrap(); + let temp_dir = tempdir(); let dir_path = PathBuf::from(temp_dir.path()); let mut segments = vec![]; @@ -205,7 +208,7 @@ mod tests { #[test] fn wal_remove_recover_is_ok() { - let temp_dir = tempfile::tempdir().unwrap(); + let temp_dir = tempdir(); let dir_path = PathBuf::from(temp_dir.path()); ABORT_REMOVE.with(|f| *f.borrow_mut() = true); diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index c50ab6573..1fc487b56 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -1,5 +1,4 @@ use std::{ - fs::File, io::{self, Read, Write}, iter, pin::Pin, @@ -21,11 +20,18 @@ use super::{ codec::{DataFrame, DataFrameOwned, WAL}, error::{CorruptType, WALError}, framed::{Decoder, Encoder}, + fs, util::{get_checksum, parse_u64, validate_data, LockedFile}, - WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, + WAL_FILE_EXT, }; use crate::log_entry::LogEntry; +/// The magic of the WAL file +const WAL_MAGIC: u32 = 0xd86e_0be2; + +/// The current WAL version +const WAL_VERSION: u8 = 0x00; + /// The size of wal file header in bytes pub(super) const WAL_HEADER_SIZE: usize = 56; @@ -39,7 +45,7 @@ pub(super) struct WALSegment { /// The soft size limit of this segment size_limit: u64, /// The opened file of this segment - file: File, + file: fs::File, /// The file size of the segment size: u64, /// The highest index of the segment @@ -314,7 +320,7 @@ mod tests { use curp_test_utils::test_cmd::TestCommand; use super::*; - use crate::log_entry::EntryData; + use crate::{log_entry::EntryData, server::storage::wal::util::tempdir}; #[test] fn gen_parse_header_is_correct() { @@ -341,7 +347,7 @@ mod tests { const BASE_INDEX: u64 = 17; const SEGMENT_ID: u64 = 2; const SIZE_LIMIT: u64 = 5; - let dir = tempfile::tempdir().unwrap(); + let dir = tempdir(); let mut tmp_path = dir.path().to_path_buf(); tmp_path.push("test.tmp"); let segment_name = WALSegment::segment_name(SEGMENT_ID, BASE_INDEX); @@ -365,7 +371,7 @@ mod tests { const BASE_INDEX: u64 = 1; const SEGMENT_ID: u64 = 1; const SIZE_LIMIT: u64 = 5; - let dir = tempfile::tempdir().unwrap(); + let dir = tempdir(); let mut tmp_path = dir.path().to_path_buf(); tmp_path.push("test.tmp"); let segment_name = WALSegment::segment_name(SEGMENT_ID, BASE_INDEX); diff --git a/crates/curp/src/server/storage/wal/storage.rs b/crates/curp/src/server/storage/wal/storage.rs new file mode 100644 index 000000000..1e39dc395 --- /dev/null +++ b/crates/curp/src/server/storage/wal/storage.rs @@ -0,0 +1,263 @@ +use std::{io, marker::PhantomData, ops::Mul}; + +use clippy_utilities::OverflowArithmetic; +use curp_external_api::LogIndex; +use futures::{future::join_all, Future, SinkExt, StreamExt}; +use itertools::Itertools; +use serde::{de::DeserializeOwned, Serialize}; +use tokio_util::codec::Framed; +use tracing::{debug, error, info, warn}; + +use crate::log_entry::LogEntry; + +use super::{ + codec::{DataFrame, DataFrameOwned, WAL}, + config::PersistentConfig, + error::{CorruptType, WALError}, + fs, + pipeline::FilePipeline, + remover::SegmentRemover, + segment::WALSegment, + util::{self, LockedFile}, + WALStorageOps, WAL_FILE_EXT, +}; + +/// The WAL storage +#[derive(Debug)] +pub(crate) struct WALStorage { + /// The config of wal files + config: PersistentConfig, + /// The pipeline that pre-allocates files + pipeline: FilePipeline, + /// WAL segments + segments: Vec, + /// The next segment id + next_segment_id: u64, + /// The next log index + next_log_index: LogIndex, + /// The phantom data + _phantom: PhantomData, +} + +impl WALStorage { + /// Creates a new `LogStorage` + pub(super) fn new(config: PersistentConfig) -> io::Result> { + if fs::metadata(&config.dir).is_err() { + fs::create_dir_all(&config.dir)?; + } + let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size); + Ok(Self { + config, + pipeline, + segments: vec![], + next_segment_id: 0, + next_log_index: 0, + _phantom: PhantomData, + }) + } +} + +impl WALStorageOps for WALStorage +where + C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug, +{ + /// Recover from the given directory if there's any segments + fn recover(&mut self) -> io::Result>> { + /// Number of lines printed around the missing log in debug information + const NUM_LINES_DEBUG: usize = 3; + // We try to recover the removal first + SegmentRemover::recover(&self.config.dir)?; + + let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?; + let lfiles: Vec<_> = file_paths + .into_iter() + .map(LockedFile::open_rw) + .collect::>()?; + + let segment_opening = lfiles + .into_iter() + .map(|f| WALSegment::open(f, self.config.max_segment_size)); + + let mut segments = Self::take_until_io_error(segment_opening)?; + segments.sort_unstable(); + debug!("Recovered segments: {:?}", segments); + + let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs); + + let logs_batches = Self::take_until_io_error(logs_iter)?; + let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect(); + + let pos = Self::highest_valid_pos(&logs[..]); + if pos != logs.len() { + let debug_logs: Vec<_> = logs + .iter() + .skip(pos.overflow_sub(pos.min(NUM_LINES_DEBUG))) + .take(NUM_LINES_DEBUG.mul(2)) + .collect(); + error!( + "WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}", + CorruptType::LogNotContinue + ); + logs.truncate(pos); + } + + let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1)); + let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1)); + self.next_segment_id = next_segment_id; + self.next_log_index = next_log_index; + self.segments = segments; + + self.open_new_segment()?; + info!("WAL successfully recovered"); + + Ok(logs) + } + + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + let last_segment = self + .segments + .last_mut() + .unwrap_or_else(|| unreachable!("there should be at least on segment")); + if let Some(DataFrame::Entry(entry)) = item.last() { + self.next_log_index = entry.index.overflow_add(1); + } + last_segment.write_sync(item, WAL::new())?; + + if last_segment.is_full() { + self.open_new_segment()?; + } + + Ok(()) + } + + /// Truncate all the logs whose index is less than or equal to `compact_index` + /// + /// `compact_index` should be the smallest index required in CURP + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + if compact_index >= self.next_log_index { + warn!( + "head truncation: compact index too large, compact index: {}, storage next index: {}", + compact_index, self.next_log_index + ); + return Ok(()); + } + + debug!("performing head truncation on index: {compact_index}"); + + let mut to_remove_num = self + .segments + .iter() + .take_while(|s| s.base_index() <= compact_index) + .count() + .saturating_sub(1); + + if to_remove_num == 0 { + return Ok(()); + } + + // The last segment does not need to be removed + let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + Ok(()) + } + + /// Truncate all the logs whose index is greater than `max_index` + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + // segments to truncate + let segments: Vec<_> = self + .segments + .iter_mut() + .rev() + .take_while_inclusive::<_>(|s| s.base_index() > max_index) + .collect(); + + for segment in segments { + segment.seal::(max_index)?; + } + + let to_remove = self.update_segments(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + self.next_log_index = max_index.overflow_add(1); + self.open_new_segment()?; + + Ok(()) + } +} + +impl WALStorage +where + C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug, +{ + /// Opens a new WAL segment + fn open_new_segment(&mut self) -> io::Result<()> { + let lfile = self + .pipeline + .next() + .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; + + let segment = WALSegment::create( + lfile, + self.next_log_index, + self.next_segment_id, + self.config.max_segment_size, + )?; + + self.segments.push(segment); + self.next_segment_id = self.next_segment_id.overflow_add(1); + + Ok(()) + } + + /// Removes segments that are no longer needed + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + fn update_segments(&mut self) -> Vec { + let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect(); + let (to_remove, remaining): (Vec<_>, Vec<_>) = + self.segments.drain(..).zip(flags).partition(|(_, f)| *f); + + self.segments = remaining.into_iter().map(|(s, _)| s).collect(); + + to_remove.into_iter().map(|(s, _)| s).collect() + } + + /// Returns the highest valid position of the log entries, + /// the logs are continuous before this position + #[allow(clippy::pattern_type_mismatch)] // can't fix + fn highest_valid_pos(entries: &[LogEntry]) -> usize { + let iter = entries.iter(); + iter.clone() + .zip(iter.skip(1)) + .enumerate() + .find(|(_, (x, y))| x.index.overflow_add(1) != y.index) + .map_or(entries.len(), |(i, _)| i) + } + + /// Iterates until an `io::Error` occurs. + fn take_until_io_error(opening: I) -> io::Result> + where + I: IntoIterator>, + { + let mut ts = vec![]; + + for result in opening { + match result { + Ok(t) => ts.push(t), + Err(e) => { + let e = e.io_or_corrupt()?; + error!("WAL corrupted: {e}"); + } + } + } + + Ok(ts) + } +} + +impl Drop for WALStorage { + fn drop(&mut self) { + self.pipeline.stop(); + } +} diff --git a/crates/curp/src/server/storage/wal/tests.rs b/crates/curp/src/server/storage/wal/tests.rs index cbb942837..4df61a218 100644 --- a/crates/curp/src/server/storage/wal/tests.rs +++ b/crates/curp/src/server/storage/wal/tests.rs @@ -3,8 +3,8 @@ use std::{fs, path::Path, sync::Arc}; use bytes::BytesMut; use curp_test_utils::test_cmd::TestCommand; use parking_lot::Mutex; -use tempfile::TempDir; use tokio_util::codec::Encoder; +use util::tempdir; use crate::{ log_entry::{EntryData, LogEntry}, @@ -20,7 +20,7 @@ const TEST_SEGMENT_SIZE: u64 = 512; #[test] fn simple_append_and_recovery_is_ok() { - let wal_test_path = tempfile::tempdir().unwrap(); + let wal_test_path = tempdir(); test_follow_up_append_recovery(wal_test_path.path(), 100); } @@ -28,7 +28,7 @@ fn simple_append_and_recovery_is_ok() { fn log_head_truncation_is_ok() { for num_entries in 1..40 { for truncate_at in 1..=num_entries { - let wal_test_path = tempfile::tempdir().unwrap(); + let wal_test_path = tempdir(); test_head_truncate_at(wal_test_path.path(), num_entries, truncate_at as u64); test_follow_up_append_recovery(wal_test_path.path(), 10); } @@ -39,7 +39,7 @@ fn log_head_truncation_is_ok() { fn log_tail_truncation_is_ok() { for num_entries in 1..40 { for truncate_at in 1..=num_entries { - let wal_test_path = tempfile::tempdir().unwrap(); + let wal_test_path = tempdir(); test_tail_truncate_at(wal_test_path.path(), num_entries, truncate_at as u64); test_follow_up_append_recovery(wal_test_path.path(), 10); } diff --git a/crates/curp/src/server/storage/wal/util.rs b/crates/curp/src/server/storage/wal/util.rs index 080990990..04833218f 100644 --- a/crates/curp/src/server/storage/wal/util.rs +++ b/crates/curp/src/server/storage/wal/util.rs @@ -1,5 +1,4 @@ use std::{ - fs::{File as StdFile, OpenOptions}, io, path::{Path, PathBuf}, }; @@ -8,6 +7,8 @@ use fs2::FileExt; use sha2::{digest::Output, Digest, Sha256}; use tokio::fs::File as TokioFile; +use super::fs::{self, File as StdFile, OpenOptions}; + /// File that is exclusively locked #[derive(Debug)] pub(super) struct LockedFile { @@ -53,7 +54,7 @@ impl LockedFile { pub(super) fn rename(mut self, new_name: impl AsRef) -> io::Result { let mut new_path = parent_dir(&self.path); new_path.push(new_name.as_ref()); - std::fs::rename(&self.path, &new_path)?; + fs::rename(&self.path, &new_path)?; sync_parent_dir(&new_path)?; Ok(Self { @@ -81,7 +82,7 @@ impl LockedFile { impl Drop for LockedFile { fn drop(&mut self) { if self.file.is_some() && is_exist(self.path()) { - let _ignore = std::fs::remove_file(self.path()); + let _ignore = fs::remove_file(self.path()); } } } @@ -92,7 +93,7 @@ pub(super) fn get_file_paths_with_ext( ext: &str, ) -> io::Result> { let mut files = vec![]; - for result in std::fs::read_dir(dir)? { + for result in fs::read_dir(dir)? { let file = result?; if let Some(filename) = file.file_name().to_str() { if filename.ends_with(ext) { @@ -113,7 +114,7 @@ pub(super) fn parent_dir(dir: impl AsRef) -> PathBuf { /// Fsyncs the parent directory pub(super) fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { let parent_dir = parent_dir(&dir); - let parent = std::fs::File::open(parent_dir)?; + let parent = StdFile::open(parent_dir)?; parent.sync_all()?; Ok(()) @@ -133,7 +134,7 @@ pub(super) fn validate_data(data: &[u8], checksum: &[u8]) -> bool { /// Checks whether the file exist pub(super) fn is_exist(path: impl AsRef) -> bool { - std::fs::metadata(path).is_ok() + fs::metadata(path).is_ok() } /// Parses a u64 from u8 slice @@ -146,6 +147,14 @@ pub(super) fn parse_u64(bytes_le: &[u8]) -> u64 { ) } +#[cfg(test)] +pub(super) fn tempdir() -> tempfile::TempDir { + use super::fs::create_dir_all; + let dir = tempfile::tempdir().unwrap(); + create_dir_all(dir.path()); + dir +} + #[cfg(test)] mod tests { use std::{io::Read, process::Command}; @@ -154,7 +163,7 @@ mod tests { #[test] fn file_rename_is_ok() { - let mut tempdir = tempfile::tempdir().unwrap(); + let mut tempdir = tempdir(); let mut path = PathBuf::from(tempdir.path()); path.push("file.test"); let lfile = LockedFile::open_rw(&path).unwrap(); @@ -169,7 +178,7 @@ mod tests { #[test] #[allow(clippy::verbose_file_reads)] // false positive fn file_open_is_exclusive() { - let mut tempdir = tempfile::tempdir().unwrap(); + let mut tempdir = tempdir(); let mut path = PathBuf::from(tempdir.path()); path.push("file.test"); let mut lfile = LockedFile::open_rw(&path).unwrap(); @@ -181,13 +190,13 @@ mod tests { #[test] fn get_file_paths_with_ext_is_ok() { - let dir = tempfile::tempdir().unwrap(); + let dir = tempdir(); let num_paths = 10; let paths_create: Vec<_> = (0..num_paths) .map(|i| { let mut path = PathBuf::from(dir.path()); path.push(format!("{i}.test")); - std::fs::File::create(&path).unwrap(); + fs::File::create(&path).unwrap(); path }) .collect();