diff --git a/src/lib.rs b/src/lib.rs index 16386e8..2b6b135 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,19 +17,25 @@ use alloc::sync::Arc; #[cfg(not(loom))] use core::sync::atomic::{AtomicUsize, Ordering}; -const DATA_EMPTY: usize = 0; -const DATA_AVAILABLE: usize = 1; -const DATA_WRITING: usize = 2; +const STATE_EMPTY: usize = 0; +const STATE_AVAILABLE: usize = 1; +const STATE_WRITING: usize = 2; // reading can be done by multiple readers // so we use value that is power of 2 // we add this value to the state, which means there is a reader -const DATA_READING: usize = 4; -const READING_MASK: usize = usize::MAX & !(DATA_READING - 1); +const STATE_READING: usize = 4; +const READING_MASK: usize = usize::MAX & !(STATE_READING - 1); -fn is_read_state(state: usize) -> bool { +#[inline] +fn is_reading(state: usize) -> bool { state & READING_MASK != 0 } +#[inline] +fn is_readable(state: usize) -> bool { + state == STATE_AVAILABLE || is_reading(state) +} + struct Node { data: UnsafeCell>, state: AtomicUsize, @@ -46,7 +52,7 @@ impl Default for Node { fn default() -> Self { Self { data: UnsafeCell::new(MaybeUninit::uninit()), - state: AtomicUsize::new(DATA_EMPTY), + state: AtomicUsize::new(STATE_EMPTY), } } } @@ -113,29 +119,29 @@ impl InnerChannel { loop { let mut state = node.state.load(Ordering::Acquire); - while is_read_state(state) { + while is_reading(state) { core::hint::spin_loop(); // wait until the reader is done state = node.state.load(Ordering::Acquire); } match state { - DATA_EMPTY | DATA_AVAILABLE => { + STATE_EMPTY | STATE_AVAILABLE => { if node .state .compare_exchange_weak( state, - DATA_WRITING, + STATE_WRITING, Ordering::Release, Ordering::Relaxed, ) .is_ok() { - should_drop = state == DATA_AVAILABLE; + should_drop = state == STATE_AVAILABLE; break; } } - DATA_WRITING => unreachable!("There should be no writer writing"), + STATE_WRITING => unreachable!("There should be no writer writing"), s => unreachable!("Invalid state: {}", s), } } @@ -153,7 +159,7 @@ impl InnerChannel { } // publish the value - node.state.store(DATA_AVAILABLE, Ordering::Release); + node.state.store(STATE_AVAILABLE, Ordering::Release); } fn pop(&self, reader: &mut ReaderData) -> Option { @@ -195,41 +201,23 @@ impl InnerChannel { loop { let state = node.state.load(Ordering::Acquire); - if is_read_state(state) { - let old = node.state.fetch_add(DATA_READING, Ordering::AcqRel); + if is_readable(state) { + let old = node.state.fetch_add(STATE_READING, Ordering::Release); - if old == state { + if is_readable(old) { break; - } else { - match old { - DATA_AVAILABLE => { - break; - } - _ => { - // something happened, rollback - // subtract one reader - node.state.fetch_sub(DATA_READING, Ordering::AcqRel); - } - } } + // something happened, rollback + node.state.fetch_sub(STATE_READING, Ordering::Release); + continue; } match state { - DATA_AVAILABLE => { - let old = node.state.fetch_add(DATA_READING, Ordering::AcqRel); - - if old == state || is_read_state(old) { - break; - } else { - // subtract one reader - node.state.fetch_sub(DATA_READING, Ordering::AcqRel); - } - } - DATA_WRITING => { + STATE_WRITING => { reader_index += 1; node = &self.buffer[reader_index % self.buffer.len()]; } - DATA_EMPTY => unreachable!("There should be some data at least"), + STATE_EMPTY => unreachable!("There should be some data at least"), s => unreachable!("Invalid state: {}", s), } } @@ -241,7 +229,7 @@ impl InnerChannel { reader.lap += 1; } - node.state.fetch_sub(DATA_READING, Ordering::Release); + node.state.fetch_sub(STATE_READING, Ordering::Release); Some(data) }