Skip to content

Commit

Permalink
Improved readability and reduced loops in pop
Browse files Browse the repository at this point in the history
The `pop` was a bit more complicated than it needs to be.
  • Loading branch information
Amjad50 committed Feb 21, 2024
1 parent 90578ce commit aad2972
Showing 1 changed file with 28 additions and 40 deletions.
68 changes: 28 additions & 40 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@ use alloc::sync::Arc;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicUsize, Ordering};

const DATA_EMPTY: usize = 0;
const DATA_AVAILABLE: usize = 1;
const DATA_WRITING: usize = 2;
const STATE_EMPTY: usize = 0;
const STATE_AVAILABLE: usize = 1;
const STATE_WRITING: usize = 2;
// reading can be done by multiple readers
// so we use value that is power of 2
// we add this value to the state, which means there is a reader
const DATA_READING: usize = 4;
const READING_MASK: usize = usize::MAX & !(DATA_READING - 1);
const STATE_READING: usize = 4;
const READING_MASK: usize = usize::MAX & !(STATE_READING - 1);

fn is_read_state(state: usize) -> bool {
#[inline]
fn is_reading(state: usize) -> bool {
state & READING_MASK != 0
}

#[inline]
fn is_readable(state: usize) -> bool {
state == STATE_AVAILABLE || is_reading(state)
}

struct Node<T> {
data: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
Expand All @@ -46,7 +52,7 @@ impl<T> Default for Node<T> {
fn default() -> Self {
Self {
data: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(DATA_EMPTY),
state: AtomicUsize::new(STATE_EMPTY),
}
}
}
Expand Down Expand Up @@ -113,29 +119,29 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
loop {
let mut state = node.state.load(Ordering::Acquire);

while is_read_state(state) {
while is_reading(state) {
core::hint::spin_loop();
// wait until the reader is done
state = node.state.load(Ordering::Acquire);
}

match state {
DATA_EMPTY | DATA_AVAILABLE => {
STATE_EMPTY | STATE_AVAILABLE => {
if node
.state
.compare_exchange_weak(
state,
DATA_WRITING,
STATE_WRITING,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
should_drop = state == DATA_AVAILABLE;
should_drop = state == STATE_AVAILABLE;
break;
}
}
DATA_WRITING => unreachable!("There should be no writer writing"),
STATE_WRITING => unreachable!("There should be no writer writing"),
s => unreachable!("Invalid state: {}", s),
}
}
Expand All @@ -153,7 +159,7 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
}

// publish the value
node.state.store(DATA_AVAILABLE, Ordering::Release);
node.state.store(STATE_AVAILABLE, Ordering::Release);
}

fn pop(&self, reader: &mut ReaderData) -> Option<T> {
Expand Down Expand Up @@ -195,41 +201,23 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
loop {
let state = node.state.load(Ordering::Acquire);

if is_read_state(state) {
let old = node.state.fetch_add(DATA_READING, Ordering::AcqRel);
if is_readable(state) {
let old = node.state.fetch_add(STATE_READING, Ordering::Release);

if old == state {
if is_readable(old) {
break;
} else {
match old {
DATA_AVAILABLE => {
break;
}
_ => {
// something happened, rollback
// subtract one reader
node.state.fetch_sub(DATA_READING, Ordering::AcqRel);
}
}
}
// something happened, rollback
node.state.fetch_sub(STATE_READING, Ordering::Release);
continue;
}

match state {
DATA_AVAILABLE => {
let old = node.state.fetch_add(DATA_READING, Ordering::AcqRel);

if old == state || is_read_state(old) {
break;
} else {
// subtract one reader
node.state.fetch_sub(DATA_READING, Ordering::AcqRel);
}
}
DATA_WRITING => {
STATE_WRITING => {
reader_index += 1;
node = &self.buffer[reader_index % self.buffer.len()];
}
DATA_EMPTY => unreachable!("There should be some data at least"),
STATE_EMPTY => unreachable!("There should be some data at least"),
s => unreachable!("Invalid state: {}", s),
}
}
Expand All @@ -241,7 +229,7 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
reader.lap += 1;
}

node.state.fetch_sub(DATA_READING, Ordering::Release);
node.state.fetch_sub(STATE_READING, Ordering::Release);

Some(data)
}
Expand Down

0 comments on commit aad2972

Please sign in to comment.