diff --git a/crates/libs/msquic/src/stream.rs b/crates/libs/msquic/src/stream.rs index cc8f3fc..133e807 100644 --- a/crates/libs/msquic/src/stream.rs +++ b/crates/libs/msquic/src/stream.rs @@ -15,11 +15,7 @@ use c2::{ use tokio::sync::oneshot; use crate::{ - buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, - conn::QConnection, - info, - utils::SBox, - QApi, + buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, conn::QConnection, info, sync::{QQueue, QResetChannel}, utils::SBox, QApi }; // #[derive(Debug)] @@ -29,19 +25,19 @@ pub struct QStream { ctx: Box, } -#[derive(Debug, Clone)] -enum ShutdownError { - PeerSendShutdown, - PeerSendAbort, - ShutdownComplete, -} +// #[derive(Debug, Clone)] +// enum ShutdownError { +// PeerSendShutdown, +// PeerSendAbort, +// ShutdownComplete, +// } -#[allow(dead_code)] -#[derive(Debug)] -enum ReceivePayload { - Success(Vec), - Stop(ShutdownError), -} +// #[allow(dead_code)] +// #[derive(Debug)] +// enum ReceivePayload { +// Success(Vec), +// //Stop(ShutdownError), +// } #[derive(Debug, Clone)] enum SentPayload { @@ -61,16 +57,16 @@ enum State { Backend, } -struct ReceiveState { - data: Vec, // Buffers received but not yet given to frontend. - receive_tx: Option>, // frontend pending wait - is_peer_closed: bool, -} +// struct ReceiveState { +// data: Vec, // Buffers received but not yet given to frontend. +// receive_tx: Option>, // frontend pending wait +// is_peer_closed: bool, +// } struct QStreamCtx { start_tx: Option>, - receive_state: ReceiveState, - send_tx: Option>, + receive_ch: QQueue>, + send_ch: QResetChannel, send_shtdwn_tx: Option>, drain_tx: Option>, is_drained: bool, @@ -82,12 +78,8 @@ impl QStreamCtx { Self { //start_rx: Some(start_rx), start_tx: None, - receive_state: ReceiveState { - data: Vec::new(), - receive_tx: None, - is_peer_closed: false, - }, - send_tx: None, + receive_ch: QQueue::new(), + send_ch: QResetChannel::new(), send_shtdwn_tx: None, drain_tx: None, is_drained: false, @@ -116,13 +108,14 @@ extern "C" fn qstream_handler_callback( STREAM_EVENT_SEND_COMPLETE => { let raw = unsafe { event.payload.send_complete }; info!("[{:?}] STREAM_EVENT_SEND_COMPLETE", stream); - let tx = ctx.send_tx.take().unwrap(); let payload = if raw.canceled { SentPayload::Canceled } else { SentPayload::Success }; - tx.send(payload).unwrap(); + if ctx.send_ch.can_set(){ + ctx.send_ch.set(payload); + } } STREAM_EVENT_RECEIVE => { info!("[{:?}] QUIC_STREAM_EVENT_RECEIVE", stream); @@ -131,38 +124,23 @@ extern "C" fn qstream_handler_callback( let curr = raw.buffer; let buffs = unsafe { slice::from_raw_parts(curr, count.try_into().unwrap()) }; // send to frontend - let mut v = buffs + let v = buffs .iter() .map(|b| QVecBuffer::from(&QBuffRef::from(b))) .collect::>(); - if let Some(tx) = ctx.receive_state.receive_tx.take() { - assert_eq!(ctx.receive_state.data.len(), 0); - // frontent is waiting - tx.send(ReceivePayload::Success(v)).unwrap(); - // status = QUIC_STATUS_PENDING; - } else { - // queue the buffer to ctx - ctx.receive_state.data.append(&mut v) - } + ctx.receive_ch.insert(v); } // peer can shutdown their direction. But we should receive what is pending. STREAM_EVENT_PEER_SEND_SHUTDOWN => { info!("[{:?}] STREAM_EVENT_PEER_SEND_SHUTDOWN", stream); - ctx.receive_state.is_peer_closed = true; - let err = ShutdownError::PeerSendShutdown; // Peer will no longer send new stuff, so the receive can be dropped. // if frontend is waiting stop it. - if let Some(tx) = ctx.receive_state.receive_tx.take() { - tx.send(ReceivePayload::Stop(err)).unwrap(); - } + ctx.receive_ch.close(0); } STREAM_EVENT_PEER_SEND_ABORTED => { - info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED", stream); - ctx.receive_state.is_peer_closed = true; - let err = ShutdownError::PeerSendAbort; - if let Some(tx) = ctx.receive_state.receive_tx.take() { - tx.send(ReceivePayload::Stop(err)).unwrap(); - } + let raw = unsafe{event.payload.peer_send_aborted}; + info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}", stream, raw.error_code); + ctx.receive_ch.close(0); } STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => { // can ignore for now. This send to peer shutdown? @@ -174,11 +152,12 @@ extern "C" fn qstream_handler_callback( STREAM_EVENT_SHUTDOWN_COMPLETE => { info!("[{:?}] STREAM_EVENT_SHUTDOWN_COMPLETE", stream); // close all channels - ctx.receive_state.is_peer_closed = true; - let err = ShutdownError::ShutdownComplete; - if let Some(tx) = ctx.receive_state.receive_tx.take() { - tx.send(ReceivePayload::Stop(err)).unwrap(); - } + // ctx.receive_state.is_peer_closed = true; + // let err = ShutdownError::ShutdownComplete; + // if let Some(tx) = ctx.receive_state.receive_tx.take() { + // tx.send(ReceivePayload::Stop(err)).unwrap(); + // } + ctx.receive_ch.close(0); // drain signal ctx.is_drained = true; if let Some(tx) = ctx.drain_tx.take() { @@ -251,34 +230,31 @@ impl QStream { // receive into this buff // return num of bytes wrote. pub async fn receive(&mut self, buff: &mut [u8]) -> Result { - let fu; + let rx; { let mut state = self.ctx.state.lock().unwrap(); *state = State::Frontend; - let r_state = &mut self.ctx.receive_state; - if r_state.data.is_empty() { - if r_state.is_peer_closed { - return Err(Error::from(ErrorKind::BrokenPipe)); - } - // - // need to wait for more. - let (receive_tx, receive_rx) = oneshot::channel(); - assert!(r_state.receive_tx.is_none()); - r_state.receive_tx.replace(receive_tx); - fu = receive_rx; - } else { - let v = &r_state.data; - let copied = QStream::copy_vec(v, buff); - assert_ne!(copied, 0); - return Ok(copied); - } + rx = self.ctx.receive_ch.pop(); + // let r_state = &mut self.ctx.receive_state; + // if r_state.data.is_empty() { + // if r_state.is_peer_closed { + // return Err(Error::from(ErrorKind::BrokenPipe)); + // } + // // + // // need to wait for more. + // let (receive_tx, receive_rx) = oneshot::channel(); + // assert!(r_state.receive_tx.is_none()); + // r_state.receive_tx.replace(receive_tx); + // fu = receive_rx; + // } else { + // let v = &r_state.data; + // let copied = QStream::copy_vec(v, buff); + // assert_ne!(copied, 0); + // return Ok(copied); + // } } - let payload = fu.await.unwrap(); - let v = match payload { - ReceivePayload::Success(buff) => Ok(buff), - ReceivePayload::Stop(_) => Err(Error::from(ErrorKind::ConnectionAborted)), - }?; + let v = rx.await.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?; let copied = QStream::copy_vec(&v, buff); // resume // self.receive_complete(copied as u64); @@ -311,13 +287,12 @@ impl QStream { // } pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> { - let (send_tx, send_rx) = oneshot::channel(); + let rx; { let mut state = self.ctx.state.lock().unwrap(); *state = State::Frontend; - assert!(self.ctx.send_tx.is_none()); - self.ctx.send_tx.replace(send_tx); + rx = self.ctx.send_ch.reset(); let b = QBufferVec::from(buffers); let bb = b.as_buffers(); @@ -327,10 +302,12 @@ impl QStream { } // wait backend - send_rx - .await - .map_or(Err(Error::from(ErrorKind::BrokenPipe)), Ok)?; - Ok(()) + let res = rx + .await; + match res { + SentPayload::Success => Ok(()), + SentPayload::Canceled => Err(Error::from(ErrorKind::ConnectionAborted)), + } } // do not call this if already indicated shutdown during send. diff --git a/crates/libs/msquic/src/sync.rs b/crates/libs/msquic/src/sync.rs index f09291d..40ee959 100644 --- a/crates/libs/msquic/src/sync.rs +++ b/crates/libs/msquic/src/sync.rs @@ -155,13 +155,9 @@ impl QQueue { pub fn close(&mut self, err: QError) { self.is_closed = true; self.ec = err; - if self.v.is_empty() { - // if there is wait give out error - if self.channel.can_set() { - self.channel.set(Err(self.ec)); - } - } else { - assert!(self.channel.can_set(), "v is empty and channel is waiting"); + // if there is wait give out error + if self.channel.can_set() { + self.channel.set(Err(self.ec)); } } }