Skip to content

Commit

Permalink
add sync primitives and use it in listener
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 7, 2024
1 parent f45be59 commit 5f71447
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 33 deletions.
1 change: 1 addition & 0 deletions crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod conn;
pub mod listener;
pub mod reg;
pub mod stream;
pub mod sync;
mod utils;

// Some useful defs
Expand Down
58 changes: 25 additions & 33 deletions crates/libs/msquic/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::{
sync::{Arc, Mutex},
};

use crate::{conn::QConnection, info};
use crate::{
conn::QConnection,
info,
sync::{QQueue, QResetChannel, QSignal},
};
use c2::{
Addr, Buffer, Configuration, Handle, Listener, ListenerEvent, LISTENER_EVENT_NEW_CONNECTION,
};
use tokio::sync::oneshot;

use crate::{config::QConfiguration, reg::QRegistration, utils::SBox, QApi};

Expand All @@ -19,17 +22,16 @@ pub struct QListener {

struct QListenerCtx {
_api: QApi,
tx: Option<oneshot::Sender<Payload>>,
ch: QQueue<QConnection>,
config: Arc<SBox<Configuration>>,
state: Mutex<State>, // cannot use tokio mutex because the callback maybe invoked sync and block tokio thread
stop_tx: Option<oneshot::Sender<()>>,
sig_stop: QSignal,
}

#[derive(Debug)]
enum Payload {
Conn(QConnection),
Stop,
}
// #[derive(Debug)]
// enum Payload {
// Conn(QConnection),
// }

#[derive(PartialEq, Debug)]
enum State {
Expand Down Expand Up @@ -61,23 +63,17 @@ extern "C" fn listener_handler(
"[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}",
listener, h, info
);
if let Some(tx) = ctx.tx.take() {
tx.send(Payload::Conn(c)).unwrap();
}
ctx.ch.insert(c);
}
QUIC_LISTENER_EVENT_STOP_COMPLETE => {
info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener);
assert_eq!(*state, State::StopRequested);
*state = State::Stopped;
// stop the acceptor
if let Some(tx) = ctx.tx.take() {
info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE tx send", listener);
// front end accept might already be stopped.
let _ = tx.send(Payload::Stop);
}
ctx.ch.close(0);
// stop the stop action
info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE stop_tx", listener);
ctx.stop_tx.take().unwrap().send(()).unwrap();
ctx.sig_stop.set(());
}
_ => {
unreachable!()
Expand All @@ -92,10 +88,10 @@ impl QListener {
pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self {
let context = Box::new(QListenerCtx {
_api: registration.api.clone(),
tx: None,
ch: QQueue::new(),
config: configuration.inner.clone(),
state: Mutex::new(State::Idle),
stop_tx: None,
sig_stop: QResetChannel::new(),
});
let l = Listener::new(
&registration.inner.inner,
Expand All @@ -117,41 +113,37 @@ impl QListener {
}

pub async fn accept(&mut self) -> Option<QConnection> {
let (tx, rx) = oneshot::channel();
let fu;
let rx;
{
let state = self.ctx.state.lock().unwrap();
if *state == State::Stopped {
return None;
}
// must be started
assert_ne!(*state, State::Idle);
assert!(self.ctx.tx.is_none());
self.ctx.tx.replace(tx);
fu = rx;
rx = self.ctx.ch.pop();
}
let payload = fu.await.unwrap();
match payload {
Payload::Conn(c) => Some(c),
Payload::Stop => None,
let res = rx.await;
match res {
Ok(c) => Some(c),
Err(_) => None,
}
}

pub async fn stop(&mut self) {
let (stop_tx, stop_rx) = oneshot::channel();
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
assert_eq!(*state, State::Started);
*state = State::StopRequested;
assert!(self.ctx.stop_tx.is_none());
self.ctx.stop_tx.replace(stop_tx);
rx = self.ctx.sig_stop.reset();
}
// callback may be invoked in the same thread.
info!("listner stop requested.");
self.inner.inner.stop();
info!("wait for stop_rx signal.");
// wait for drain.
stop_rx.await.unwrap();
rx.await;
info!("wait for stop_rx signal ok.");
}
}
Expand Down
167 changes: 167 additions & 0 deletions crates/libs/msquic/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::{
collections::LinkedList,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use tokio::sync::oneshot::{self, Receiver};

pub type QError = u32;

pub struct QReceiver<T> {
rx: oneshot::Receiver<T>,
}

pub struct QSender<T> {
tx: oneshot::Sender<T>,
}

pub fn oneshot_channel<T>() -> (QSender<T>, QReceiver<T>) {
let (tx, rx) = oneshot::channel();
(QSender { tx }, QReceiver { rx })
}

impl<T> QSender<T> {
pub fn send(self, data: T) {
// ignore is receiver dropped.
let _ = self.tx.send(data);
}
}

impl<T> QReceiver<T> {
pub fn blocking_recv(self) -> T {
// sender must send stuff so that there is not error.
self.rx.blocking_recv().unwrap()
}

// make a receiver that is filled with data
fn make_ready(data: T) -> Self {
let (tx, rx) = oneshot_channel();
tx.send(data);
rx
}
}

impl<T> Future for QReceiver<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
// Try to receive the value from the sender
let innner = <Receiver<T> as Future>::poll(Pin::new(&mut self.rx), _cx);
match innner {
Poll::Ready(x) => {
// error only happens when sender is dropped without sending.
// we ignore this error since in sf-rs use this will never happen.
Poll::Ready(x.expect("sender closed without sending a value."))
}
Poll::Pending => Poll::Pending,
}
}
}

// signal that can be set and reset and awaited.
pub struct QResetChannel<T> {
tx: Option<QSender<T>>,
}

impl<T> Default for QResetChannel<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> QResetChannel<T> {
pub fn new() -> Self {
Self { tx: None }
}

// set and reset are not thread safe.
pub fn reset(&mut self) -> QReceiver<T> {
let (tx, rx) = oneshot_channel();
assert!(self.tx.is_none());
self.tx.replace(tx);
rx
}

// send the data
pub fn set(&mut self, data: T) {
assert!(self.tx.is_some());
self.tx.take().unwrap().send(data)
}

// check if the channel can send/set
pub fn can_set(&self) -> bool {
self.tx.is_some()
}
}

pub type QSignal = QResetChannel<()>;

// Queue that can insert and awaited.
pub struct QQueue<T> {
v: LinkedList<T>,
channel: QResetChannel<Result<T, QError>>,
is_closed: bool,
ec: u32,
}

impl<T> Default for QQueue<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> QQueue<T> {
pub fn new() -> Self {
Self {
v: LinkedList::new(),
channel: QResetChannel::new(),
is_closed: false,
ec: 0,
}
}

pub fn insert(&mut self, data: T) {
assert!(!self.is_closed);
// if channel is waiting insert to channel.
if self.channel.can_set() {
self.channel.set(Ok(data));
return;
}
self.v.push_back(data);
}

// fails if queue is closed and there is no more data
// i.e. no more new data can be extracted.
pub fn pop(&mut self) -> QReceiver<Result<T, QError>> {
// if there is pending in v, return it
if !self.v.is_empty() {
let data = self.v.pop_front().unwrap();
return QReceiver::make_ready(Ok(data));
}

if self.is_closed {
// copy the error code
return QReceiver::make_ready(Err(self.ec));
}

// wait for next insert.
assert!(!self.channel.can_set()); // no pending wait
self.channel.reset()
}

// no more pop can initiate new wait.
//
pub fn close(&mut self, err: QError) {
self.is_closed = true;
self.ec = err;
if self.v.is_empty() {
// if there is wait give out error
if self.channel.can_set() {
self.channel.set(Err(self.ec));
}
} else {
assert!(self.channel.can_set(), "v is empty and channel is waiting");
}
}
}

0 comments on commit 5f71447

Please sign in to comment.