From ab1d28951dae7c1cd66bc19cf8624b3f56f462ca Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 22 Mar 2024 18:50:59 +0900 Subject: [PATCH] Change in semantics. Upon critical situation we flush and fsync. --- src/block_read_write.rs | 8 +-- src/frame/tests.rs | 10 ++-- src/frame/writer.rs | 6 +-- src/multi_record_log.rs | 22 ++++----- src/persist_policy.rs | 104 +++++++++++++++++++++++++++++++++++++++ src/recordlog/tests.rs | 14 +++--- src/recordlog/writer.rs | 9 ++-- src/rolling/directory.rs | 20 +++++--- src/rolling/tests.rs | 10 ++-- 9 files changed, 156 insertions(+), 47 deletions(-) create mode 100644 src/persist_policy.rs diff --git a/src/block_read_write.rs b/src/block_read_write.rs index ab89a88..f2bd521 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -1,5 +1,7 @@ use std::io; +use crate::PersistAction; + pub const BLOCK_NUM_BYTES: usize = 32_768; pub trait BlockRead { @@ -25,8 +27,8 @@ pub trait BlockRead { pub trait BlockWrite { /// Must panic if buf is larger than `num_bytes_remaining_in_block`. fn write(&mut self, buf: &[u8]) -> io::Result<()>; - /// The semantics of flush may depend on the implementation. - fn flush(&mut self, fsync: bool) -> io::Result<()>; + /// Persist the data following the `persist_action`. + fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>; /// Number of bytes that can be added in the block. fn num_bytes_remaining_in_block(&self) -> usize; } @@ -90,7 +92,7 @@ impl BlockWrite for VecBlockWriter { Ok(()) } - fn flush(&mut self, _fsync: bool) -> io::Result<()> { + fn persist(&mut self, _persist_action: PersistAction) -> io::Result<()> { Ok(()) } diff --git a/src/frame/tests.rs b/src/frame/tests.rs index e3d865b..1ea8a7b 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -3,7 +3,7 @@ use std::io; use crate::block_read_write::{ArrayReader, VecBlockWriter}; use crate::frame::header::{FrameType, HEADER_LEN}; use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::BLOCK_NUM_BYTES; +use crate::{PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_frame_simple() { @@ -19,7 +19,7 @@ fn test_frame_simple() { frame_writer .write_frame(FrameType::Last, &b"fgh"[..]) .unwrap(); - frame_writer.flush(false).unwrap(); + frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer() }; let buffer: Vec = block_writer.into(); @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> { let mut buf: Vec = { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); frame_writer.write_frame(FrameType::First, &b"abc"[..])?; - frame_writer.flush(false)?; + frame_writer.persist(PersistAction::Flush)?; frame_writer.write_frame(FrameType::Middle, &b"de"[..])?; - frame_writer.flush(false)?; + frame_writer.persist(PersistAction::Flush)?; frame_writer.into_writer().into() }; buf[8] = 0u8; @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec { for _ in 0..repeat { frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap(); } - frame_writer.flush(false).unwrap(); + frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer().into() } diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 6afb954..69d23c0 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -2,7 +2,7 @@ use std::io; use crate::frame::{FrameType, Header, HEADER_LEN}; use crate::rolling::{Directory, RollingWriter}; -use crate::{BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct FrameWriter { wrt: W, @@ -41,8 +41,8 @@ impl FrameWriter { /// When writing to a file, this performs a syscall and /// the OS will be in charge of eventually writing the data /// to disk, but this is not sufficient to ensure durability. - pub fn flush(&mut self, fsync: bool) -> io::Result<()> { - self.wrt.flush(fsync) + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.wrt.persist(persist_action) } /// Returns the maximum amount of bytes that can be written. diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 068a59d..89b5944 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -112,7 +112,7 @@ impl MultiRecordLog { } let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; self.record_log_writer.write_record(record)?; - self.persist_and_maybe_fsync()?; + self.persist(PersistAction::FlushAndFsync)?; self.in_mem_queues.create_queue(queue)?; Ok(()) } @@ -124,7 +124,7 @@ impl MultiRecordLog { self.record_log_writer.write_record(record)?; self.in_mem_queues.delete_queue(queue)?; self.run_gc_if_necessary()?; - self.persist_and_maybe_fsync()?; + self.persist(PersistAction::FlushAndFsync)?; Ok(()) } @@ -216,9 +216,9 @@ impl MultiRecordLog { has_empty_queues = true } if has_empty_queues { - // We need to sync here! We are remove files from the FS + // We need to fsync here! We are remove files from the FS // so we need to make sure our empty queue positions are properly persisted. - self.persist_and_maybe_fsync()?; + self.persist(PersistAction::FlushAndFsync)?; } Ok(()) } @@ -284,20 +284,20 @@ impl MultiRecordLog { /// Flush if the policy says it should be done fn persist_on_policy(&mut self) -> io::Result<()> { - if self.next_persist.should_persist() { - self.persist_and_maybe_fsync()?; + if let Some(persist_action) = self.next_persist.should_persist() { + self.persist(persist_action)?; self.next_persist.update_persisted(); } Ok(()) } - fn persist_and_maybe_fsync(&mut self) -> io::Result<()> { - self.persist(self.next_persist.action().is_fsync()) - } + // fn persist_and_maybe_fsync(&mut self) -> io::Result<()> { + // self.persist(self.next_persist.action().is_fsync()) + // } /// Flush and optionnally fsync data - pub fn persist(&mut self, fsync: bool) -> io::Result<()> { - self.record_log_writer.flush(fsync) + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.record_log_writer.persist(persist_action) } /// Returns the position of the last record appended to the queue. diff --git a/src/persist_policy.rs b/src/persist_policy.rs new file mode 100644 index 0000000..07cb6d0 --- /dev/null +++ b/src/persist_policy.rs @@ -0,0 +1,104 @@ +use std::time::{Duration, Instant}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PersistAction { + /// The buffer will be flushed to the OS, but not necessarily to the disk. + Flush, + /// The buffer will be flushed to the OS, and the OS will be asked to flush + /// it to the disk. + FlushAndFsync, +} + +impl PersistAction { + pub fn is_fsync(self) -> bool { + self == PersistAction::FlushAndFsync + } +} + +/// We have two type of operations on the mrecordlog. +/// +/// Critical records are relatively rare and really need to be persisted: +/// - RecordPosition { queue: &'a str, position: u64 }, +/// - DeleteQueue. +/// +/// For these operations, we want to always flush and fsync. +/// +/// On the other hand, +/// - Truncate +/// - AppendRecords +/// are considered are more frequent and one might want to sacrifice +/// persistence guarantees for performance. +/// +/// The `PersistPolicy` defines the trade-off applied for the second kind of +/// operations. +#[derive(Clone, Debug)] +pub enum PersistPolicy { + /// Only ensure data is persisted when critical records are written. + /// + /// With this policy, the timing after which the data reaches the disk + /// is up to the OS. + DoNothing, + /// Pesiste data once every interval, and when critical records are written + OnDelay { + interval: Duration, + action: PersistAction, + }, + /// Persist data after each action + Always(PersistAction), +} + +#[derive(Debug)] +pub(crate) enum PersistState { + OnAppend(PersistAction), + OnDelay { + next_persist: Instant, + interval: Duration, + action: PersistAction, + }, + NoOp, +} + +impl PersistState { + pub fn should_persist(&self) -> Option { + match self { + PersistState::OnAppend(action) => Some(*action), + PersistState::OnDelay { + action, + next_persist, + .. + } => { + if *next_persist < Instant::now() { + Some(*action) + } else { + None + } + } + PersistState::NoOp => None, + } + } + + pub fn update_persisted(&mut self) { + match self { + PersistState::OnAppend(_) | PersistState::NoOp => (), + PersistState::OnDelay { + ref mut next_persist, + interval, + .. + } => *next_persist = Instant::now() + *interval, + } + } +} + +impl From for PersistState { + fn from(val: PersistPolicy) -> PersistState { + match val { + PersistPolicy::Always(action) => PersistState::OnAppend(action), + PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay { + next_persist: Instant::now() + interval, + interval, + action, + }, + PersistPolicy::DoNothing => PersistState::NoOp, + } + } +} diff --git a/src/recordlog/tests.rs b/src/recordlog/tests.rs index e293c30..59f21df 100644 --- a/src/recordlog/tests.rs +++ b/src/recordlog/tests.rs @@ -2,7 +2,7 @@ use super::{RecordReader, RecordWriter}; use crate::block_read_write::ArrayReader; use crate::error::ReadRecordError; use crate::frame::HEADER_LEN; -use crate::BLOCK_NUM_BYTES; +use crate::{PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_no_data() { @@ -15,7 +15,7 @@ fn test_no_data() { fn test_empty_record() { let mut writer = RecordWriter::in_memory(); writer.write_record("").unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!(reader.read_record::<&str>().unwrap(), Some("")); @@ -27,7 +27,7 @@ fn test_simple_record() { let mut writer = RecordWriter::in_memory(); let record = "hello"; writer.write_record(record).unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello")))); @@ -43,7 +43,7 @@ fn test_spans_over_more_than_one_block() { let long_entry: String = make_long_entry(80_000); let mut writer = RecordWriter::in_memory(); writer.write_record(long_entry.as_str()).unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); let record_payload: &str = reader.read_record().unwrap().unwrap(); @@ -60,7 +60,7 @@ fn test_block_requires_padding() { let mut writer = RecordWriter::in_memory(); writer.write_record(long_record.as_str()).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buffer: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); assert_eq!( @@ -80,7 +80,7 @@ fn test_first_chunk_empty() { let mut writer = RecordWriter::in_memory(); writer.write_record(&long_record[..]).unwrap(); writer.write_record(short_record).unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); assert_eq!( @@ -98,7 +98,7 @@ fn test_behavior_upon_corruption() { for record in &records { writer.write_record(record.as_str()).unwrap(); } - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let mut buffer: Vec = writer.into_writer().into(); { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index 9ea6dad..0a44d6e 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -3,7 +3,7 @@ use std::io; use crate::block_read_write::VecBlockWriter; use crate::frame::{FrameType, FrameWriter}; use crate::rolling::{Directory, FileNumber, RollingWriter}; -use crate::{BlockWrite, Serializable}; +use crate::{BlockWrite, PersistAction, Serializable}; pub struct RecordWriter { frame_writer: FrameWriter, @@ -67,10 +67,9 @@ impl RecordWriter { Ok(()) } - /// Flushes and sync the data to disk. - pub fn flush(&mut self, fsync: bool) -> io::Result<()> { - // Empty the application buffer and optionally fsyncs - self.frame_writer.flush(fsync) + /// Persist the data to disk, according to the persist_action. + pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + self.frame_writer.persist(persist_action) } pub fn get_underlying_wrt(&self) -> &W { diff --git a/src/rolling/directory.rs b/src/rolling/directory.rs index 28af25f..ac64bac 100644 --- a/src/rolling/directory.rs +++ b/src/rolling/directory.rs @@ -6,7 +6,7 @@ use tracing::info; use super::{FileNumber, FileTracker}; use crate::rolling::{FILE_NUM_BYTES, FRAME_NUM_BYTES}; -use crate::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockRead, BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct Directory { dir: PathBuf, @@ -267,13 +267,17 @@ impl BlockWrite for RollingWriter { Ok(()) } - fn flush(&mut self, fsync: bool) -> io::Result<()> { - if fsync { - self.file.flush()?; - self.file.get_ref().sync_data()?; - self.directory.sync_directory() - } else { - self.file.flush() + fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> { + match persist_action { + PersistAction::FlushAndFsync => { + self.file.flush()?; + self.file.get_ref().sync_data()?; + self.directory.sync_directory() + } + PersistAction::Flush => { + // This will flush the buffer of the BufWriter to the underlying OS. + self.file.flush() + } } } diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index c46e016..1c017d8 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; +use crate::{BlockRead, BlockWrite, PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_read_write() { @@ -15,7 +15,7 @@ fn test_read_write() { writer.write(&buffer[..]).unwrap(); buffer.fill(2u8); writer.write(&buffer[..]).unwrap(); - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); assert!(rolling_reader.block().iter().all(|&b| b == 0)); @@ -36,7 +36,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -50,7 +50,7 @@ fn test_read_write_2nd_block() { buffer.fill(i); writer.write(&buffer[..]).unwrap(); } - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); @@ -76,7 +76,7 @@ fn test_read_truncated() { buffer.fill(i as u8); writer.write(&buffer[..]).unwrap(); } - writer.flush(false).unwrap(); + writer.persist(PersistAction::Flush).unwrap(); let file_ids = writer.list_file_numbers(); let middle_file = file_ids[1]; let filepath =