diff --git a/src/block_read_write.rs b/src/block_read_write.rs index 74a0d0b..f2bd521 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -1,5 +1,7 @@ use std::io; +use crate::PersistAction; + pub const BLOCK_NUM_BYTES: usize = 32_768; pub trait BlockRead { @@ -25,8 +27,8 @@ pub trait BlockRead { pub trait BlockWrite { /// Must panic if buf is larger than `num_bytes_remaining_in_block`. fn write(&mut self, buf: &[u8]) -> io::Result<()>; - /// The semantics of flush may depend on the implementation. - fn flush(&mut self) -> io::Result<()>; + /// Persist the data following the `persist_action`. + fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>; /// Number of bytes that can be added in the block. fn num_bytes_remaining_in_block(&self) -> usize; } @@ -90,7 +92,7 @@ impl BlockWrite for VecBlockWriter { Ok(()) } - fn flush(&mut self) -> io::Result<()> { + fn persist(&mut self, _persist_action: PersistAction) -> io::Result<()> { Ok(()) } diff --git a/src/frame/tests.rs b/src/frame/tests.rs index ab219bd..1ea8a7b 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -3,7 +3,7 @@ use std::io; use crate::block_read_write::{ArrayReader, VecBlockWriter}; use crate::frame::header::{FrameType, HEADER_LEN}; use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::BLOCK_NUM_BYTES; +use crate::{PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_frame_simple() { @@ -19,7 +19,7 @@ fn test_frame_simple() { frame_writer .write_frame(FrameType::Last, &b"fgh"[..]) .unwrap(); - frame_writer.flush().unwrap(); + frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer() }; let buffer: Vec = block_writer.into(); @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> { let mut buf: Vec = { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); frame_writer.write_frame(FrameType::First, &b"abc"[..])?; - frame_writer.flush()?; + frame_writer.persist(PersistAction::Flush)?; frame_writer.write_frame(FrameType::Middle, &b"de"[..])?; - frame_writer.flush()?; + frame_writer.persist(PersistAction::Flush)?; frame_writer.into_writer().into() }; buf[8] = 0u8; @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec { for _ in 0..repeat { frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap(); } - frame_writer.flush().unwrap(); + frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer().into() } diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 0788417..69d23c0 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -2,7 +2,7 @@ use std::io; use crate::frame::{FrameType, Header, HEADER_LEN}; use crate::rolling::{Directory, RollingWriter}; -use crate::{BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct FrameWriter { wrt: W, @@ -41,8 +41,8 @@ impl FrameWriter { /// When writing to a file, this performs a syscall and /// the OS will be in charge of eventually writing the data /// to disk, but this is not sufficient to ensure durability. - pub fn flush(&mut self) -> io::Result<()> { - self.wrt.flush() + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.wrt.persist(persist_action) } /// Returns the maximum amount of bytes that can be written. diff --git a/src/lib.rs b/src/lib.rs index 0a6d1ce..c2a865c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,11 +7,14 @@ pub mod error; mod frame; mod mem; mod multi_record_log; +mod persist_policy; mod record; mod recordlog; mod rolling; -pub use multi_record_log::{MultiRecordLog, SyncPolicy}; +pub use multi_record_log::MultiRecordLog; +pub(crate) use persist_policy::PersistState; +pub use persist_policy::{PersistAction, PersistPolicy}; #[derive(Debug, PartialEq, Eq)] pub struct Record<'a> { diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index ca3ca22..c878bed 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -1,7 +1,6 @@ use std::io; use std::ops::RangeBounds; use std::path::Path; -use std::time::{Duration, Instant}; use bytes::Buf; use tracing::{debug, event_enabled, info, warn, Level}; @@ -13,70 +12,20 @@ use crate::mem::{MemQueue, QueuesSummary}; use crate::record::{MultiPlexedRecord, MultiRecord}; use crate::recordlog::RecordWriter; use crate::rolling::RollingWriter; -use crate::{mem, Record, ResourceUsage}; +use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage}; pub struct MultiRecordLog { record_log_writer: crate::recordlog::RecordWriter, in_mem_queues: mem::MemQueues, - next_sync: SyncState, + next_persist: PersistState, // A simple buffer we reuse to avoid allocation. multi_record_spare_buffer: Vec, } -/// Policy for synchonizing and flushing data -pub enum SyncPolicy { - /// Sync and flush at each operation - OnAppend, - /// Sync and flush regularly. Sync is realized on the first operation after the delay since - /// last sync elapsed. This means if no new operation arrive, some content may not get - /// flushed for a while. - OnDelay(Duration), -} - -#[derive(Debug)] -enum SyncState { - OnAppend, - OnDelay { - next_sync: Instant, - interval: Duration, - }, -} - -impl SyncState { - fn should_sync(&self) -> bool { - match self { - SyncState::OnAppend => true, - SyncState::OnDelay { next_sync, .. } => *next_sync < Instant::now(), - } - } - - fn update_synced(&mut self) { - match self { - SyncState::OnAppend => (), - SyncState::OnDelay { - ref mut next_sync, - interval, - } => *next_sync = Instant::now() + *interval, - } - } -} - -impl From for SyncState { - fn from(val: SyncPolicy) -> SyncState { - match val { - SyncPolicy::OnAppend => SyncState::OnAppend, - SyncPolicy::OnDelay(dur) => SyncState::OnDelay { - next_sync: Instant::now() + dur, - interval: dur, - }, - } - } -} - impl MultiRecordLog { - /// Open the multi record log, syncing after each operation. + /// Open the multi record log, flushing after each operation, but not fsyncing. pub fn open(directory_path: &Path) -> Result { - Self::open_with_prefs(directory_path, SyncPolicy::OnAppend) + Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush)) } pub fn summary(&self) -> QueuesSummary { @@ -86,7 +35,7 @@ impl MultiRecordLog { /// Open the multi record log, syncing following the provided policy. pub fn open_with_prefs( directory_path: &Path, - sync_policy: SyncPolicy, + persist_policy: PersistPolicy, ) -> Result { // io errors are non-recoverable let rolling_reader = crate::rolling::RollingReader::open(directory_path)?; @@ -144,7 +93,7 @@ impl MultiRecordLog { let mut multi_record_log = MultiRecordLog { record_log_writer, in_mem_queues, - next_sync: sync_policy.into(), + next_persist: persist_policy.into(), multi_record_spare_buffer: Vec::new(), }; multi_record_log.run_gc_if_necessary()?; @@ -167,7 +116,7 @@ impl MultiRecordLog { } let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; self.record_log_writer.write_record(record)?; - self.sync()?; + self.persist_on_policy()?; self.in_mem_queues.create_queue(queue)?; Ok(()) } @@ -179,7 +128,7 @@ impl MultiRecordLog { self.record_log_writer.write_record(record)?; self.in_mem_queues.delete_queue(queue)?; self.run_gc_if_necessary()?; - self.sync()?; + self.persist(PersistAction::FlushAndFsync)?; Ok(()) } @@ -209,8 +158,8 @@ impl MultiRecordLog { /// /// This operation is atomic: either all records get stored, or none do. /// However this function succeeding does not necessarily means records where stored, be sure - /// to call [`Self::sync`] to make sure changes are persisted if you don't use - /// [`SyncPolicy::OnAppend`] (which is the default). + /// to call [`Self::persist`] to make sure changes are persisted if you don't use + /// [`PersistPolicy::Always`] (which is the default). pub fn append_records>( &mut self, queue: &str, @@ -244,7 +193,7 @@ impl MultiRecordLog { records, }; self.record_log_writer.write_record(record)?; - self.sync_on_policy()?; + self.persist_on_policy()?; let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; let mut max_position = position; @@ -271,9 +220,9 @@ impl MultiRecordLog { has_empty_queues = true } if has_empty_queues { - // We need to sync here! We are remove files from the FS + // We need to fsync here! We are remove files from the FS // so we need to make sure our empty queue positions are properly persisted. - self.sync()?; + self.persist(PersistAction::FlushAndFsync)?; } Ok(()) } @@ -293,7 +242,7 @@ impl MultiRecordLog { .write_record(MultiPlexedRecord::Truncate { position, queue })?; let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0); self.run_gc_if_necessary()?; - self.sync_on_policy()?; + self.persist_on_policy()?; Ok(removed_count) } @@ -337,16 +286,18 @@ impl MultiRecordLog { self.in_mem_queues.range(queue, range) } - fn sync_on_policy(&mut self) -> io::Result<()> { - if self.next_sync.should_sync() { - self.sync()?; - self.next_sync.update_synced(); + /// Flush if the policy says it should be done + fn persist_on_policy(&mut self) -> io::Result<()> { + if let Some(persist_action) = self.next_persist.should_persist() { + self.persist(persist_action)?; + self.next_persist.update_persisted(); } Ok(()) } - pub fn sync(&mut self) -> io::Result<()> { - self.record_log_writer.flush() + /// Flush and optionnally fsync data + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.record_log_writer.persist(persist_action) } /// Returns the position of the last record appended to the queue. diff --git a/src/persist_policy.rs b/src/persist_policy.rs new file mode 100644 index 0000000..07cb6d0 --- /dev/null +++ b/src/persist_policy.rs @@ -0,0 +1,104 @@ +use std::time::{Duration, Instant}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PersistAction { + /// The buffer will be flushed to the OS, but not necessarily to the disk. + Flush, + /// The buffer will be flushed to the OS, and the OS will be asked to flush + /// it to the disk. + FlushAndFsync, +} + +impl PersistAction { + pub fn is_fsync(self) -> bool { + self == PersistAction::FlushAndFsync + } +} + +/// We have two type of operations on the mrecordlog. +/// +/// Critical records are relatively rare and really need to be persisted: +/// - RecordPosition { queue: &'a str, position: u64 }, +/// - DeleteQueue. +/// +/// For these operations, we want to always flush and fsync. +/// +/// On the other hand, +/// - Truncate +/// - AppendRecords +/// are considered are more frequent and one might want to sacrifice +/// persistence guarantees for performance. +/// +/// The `PersistPolicy` defines the trade-off applied for the second kind of +/// operations. +#[derive(Clone, Debug)] +pub enum PersistPolicy { + /// Only ensure data is persisted when critical records are written. + /// + /// With this policy, the timing after which the data reaches the disk + /// is up to the OS. + DoNothing, + /// Pesiste data once every interval, and when critical records are written + OnDelay { + interval: Duration, + action: PersistAction, + }, + /// Persist data after each action + Always(PersistAction), +} + +#[derive(Debug)] +pub(crate) enum PersistState { + OnAppend(PersistAction), + OnDelay { + next_persist: Instant, + interval: Duration, + action: PersistAction, + }, + NoOp, +} + +impl PersistState { + pub fn should_persist(&self) -> Option { + match self { + PersistState::OnAppend(action) => Some(*action), + PersistState::OnDelay { + action, + next_persist, + .. + } => { + if *next_persist < Instant::now() { + Some(*action) + } else { + None + } + } + PersistState::NoOp => None, + } + } + + pub fn update_persisted(&mut self) { + match self { + PersistState::OnAppend(_) | PersistState::NoOp => (), + PersistState::OnDelay { + ref mut next_persist, + interval, + .. + } => *next_persist = Instant::now() + *interval, + } + } +} + +impl From for PersistState { + fn from(val: PersistPolicy) -> PersistState { + match val { + PersistPolicy::Always(action) => PersistState::OnAppend(action), + PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay { + next_persist: Instant::now() + interval, + interval, + action, + }, + PersistPolicy::DoNothing => PersistState::NoOp, + } + } +} diff --git a/src/recordlog/tests.rs b/src/recordlog/tests.rs index 2e80125..59f21df 100644 --- a/src/recordlog/tests.rs +++ b/src/recordlog/tests.rs @@ -2,7 +2,7 @@ use super::{RecordReader, RecordWriter}; use crate::block_read_write::ArrayReader; use crate::error::ReadRecordError; use crate::frame::HEADER_LEN; -use crate::BLOCK_NUM_BYTES; +use crate::{PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_no_data() { @@ -15,7 +15,7 @@ fn test_no_data() { fn test_empty_record() { let mut writer = RecordWriter::in_memory(); writer.write_record("").unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!(reader.read_record::<&str>().unwrap(), Some("")); @@ -27,7 +27,7 @@ fn test_simple_record() { let mut writer = RecordWriter::in_memory(); let record = "hello"; writer.write_record(record).unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello")))); @@ -43,7 +43,7 @@ fn test_spans_over_more_than_one_block() { let long_entry: String = make_long_entry(80_000); let mut writer = RecordWriter::in_memory(); writer.write_record(long_entry.as_str()).unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); let record_payload: &str = reader.read_record().unwrap().unwrap(); @@ -60,7 +60,7 @@ fn test_block_requires_padding() { let mut writer = RecordWriter::in_memory(); writer.write_record(long_record.as_str()).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buffer: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); assert_eq!( @@ -80,7 +80,7 @@ fn test_first_chunk_empty() { let mut writer = RecordWriter::in_memory(); writer.write_record(&long_record[..]).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!( @@ -98,7 +98,7 @@ fn test_behavior_upon_corruption() { for record in &records { writer.write_record(record.as_str()).unwrap(); } - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let mut buffer: Vec = writer.into_writer().into(); { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index ccc1e1c..0a44d6e 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -3,7 +3,7 @@ use std::io; use crate::block_read_write::VecBlockWriter; use crate::frame::{FrameType, FrameWriter}; use crate::rolling::{Directory, FileNumber, RollingWriter}; -use crate::{BlockWrite, Serializable}; +use crate::{BlockWrite, PersistAction, Serializable}; pub struct RecordWriter { frame_writer: FrameWriter, @@ -67,10 +67,9 @@ impl RecordWriter { Ok(()) } - /// Flushes and sync the data to disk. - pub fn flush(&mut self) -> io::Result<()> { - // Empty the application buffer. - self.frame_writer.flush() + /// Persist the data to disk, according to the persist_action. + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.frame_writer.persist(persist_action) } pub fn get_underlying_wrt(&self) -> &W { diff --git a/src/rolling/directory.rs b/src/rolling/directory.rs index 449f5eb..ac64bac 100644 --- a/src/rolling/directory.rs +++ b/src/rolling/directory.rs @@ -6,7 +6,7 @@ use tracing::info; use super::{FileNumber, FileTracker}; use crate::rolling::{FILE_NUM_BYTES, FRAME_NUM_BYTES}; -use crate::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockRead, BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct Directory { dir: PathBuf, @@ -103,6 +103,16 @@ impl Directory { file.seek(SeekFrom::Start(0u64))?; Ok(file) } + + fn sync_directory(&self) -> io::Result<()> { + let mut open_opts = OpenOptions::new(); + // Linux needs read to be set, otherwise returns EINVAL + // write must not be set, or it fails with EISDIR + open_opts.read(true); + let fd = open_opts.open(&self.dir)?; + fd.sync_data()?; + Ok(()) + } } pub struct RollingReader { @@ -235,6 +245,8 @@ impl BlockWrite for RollingWriter { assert!(buf.len() <= self.num_bytes_remaining_in_block()); if self.offset + buf.len() > FILE_NUM_BYTES { self.file.flush()?; + self.file.get_ref().sync_data()?; + self.directory.sync_directory()?; let (file_number, file) = if let Some(next_file_number) = self.directory.files.next(&self.file_number) { @@ -255,8 +267,18 @@ impl BlockWrite for RollingWriter { Ok(()) } - fn flush(&mut self) -> io::Result<()> { - self.file.flush() + fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + match persist_action { + PersistAction::FlushAndFsync => { + self.file.flush()?; + self.file.get_ref().sync_data()?; + self.directory.sync_directory() + } + PersistAction::Flush => { + // This will flush the buffer of the BufWriter to the underlying OS. + self.file.flush() + } + } } fn num_bytes_remaining_in_block(&self) -> usize { diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index 0e25717..1c017d8 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockRead, BlockWrite, PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_read_write() { @@ -15,7 +15,7 @@ fn test_read_write() { writer.write(&buffer[..]).unwrap(); buffer.fill(2u8); writer.write(&buffer[..]).unwrap(); - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); assert!(rolling_reader.block().iter().all(|&b| b == 0)); @@ -36,7 +36,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -50,7 +50,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -76,7 +76,7 @@ fn test_read_truncated() { buffer.fill(i as u8); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let file_ids = writer.list_file_numbers(); let middle_file = file_ids[1]; let filepath =