Skip to content

Commit

Permalink
Merge pull request #14 from cloudflare/ab/custom-sleep-duration
Browse files Browse the repository at this point in the history
Implement customizable sleep duration in nanoseconds used by writer during lock acquisition (default 1s).
  • Loading branch information
bocharov authored Aug 13, 2024
2 parents d0afe4e + dab1275 commit 1756d86
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mmap-sync"
version = "1.0.3"
version = "1.0.4"
edition = "2021"
authors = ["Alex Bocharov <[email protected]>"]
description = "A Rust package allowing sharing of data between processes in a wait-free and zero-copy fashion from mapped memory."
Expand Down
11 changes: 7 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::synchronizer::SynchronizerError;
use crate::synchronizer::SynchronizerError::*;

const STATE_SIZE: usize = mem::size_of::<State>();
const SLEEP_DURATION: Duration = Duration::from_secs(1);

/// State stored in memory for synchronization using atomics
#[repr(C)]
pub(crate) struct State {
pub(crate) struct State<const SD: usize = 1_000_000_000> {
/// Current data instance version
version: AtomicU64,
/// Current number of readers for each data instance
Expand Down Expand Up @@ -48,7 +47,11 @@ impl State {

/// Acquire next `idx` of the state for writing
#[inline]
pub(crate) fn acquire_next_idx(&self, grace_duration: Duration) -> (usize, bool) {
pub(crate) fn acquire_next_idx(
&self,
grace_duration: Duration,
sleep_duration: Duration,
) -> (usize, bool) {
// calculate `next_idx` to acquire, in case of uninitialized version use 0
let next_idx = match InstanceVersion::try_from(self.version.load(Ordering::SeqCst)) {
Ok(version) => (version.idx() + 1) % 2,
Expand All @@ -69,7 +72,7 @@ impl State {
reset = true;
break;
} else {
thread::sleep(SLEEP_DURATION);
thread::sleep(sleep_duration);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ use crate::synchronizer::SynchronizerError::*;
/// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes.
///
/// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions.
pub struct Synchronizer<H: Hasher + Default = WyHash, const N: usize = 1024> {
///
/// Template parameters:
/// - `H` - hasher used for checksum calculation
/// - `N` - serializer scratch space size
/// - `SD` - sleep duration in nanoseconds used by writer during lock acquisition (default 1s)
pub struct Synchronizer<
H: Hasher + Default = WyHash,
const N: usize = 1024,
const SD: u64 = 1_000_000_000,
> {
/// Container storing state mmap
state_container: StateContainer,
/// Container storing data mmap
Expand Down Expand Up @@ -70,7 +79,7 @@ impl Synchronizer {
}
}

impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
impl<H: Hasher + Default, const N: usize, const SD: u64> Synchronizer<H, N, SD> {
/// Create new instance of `Synchronizer` using given `path_prefix` and template parameters
pub fn with_params(path_prefix: &OsStr) -> Self {
Synchronizer {
Expand Down Expand Up @@ -133,7 +142,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(&data, new_version)?;

Expand Down Expand Up @@ -167,7 +177,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(data, new_version)?;

Expand Down

0 comments on commit 1756d86

Please sign in to comment.