Skip to content

Commit

Permalink
refactor stream end
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 11, 2024
1 parent 994aacf commit 2ba01e4
Showing 1 changed file with 105 additions and 142 deletions.
247 changes: 105 additions & 142 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,29 @@ use std::{
sync::Mutex,
};

use crate::{
buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer},
conn::QConnection,
info,
sync::{QQueue, QResetChannel, QSignal},
utils::SBox,
QApi,
};
use c2::{
Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags,
Buffer, Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags,
STREAM_EVENT_PEER_RECEIVE_ABORTED, STREAM_EVENT_PEER_SEND_ABORTED,
STREAM_EVENT_PEER_SEND_SHUTDOWN, STREAM_EVENT_RECEIVE, STREAM_EVENT_SEND_COMPLETE,
STREAM_EVENT_SEND_SHUTDOWN_COMPLETE, STREAM_EVENT_SHUTDOWN_COMPLETE,
STREAM_EVENT_START_COMPLETE, STREAM_SHUTDOWN_FLAG_NONE,
};
use tokio::sync::oneshot;

use crate::{
buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, conn::QConnection, info, sync::{QQueue, QResetChannel}, utils::SBox, QApi
};

// #[derive(Debug)]
pub struct QStream {
_api: QApi,
inner: SBox<Stream>,
ctx: Box<QStreamCtx>,
ctx: Box<Mutex<QStreamCtx>>,
}

// #[derive(Debug, Clone)]
// enum ShutdownError {
// PeerSendShutdown,
// PeerSendAbort,
// ShutdownComplete,
// }

// #[allow(dead_code)]
// #[derive(Debug)]
// enum ReceivePayload {
// Success(Vec<QVecBuffer>),
// //Stop(ShutdownError),
// }

