Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fulmicoton fsync data #60

Merged
merged 5 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::io;

use crate::PersistAction;

pub const BLOCK_NUM_BYTES: usize = 32_768;

pub trait BlockRead {
Expand All @@ -25,8 +27,8 @@ pub trait BlockRead {
pub trait BlockWrite {
/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// The semantics of flush may depend on the implementation.
fn flush(&mut self) -> io::Result<()>;
/// Persist the data following the `persist_action`.
fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>;
/// Number of bytes that can be added in the block.
fn num_bytes_remaining_in_block(&self) -> usize;
}
Expand Down Expand Up @@ -90,7 +92,7 @@ impl BlockWrite for VecBlockWriter {
Ok(())
}

fn flush(&mut self) -> io::Result<()> {
fn persist(&mut self, _persist_action: PersistAction) -> io::Result<()> {
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use crate::block_read_write::{ArrayReader, VecBlockWriter};
use crate::frame::header::{FrameType, HEADER_LEN};
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::BLOCK_NUM_BYTES;
use crate::{PersistAction, BLOCK_NUM_BYTES};

#[test]
fn test_frame_simple() {
Expand All @@ -19,7 +19,7 @@ fn test_frame_simple() {
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.unwrap();
frame_writer.flush().unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
Expand All @@ -45,9 +45,9 @@ fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
frame_writer.flush()?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.flush()?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
Expand All @@ -68,7 +68,7 @@ fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
for _ in 0..repeat {
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
}
frame_writer.flush().unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer().into()
}

Expand Down
6 changes: 3 additions & 3 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;

use crate::frame::{FrameType, Header, HEADER_LEN};
use crate::rolling::{Directory, RollingWriter};
use crate::{BlockWrite, BLOCK_NUM_BYTES};
use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES};

pub struct FrameWriter<W> {
wrt: W,
Expand Down Expand Up @@ -41,8 +41,8 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// When writing to a file, this performs a syscall and
/// the OS will be in charge of eventually writing the data
/// to disk, but this is not sufficient to ensure durability.
pub fn flush(&mut self) -> io::Result<()> {
self.wrt.flush()
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.wrt.persist(persist_action)
}

/// Returns the maximum amount of bytes that can be written.
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ pub mod error;
mod frame;
mod mem;
mod multi_record_log;
mod persist_policy;
mod record;
mod recordlog;
mod rolling;

pub use multi_record_log::{MultiRecordLog, SyncPolicy};
pub use multi_record_log::MultiRecordLog;
pub(crate) use persist_policy::PersistState;
pub use persist_policy::{PersistAction, PersistPolicy};

#[derive(Debug, PartialEq, Eq)]
pub struct Record<'a> {
Expand Down
93 changes: 22 additions & 71 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io;
use std::ops::RangeBounds;
use std::path::Path;
use std::time::{Duration, Instant};

use bytes::Buf;
use tracing::{debug, event_enabled, info, warn, Level};
Expand All @@ -13,76 +12,26 @@ use crate::mem::MemQueue;
use crate::record::{MultiPlexedRecord, MultiRecord};
use crate::recordlog::RecordWriter;
use crate::rolling::RollingWriter;
use crate::{mem, Record, ResourceUsage};
use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage};

pub struct MultiRecordLog {
record_log_writer: crate::recordlog::RecordWriter<RollingWriter>,
in_mem_queues: mem::MemQueues,
next_sync: SyncState,
next_persist: PersistState,
// A simple buffer we reuse to avoid allocation.
multi_record_spare_buffer: Vec<u8>,
}

/// Policy for synchonizing and flushing data
pub enum SyncPolicy {
/// Sync and flush at each operation
OnAppend,
/// Sync and flush regularly. Sync is realized on the first operation after the delay since
/// last sync elapsed. This means if no new operation arrive, some content may not get
/// flushed for a while.
OnDelay(Duration),
}

#[derive(Debug)]
enum SyncState {
OnAppend,
OnDelay {
next_sync: Instant,
interval: Duration,
},
}

impl SyncState {
fn should_sync(&self) -> bool {
match self {
SyncState::OnAppend => true,
SyncState::OnDelay { next_sync, .. } => *next_sync < Instant::now(),
}
}

fn update_synced(&mut self) {
match self {
SyncState::OnAppend => (),
SyncState::OnDelay {
ref mut next_sync,
interval,
} => *next_sync = Instant::now() + *interval,
}
}
}

impl From<SyncPolicy> for SyncState {
fn from(val: SyncPolicy) -> SyncState {
match val {
SyncPolicy::OnAppend => SyncState::OnAppend,
SyncPolicy::OnDelay(dur) => SyncState::OnDelay {
next_sync: Instant::now() + dur,
interval: dur,
},
}
}
}

impl MultiRecordLog {
/// Open the multi record log, syncing after each operation.
/// Open the multi record log, flushing after each operation, bit not fsyncing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Open the multi record log, flushing after each operation, bit not fsyncing.
/// Open the multi record log, flushing after each operation, but not fsyncing.

pub fn open(directory_path: &Path) -> Result<Self, ReadRecordError> {
Self::open_with_prefs(directory_path, SyncPolicy::OnAppend)
Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush))
}

/// Open the multi record log, syncing following the provided policy.
pub fn open_with_prefs(
directory_path: &Path,
sync_policy: SyncPolicy,
persist_policy: PersistPolicy,
) -> Result<Self, ReadRecordError> {
// io errors are non-recoverable
let rolling_reader = crate::rolling::RollingReader::open(directory_path)?;
Expand Down Expand Up @@ -140,7 +89,7 @@ impl MultiRecordLog {
let mut multi_record_log = MultiRecordLog {
record_log_writer,
in_mem_queues,
next_sync: sync_policy.into(),
next_persist: persist_policy.into(),
multi_record_spare_buffer: Vec::new(),
};
multi_record_log.run_gc_if_necessary()?;
Expand All @@ -163,7 +112,7 @@ impl MultiRecordLog {
}
let record = MultiPlexedRecord::RecordPosition { queue, position: 0 };
self.record_log_writer.write_record(record)?;
self.sync()?;
self.persist(PersistAction::FlushAndFsync)?;
self.in_mem_queues.create_queue(queue)?;
Ok(())
}
Expand All @@ -175,7 +124,7 @@ impl MultiRecordLog {
self.record_log_writer.write_record(record)?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary()?;
self.sync()?;
self.persist(PersistAction::FlushAndFsync)?;
Ok(())
}

Expand Down Expand Up @@ -205,8 +154,8 @@ impl MultiRecordLog {
///
/// This operation is atomic: either all records get stored, or none do.
/// However this function succeeding does not necessarily means records where stored, be sure
/// to call [`Self::sync`] to make sure changes are persisted if you don't use
/// [`SyncPolicy::OnAppend`] (which is the default).
/// to call [`Self::persist`] to make sure changes are persisted if you don't use
/// [`PersistPolicy::Always`] (which is the default).
pub fn append_records<T: Iterator<Item = impl Buf>>(
&mut self,
queue: &str,
Expand Down Expand Up @@ -240,7 +189,7 @@ impl MultiRecordLog {
records,
};
self.record_log_writer.write_record(record)?;
self.sync_on_policy()?;
self.persist_on_policy()?;

let mem_queue = self.in_mem_queues.get_queue_mut(queue)?;
let mut max_position = position;
Expand All @@ -267,9 +216,9 @@ impl MultiRecordLog {
has_empty_queues = true
}
if has_empty_queues {
// We need to sync here! We are remove files from the FS
// We need to fsync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.sync()?;
self.persist(PersistAction::FlushAndFsync)?;
}
Ok(())
}
Expand All @@ -289,7 +238,7 @@ impl MultiRecordLog {
.write_record(MultiPlexedRecord::Truncate { position, queue })?;
let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0);
self.run_gc_if_necessary()?;
self.sync_on_policy()?;
self.persist_on_policy()?;
Ok(removed_count)
}

Expand Down Expand Up @@ -333,16 +282,18 @@ impl MultiRecordLog {
self.in_mem_queues.range(queue, range)
}

fn sync_on_policy(&mut self) -> io::Result<()> {
if self.next_sync.should_sync() {
self.sync()?;
self.next_sync.update_synced();
/// Flush if the policy says it should be done
fn persist_on_policy(&mut self) -> io::Result<()> {
if let Some(persist_action) = self.next_persist.should_persist() {
self.persist(persist_action)?;
self.next_persist.update_persisted();
}
Ok(())
}

pub fn sync(&mut self) -> io::Result<()> {
self.record_log_writer.flush()
/// Flush and optionnally fsync data
pub fn persist(&mut self, persist_action: PersistAction) -> io::Result<()> {
self.record_log_writer.persist(persist_action)
}

/// Returns the position of the last record appended to the queue.
Expand Down
104 changes: 104 additions & 0 deletions src/persist_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::time::{Duration, Instant};

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PersistAction {
/// The buffer will be flushed to the OS, but not necessarily to the disk.
Flush,
/// The buffer will be flushed to the OS, and the OS will be asked to flush
/// it to the disk.
FlushAndFsync,
}

impl PersistAction {
pub fn is_fsync(self) -> bool {
self == PersistAction::FlushAndFsync
}
}

/// We have two type of operations on the mrecordlog.
///
/// Critical records are relatively rare and really need to be persisted:
/// - RecordPosition { queue: &'a str, position: u64 },
/// - DeleteQueue.
Comment on lines +20 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateQueue is a bit of a virtual operation, but I think it's worth putting it in that list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not that virtual but yeah, it should not be consequential for Quickwit.

///
/// For these operations, we want to always flush and fsync.
///
/// On the other hand,
/// - Truncate
/// - AppendRecords
/// are considered are more frequent and one might want to sacrifice
/// persistence guarantees for performance.
///
/// The `PersistPolicy` defines the trade-off applied for the second kind of
/// operations.
#[derive(Clone, Debug)]
pub enum PersistPolicy {
/// Only ensure data is persisted when critical records are written.
///
/// With this policy, the timing after which the data reaches the disk
/// is up to the OS.
DoNothing,
/// Pesiste data once every interval, and when critical records are written
OnDelay {
interval: Duration,
action: PersistAction,
},
/// Persist data after each action
Always(PersistAction),
}

#[derive(Debug)]
pub(crate) enum PersistState {
OnAppend(PersistAction),
OnDelay {
next_persist: Instant,
interval: Duration,
action: PersistAction,
},
NoOp,
}

impl PersistState {
pub fn should_persist(&self) -> Option<PersistAction> {
match self {
PersistState::OnAppend(action) => Some(*action),
PersistState::OnDelay {
action,
next_persist,
..
} => {
if *next_persist < Instant::now() {
Some(*action)
} else {
None
}
}
PersistState::NoOp => None,
}
}

pub fn update_persisted(&mut self) {
match self {
PersistState::OnAppend(_) | PersistState::NoOp => (),
PersistState::OnDelay {
ref mut next_persist,
interval,
..
} => *next_persist = Instant::now() + *interval,
}
}
}

impl From<PersistPolicy> for PersistState {
fn from(val: PersistPolicy) -> PersistState {
match val {
PersistPolicy::Always(action) => PersistState::OnAppend(action),
PersistPolicy::OnDelay { interval, action } => PersistState::OnDelay {
next_persist: Instant::now() + interval,
interval,
action,
},
PersistPolicy::DoNothing => PersistState::NoOp,
}
}
}
Loading
Loading