diff --git a/crates/libs/msquic/src/lib.rs b/crates/libs/msquic/src/lib.rs index e4c90f2..1346197 100644 --- a/crates/libs/msquic/src/lib.rs +++ b/crates/libs/msquic/src/lib.rs @@ -12,6 +12,7 @@ pub mod conn; pub mod listener; pub mod reg; pub mod stream; +pub mod sync; mod utils; // Some useful defs diff --git a/crates/libs/msquic/src/listener.rs b/crates/libs/msquic/src/listener.rs index ad6a007..0e963c5 100644 --- a/crates/libs/msquic/src/listener.rs +++ b/crates/libs/msquic/src/listener.rs @@ -3,11 +3,14 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::{conn::QConnection, info}; +use crate::{ + conn::QConnection, + info, + sync::{QQueue, QResetChannel, QSignal}, +}; use c2::{ Addr, Buffer, Configuration, Handle, Listener, ListenerEvent, LISTENER_EVENT_NEW_CONNECTION, }; -use tokio::sync::oneshot; use crate::{config::QConfiguration, reg::QRegistration, utils::SBox, QApi}; @@ -19,17 +22,16 @@ pub struct QListener { struct QListenerCtx { _api: QApi, - tx: Option>, + ch: QQueue, config: Arc>, state: Mutex, // cannot use tokio mutex because the callback maybe invoked sync and block tokio thread - stop_tx: Option>, + sig_stop: QSignal, } -#[derive(Debug)] -enum Payload { - Conn(QConnection), - Stop, -} +// #[derive(Debug)] +// enum Payload { +// Conn(QConnection), +// } #[derive(PartialEq, Debug)] enum State { @@ -61,23 +63,17 @@ extern "C" fn listener_handler( "[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}", listener, h, info ); - if let Some(tx) = ctx.tx.take() { - tx.send(Payload::Conn(c)).unwrap(); - } + ctx.ch.insert(c); } QUIC_LISTENER_EVENT_STOP_COMPLETE => { info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener); assert_eq!(*state, State::StopRequested); *state = State::Stopped; // stop the acceptor - if let Some(tx) = ctx.tx.take() { - info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE tx send", listener); - // front end accept might already be stopped. - let _ = tx.send(Payload::Stop); - } + ctx.ch.close(0); // stop the stop action info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE stop_tx", listener); - ctx.stop_tx.take().unwrap().send(()).unwrap(); + ctx.sig_stop.set(()); } _ => { unreachable!() @@ -92,10 +88,10 @@ impl QListener { pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self { let context = Box::new(QListenerCtx { _api: registration.api.clone(), - tx: None, + ch: QQueue::new(), config: configuration.inner.clone(), state: Mutex::new(State::Idle), - stop_tx: None, + sig_stop: QResetChannel::new(), }); let l = Listener::new( ®istration.inner.inner, @@ -117,8 +113,7 @@ impl QListener { } pub async fn accept(&mut self) -> Option { - let (tx, rx) = oneshot::channel(); - let fu; + let rx; { let state = self.ctx.state.lock().unwrap(); if *state == State::Stopped { @@ -126,32 +121,29 @@ impl QListener { } // must be started assert_ne!(*state, State::Idle); - assert!(self.ctx.tx.is_none()); - self.ctx.tx.replace(tx); - fu = rx; + rx = self.ctx.ch.pop(); } - let payload = fu.await.unwrap(); - match payload { - Payload::Conn(c) => Some(c), - Payload::Stop => None, + let res = rx.await; + match res { + Ok(c) => Some(c), + Err(_) => None, } } pub async fn stop(&mut self) { - let (stop_tx, stop_rx) = oneshot::channel(); + let rx; { let mut state = self.ctx.state.lock().unwrap(); assert_eq!(*state, State::Started); *state = State::StopRequested; - assert!(self.ctx.stop_tx.is_none()); - self.ctx.stop_tx.replace(stop_tx); + rx = self.ctx.sig_stop.reset(); } // callback may be invoked in the same thread. info!("listner stop requested."); self.inner.inner.stop(); info!("wait for stop_rx signal."); // wait for drain. - stop_rx.await.unwrap(); + rx.await; info!("wait for stop_rx signal ok."); } } diff --git a/crates/libs/msquic/src/sync.rs b/crates/libs/msquic/src/sync.rs new file mode 100644 index 0000000..f09291d --- /dev/null +++ b/crates/libs/msquic/src/sync.rs @@ -0,0 +1,167 @@ +use std::{ + collections::LinkedList, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::sync::oneshot::{self, Receiver}; + +pub type QError = u32; + +pub struct QReceiver { + rx: oneshot::Receiver, +} + +pub struct QSender { + tx: oneshot::Sender, +} + +pub fn oneshot_channel() -> (QSender, QReceiver) { + let (tx, rx) = oneshot::channel(); + (QSender { tx }, QReceiver { rx }) +} + +impl QSender { + pub fn send(self, data: T) { + // ignore is receiver dropped. + let _ = self.tx.send(data); + } +} + +impl QReceiver { + pub fn blocking_recv(self) -> T { + // sender must send stuff so that there is not error. + self.rx.blocking_recv().unwrap() + } + + // make a receiver that is filled with data + fn make_ready(data: T) -> Self { + let (tx, rx) = oneshot_channel(); + tx.send(data); + rx + } +} + +impl Future for QReceiver { + type Output = T; + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + // Try to receive the value from the sender + let innner = as Future>::poll(Pin::new(&mut self.rx), _cx); + match innner { + Poll::Ready(x) => { + // error only happens when sender is dropped without sending. + // we ignore this error since in sf-rs use this will never happen. + Poll::Ready(x.expect("sender closed without sending a value.")) + } + Poll::Pending => Poll::Pending, + } + } +} + +// signal that can be set and reset and awaited. +pub struct QResetChannel { + tx: Option>, +} + +impl Default for QResetChannel { + fn default() -> Self { + Self::new() + } +} + +impl QResetChannel { + pub fn new() -> Self { + Self { tx: None } + } + + // set and reset are not thread safe. + pub fn reset(&mut self) -> QReceiver { + let (tx, rx) = oneshot_channel(); + assert!(self.tx.is_none()); + self.tx.replace(tx); + rx + } + + // send the data + pub fn set(&mut self, data: T) { + assert!(self.tx.is_some()); + self.tx.take().unwrap().send(data) + } + + // check if the channel can send/set + pub fn can_set(&self) -> bool { + self.tx.is_some() + } +} + +pub type QSignal = QResetChannel<()>; + +// Queue that can insert and awaited. +pub struct QQueue { + v: LinkedList, + channel: QResetChannel>, + is_closed: bool, + ec: u32, +} + +impl Default for QQueue { + fn default() -> Self { + Self::new() + } +} + +impl QQueue { + pub fn new() -> Self { + Self { + v: LinkedList::new(), + channel: QResetChannel::new(), + is_closed: false, + ec: 0, + } + } + + pub fn insert(&mut self, data: T) { + assert!(!self.is_closed); + // if channel is waiting insert to channel. + if self.channel.can_set() { + self.channel.set(Ok(data)); + return; + } + self.v.push_back(data); + } + + // fails if queue is closed and there is no more data + // i.e. no more new data can be extracted. + pub fn pop(&mut self) -> QReceiver> { + // if there is pending in v, return it + if !self.v.is_empty() { + let data = self.v.pop_front().unwrap(); + return QReceiver::make_ready(Ok(data)); + } + + if self.is_closed { + // copy the error code + return QReceiver::make_ready(Err(self.ec)); + } + + // wait for next insert. + assert!(!self.channel.can_set()); // no pending wait + self.channel.reset() + } + + // no more pop can initiate new wait. + // + pub fn close(&mut self, err: QError) { + self.is_closed = true; + self.ec = err; + if self.v.is_empty() { + // if there is wait give out error + if self.channel.can_set() { + self.channel.set(Err(self.ec)); + } + } else { + assert!(self.channel.can_set(), "v is empty and channel is waiting"); + } + } +}