Skip to content

Commit

Permalink
feat: add latent queue (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Nov 22, 2024
1 parent bb8157b commit 86b8f46
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 38 deletions.
8 changes: 8 additions & 0 deletions bach/src/environment/default.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{environment::Environment as _, executor, rand, time::scheduler};
use core::task::Poll;
use std::time::Duration;

use super::Macrostep;

Expand Down Expand Up @@ -45,6 +46,13 @@ impl Runtime {
{
self.inner.block_on(f)
}

pub fn elapsed(&mut self) -> Duration {
self.inner
.environment()
.time
.enter(|| crate::time::Instant::now().elapsed_since_start())
}
}

impl Drop for Runtime {
Expand Down
5 changes: 4 additions & 1 deletion bach/src/ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use core::time::Duration;

pub use crate::group::GroupExt;
pub use crate::{
group::GroupExt,
sync::queue::{InstantQueueExt, QueueExt},
};

pub trait DurationLiteral {
fn s(self) -> Duration;
Expand Down
126 changes: 105 additions & 21 deletions bach/src/sync/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use event_listener_strategy::{
};
use futures_core::{ready, stream::Stream};
use pin_project_lite::pin_project;
use std::process::abort;
use std::{
process::abort,
task::{RawWaker, RawWakerVTable, Waker},
};

struct Channel<T, Q: ?Sized = dyn 'static + Send + Sync + Queue<T>> {
/// Send operations waiting while the channel is full.
Expand Down Expand Up @@ -59,8 +62,25 @@ impl<T> Channel<T> {
}
}

impl<T, Q> Channel<T, Q> {
fn notify_after_send(&self) {
// Notify a blocked receive operation. If the notified operation gets canceled,
// it will notify another blocked receive operation.
self.recv_ops.notify_additional(1);

// Notify all blocked streams.
self.stream_ops.notify(usize::MAX);
}

fn notify_after_recv(&self) {
// Notify a blocked send operation. If the notified operation gets canceled, it
// will notify another blocked send operation.
self.send_ops.notify_additional(1);
}
}

/// Creates a channel.
pub fn new<Q, T>(queue: Q) -> (Sender<T>, Receiver<T>)
pub fn new<T, Q>(queue: Q) -> (Sender<T>, Receiver<T>)
where
Q: 'static + Send + Sync + Queue<T>,
{
Expand All @@ -76,15 +96,78 @@ where
queue,
});

let s = Sender {
let sender_waker = waker::<_, _, true>(&channel);
let receiver_waker = waker::<_, _, false>(&channel);

let channel: Arc<Channel<T>> = channel;

let sender = Sender {
channel: channel.clone(),
waker: sender_waker,
};
let r = Receiver {

let receiver = Receiver {
channel: channel.clone(),
waker: receiver_waker,
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)

(sender, receiver)
}

fn waker<Q, T, const IS_SEND: bool>(channel: &Arc<Channel<T, Q>>) -> Waker {
use core::mem::ManuallyDrop;

#[inline(always)]
unsafe fn clone_waker<T, Q, const IS_SEND: bool>(waker: *const ()) -> RawWaker {
unsafe { Arc::increment_strong_count(waker as *const Channel<T, Q>) };
RawWaker::new(
waker,
&RawWakerVTable::new(
clone_waker::<T, Q, IS_SEND>,
wake::<T, Q, IS_SEND>,
wake_by_ref::<T, Q, IS_SEND>,
drop_waker::<T, Q, IS_SEND>,
),
)
}

unsafe fn wake<T, Q, const IS_SEND: bool>(waker: *const ()) {
let channel = unsafe { Arc::from_raw(waker as *const Channel<T, Q>) };
if IS_SEND {
channel.notify_after_send();
} else {
channel.notify_after_recv();
}
}

unsafe fn wake_by_ref<T, Q, const IS_SEND: bool>(waker: *const ()) {
let channel = unsafe { ManuallyDrop::new(Arc::from_raw(waker as *const Channel<T, Q>)) };
if IS_SEND {
channel.notify_after_send();
} else {
channel.notify_after_recv();
}
}

unsafe fn drop_waker<T, Q, const IS_SEND: bool>(waker: *const ()) {
unsafe { Arc::decrement_strong_count(waker as *const Channel<T, Q>) };
}

unsafe {
let ptr = Arc::into_raw(channel.clone()) as *const _;
let raw = RawWaker::new(
ptr,
&RawWakerVTable::new(
clone_waker::<T, Q, IS_SEND>,
wake::<T, Q, IS_SEND>,
wake_by_ref::<T, Q, IS_SEND>,
drop_waker::<T, Q, IS_SEND>,
),
);
Waker::from_raw(raw)
}
}

/// The sending side of a channel.
Expand All @@ -96,21 +179,14 @@ where
pub struct Sender<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,
waker: Waker,
}

impl<T> Sender<T> {
/// Attempts to push a message into the channel.
pub fn try_push(&self, msg: T) -> Result<Option<T>, PushError<T>> {
let prev = self.channel.queue.push(msg)?;

// Notify a blocked receive operation. If the notified operation gets canceled,
// it will notify another blocked receive operation.
self.channel.recv_ops.notify_additional(1);

// Notify all blocked streams.
self.channel.stream_ops.notify(usize::MAX);

Ok(prev)
let mut ctx = core::task::Context::from_waker(&self.waker);
self.channel.queue.push_with_context(msg, &mut ctx)
}

/// Pushes a message into the channel.
Expand Down Expand Up @@ -174,6 +250,7 @@ impl<T> Sender<T> {
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
channel: self.channel.clone(),
waker: self.waker.clone(),
}
}

Expand Down Expand Up @@ -209,6 +286,7 @@ impl<T> Clone for Sender<T> {

Sender {
channel: self.channel.clone(),
waker: self.waker.clone(),
}
}
}
Expand All @@ -225,6 +303,7 @@ pin_project! {
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,
waker: Waker,

// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
Expand All @@ -249,11 +328,8 @@ pin_project! {
impl<T> Receiver<T> {
/// Attempts to pop a message from the channel.
pub fn try_pop(&self) -> Result<T, PopError> {
let msg = self.channel.queue.pop()?;
// Notify a blocked send operation. If the notified operation gets canceled, it
// will notify another blocked send operation.
self.channel.send_ops.notify_additional(1);
Ok(msg)
let mut ctx = core::task::Context::from_waker(&self.waker);
self.channel.queue.pop_with_context(&mut ctx)
}

/// Pops a message from the channel.
Expand Down Expand Up @@ -316,6 +392,7 @@ impl<T> Receiver<T> {
pub fn downgrade(&self) -> WeakReceiver<T> {
WeakReceiver {
channel: self.channel.clone(),
waker: self.waker.clone(),
}
}

Expand All @@ -342,6 +419,7 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
waker: self.waker.clone(),
listener: None,
_pin: PhantomPinned,
}
Expand Down Expand Up @@ -405,6 +483,7 @@ impl<T> futures_core::stream::FusedStream for Receiver<T> {
/// to be upgraded into a [`Sender`] through the `upgrade` method.
pub struct WeakSender<T> {
channel: Arc<Channel<T>>,
waker: Waker,
}

impl<T> WeakSender<T> {
Expand All @@ -425,6 +504,7 @@ impl<T> WeakSender<T> {
}
Ok(_) => Some(Sender {
channel: self.channel.clone(),
waker: self.waker.clone(),
}),
}
}
Expand All @@ -435,6 +515,7 @@ impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
WeakSender {
channel: self.channel.clone(),
waker: self.waker.clone(),
}
}
}
Expand All @@ -451,6 +532,7 @@ impl<T> fmt::Debug for WeakSender<T> {
/// to be upgraded into a [`Receiver`] through the `upgrade` method.
pub struct WeakReceiver<T> {
channel: Arc<Channel<T>>,
waker: Waker,
}

impl<T> WeakReceiver<T> {
Expand All @@ -471,6 +553,7 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
waker: self.waker.clone(),
listener: None,
_pin: PhantomPinned,
}),
Expand All @@ -483,6 +566,7 @@ impl<T> Clone for WeakReceiver<T> {
fn clone(&self) -> Self {
WeakReceiver {
channel: self.channel.clone(),
waker: self.waker.clone(),
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions bach/src/sync/queue.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use crate::{sync::channel, time::Instant};
use core::fmt;
use std::task::Context;

pub mod latent;
pub mod priority;
pub mod sojourn;
pub mod span;
pub mod vec_deque;

pub trait Queue<T> {
fn push(&self, value: T) -> Result<Option<T>, PushError<T>>;
fn push_with_context(&self, value: T, cx: &mut Context) -> Result<Option<T>, PushError<T>>;

fn pop(&self) -> Result<T, PopError>;
fn pop_with_context(&self, cx: &mut Context) -> Result<T, PopError>;

fn close(&self) -> Result<(), CloseError>;
fn is_closed(&self) -> bool;
fn is_empty(&self) -> bool;
Expand All @@ -16,6 +23,42 @@ pub trait Queue<T> {
fn capacity(&self) -> Option<usize>;
}

pub trait Conditional<T>: Queue<T> {
fn find_pop<F: Fn(&T) -> bool>(&self, check: F) -> Result<T, PopError>;
}

pub trait QueueExt<T>: 'static + Queue<T> + Sized + Send + Sync {
#[inline]
fn span(self, name: &'static str) -> span::Queue<Self> {
span::Queue::new(self, name)
}

#[inline]
fn channel(self) -> (channel::Sender<T>, channel::Receiver<T>) {
channel::new(self)
}
}

impl<Q, T> QueueExt<T> for Q where Q: 'static + Queue<T> + Sized + Send + Sync {}

pub trait InstantQueueExt<T>: 'static + Queue<(Instant, T)> + Sized + Send + Sync {
#[inline]
fn sojourn(self) -> sojourn::Queue<T, Self> {
sojourn::Queue::new(self)
}

#[inline]
fn latent<L>(self, latency: L) -> latent::Queue<T, Self, L>
where
L: latent::Latency<T>,
Self: Conditional<(Instant, T)>,
{
latent::Queue::new(self, latency)
}
}

impl<Q, T> InstantQueueExt<T> for Q where Q: 'static + Queue<(Instant, T)> + Sized + Send + Sync {}

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum PushError<T> {
Full(T),
Expand Down
Loading

0 comments on commit 86b8f46

Please sign in to comment.