Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iox2-51 integrate posix condition variable for mac os #52

Merged
34 changes: 23 additions & 11 deletions iceoryx2-pal/concurrency-sync/src/condition_variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
use core::sync::atomic::{AtomicU32, Ordering};

pub use crate::mutex::Mutex;
use crate::{semaphore::Semaphore, WaitAction, WaitResult};

pub struct ConditionVariable {
counter: AtomicU32,
number_of_waiters: AtomicU32,
semaphore: Semaphore,
}

impl Default for ConditionVariable {
fn default() -> Self {
Self {
counter: AtomicU32::new(0),
semaphore: Semaphore::new(0),
number_of_waiters: AtomicU32::new(0),
}
}
}
Expand All @@ -31,28 +34,37 @@ impl ConditionVariable {
Self::default()
}

pub fn notify<WakeOneOrAll: Fn(&AtomicU32)>(&self, wake_one_or_all: WakeOneOrAll) {
self.counter.fetch_add(1, Ordering::Relaxed);
wake_one_or_all(&self.counter);
pub fn notify_one<WakeOne: Fn(&AtomicU32)>(&self, wake_one: WakeOne) {
self.semaphore.post(
wake_one,
1.min(self.number_of_waiters.load(Ordering::Relaxed)),
);
}

pub fn notify_all<WakeAll: Fn(&AtomicU32)>(&self, wake_all: WakeAll) {
self.semaphore
.post(wake_all, self.number_of_waiters.load(Ordering::Relaxed));
}

pub fn wait<
WakeOne: Fn(&AtomicU32),
Wait: Fn(&AtomicU32, &u32) -> bool,
MtxWait: Fn(&AtomicU32, &u32) -> bool,
Wait: Fn(&AtomicU32, &u32) -> WaitAction,
MtxWait: Fn(&AtomicU32, &u32) -> WaitAction,
>(
&self,
mtx: &Mutex,
mtx_wake_one: WakeOne,
wait: Wait,
mtx_wait: MtxWait,
) -> bool {
let counter_value = self.counter.load(Ordering::Relaxed);
) -> WaitResult {
self.number_of_waiters.fetch_add(1, Ordering::Relaxed);
mtx.unlock(mtx_wake_one);

if !wait(&self.counter, &counter_value) {
return false;
if self.semaphore.wait(wait) == WaitResult::Interrupted {
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);
return WaitResult::Interrupted;
}
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);

// this maybe problematic when the wait has a timeout. it is possible that
// the timeout is nearly doubled when wait is waken up at the end of the timeout
Expand Down
12 changes: 12 additions & 0 deletions iceoryx2-pal/concurrency-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,15 @@ pub mod condition_variable;
pub mod mutex;
pub mod rwlock;
pub mod semaphore;

#[derive(Debug, PartialEq, Eq)]
pub enum WaitAction {
Continue,
Abort,
}

#[derive(Debug, PartialEq, Eq)]
pub enum WaitResult {
Interrupted,
Success,
}
36 changes: 22 additions & 14 deletions iceoryx2-pal/concurrency-sync/src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use core::{
sync::atomic::{AtomicU32, Ordering},
};

use crate::{WaitAction, WaitResult};

pub struct Mutex {
// we use an AtomicU32 since it should be supported on nearly every platform
state: AtomicU32,
Expand All @@ -33,24 +35,24 @@ impl Mutex {
}
}

pub fn lock<Wait: Fn(&AtomicU32, &u32) -> bool>(&self, wait: Wait) -> bool {
if self.uncontested_lock(crate::SPIN_REPETITIONS) {
return true;
pub fn lock<Wait: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: Wait) -> WaitResult {
if self.uncontested_lock(crate::SPIN_REPETITIONS) == WaitResult::Success {
return WaitResult::Success;
}

loop {
let keep_running = wait(&self.state, &1);
let action = wait(&self.state, &1);

if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return true;
return WaitResult::Success;
}

if !keep_running {
return false;
if action == WaitAction::Abort {
return WaitResult::Interrupted;
}
}
}
Expand All @@ -60,15 +62,21 @@ impl Mutex {
wake_one(&self.state);
}

pub fn try_lock(&self) -> bool {
self.state
pub fn try_lock(&self) -> WaitResult {
if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

fn uncontested_lock(&self, retry_limit: u64) -> bool {
if self.try_lock() {
return true;
fn uncontested_lock(&self, retry_limit: u64) -> WaitResult {
if self.try_lock() == WaitResult::Success {
return WaitResult::Success;
}

let mut retry_counter = 0;
Expand All @@ -81,10 +89,10 @@ impl Mutex {
retry_counter += 1;

if retry_limit == retry_counter {
return false;
return WaitResult::Interrupted;
}
}

true
WaitResult::Success
}
}
82 changes: 53 additions & 29 deletions iceoryx2-pal/concurrency-sync/src/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use core::{
sync::atomic::{AtomicU32, Ordering},
};

use crate::SPIN_REPETITIONS;
use crate::{WaitAction, WaitResult, SPIN_REPETITIONS};

const WRITE_LOCKED: u32 = u32::MAX;
const UNLOCKED: u32 = 0;
Expand All @@ -37,24 +37,30 @@ impl RwLockReaderPreference {
Self::default()
}

pub fn try_read_lock(&self) -> bool {
pub fn try_read_lock(&self) -> WaitResult {
let reader_count = self.reader_count.load(Ordering::Relaxed);

if reader_count == WRITE_LOCKED {
return false;
return WaitResult::Interrupted;
}

self.reader_count
if self
.reader_count
.compare_exchange(
reader_count,
reader_count + 1,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn read_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn read_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut reader_count = self.reader_count.load(Ordering::Relaxed);
let mut retry_counter = 0;

Expand All @@ -66,13 +72,13 @@ impl RwLockReaderPreference {
}

if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.reader_count, &reader_count) {
} else if wait(&self.reader_count, &reader_count) == WaitAction::Abort {
keep_running = false;
}

Expand All @@ -85,7 +91,7 @@ impl RwLockReaderPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => {
reader_count = v;
}
Expand All @@ -103,13 +109,19 @@ impl RwLockReaderPreference {
wake_one(&self.reader_count);
}

pub fn try_write_lock(&self) -> bool {
self.reader_count
pub fn try_write_lock(&self) -> WaitResult {
if self
.reader_count
.compare_exchange(UNLOCKED, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn write_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn write_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut retry_counter = 0;
let mut reader_count;

Expand All @@ -122,17 +134,17 @@ impl RwLockReaderPreference {
Ordering::Relaxed,
) {
Err(v) => reader_count = v,
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
};

if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.reader_count, &reader_count) {
} else if wait(&self.reader_count, &reader_count) == WaitAction::Abort {
keep_running = false;
}
}
Expand All @@ -158,32 +170,38 @@ impl RwLockWriterPreference {
Self::default()
}

pub fn try_read_lock(&self) -> bool {
pub fn try_read_lock(&self) -> WaitResult {
let state = self.state.load(Ordering::Relaxed);
if state % 2 == 1 {
return false;
return WaitResult::Interrupted;
}

self.state
if self
.state
.compare_exchange(state, state + 2, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn read_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn read_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut state = self.state.load(Ordering::Relaxed);

let mut retry_counter = 0;
let mut keep_running = true;
loop {
if state % 2 == 1 {
if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.state, &state) {
} else if wait(&self.state, &state) == WaitAction::Abort {
keep_running = false;
}
state = self.state.load(Ordering::Relaxed);
Expand All @@ -194,7 +212,7 @@ impl RwLockWriterPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => state = v,
}
}
Expand All @@ -218,22 +236,28 @@ impl RwLockWriterPreference {
}
}

pub fn try_write_lock(&self) -> bool {
self.state
pub fn try_write_lock(&self) -> WaitResult {
if self
.state
.compare_exchange(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn write_lock<
Wait: Fn(&AtomicU32, &u32) -> bool,
Wait: Fn(&AtomicU32, &u32) -> WaitAction,
WakeOne: Fn(&AtomicU32),
WakeAll: Fn(&AtomicU32),
>(
&self,
wait: Wait,
wake_one: WakeOne,
wake_all: WakeAll,
) -> bool {
) -> WaitResult {
let mut state = self.state.load(Ordering::Relaxed);

let mut keep_running = true;
Expand All @@ -246,7 +270,7 @@ impl RwLockWriterPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => state = v,
}
}
Expand All @@ -264,12 +288,12 @@ impl RwLockWriterPreference {
self.writer_wake_counter.fetch_add(1, Ordering::Relaxed);
wake_one(&self.writer_wake_counter);
wake_all(&self.state);
return false;
return WaitResult::Interrupted;
}
Err(v) => state = v,
}
} else {
return false;
return WaitResult::Interrupted;
}
}
}
Expand All @@ -288,7 +312,7 @@ impl RwLockWriterPreference {
retry_counter += 1;
} else {
let writer_wake_counter = self.writer_wake_counter.load(Ordering::Relaxed);
if !wait(&self.writer_wake_counter, &writer_wake_counter) {
if wait(&self.writer_wake_counter, &writer_wake_counter) == WaitAction::Abort {
keep_running = false;
}
}
Expand Down
Loading