Skip to content

Commit

Permalink
Implement customizable sleep duration in nanoseconds used by writer d…
Browse files Browse the repository at this point in the history
…uring lock acquisition (default 1s).
  • Loading branch information
bocharov committed Aug 13, 2024
1 parent d0afe4e commit dab1275
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mmap-sync"
version = "1.0.3"
version = "1.0.4"
edition = "2021"
authors = ["Alex Bocharov <[email protected]>"]
description = "A Rust package allowing sharing of data between processes in a wait-free and zero-copy fashion from mapped memory."
Expand Down
11 changes: 7 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::synchronizer::SynchronizerError;
use crate::synchronizer::SynchronizerError::*;

const STATE_SIZE: usize = mem::size_of::<State>();
const SLEEP_DURATION: Duration = Duration::from_secs(1);

/// State stored in memory for synchronization using atomics
#[repr(C)]
pub(crate) struct State {
pub(crate) struct State<const SD: usize = 1_000_000_000> {
/// Current data instance version
version: AtomicU64,
/// Current number of readers for each data instance
Expand Down Expand Up @@ -48,7 +47,11 @@ impl State {

/// Acquire next `idx` of the state for writing
#[inline]
pub(crate) fn acquire_next_idx(&self, grace_duration: Duration) -> (usize, bool) {
pub(crate) fn acquire_next_idx(
&self,
grace_duration: Duration,
sleep_duration: Duration,
) -> (usize, bool) {
// calculate `next_idx` to acquire, in case of uninitialized version use 0
let next_idx = match InstanceVersion::try_from(self.version.load(Ordering::SeqCst)) {
Ok(version) => (version.idx() + 1) % 2,
Expand All @@ -69,7 +72,7 @@ impl State {
reset = true;
break;
} else {
thread::sleep(SLEEP_DURATION);
thread::sleep(sleep_duration);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ use crate::synchronizer::SynchronizerError::*;
/// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes.
///
/// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions.
pub struct Synchronizer<H: Hasher + Default = WyHash, const N: usize = 1024> {
///
/// Template parameters:
/// - `H` - hasher used for checksum calculation
/// - `N` - serializer scratch space size
/// - `SD` - sleep duration in nanoseconds used by writer during lock acquisition (default 1s)
pub struct Synchronizer<
H: Hasher + Default = WyHash,
const N: usize = 1024,
const SD: u64 = 1_000_000_000,
> {
/// Container storing state mmap
state_container: StateContainer,
/// Container storing data mmap
Expand Down Expand Up @@ -70,7 +79,7 @@ impl Synchronizer {
}
}

impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
impl<H: Hasher + Default, const N: usize, const SD: u64> Synchronizer<H, N, SD> {
/// Create new instance of `Synchronizer` using given `path_prefix` and template parameters
pub fn with_params(path_prefix: &OsStr) -> Self {
Synchronizer {
Expand Down Expand Up @@ -133,7 +142,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(&data, new_version)?;

Expand Down Expand Up @@ -167,7 +177,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(data, new_version)?;

Expand Down

0 comments on commit dab1275

Please sign in to comment.