#[derive(Debug, Clone)]
enum SentPayload {
Success,
Expand All @@ -50,40 +39,71 @@ enum StartPayload {
Success,
}

// dummy state lock
enum State {
Idle,
Frontend,
Backend,
}

// struct ReceiveState {
// data: Vec<QVecBuffer>, // Buffers received but not yet given to frontend.
// receive_tx: Option<oneshot::Sender<ReceivePayload>>, // frontend pending wait
// is_peer_closed: bool,
// }

struct QStreamCtx {
start_tx: Option<oneshot::Sender<StartPayload>>,
start_sig: QResetChannel<StartPayload>,
receive_ch: QQueue<Vec<QVecBuffer>>,
send_ch: QResetChannel<SentPayload>,
send_shtdwn_tx: Option<oneshot::Sender<()>>,
drain_tx: Option<oneshot::Sender<()>>,
send_shtdwn_sig: QSignal,
drain_sig: QSignal,
is_drained: bool,
state: Mutex<State>,
}

impl QStreamCtx {
fn new() -> Self {
Self {
//start_rx: Some(start_rx),
start_tx: None,
start_sig: QResetChannel::new(),
receive_ch: QQueue::new(),
send_ch: QResetChannel::new(),
send_shtdwn_tx: None,
drain_tx: None,
send_shtdwn_sig: QSignal::new(),
drain_sig: QSignal::new(),
is_drained: false,
state: Mutex::new(State::Idle),
}
}

fn on_start_complete(&mut self) {
if self.start_sig.can_set() {
self.start_sig.set(StartPayload::Success);
}
}
fn on_send_complete(&mut self, cancelled: bool) {
let payload = if cancelled {
SentPayload::Canceled
} else {
SentPayload::Success
};
if self.send_ch.can_set() {
self.send_ch.set(payload);
}
}
fn on_receive(&mut self, buffs: &[Buffer]) {
// send to frontend
let v = buffs
.iter()
.map(|b| QVecBuffer::from(&QBuffRef::from(b)))
.collect::<Vec<_>>();
self.receive_ch.insert(v);
}
fn on_peer_send_shutdown(&mut self) {
// peer can shutdown their direction. But we should receive what is pending.
// Peer will no longer send new stuff, so the receive can be dropped.
// if frontend is waiting stop it.
self.receive_ch.close(0);
}
fn on_peer_send_abort(&mut self, _ec: u64) {
self.receive_ch.close(0);
}
fn on_send_shutdown_complete(&mut self) {
if self.send_shtdwn_sig.can_set() {
self.send_shtdwn_sig.set(());
}
}
fn on_shutdown_complete(&mut self) {
// close all channels
self.receive_ch.close(0);
// drain signal
self.is_drained = true;
if self.drain_sig.can_set() {
self.drain_sig.set(())
}
}
}
Expand All @@ -94,75 +114,51 @@ extern "C" fn qstream_handler_callback(
event: &StreamEvent,
) -> u32 {
assert!(!context.is_null());
let ctx = unsafe { (context as *mut QStreamCtx).as_mut().unwrap() };
let ctx = unsafe { (context as *mut Mutex<QStreamCtx>).as_mut().unwrap() };
#[allow(clippy::mut_mutex_lock)]
let mut ctx = ctx.lock().unwrap();
let status = 0;
let mut state = ctx.state.lock().unwrap();
*state = State::Backend;

match event.event_type {
STREAM_EVENT_START_COMPLETE => {
info!("[{:?}] STREAM_EVENT_START_COMPLETE", stream);
let tx = ctx.start_tx.take().unwrap();
tx.send(StartPayload::Success).unwrap();
ctx.on_start_complete();
}
STREAM_EVENT_SEND_COMPLETE => {
let raw = unsafe { event.payload.send_complete };
info!("[{:?}] STREAM_EVENT_SEND_COMPLETE", stream);
let payload = if raw.canceled {
SentPayload::Canceled
} else {
SentPayload::Success
};
if ctx.send_ch.can_set(){
ctx.send_ch.set(payload);
}
info!(
"[{:?}] STREAM_EVENT_SEND_COMPLETE cancel {}",
stream, raw.canceled
);
ctx.on_send_complete(raw.canceled);
}
STREAM_EVENT_RECEIVE => {
info!("[{:?}] QUIC_STREAM_EVENT_RECEIVE", stream);
let raw = unsafe { event.payload.receive };
let count = raw.buffer_count;
let curr = raw.buffer;
let buffs = unsafe { slice::from_raw_parts(curr, count.try_into().unwrap()) };
// send to frontend
let v = buffs
.iter()
.map(|b| QVecBuffer::from(&QBuffRef::from(b)))
.collect::<Vec<_>>();
ctx.receive_ch.insert(v);
ctx.on_receive(buffs);
}
// peer can shutdown their direction. But we should receive what is pending.
STREAM_EVENT_PEER_SEND_SHUTDOWN => {
info!("[{:?}] STREAM_EVENT_PEER_SEND_SHUTDOWN", stream);
// Peer will no longer send new stuff, so the receive can be dropped.
// if frontend is waiting stop it.
ctx.receive_ch.close(0);
ctx.on_peer_send_shutdown();
}
STREAM_EVENT_PEER_SEND_ABORTED => {
let raw = unsafe{event.payload.peer_send_aborted};
info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}", stream, raw.error_code);
ctx.receive_ch.close(0);
let raw = unsafe { event.payload.peer_send_aborted };
info!(
"[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}",
stream, raw.error_code
);
ctx.on_peer_send_abort(raw.error_code);
}
STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => {
// can ignore for now. This send to peer shutdown?
info!("[{:?}] STREAM_EVENT_SEND_SHUTDOWN_COMPLETE", stream);
if let Some(tx) = ctx.send_shtdwn_tx.take() {
tx.send(()).unwrap();
}
ctx.on_send_shutdown_complete();
}
STREAM_EVENT_SHUTDOWN_COMPLETE => {
info!("[{:?}] STREAM_EVENT_SHUTDOWN_COMPLETE", stream);
// close all channels
// ctx.receive_state.is_peer_closed = true;
// let err = ShutdownError::ShutdownComplete;
// if let Some(tx) = ctx.receive_state.receive_tx.take() {
// tx.send(ReceivePayload::Stop(err)).unwrap();
// }
ctx.receive_ch.close(0);
// drain signal
ctx.is_drained = true;
if let Some(tx) = ctx.drain_tx.take() {
tx.send(()).unwrap();
}
ctx.on_shutdown_complete();
}
STREAM_EVENT_PEER_RECEIVE_ABORTED => {
// can ignore for now
Expand All @@ -178,10 +174,10 @@ extern "C" fn qstream_handler_callback(
impl QStream {
pub fn attach(api: QApi, h: Handle) -> Self {
let s = Stream::from_parts(h, &api.inner.inner);
let ctx = Box::new(QStreamCtx::new());
let ctx = Box::new(Mutex::new(QStreamCtx::new()));
s.set_callback_handler(
qstream_handler_callback,
&*ctx as *const QStreamCtx as *const c_void,
&*ctx as *const Mutex<QStreamCtx> as *const c_void,
);

Self {
Expand All @@ -194,12 +190,12 @@ impl QStream {
// open client stream
pub fn open(connection: &QConnection, flags: StreamOpenFlags) -> Self {
let s = Stream::new(&connection._api.inner.inner);
let ctx = Box::new(QStreamCtx::new());
let ctx = Box::new(Mutex::new(QStreamCtx::new()));
s.open(
&connection.inner.inner,
flags,
qstream_handler_callback,
&*ctx as *const QStreamCtx as *const c_void,
&*ctx as *const Mutex<QStreamCtx> as *const c_void,
);
Self {
_api: connection._api.clone(),
Expand All @@ -211,18 +207,14 @@ impl QStream {
// start stream for client
pub async fn start(&mut self, flags: StreamStartFlags) -> Result<(), Error> {
// regardless of start success of fail, there is a QUIC_STREAM_EVENT_START_COMPLETE callback.
let (start_tx, start_rx) = oneshot::channel();
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;

// prepare the channel.
assert!(self.ctx.start_tx.is_none());
self.ctx.start_tx.replace(start_tx);
rx = self.ctx.lock().unwrap().start_sig.reset();
self.inner.inner.start(flags);
}
// wait for backend
match start_rx.await.unwrap() {
match rx.await {
StartPayload::Success => Ok(()),
}
}
Expand All @@ -232,29 +224,12 @@ impl QStream {
pub async fn receive(&mut self, buff: &mut [u8]) -> Result<u32, Error> {
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
rx = self.ctx.receive_ch.pop();
// let r_state = &mut self.ctx.receive_state;
// if r_state.data.is_empty() {
// if r_state.is_peer_closed {
// return Err(Error::from(ErrorKind::BrokenPipe));
// }
// //
// // need to wait for more.
// let (receive_tx, receive_rx) = oneshot::channel();
// assert!(r_state.receive_tx.is_none());
// r_state.receive_tx.replace(receive_tx);
// fu = receive_rx;
// } else {
// let v = &r_state.data;
// let copied = QStream::copy_vec(v, buff);
// assert_ne!(copied, 0);
// return Ok(copied);
// }
rx = self.ctx.lock().unwrap().receive_ch.pop();
}

let v = rx.await.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?;
let v = rx
.await
.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?;
let copied = QStream::copy_vec(&v, buff);
// resume
// self.receive_complete(copied as u64);
Expand Down Expand Up @@ -289,10 +264,7 @@ impl QStream {
pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> {
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;

rx = self.ctx.send_ch.reset();
rx = self.ctx.lock().unwrap().send_ch.reset();

let b = QBufferVec::from(buffers);
let bb = b.as_buffers();
Expand All @@ -302,43 +274,34 @@ impl QStream {
}

// wait backend
let res = rx
.await;
let res = rx.await;
match res {
SentPayload::Success => Ok(()),
SentPayload::Success => Ok(()),
SentPayload::Canceled => Err(Error::from(ErrorKind::ConnectionAborted)),
}
}

// send shutdown signal to peer.
// do not call this if already indicated shutdown during send.
pub async fn shutdown(&mut self) {
let fu;
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
let (send_shtdwn_tx, send_shtdwn_rx) = oneshot::channel();
assert!(self.ctx.send_shtdwn_tx.is_none());
self.ctx.send_shtdwn_tx.replace(send_shtdwn_tx);
fu = send_shtdwn_rx;
rx = self.ctx.lock().unwrap().send_shtdwn_sig.reset();
self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0);
}
fu.await.unwrap();
rx.await;
}

// wait for the complete shutdown event. before close handle.
pub async fn drain(&mut self) {
let fu;
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
if self.ctx.is_drained {
let mut lk = self.ctx.lock().unwrap();
if lk.is_drained {
return;
}
let (drain_tx, drain_rx) = oneshot::channel();
assert!(self.ctx.drain_tx.is_none());
self.ctx.drain_tx.replace(drain_tx);
fu = drain_rx;
rx = lk.drain_sig.reset();
}
fu.await.unwrap();
rx.await;
}
}

0 comments on commit 2ba01e4

Please sign in to comment.