Skip to content

Commit

Permalink
stream refactor 1
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 11, 2024
1 parent 023ade6 commit 994aacf
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 95 deletions.
153 changes: 65 additions & 88 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ use c2::{
use tokio::sync::oneshot;

use crate::{
buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer},
conn::QConnection,
info,
utils::SBox,
QApi,
buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, conn::QConnection, info, sync::{QQueue, QResetChannel}, utils::SBox, QApi
};

// #[derive(Debug)]
Expand All @@ -29,19 +25,19 @@ pub struct QStream {
ctx: Box<QStreamCtx>,
}

#[derive(Debug, Clone)]
enum ShutdownError {
PeerSendShutdown,
PeerSendAbort,
ShutdownComplete,
}
// #[derive(Debug, Clone)]
// enum ShutdownError {
// PeerSendShutdown,
// PeerSendAbort,
// ShutdownComplete,
// }

#[allow(dead_code)]
#[derive(Debug)]
enum ReceivePayload {
Success(Vec<QVecBuffer>),
Stop(ShutdownError),
}
// #[allow(dead_code)]
// #[derive(Debug)]
// enum ReceivePayload {
// Success(Vec<QVecBuffer>),
// //Stop(ShutdownError),
// }

#[derive(Debug, Clone)]
enum SentPayload {
Expand All @@ -61,16 +57,16 @@ enum State {
Backend,
}

struct ReceiveState {
data: Vec<QVecBuffer>, // Buffers received but not yet given to frontend.
receive_tx: Option<oneshot::Sender<ReceivePayload>>, // frontend pending wait
is_peer_closed: bool,
}
// struct ReceiveState {
// data: Vec<QVecBuffer>, // Buffers received but not yet given to frontend.
// receive_tx: Option<oneshot::Sender<ReceivePayload>>, // frontend pending wait
// is_peer_closed: bool,
// }

struct QStreamCtx {
start_tx: Option<oneshot::Sender<StartPayload>>,
receive_state: ReceiveState,
send_tx: Option<oneshot::Sender<SentPayload>>,
receive_ch: QQueue<Vec<QVecBuffer>>,
send_ch: QResetChannel<SentPayload>,
send_shtdwn_tx: Option<oneshot::Sender<()>>,
drain_tx: Option<oneshot::Sender<()>>,
is_drained: bool,
Expand All @@ -82,12 +78,8 @@ impl QStreamCtx {
Self {
//start_rx: Some(start_rx),
start_tx: None,
receive_state: ReceiveState {
data: Vec::new(),
receive_tx: None,
is_peer_closed: false,
},
send_tx: None,
receive_ch: QQueue::new(),
send_ch: QResetChannel::new(),
send_shtdwn_tx: None,
drain_tx: None,
is_drained: false,
Expand Down Expand Up @@ -116,13 +108,14 @@ extern "C" fn qstream_handler_callback(
STREAM_EVENT_SEND_COMPLETE => {
let raw = unsafe { event.payload.send_complete };
info!("[{:?}] STREAM_EVENT_SEND_COMPLETE", stream);
let tx = ctx.send_tx.take().unwrap();
let payload = if raw.canceled {
SentPayload::Canceled
} else {
SentPayload::Success
};
tx.send(payload).unwrap();
if ctx.send_ch.can_set(){
ctx.send_ch.set(payload);
}
}
STREAM_EVENT_RECEIVE => {
info!("[{:?}] QUIC_STREAM_EVENT_RECEIVE", stream);
Expand All @@ -131,38 +124,23 @@ extern "C" fn qstream_handler_callback(
let curr = raw.buffer;
let buffs = unsafe { slice::from_raw_parts(curr, count.try_into().unwrap()) };
// send to frontend
let mut v = buffs
let v = buffs
.iter()
.map(|b| QVecBuffer::from(&QBuffRef::from(b)))
.collect::<Vec<_>>();
if let Some(tx) = ctx.receive_state.receive_tx.take() {
assert_eq!(ctx.receive_state.data.len(), 0);
// frontent is waiting
tx.send(ReceivePayload::Success(v)).unwrap();
// status = QUIC_STATUS_PENDING;
} else {
// queue the buffer to ctx
ctx.receive_state.data.append(&mut v)
}
ctx.receive_ch.insert(v);
}
// peer can shutdown their direction. But we should receive what is pending.
STREAM_EVENT_PEER_SEND_SHUTDOWN => {
info!("[{:?}] STREAM_EVENT_PEER_SEND_SHUTDOWN", stream);
ctx.receive_state.is_peer_closed = true;
let err = ShutdownError::PeerSendShutdown;
// Peer will no longer send new stuff, so the receive can be dropped.
// if frontend is waiting stop it.
if let Some(tx) = ctx.receive_state.receive_tx.take() {
tx.send(ReceivePayload::Stop(err)).unwrap();
}
ctx.receive_ch.close(0);
}
STREAM_EVENT_PEER_SEND_ABORTED => {
info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED", stream);
ctx.receive_state.is_peer_closed = true;
let err = ShutdownError::PeerSendAbort;
if let Some(tx) = ctx.receive_state.receive_tx.take() {
tx.send(ReceivePayload::Stop(err)).unwrap();
}
let raw = unsafe{event.payload.peer_send_aborted};
info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}", stream, raw.error_code);
ctx.receive_ch.close(0);
}
STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => {
// can ignore for now. This send to peer shutdown?
Expand All @@ -174,11 +152,12 @@ extern "C" fn qstream_handler_callback(
STREAM_EVENT_SHUTDOWN_COMPLETE => {
info!("[{:?}] STREAM_EVENT_SHUTDOWN_COMPLETE", stream);
// close all channels
ctx.receive_state.is_peer_closed = true;
let err = ShutdownError::ShutdownComplete;
if let Some(tx) = ctx.receive_state.receive_tx.take() {
tx.send(ReceivePayload::Stop(err)).unwrap();
}
// ctx.receive_state.is_peer_closed = true;
// let err = ShutdownError::ShutdownComplete;
// if let Some(tx) = ctx.receive_state.receive_tx.take() {
// tx.send(ReceivePayload::Stop(err)).unwrap();
// }
ctx.receive_ch.close(0);
// drain signal
ctx.is_drained = true;
if let Some(tx) = ctx.drain_tx.take() {
Expand Down Expand Up @@ -251,34 +230,31 @@ impl QStream {
// receive into this buff
// return num of bytes wrote.
pub async fn receive(&mut self, buff: &mut [u8]) -> Result<u32, Error> {
let fu;
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
let r_state = &mut self.ctx.receive_state;
if r_state.data.is_empty() {
if r_state.is_peer_closed {
return Err(Error::from(ErrorKind::BrokenPipe));
}
//
// need to wait for more.
let (receive_tx, receive_rx) = oneshot::channel();
assert!(r_state.receive_tx.is_none());
r_state.receive_tx.replace(receive_tx);
fu = receive_rx;
} else {
let v = &r_state.data;
let copied = QStream::copy_vec(v, buff);
assert_ne!(copied, 0);
return Ok(copied);
}
rx = self.ctx.receive_ch.pop();
// let r_state = &mut self.ctx.receive_state;
// if r_state.data.is_empty() {
// if r_state.is_peer_closed {
// return Err(Error::from(ErrorKind::BrokenPipe));
// }
// //
// // need to wait for more.
// let (receive_tx, receive_rx) = oneshot::channel();
// assert!(r_state.receive_tx.is_none());
// r_state.receive_tx.replace(receive_tx);
// fu = receive_rx;
// } else {
// let v = &r_state.data;
// let copied = QStream::copy_vec(v, buff);
// assert_ne!(copied, 0);
// return Ok(copied);
// }
}

let payload = fu.await.unwrap();
let v = match payload {
ReceivePayload::Success(buff) => Ok(buff),
ReceivePayload::Stop(_) => Err(Error::from(ErrorKind::ConnectionAborted)),
}?;
let v = rx.await.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?;
let copied = QStream::copy_vec(&v, buff);
// resume
// self.receive_complete(copied as u64);
Expand Down Expand Up @@ -311,13 +287,12 @@ impl QStream {
// }

pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> {
let (send_tx, send_rx) = oneshot::channel();
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;

assert!(self.ctx.send_tx.is_none());
self.ctx.send_tx.replace(send_tx);
rx = self.ctx.send_ch.reset();

let b = QBufferVec::from(buffers);
let bb = b.as_buffers();
Expand All @@ -327,10 +302,12 @@ impl QStream {
}

// wait backend
send_rx
.await
.map_or(Err(Error::from(ErrorKind::BrokenPipe)), Ok)?;
Ok(())
let res = rx
.await;
match res {
SentPayload::Success => Ok(()),
SentPayload::Canceled => Err(Error::from(ErrorKind::ConnectionAborted)),
}
}

// do not call this if already indicated shutdown during send.
Expand Down
10 changes: 3 additions & 7 deletions crates/libs/msquic/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,9 @@ impl<T> QQueue<T> {
pub fn close(&mut self, err: QError) {
self.is_closed = true;
self.ec = err;
if self.v.is_empty() {
// if there is wait give out error
if self.channel.can_set() {
self.channel.set(Err(self.ec));
}
} else {
assert!(self.channel.can_set(), "v is empty and channel is waiting");
// if there is wait give out error
if self.channel.can_set() {
self.channel.set(Err(self.ec));
}
}
}

0 comments on commit 994aacf

Please sign in to comment.