Skip to content

Commit

Permalink
Fulmicoton fsync data (#60)
Browse files Browse the repository at this point in the history
Add facility to fdatasync.

This PR also refactors the concept of SyncPolicy into a PersistPolicy.
See comment on PersistPolicy for more details.

---------

Co-authored-by: trinity-1686a <[email protected]>
  • Loading branch information
fulmicoton and trinity-1686a authored Mar 22, 2024
1 parent a9b003b commit 8a77dd2
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 103 deletions.
8 changes: 5 additions & 3 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::io;

use crate::PersistAction;

pub const BLOCK_NUM_BYTES: usize = 32_768;

pub trait BlockRead {
Expand All @@ -25,8 +27,8 @@ pub trait BlockRead {
pub trait BlockWrite {
/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// The semantics of flush may depend on the implementation.
fn flush(&mut self) -> io::Result<()>;
/// Persist the data following the `persist_action`.
fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>;
/// Number of bytes that can be added in the block.
fn num_bytes_remaining_in_block(&self) -> usize;
}
Expand Down Expand Up @@ -90,7 +92,7 @@ impl BlockWrite for VecBlockWriter {
Ok(())
}

fn flush(&mut self) -> io::Result<()> {
fn persist(&mut self, _persist_action: PersistAction) -> io::Result<()> {
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use crate::block_read_write::{ArrayReader, VecBlockWriter};
use crate::frame::header::{FrameType, HEADER_LEN};
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::BLOCK_NUM_BYTES;
use crate::{PersistAction, BLOCK_NUM_BYTES};

#[test]
fn test_frame_simple() {
Expand All @@ -19,7 +19,7 @@ fn test_frame_simple() {
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.unwrap();
frame_writer.flush().unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
Expand All @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
frame_writer.flush()?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.flush()?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
Expand All @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
for _ in 0..repeat {
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
}
frame_writer.flush().unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer().into()
}

Expand Down
6 changes: 3 additions & 3 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;

use crate::frame::{FrameType, Header, HEADER_LEN};
use crate::rolling::{Directory, RollingWriter};
use crate::{BlockWrite, BLOCK_NUM_BYTES};
use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES};

pub struct FrameWriter<W> {
wrt: W,
Expand Down Expand Up @@ -41,8 +41,8 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// When writing to a file, this performs a syscall and
/// the OS will be in charge of eventually writing the data
/// to disk, but this is not sufficient to ensure durability.
pub fn flush(&mut self) -> io::Result<()> {
self.wrt.flush()
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.wrt.persist(persist_action)
}

/// Returns the maximum amount of bytes that can be written.
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ pub mod error;
mod frame;
mod mem;
mod multi_record_log;
mod persist_policy;
mod record;
mod recordlog;
mod rolling;

pub use multi_record_log::{MultiRecordLog, SyncPolicy};
pub use multi_record_log::MultiRecordLog;
pub(crate) use persist_policy::PersistState;
pub use persist_policy::{PersistAction, PersistPolicy};

#[derive(Debug, PartialEq, Eq)]
pub struct Record<'a> {
Expand Down
93 changes: 22 additions & 71 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io;
use std::ops::RangeBounds;
use std::path::Path;
use std::time::{Duration, Instant};

use bytes::Buf;
use tracing::{debug, event_enabled, info, warn, Level};
Expand All @@ -13,70 +12,20 @@ use crate::mem::{MemQueue, QueuesSummary};
use crate::record::{MultiPlexedRecord, MultiRecord};
use crate::recordlog::RecordWriter;
use crate::rolling::RollingWriter;
use crate::{mem, Record, ResourceUsage};
use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage};

pub struct MultiRecordLog {
record_log_writer: crate::recordlog::RecordWriter<RollingWriter>,
in_mem_queues: mem::MemQueues,
next_sync: SyncState,
next_persist: PersistState,
// A simple buffer we reuse to avoid allocation.
multi_record_spare_buffer: Vec<u8>,
}

/// Policy for synchonizing and flushing data
pub enum SyncPolicy {
/// Sync and flush at each operation
OnAppend,
/// Sync and flush regularly. Sync is realized on the first operation after the delay since
/// last sync elapsed. This means if no new operation arrive, some content may not get
/// flushed for a while.
OnDelay(Duration),
}

#[derive(Debug)]
enum SyncState {
OnAppend,
OnDelay {
next_sync: Instant,
interval: Duration,
},
}

impl SyncState {
fn should_sync(&self) -> bool {
match self {
SyncState::OnAppend => true,
SyncState::OnDelay { next_sync, .. } => *next_sync < Instant::now(),
}
}

fn update_synced(&mut self) {
match self {
SyncState::OnAppend => (),
SyncState::OnDelay {
ref mut next_sync,
interval,
} => *next_sync = Instant::now() + *interval,
}
}
}

impl From<SyncPolicy> for SyncState {
fn from(val: SyncPolicy) -> SyncState {
match val {
SyncPolicy::OnAppend => SyncState::OnAppend,
SyncPolicy::OnDelay(dur) => SyncState::OnDelay {
next_sync: Instant::now() + dur,
interval: dur,
},
}
}
}

impl MultiRecordLog {
/// Open the multi record log, syncing after each operation.
/// Open the multi record log, flushing after each operation, but not fsyncing.
pub fn open(directory_path: &Path) -> Result<Self, ReadRecordError> {
Self::open_with_prefs(directory_path, SyncPolicy::OnAppend)
Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush))
}

pub fn summary(&self) -> QueuesSummary {
Expand All @@ -86,7 +35,7 @@ impl MultiRecordLog {
/// Open the multi record log, syncing following the provided policy.
pub fn open_with_prefs(
directory_path: &Path,
sync_policy: SyncPolicy,
persist_policy: PersistPolicy,
) -> Result<Self, ReadRecordError> {
// io errors are non-recoverable
let rolling_reader = crate::rolling::RollingReader::open(directory_path)?;
Expand Down Expand Up @@ -144,7 +93,7 @@ impl MultiRecordLog {
let mut multi_record_log = MultiRecordLog {
record_log_writer,
in_mem_queues,
next_sync: sync_policy.into(),
next_persist: persist_policy.into(),
multi_record_spare_buffer: Vec::new(),
};
multi_record_log.run_gc_if_necessary()?;
Expand All @@ -167,7 +116,7 @@ impl MultiRecordLog {
}
let record = MultiPlexedRecord::RecordPosition { queue, position: 0 };
self.record_log_writer.write_record(record)?;
self.sync()?;
self.persist_on_policy()?;
self.in_mem_queues.create_queue(queue)?;
Ok(())
}
Expand All @@ -179,7 +128,7 @@ impl MultiRecordLog {
self.record_log_writer.write_record(record)?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary()?;
self.sync()?;
self.persist(PersistAction::FlushAndFsync)?;
Ok(())
}

Expand Down Expand Up @@ -209,8 +158,8 @@ impl MultiRecordLog {
///
/// This operation is atomic: either all records get stored, or none do.
/// However this function succeeding does not necessarily means records where stored, be sure
/// to call [`Self::sync`] to make sure changes are persisted if you don't use
/// [`SyncPolicy::OnAppend`] (which is the default).
/// to call [`Self::persist`] to make sure changes are persisted if you don't use
/// [`PersistPolicy::Always`] (which is the default).
pub fn append_records<T: Iterator<Item = impl Buf>>(
&mut self,
queue: &str,
Expand Down Expand Up @@ -244,7 +193,7 @@ impl MultiRecordLog {
records,
};
self.record_log_writer.write_record(record)?;
self.sync_on_policy()?;
self.persist_on_policy()?;

let mem_queue = self.in_mem_queues.get_queue_mut(queue)?;
let mut max_position = position;
Expand All @@ -271,9 +220,9 @@ impl MultiRecordLog {
has_empty_queues = true
}
if has_empty_queues {
// We need to sync here! We are remove files from the FS
// We need to fsync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.sync()?;
self.persist(PersistAction::FlushAndFsync)?;
}
Ok(())
}
Expand All @@ -293,7 +242,7 @@ impl MultiRecordLog {
.write_record(MultiPlexedRecord::Truncate { position, queue })?;
let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0);
self.run_gc_if_necessary()?;
self.sync_on_policy()?;
self.persist_on_policy()?;
Ok(removed_count)
}

Expand Down Expand Up @@ -337,16 +286,18 @@ impl MultiRecordLog {
self.in_mem_queues.range(queue, range)
}

fn sync_on_policy(&mut self) -> io::Result<()> {
if self.next_sync.should_sync() {
self.sync()?;
self.next_sync.update_synced();
/// Flush if the policy says it should be done
fn persist_on_policy(&mut self) -> io::Result<()> {
if let Some(persist_action) = self.next_persist.should_persist() {
self.persist(persist_action)?;
self.next_persist.update_persisted();
}
Ok(())
}

pub fn sync(&mut self) -> io::Result<()> {
self.record_log_writer.flush()
/// Flush and optionnally fsync data
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.record_log_writer.persist(persist_action)
}

/// Returns the position of the last record appended to the queue.
Expand Down
104 changes: 104 additions & 0 deletions src/persist_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::time::{Duration, Instant};

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PersistAction {
/// The buffer will be flushed to the OS, but not necessarily to the disk.
Flush,
/// The buffer will be flushed to the OS, and the OS will be asked to flush
/// it to the disk.
FlushAndFsync,
}

impl PersistAction {
pub fn is_fsync(self) -> bool {
self == PersistAction::FlushAndFsync
}
}

/// We have two type of operations on the mrecordlog.
///
/// Critical records are relatively rare and really need to be persisted:
/// - RecordPosition { queue: &'a str, position: u64 },
/// - DeleteQueue.
///
/// For these operations, we want to always flush and fsync.
///
/// On the other hand,
/// - Truncate
/// - AppendRecords
/// are considered are more frequent and one might want to sacrifice
/// persistence guarantees for performance.
///
/// The `PersistPolicy` defines the trade-off applied for the second kind of
/// operations.
#[derive(Clone, Debug)]
pub enum PersistPolicy {
/// Only ensure data is persisted when critical records are written.
///
/// With this policy, the timing after which the data reaches the disk
/// is up to the OS.
DoNothing,
/// Pesiste data once every interval, and when critical records are written
OnDelay {
interval: Duration,
action: PersistAction,
},
/// Persist data after each action
Always(PersistAction),
}

#[derive(Debug)]
pub(crate) enum PersistState {
OnAppend(PersistAction),
OnDelay {
next_persist: Instant,
interval: Duration,
action: PersistAction,
},
NoOp,
}

impl PersistState {
pub fn should_persist(&self) -> Option<PersistAction> {
match self {
PersistState::OnAppend(action) => Some(*action),
PersistState::OnDelay {
action,
next_persist,
..
} => {
if *next_persist < Instant::now() {
Some(*action)
} else {
None
}
}
PersistState::NoOp => None,
}
}

pub fn update_persisted(&mut self) {
match self {
PersistState::OnAppend(_) | PersistState::NoOp => (),
PersistState::OnDelay {
ref mut next_persist,
interval,
..
} => *next_persist = Instant::now() + *interval,
}
}
}

impl From<PersistPolicy> for PersistState {
fn from(val: PersistPolicy) -> PersistState {
match val {
PersistPolicy::Always(action) => PersistState::OnAppend(action),
PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay {
next_persist: Instant::now() + interval,
interval,
action,
},
PersistPolicy::DoNothing => PersistState::NoOp,
}
}
}
Loading

0 comments on commit 8a77dd2

Please sign in to comment.