Skip to content

Commit

Permalink
Change in semantics.
Browse files Browse the repository at this point in the history
Upon critical situation we flush and fsync.
  • Loading branch information
fulmicoton committed Mar 22, 2024
1 parent 33e5126 commit ab1d289
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 47 deletions.
8 changes: 5 additions & 3 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::io;

use crate::PersistAction;

pub const BLOCK_NUM_BYTES: usize = 32_768;

pub trait BlockRead {
Expand All @@ -25,8 +27,8 @@ pub trait BlockRead {
pub trait BlockWrite {
/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// The semantics of flush may depend on the implementation.
fn flush(&mut self, fsync: bool) -> io::Result<()>;
/// Persist the data following the `persist_action`.
fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>;
/// Number of bytes that can be added in the block.
fn num_bytes_remaining_in_block(&self) -> usize;
}
Expand Down Expand Up @@ -90,7 +92,7 @@ impl BlockWrite for VecBlockWriter {
Ok(())
}

fn flush(&mut self, _fsync: bool) -> io::Result<()> {
fn persist(&mut self, _persist_action: PersistAction) -> io::Result<()> {
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use crate::block_read_write::{ArrayReader, VecBlockWriter};
use crate::frame::header::{FrameType, HEADER_LEN};
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::BLOCK_NUM_BYTES;
use crate::{PersistAction, BLOCK_NUM_BYTES};

#[test]
fn test_frame_simple() {
Expand All @@ -19,7 +19,7 @@ fn test_frame_simple() {
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.unwrap();
frame_writer.flush(false).unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
Expand All @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
frame_writer.flush(false)?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.flush(false)?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
Expand All @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
for _ in 0..repeat {
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
}
frame_writer.flush(false).unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer().into()
}

Expand Down
6 changes: 3 additions & 3 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;

use crate::frame::{FrameType, Header, HEADER_LEN};
use crate::rolling::{Directory, RollingWriter};
use crate::{BlockWrite, BLOCK_NUM_BYTES};
use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES};

pub struct FrameWriter<W> {
wrt: W,
Expand Down Expand Up @@ -41,8 +41,8 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// When writing to a file, this performs a syscall and
/// the OS will be in charge of eventually writing the data
/// to disk, but this is not sufficient to ensure durability.
pub fn flush(&mut self, fsync: bool) -> io::Result<()> {
self.wrt.flush(fsync)
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.wrt.persist(persist_action)
}

/// Returns the maximum amount of bytes that can be written.
Expand Down
22 changes: 11 additions & 11 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl MultiRecordLog {
}
let record = MultiPlexedRecord::RecordPosition { queue, position: 0 };
self.record_log_writer.write_record(record)?;
self.persist_and_maybe_fsync()?;
self.persist(PersistAction::FlushAndFsync)?;
self.in_mem_queues.create_queue(queue)?;
Ok(())
}
Expand All @@ -124,7 +124,7 @@ impl MultiRecordLog {
self.record_log_writer.write_record(record)?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary()?;
self.persist_and_maybe_fsync()?;
self.persist(PersistAction::FlushAndFsync)?;
Ok(())
}

Expand Down Expand Up @@ -216,9 +216,9 @@ impl MultiRecordLog {
has_empty_queues = true
}
if has_empty_queues {
// We need to sync here! We are remove files from the FS
// We need to fsync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.persist_and_maybe_fsync()?;
self.persist(PersistAction::FlushAndFsync)?;
}
Ok(())
}
Expand Down Expand Up @@ -284,20 +284,20 @@ impl MultiRecordLog {

/// Flush if the policy says it should be done
fn persist_on_policy(&mut self) -> io::Result<()> {
if self.next_persist.should_persist() {
self.persist_and_maybe_fsync()?;
if let Some(persist_action) = self.next_persist.should_persist() {
self.persist(persist_action)?;
self.next_persist.update_persisted();
}
Ok(())
}

fn persist_and_maybe_fsync(&mut self) -> io::Result<()> {
self.persist(self.next_persist.action().is_fsync())
}
// fn persist_and_maybe_fsync(&mut self) -> io::Result<()> {
// self.persist(self.next_persist.action().is_fsync())
// }

/// Flush and optionnally fsync data
pub fn persist(&mut self, fsync: bool) -> io::Result<()> {
self.record_log_writer.flush(fsync)
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.record_log_writer.persist(persist_action)
}

/// Returns the position of the last record appended to the queue.
Expand Down
104 changes: 104 additions & 0 deletions src/persist_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::time::{Duration, Instant};

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PersistAction {
/// The buffer will be flushed to the OS, but not necessarily to the disk.
Flush,
/// The buffer will be flushed to the OS, and the OS will be asked to flush
/// it to the disk.
FlushAndFsync,
}

impl PersistAction {
pub fn is_fsync(self) -> bool {
self == PersistAction::FlushAndFsync
}
}

/// We have two type of operations on the mrecordlog.
///
/// Critical records are relatively rare and really need to be persisted:
/// - RecordPosition { queue: &'a str, position: u64 },
/// - DeleteQueue.
///
/// For these operations, we want to always flush and fsync.
///
/// On the other hand,
/// - Truncate
/// - AppendRecords
/// are considered are more frequent and one might want to sacrifice
/// persistence guarantees for performance.
///
/// The `PersistPolicy` defines the trade-off applied for the second kind of
/// operations.
#[derive(Clone, Debug)]
pub enum PersistPolicy {
/// Only ensure data is persisted when critical records are written.
///
/// With this policy, the timing after which the data reaches the disk
/// is up to the OS.
DoNothing,
/// Pesiste data once every interval, and when critical records are written
OnDelay {
interval: Duration,
action: PersistAction,
},
/// Persist data after each action
Always(PersistAction),
}

#[derive(Debug)]
pub(crate) enum PersistState {
OnAppend(PersistAction),
OnDelay {
next_persist: Instant,
interval: Duration,
action: PersistAction,
},
NoOp,
}

impl PersistState {
pub fn should_persist(&self) -> Option<PersistAction> {
match self {
PersistState::OnAppend(action) => Some(*action),
PersistState::OnDelay {
action,
next_persist,
..
} => {
if *next_persist < Instant::now() {
Some(*action)
} else {
None
}
}
PersistState::NoOp => None,
}
}

pub fn update_persisted(&mut self) {
match self {
PersistState::OnAppend(_) | PersistState::NoOp => (),
PersistState::OnDelay {
ref mut next_persist,
interval,
..
} => *next_persist = Instant::now() + *interval,
}
}
}

impl From<PersistPolicy> for PersistState {
fn from(val: PersistPolicy) -> PersistState {
match val {
PersistPolicy::Always(action) => PersistState::OnAppend(action),
PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay {
next_persist: Instant::now() + interval,
interval,
action,
},
PersistPolicy::DoNothing => PersistState::NoOp,
}
}
}
14 changes: 7 additions & 7 deletions src/recordlog/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{RecordReader, RecordWriter};
use crate::block_read_write::ArrayReader;
use crate::error::ReadRecordError;
use crate::frame::HEADER_LEN;
use crate::BLOCK_NUM_BYTES;
use crate::{PersistAction, BLOCK_NUM_BYTES};

#[test]
fn test_no_data() {
Expand All @@ -15,7 +15,7 @@ fn test_no_data() {
fn test_empty_record() {
let mut writer = RecordWriter::in_memory();
writer.write_record("").unwrap();
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert_eq!(reader.read_record::<&str>().unwrap(), Some(""));
Expand All @@ -27,7 +27,7 @@ fn test_simple_record() {
let mut writer = RecordWriter::in_memory();
let record = "hello";
writer.write_record(record).unwrap();
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello"))));
Expand All @@ -43,7 +43,7 @@ fn test_spans_over_more_than_one_block() {
let long_entry: String = make_long_entry(80_000);
let mut writer = RecordWriter::in_memory();
writer.write_record(long_entry.as_str()).unwrap();
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
let record_payload: &str = reader.read_record().unwrap().unwrap();
Expand All @@ -60,7 +60,7 @@ fn test_block_requires_padding() {
let mut writer = RecordWriter::in_memory();
writer.write_record(long_record.as_str()).unwrap();
writer.write_record(short_record).unwrap();
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let buffer: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buffer[..]));
assert_eq!(
Expand All @@ -80,7 +80,7 @@ fn test_first_chunk_empty() {
let mut writer = RecordWriter::in_memory();
writer.write_record(&long_record[..]).unwrap();
writer.write_record(short_record).unwrap();
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let buf: Vec<u8> = writer.into_writer().into();
let mut reader = RecordReader::open(ArrayReader::from(&buf[..]));
assert_eq!(
Expand All @@ -98,7 +98,7 @@ fn test_behavior_upon_corruption() {
for record in &records {
writer.write_record(record.as_str()).unwrap();
}
writer.flush(false).unwrap();
writer.persist(PersistAction::Flush).unwrap();
let mut buffer: Vec<u8> = writer.into_writer().into();
{
let mut reader = RecordReader::open(ArrayReader::from(&buffer[..]));
Expand Down
9 changes: 4 additions & 5 deletions src/recordlog/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use crate::block_read_write::VecBlockWriter;
use crate::frame::{FrameType, FrameWriter};
use crate::rolling::{Directory, FileNumber, RollingWriter};
use crate::{BlockWrite, Serializable};
use crate::{BlockWrite, PersistAction, Serializable};

pub struct RecordWriter<W> {
frame_writer: FrameWriter<W>,
Expand Down Expand Up @@ -67,10 +67,9 @@ impl<W: BlockWrite + Unpin> RecordWriter<W> {
Ok(())
}

/// Flushes and sync the data to disk.
pub fn flush(&mut self, fsync: bool) -> io::Result<()> {
// Empty the application buffer and optionally fsyncs
self.frame_writer.flush(fsync)
/// Persist the data to disk, according to the persist_action.
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.frame_writer.persist(persist_action)
}

pub fn get_underlying_wrt(&self) -> &W {
Expand Down
20 changes: 12 additions & 8 deletions src/rolling/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::info;

use super::{FileNumber, FileTracker};
use crate::rolling::{FILE_NUM_BYTES, FRAME_NUM_BYTES};
use crate::{BlockRead, BlockWrite, BLOCK_NUM_BYTES};
use crate::{BlockRead, BlockWrite, PersistAction, BLOCK_NUM_BYTES};

pub struct Directory {
dir: PathBuf,
Expand Down Expand Up @@ -267,13 +267,17 @@ impl BlockWrite for RollingWriter {
Ok(())
}

fn flush(&mut self, fsync: bool) -> io::Result<()> {
if fsync {
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.directory.sync_directory()
} else {
self.file.flush()
fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
match persist_action {
PersistAction::FlushAndFsync => {
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.directory.sync_directory()
}
PersistAction::Flush => {
// This will flush the buffer of the BufWriter to the underlying OS.
self.file.flush()
}
}
}

Expand Down
Loading

0 comments on commit ab1d289

Please sign in to comment.