Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

call fsyncdata on flush #54

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait BlockWrite {
/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// The semantics of flush may depend on the implementation.
fn flush(&mut self) -> io::Result<()>;
fn flush(&mut self, fsync: bool) -> io::Result<()>;
/// Number of bytes that can be added in the block.
fn num_bytes_remaining_in_block(&self) -> usize;
}
Expand Down Expand Up @@ -90,7 +90,7 @@ impl BlockWrite for VecBlockWriter {
Ok(())
}

fn flush(&mut self) -> io::Result<()> {
fn flush(&mut self, _fsync: bool) -> io::Result<()> {
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn test_frame_simple() {
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.unwrap();
frame_writer.flush().unwrap();
frame_writer.flush(false).unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
Expand All @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
frame_writer.flush()?;
frame_writer.flush(false)?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.flush()?;
frame_writer.flush(false)?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
Expand All @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
for _ in 0..repeat {
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
}
frame_writer.flush().unwrap();
frame_writer.flush(false).unwrap();
frame_writer.into_writer().into()
}

Expand Down
4 changes: 2 additions & 2 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// When writing to a file, this performs a syscall and
/// the OS will be in charge of eventually writing the data
/// to disk, but this is not sufficient to ensure durability.
pub fn flush(&mut self) -> io::Result<()> {
self.wrt.flush()
pub fn flush(&mut self, fsync: bool) -> io::Result<()> {
self.wrt.flush(fsync)
}

/// Returns the maximum amount of bytes that can be written.
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod record;
mod recordlog;
mod rolling;

pub use multi_record_log::{MultiRecordLog, SyncPolicy};
pub use multi_record_log::{MultiRecordLog, PersistAction, PersistPolicy};

#[derive(Debug, PartialEq, Eq)]
pub struct Record<'a> {
Expand Down
142 changes: 98 additions & 44 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,119 @@
pub struct MultiRecordLog {
record_log_writer: crate::recordlog::RecordWriter<RollingWriter>,
in_mem_queues: mem::MemQueues,
next_sync: SyncState,
next_persist: PersistState,
// A simple buffer we reuse to avoid allocation.
multi_record_spare_buffer: Vec<u8>,
}

/// Policy for synchonizing and flushing data
pub enum SyncPolicy {
/// Sync and flush at each operation
OnAppend,
/// Sync and flush regularly. Sync is realized on the first operation after the delay since
/// last sync elapsed. This means if no new operation arrive, some content may not get
/// flushed for a while.
OnDelay(Duration),
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PersistAction {
/// The buffer will be flushed to the OS, but not necessarily to the disk.
Flush,
/// The buffer will be flushed to the OS, and the OS will be asked to flush
/// it to the disk.
FlushAndFsync,
}

impl PersistAction {
fn is_fsync(self) -> bool {
self == PersistAction::FlushAndFsync
}
}

/// We have two type of operations on the mrecordlog.
///
/// Critical records are relatively rare and really need to be persisted:
/// - RecordPosition { queue: &'a str, position: u64 },
/// - DeleteQueue.
///
/// For these operations, we want to always flush and fsync.
///
/// On the other hand,
/// - Truncate
/// - AppendRecords
/// are considered are more frequent and one might want to sacrifice
/// persistence guarantees for performance.
///
/// The `PersistPolicy` defines the trade-off applied for the second kind of
/// operations.
#[derive(Clone, Debug)]
pub enum PersistPolicy {
/// Only persist data when asked for, and when critical records are written
DoNothing(PersistAction),
/// Pesiste data once every interval, and when critical records are written
OnDelay {
interval: Duration,
action: PersistAction,
},
/// Persist data after each action
Always(PersistAction),
}

#[derive(Debug)]
enum SyncState {
OnAppend,
enum PersistState {
OnAppend(PersistAction),
OnDelay {
next_sync: Instant,
next_persist: Instant,
interval: Duration,
action: PersistAction,
},
OnRequest(PersistAction),
}

Check warning on line 79 in src/multi_record_log.rs

View workflow job for this annotation

GitHub Actions / clippy

all variants have the same prefix: `On`

warning: all variants have the same prefix: `On` --> src/multi_record_log.rs:71:1 | 71 | / enum PersistState { 72 | | OnAppend(PersistAction), 73 | | OnDelay { 74 | | next_persist: Instant, ... | 78 | | OnRequest(PersistAction), 79 | | } | |_^ | = help: remove the prefixes and use full paths to the variants instead of glob imports = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#enum_variant_names = note: `#[warn(clippy::enum_variant_names)]` on by default

Check warning on line 79 in src/multi_record_log.rs

View workflow job for this annotation

GitHub Actions / clippy

all variants have the same prefix: `On`

warning: all variants have the same prefix: `On` --> src/multi_record_log.rs:71:1 | 71 | / enum PersistState { 72 | | OnAppend(PersistAction), 73 | | OnDelay { 74 | | next_persist: Instant, ... | 78 | | OnRequest(PersistAction), 79 | | } | |_^ | = help: remove the prefixes and use full paths to the variants instead of glob imports = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#enum_variant_names = note: `#[warn(clippy::enum_variant_names)]` on by default

impl SyncState {
fn should_sync(&self) -> bool {
impl PersistState {
fn should_persist(&self) -> bool {
match self {
SyncState::OnAppend => true,
SyncState::OnDelay { next_sync, .. } => *next_sync < Instant::now(),
PersistState::OnAppend(_) => true,
PersistState::OnDelay { next_persist, .. } => *next_persist < Instant::now(),
PersistState::OnRequest(_) => false,
}
}

fn update_synced(&mut self) {
fn update_persisted(&mut self) {
match self {
SyncState::OnAppend => (),
SyncState::OnDelay {
ref mut next_sync,
PersistState::OnAppend(_) | PersistState::OnRequest(_) => (),
PersistState::OnDelay {
ref mut next_persist,
interval,
} => *next_sync = Instant::now() + *interval,
..
} => *next_persist = Instant::now() + *interval,
}
}

fn action(&self) -> PersistAction {
match self {
PersistState::OnAppend(action) => *action,
PersistState::OnDelay { action, .. } => *action,
PersistState::OnRequest(action) => *action,
}
}
}

impl From<SyncPolicy> for SyncState {
fn from(val: SyncPolicy) -> SyncState {
impl From<PersistPolicy> for PersistState {
fn from(val: PersistPolicy) -> PersistState {
match val {
SyncPolicy::OnAppend => SyncState::OnAppend,
SyncPolicy::OnDelay(dur) => SyncState::OnDelay {
next_sync: Instant::now() + dur,
interval: dur,
PersistPolicy::Always(action) => PersistState::OnAppend(action),
PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay {
next_persist: Instant::now() + interval,
interval,
action,
},
PersistPolicy::DoNothing(action) => PersistState::OnRequest(action),
}
}
}

impl MultiRecordLog {
/// Open the multi record log, syncing after each operation.
/// Open the multi record log, flushing after each operation, bit not fsyncing.
pub fn open(directory_path: &Path) -> Result<Self, ReadRecordError> {
Self::open_with_prefs(directory_path, SyncPolicy::OnAppend)
Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush))
}

/// Open the multi record log, syncing following the provided policy.
pub fn open_with_prefs(
directory_path: &Path,
sync_policy: SyncPolicy,
persist_policy: PersistPolicy,
) -> Result<Self, ReadRecordError> {
// io errors are non-recoverable
let rolling_reader = crate::rolling::RollingReader::open(directory_path)?;
Expand Down Expand Up @@ -140,7 +188,7 @@
let mut multi_record_log = MultiRecordLog {
record_log_writer,
in_mem_queues,
next_sync: sync_policy.into(),
next_persist: persist_policy.into(),
multi_record_spare_buffer: Vec::new(),
};
multi_record_log.run_gc_if_necessary()?;
Expand All @@ -163,7 +211,7 @@
}
let record = MultiPlexedRecord::RecordPosition { queue, position: 0 };
self.record_log_writer.write_record(record)?;
self.sync()?;
self.persist_and_maybe_fsync()?;
self.in_mem_queues.create_queue(queue)?;
Ok(())
}
Expand All @@ -175,7 +223,7 @@
self.record_log_writer.write_record(record)?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary()?;
self.sync()?;
self.persist_and_maybe_fsync()?;
Ok(())
}

Expand Down Expand Up @@ -205,8 +253,8 @@
///
/// This operation is atomic: either all records get stored, or none do.
/// However this function succeeding does not necessarily means records where stored, be sure
/// to call [`Self::sync`] to make sure changes are persisted if you don't use
/// [`SyncPolicy::OnAppend`] (which is the default).
/// to call [`Self::persist`] to make sure changes are persisted if you don't use
/// [`PersistPolicy::Always`] (which is the default).
pub fn append_records<T: Iterator<Item = impl Buf>>(
&mut self,
queue: &str,
Expand Down Expand Up @@ -240,7 +288,7 @@
records,
};
self.record_log_writer.write_record(record)?;
self.sync_on_policy()?;
self.persist_on_policy()?;

let mem_queue = self.in_mem_queues.get_queue_mut(queue)?;
let mut max_position = position;
Expand Down Expand Up @@ -269,7 +317,7 @@
if has_empty_queues {
// We need to sync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.sync()?;
self.persist_and_maybe_fsync()?;
}
Ok(())
}
Expand All @@ -289,7 +337,7 @@
.write_record(MultiPlexedRecord::Truncate { position, queue })?;
let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0);
self.run_gc_if_necessary()?;
self.sync_on_policy()?;
self.persist_on_policy()?;
Ok(removed_count)
}

Expand Down Expand Up @@ -333,16 +381,22 @@
self.in_mem_queues.range(queue, range)
}

fn sync_on_policy(&mut self) -> io::Result<()> {
if self.next_sync.should_sync() {
self.sync()?;
self.next_sync.update_synced();
/// Flush if the policy says it should be done
fn persist_on_policy(&mut self) -> io::Result<()> {
if self.next_persist.should_persist() {
self.persist_and_maybe_fsync()?;
self.next_persist.update_persisted();
}
Ok(())
}

pub fn sync(&mut self) -> io::Result<()> {
self.record_log_writer.flush()
fn persist_and_maybe_fsync(&mut self) -> io::Result<()> {
self.persist(self.next_persist.action().is_fsync())
}

/// Flush and optionnally fsync data
pub fn persist(&mut self, fsync: bool) -> io::Result<()> {
self.record_log_writer.flush(fsync)
}

/// Returns the position of the last record appended to the queue.
Expand Down
12 changes: 6 additions & 6 deletions src/recordlog/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn test_no_data() {
fn test_empty_record() {
let mut writer = RecordWriter::in_memory();
writer.write_record("").unwrap();
writer.flush().unwrap();
writer.flush(false).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert_eq!(reader.read_record::<&str>().unwrap(), Some(""));
Expand All @@ -27,7 +27,7 @@ fn test_simple_record() {
let mut writer = RecordWriter::in_memory();
let record = "hello";
writer.write_record(record).unwrap();
writer.flush().unwrap();
writer.flush(false).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello"))));
Expand All @@ -43,7 +43,7 @@ fn test_spans_over_more_than_one_block() {
let long_entry: String = make_long_entry(80_000);
let mut writer = RecordWriter::in_memory();
writer.write_record(long_entry.as_str()).unwrap();
writer.flush().unwrap();
writer.flush(false).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
let record_payload: &str = reader.read_record().unwrap().unwrap();
Expand All @@ -60,7 +60,7 @@ fn test_block_requires_padding() {
let mut writer = RecordWriter::in_memory();
writer.write_record(long_record.as_str()).unwrap();
writer.write_record(short_record).unwrap();
writer.flush().unwrap();
writer.flush(false).unwrap();
let buffer: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buffer[..]));
assert_eq!(
Expand All @@ -80,7 +80,7 @@ fn test_first_chunk_empty() {
let mut writer = RecordWriter::in_memory();
writer.write_record(&long_record[..]).unwrap();
writer.write_record(short_record).unwrap();
writer.flush().unwrap();
writer.flush(false).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert_eq!(
Expand All @@ -98,7 +98,7 @@ fn test_behavior_upon_corruption() {
for record in &records {
writer.write_record(record.as_str()).unwrap();
}
writer.flush().unwrap();
writer.flush(false).unwrap();
let mut buffer: Vec<u8> = writer.into_writer().into();
{
let mut reader = RecordReader::open(ArrayReader::from(&buffer[..]));
Expand Down
4 changes: 2 additions & 2 deletions src/recordlog/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ impl<W: BlockWrite + Unpin> RecordWriter<W> {
}

/// Flushes and sync the data to disk.
pub fn flush(&mut self) -> io::Result<()> {
pub fn flush(&mut self, fsync: bool) -> io::Result<()> {
// Empty the application buffer.
self.frame_writer.flush()
self.frame_writer.flush(fsync)
}

pub fn get_underlying_wrt(&self) -> &W {
Expand Down
Loading
Loading