Skip to content

Commit

Permalink
Fix race condition in send/recv resulting in recv reading out of order
Browse files Browse the repository at this point in the history
Currently, there is something like a `lock` on each `Node`.
So, a node can be in these states:
- empty (only in first lap)
- available (data is available but no reader/writer)
- writing (a single writer)
- reading (single/multiple readers)

Once the recv takes the `read` lock it can read the value safely.
But doesn't mean its correct.

This commit fixes the issues below, imagine:

- channel size = 4, data = [0, 0, 0, 0] (0 denote empty)
- send 4 elements [1, 2, 3, 4] => data = [1, 2, 3, 4].
  `head` is `0`, i.e. next time we will overwrite the `1`.
- `recv` starts receiving, the index is valid and is `0`.
- at the same time, `send` is starting to send some data, and it will
  overwrite the `0`th element.

At this point, several things can happen:
- writer takes the lock, overwrite the data (for example with `5`)
  before the reader. Then the reader will read the value `5` and thinks
  its the current value. After that it will read `2`, `3`, `4`, which is
  invalid order. BAD
- reader takes the lock, which is the normal correct behavior. It will
  read `1`, and the writer will have to wait for the reader to finish.

So, we want to fix the first case.

We have fixed it by adding the `lap` number to the node, so once the
reader takes the lock of the node, it will check the `lap`, and this is
safe since we know no writer can overwrite this `lap` value.

If the value is not what we expect (the reader lap), then start the read
process from the very top, which is the slowest case. But shouldn't
happen very often.
  • Loading branch information
Amjad50 committed Feb 21, 2024
1 parent fe5f439 commit f31c466
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
18 changes: 17 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ mod tests;
extern crate alloc;
use alloc::{boxed::Box, vec::Vec};

use core::{cell::UnsafeCell, cmp, mem::MaybeUninit};
use core::{
cell::{Cell, UnsafeCell},
cmp,
mem::MaybeUninit,
};

#[cfg(loom)]
use loom::sync::{
Expand Down Expand Up @@ -123,13 +127,15 @@ fn is_readable(state: usize) -> bool {
struct Node<T> {
data: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
lap: Cell<usize>,
}

impl<T> Default for Node<T> {
fn default() -> Self {
Self {
data: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(STATE_EMPTY),
lap: Cell::new(0),
}
}
}
Expand Down Expand Up @@ -186,6 +192,7 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
}
}

let current_lap = self.producer_lap.load(Ordering::Relaxed);
if next_head == 0 {
self.producer_lap.fetch_add(1, Ordering::Release);
}
Expand Down Expand Up @@ -234,6 +241,7 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
unsafe {
node.data.get().write(MaybeUninit::new(value));
}
node.lap.set(current_lap);

// publish the value
node.state.store(STATE_AVAILABLE, Ordering::Release);
Expand Down Expand Up @@ -299,6 +307,14 @@ impl<T: Clone, const N: usize> InnerChannel<T, N> {
}
}

// if the node contain a different lap number, then the writer
// has overwritten the data and finished writing before we got the lock
// retry the whole thing (slow)
if node.lap.get() != reader.lap {
node.state.fetch_sub(STATE_READING, Ordering::Release);
return self.pop(reader);
}

let data = unsafe { node.data.get().read().assume_init_ref().clone() };

reader.index = (reader_index + 1) % self.buffer.len();
Expand Down
56 changes: 49 additions & 7 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,61 @@ fn test_overflow() {
}

#[test]
// FIXME: spin panic on loom
#[cfg(not(loom))]
fn test_always_overflow() {
loom!({
let (sender, mut receiver) = channel::<i32, 4>();
let (sender, mut receiver) = channel::<i32, 4>();

for i in 0..100 {
sender.send(i);
}

for i in 100 - 4..100 {
assert_eq!(receiver.recv(), Some(i));
}
assert_eq!(receiver.recv(), None);
}

#[test]
// FIXME: spin panic on loom
#[cfg(not(loom))]
fn test_sender_receiver_conflict() {
let (sender, receiver) = channel::<i32, 4>();

let barrier = Arc::new(std::sync::Barrier::new(2));

for i in 0..100 {
for _ in 0..10000 {
// setup
// fill the channel
for i in 0..4 {
sender.send(i);
}

for i in 100 - 4..100 {
assert_eq!(receiver.recv(), Some(i));
// send and receive exactly at the same time
let s_handle;
let r_handle;
{
let barrier = barrier.clone();
let mut receiver = receiver.clone();
r_handle = std::thread::spawn(move || {
barrier.wait();
let v = receiver.recv();
assert!(v == Some(0) || v == Some(1), "v = {v:?}");
});
}
assert_eq!(receiver.recv(), None);
});
{
let barrier = barrier.clone();
let sender = sender.clone();
s_handle = std::thread::spawn(move || {
barrier.wait();
sender.send(5);
});
}

// wait for the threads to finish
s_handle.join().unwrap();
r_handle.join().unwrap();
}
}

#[test]
Expand Down

0 comments on commit f31c466

Please sign in to comment.