From 90700d91caaeb39aeeee1537d6f809d4b45e6de9 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 1 Mar 2024 12:12:52 +0100 Subject: [PATCH 1/2] add facility to fdatasync --- src/block_read_write.rs | 4 ++-- src/frame/tests.rs | 8 ++++---- src/frame/writer.rs | 4 ++-- src/multi_record_log.rs | 2 +- src/recordlog/tests.rs | 12 ++++++------ src/recordlog/writer.rs | 4 ++-- src/rolling/directory.rs | 22 ++++++++++++++++++++-- src/rolling/tests.rs | 8 ++++---- 8 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/block_read_write.rs b/src/block_read_write.rs index 74a0d0b..ab89a88 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -26,7 +26,7 @@ pub trait BlockWrite { /// Must panic if buf is larger than `num_bytes_remaining_in_block`. fn write(&mut self, buf: &[u8]) -> io::Result<()>; /// The semantics of flush may depend on the implementation. - fn flush(&mut self) -> io::Result<()>; + fn flush(&mut self, fsync: bool) -> io::Result<()>; /// Number of bytes that can be added in the block. fn num_bytes_remaining_in_block(&self) -> usize; } @@ -90,7 +90,7 @@ impl BlockWrite for VecBlockWriter { Ok(()) } - fn flush(&mut self) -> io::Result<()> { + fn flush(&mut self, _fsync: bool) -> io::Result<()> { Ok(()) } diff --git a/src/frame/tests.rs b/src/frame/tests.rs index ab219bd..e3d865b 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -19,7 +19,7 @@ fn test_frame_simple() { frame_writer .write_frame(FrameType::Last, &b"fgh"[..]) .unwrap(); - frame_writer.flush().unwrap(); + frame_writer.flush(false).unwrap(); frame_writer.into_writer() }; let buffer: Vec = block_writer.into(); @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> { let mut buf: Vec = { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); frame_writer.write_frame(FrameType::First, &b"abc"[..])?; - frame_writer.flush()?; + frame_writer.flush(false)?; frame_writer.write_frame(FrameType::Middle, &b"de"[..])?; - frame_writer.flush()?; + frame_writer.flush(false)?; frame_writer.into_writer().into() }; buf[8] = 0u8; @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec { for _ in 0..repeat { frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap(); } - frame_writer.flush().unwrap(); + frame_writer.flush(false).unwrap(); frame_writer.into_writer().into() } diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 0788417..6afb954 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -41,8 +41,8 @@ impl FrameWriter { /// When writing to a file, this performs a syscall and /// the OS will be in charge of eventually writing the data /// to disk, but this is not sufficient to ensure durability. - pub fn flush(&mut self) -> io::Result<()> { - self.wrt.flush() + pub fn flush(&mut self, fsync: bool) -> io::Result<()> { + self.wrt.flush(fsync) } /// Returns the maximum amount of bytes that can be written. diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 0fe3f50..62e53a0 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -342,7 +342,7 @@ impl MultiRecordLog { } pub fn sync(&mut self) -> io::Result<()> { - self.record_log_writer.flush() + self.record_log_writer.flush(false) } /// Returns the position of the last record appended to the queue. diff --git a/src/recordlog/tests.rs b/src/recordlog/tests.rs index 2e80125..e293c30 100644 --- a/src/recordlog/tests.rs +++ b/src/recordlog/tests.rs @@ -15,7 +15,7 @@ fn test_no_data() { fn test_empty_record() { let mut writer = RecordWriter::in_memory(); writer.write_record("").unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!(reader.read_record::<&str>().unwrap(), Some("")); @@ -27,7 +27,7 @@ fn test_simple_record() { let mut writer = RecordWriter::in_memory(); let record = "hello"; writer.write_record(record).unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello")))); @@ -43,7 +43,7 @@ fn test_spans_over_more_than_one_block() { let long_entry: String = make_long_entry(80_000); let mut writer = RecordWriter::in_memory(); writer.write_record(long_entry.as_str()).unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); let record_payload: &str = reader.read_record().unwrap().unwrap(); @@ -60,7 +60,7 @@ fn test_block_requires_padding() { let mut writer = RecordWriter::in_memory(); writer.write_record(long_record.as_str()).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); let buffer: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); assert_eq!( @@ -80,7 +80,7 @@ fn test_first_chunk_empty() { let mut writer = RecordWriter::in_memory(); writer.write_record(&long_record[..]).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!( @@ -98,7 +98,7 @@ fn test_behavior_upon_corruption() { for record in &records { writer.write_record(record.as_str()).unwrap(); } - writer.flush().unwrap(); + writer.flush(false).unwrap(); let mut buffer: Vec = writer.into_writer().into(); { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index ccc1e1c..042a4e8 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -68,9 +68,9 @@ impl RecordWriter { } /// Flushes and sync the data to disk. - pub fn flush(&mut self) -> io::Result<()> { + pub fn flush(&mut self, fsync: bool) -> io::Result<()> { // Empty the application buffer. - self.frame_writer.flush() + self.frame_writer.flush(fsync) } pub fn get_underlying_wrt(&self) -> &W { diff --git a/src/rolling/directory.rs b/src/rolling/directory.rs index 449f5eb..28af25f 100644 --- a/src/rolling/directory.rs +++ b/src/rolling/directory.rs @@ -103,6 +103,16 @@ impl Directory { file.seek(SeekFrom::Start(0u64))?; Ok(file) } + + fn sync_directory(&self) -> io::Result<()> { + let mut open_opts = OpenOptions::new(); + // Linux needs read to be set, otherwise returns EINVAL + // write must not be set, or it fails with EISDIR + open_opts.read(true); + let fd = open_opts.open(&self.dir)?; + fd.sync_data()?; + Ok(()) + } } pub struct RollingReader { @@ -235,6 +245,8 @@ impl BlockWrite for RollingWriter { assert!(buf.len() <= self.num_bytes_remaining_in_block()); if self.offset + buf.len() > FILE_NUM_BYTES { self.file.flush()?; + self.file.get_ref().sync_data()?; + self.directory.sync_directory()?; let (file_number, file) = if let Some(next_file_number) = self.directory.files.next(&self.file_number) { @@ -255,8 +267,14 @@ impl BlockWrite for RollingWriter { Ok(()) } - fn flush(&mut self) -> io::Result<()> { - self.file.flush() + fn flush(&mut self, fsync: bool) -> io::Result<()> { + if fsync { + self.file.flush()?; + self.file.get_ref().sync_data()?; + self.directory.sync_directory() + } else { + self.file.flush() + } } fn num_bytes_remaining_in_block(&self) -> usize { diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index 4752916..d7f34ad 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -15,7 +15,7 @@ fn test_read_write() { writer.write(&buffer[..]).unwrap(); buffer.fill(2u8); writer.write(&buffer[..]).unwrap(); - writer.flush().unwrap(); + writer.flush(false).unwrap(); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); assert!(rolling_reader.block().iter().all(|&b| b == 0)); @@ -36,7 +36,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.flush(false).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -50,7 +50,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.flush(false).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -76,7 +76,7 @@ fn test_read_truncated() { buffer.fill(i as u8); writer.write(&buffer[..]).unwrap(); } - writer.flush().unwrap(); + writer.flush(false).unwrap(); let file_ids = writer.list_file_numbers(); let middle_file = file_ids[1]; let filepath = From be00ed2f12208e427cb48d7027661e84494e8783 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 1 Mar 2024 12:25:23 +0100 Subject: [PATCH 2/2] introduce PersistPolicy --- src/lib.rs | 2 +- src/multi_record_log.rs | 142 +++++++++++++++++++++++++++------------- src/rolling/tests.rs | 2 +- 3 files changed, 100 insertions(+), 46 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3def6f7..0a01450 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ mod record; mod recordlog; mod rolling; -pub use multi_record_log::{MultiRecordLog, SyncPolicy}; +pub use multi_record_log::{MultiRecordLog, PersistAction, PersistPolicy}; #[derive(Debug, PartialEq, Eq)] pub struct Record<'a> { diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 62e53a0..8e9f0b1 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -18,71 +18,119 @@ use crate::{mem, Record}; pub struct MultiRecordLog { record_log_writer: crate::recordlog::RecordWriter, in_mem_queues: mem::MemQueues, - next_sync: SyncState, + next_persist: PersistState, // A simple buffer we reuse to avoid allocation. multi_record_spare_buffer: Vec, } -/// Policy for synchonizing and flushing data -pub enum SyncPolicy { - /// Sync and flush at each operation - OnAppend, - /// Sync and flush regularly. Sync is realized on the first operation after the delay since - /// last sync elapsed. This means if no new operation arrive, some content may not get - /// flushed for a while. - OnDelay(Duration), +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PersistAction { + /// The buffer will be flushed to the OS, but not necessarily to the disk. + Flush, + /// The buffer will be flushed to the OS, and the OS will be asked to flush + /// it to the disk. + FlushAndFsync, +} + +impl PersistAction { + fn is_fsync(self) -> bool { + self == PersistAction::FlushAndFsync + } +} + +/// We have two type of operations on the mrecordlog. +/// +/// Critical records are relatively rare and really need to be persisted: +/// - RecordPosition { queue: &'a str, position: u64 }, +/// - DeleteQueue. +/// +/// For these operations, we want to always flush and fsync. +/// +/// On the other hand, +/// - Truncate +/// - AppendRecords +/// are considered are more frequent and one might want to sacrifice +/// persistence guarantees for performance. +/// +/// The `PersistPolicy` defines the trade-off applied for the second kind of +/// operations. +#[derive(Clone, Debug)] +pub enum PersistPolicy { + /// Only persist data when asked for, and when critical records are written + DoNothing(PersistAction), + /// Pesiste data once every interval, and when critical records are written + OnDelay { + interval: Duration, + action: PersistAction, + }, + /// Persist data after each action + Always(PersistAction), } #[derive(Debug)] -enum SyncState { - OnAppend, +enum PersistState { + OnAppend(PersistAction), OnDelay { - next_sync: Instant, + next_persist: Instant, interval: Duration, + action: PersistAction, }, + OnRequest(PersistAction), } -impl SyncState { - fn should_sync(&self) -> bool { +impl PersistState { + fn should_persist(&self) -> bool { match self { - SyncState::OnAppend => true, - SyncState::OnDelay { next_sync, .. } => *next_sync < Instant::now(), + PersistState::OnAppend(_) => true, + PersistState::OnDelay { next_persist, .. } => *next_persist < Instant::now(), + PersistState::OnRequest(_) => false, } } - fn update_synced(&mut self) { + fn update_persisted(&mut self) { match self { - SyncState::OnAppend => (), - SyncState::OnDelay { - ref mut next_sync, + PersistState::OnAppend(_) | PersistState::OnRequest(_) => (), + PersistState::OnDelay { + ref mut next_persist, interval, - } => *next_sync = Instant::now() + *interval, + .. + } => *next_persist = Instant::now() + *interval, + } + } + + fn action(&self) -> PersistAction { + match self { + PersistState::OnAppend(action) => *action, + PersistState::OnDelay { action, .. } => *action, + PersistState::OnRequest(action) => *action, } } } -impl From for SyncState { - fn from(val: SyncPolicy) -> SyncState { +impl From for PersistState { + fn from(val: PersistPolicy) -> PersistState { match val { - SyncPolicy::OnAppend => SyncState::OnAppend, - SyncPolicy::OnDelay(dur) => SyncState::OnDelay { - next_sync: Instant::now() + dur, - interval: dur, + PersistPolicy::Always(action) => PersistState::OnAppend(action), + PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay { + next_persist: Instant::now() + interval, + interval, + action, }, + PersistPolicy::DoNothing(action) => PersistState::OnRequest(action), } } } impl MultiRecordLog { - /// Open the multi record log, syncing after each operation. + /// Open the multi record log, flushing after each operation, bit not fsyncing. pub fn open(directory_path: &Path) -> Result { - Self::open_with_prefs(directory_path, SyncPolicy::OnAppend) + Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush)) } /// Open the multi record log, syncing following the provided policy. pub fn open_with_prefs( directory_path: &Path, - sync_policy: SyncPolicy, + persist_policy: PersistPolicy, ) -> Result { // io errors are non-recoverable let rolling_reader = crate::rolling::RollingReader::open(directory_path)?; @@ -140,7 +188,7 @@ impl MultiRecordLog { let mut multi_record_log = MultiRecordLog { record_log_writer, in_mem_queues, - next_sync: sync_policy.into(), + next_persist: persist_policy.into(), multi_record_spare_buffer: Vec::new(), }; multi_record_log.run_gc_if_necessary()?; @@ -163,7 +211,7 @@ impl MultiRecordLog { } let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; self.record_log_writer.write_record(record)?; - self.sync()?; + self.persist_and_maybe_fsync()?; self.in_mem_queues.create_queue(queue)?; Ok(()) } @@ -175,7 +223,7 @@ impl MultiRecordLog { self.record_log_writer.write_record(record)?; self.in_mem_queues.delete_queue(queue)?; self.run_gc_if_necessary()?; - self.sync()?; + self.persist_and_maybe_fsync()?; Ok(()) } @@ -205,8 +253,8 @@ impl MultiRecordLog { /// /// This operation is atomic: either all records get stored, or none do. /// However this function succeeding does not necessarily means records where stored, be sure - /// to call [`Self::sync`] to make sure changes are persisted if you don't use - /// [`SyncPolicy::OnAppend`] (which is the default). + /// to call [`Self::persist`] to make sure changes are persisted if you don't use + /// [`PersistPolicy::Always`] (which is the default). pub fn append_records>( &mut self, queue: &str, @@ -240,7 +288,7 @@ impl MultiRecordLog { records, }; self.record_log_writer.write_record(record)?; - self.sync_on_policy()?; + self.persist_on_policy()?; let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; let mut max_position = position; @@ -269,7 +317,7 @@ impl MultiRecordLog { if has_empty_queues { // We need to sync here! We are remove files from the FS // so we need to make sure our empty queue positions are properly persisted. - self.sync()?; + self.persist_and_maybe_fsync()?; } Ok(()) } @@ -289,7 +337,7 @@ impl MultiRecordLog { .write_record(MultiPlexedRecord::Truncate { position, queue })?; let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0); self.run_gc_if_necessary()?; - self.sync_on_policy()?; + self.persist_on_policy()?; Ok(removed_count) } @@ -333,16 +381,22 @@ impl MultiRecordLog { self.in_mem_queues.range(queue, range) } - fn sync_on_policy(&mut self) -> io::Result<()> { - if self.next_sync.should_sync() { - self.sync()?; - self.next_sync.update_synced(); + /// Flush if the policy says it should be done + fn persist_on_policy(&mut self) -> io::Result<()> { + if self.next_persist.should_persist() { + self.persist_and_maybe_fsync()?; + self.next_persist.update_persisted(); } Ok(()) } - pub fn sync(&mut self) -> io::Result<()> { - self.record_log_writer.flush(false) + fn persist_and_maybe_fsync(&mut self) -> io::Result<()> { + self.persist(self.next_persist.action().is_fsync()) + } + + /// Flush and optionnally fsync data + pub fn persist(&mut self, fsync: bool) -> io::Result<()> { + self.record_log_writer.flush(fsync) } /// Returns the position of the last record appended to the queue. diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index d7f34ad..c46e016 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -86,7 +86,7 @@ fn test_read_truncated() { std::fs::OpenOptions::new() .truncate(true) .write(true) - .open(&filepath) + .open(filepath) .unwrap(); } {