diff --git a/crates/libs/msquic/src/stream.rs b/crates/libs/msquic/src/stream.rs index 133e807..a97fc18 100644 --- a/crates/libs/msquic/src/stream.rs +++ b/crates/libs/msquic/src/stream.rs @@ -5,40 +5,29 @@ use std::{ sync::Mutex, }; +use crate::{ + buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, + conn::QConnection, + info, + sync::{QQueue, QResetChannel, QSignal}, + utils::SBox, + QApi, +}; use c2::{ - Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags, + Buffer, Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags, STREAM_EVENT_PEER_RECEIVE_ABORTED, STREAM_EVENT_PEER_SEND_ABORTED, STREAM_EVENT_PEER_SEND_SHUTDOWN, STREAM_EVENT_RECEIVE, STREAM_EVENT_SEND_COMPLETE, STREAM_EVENT_SEND_SHUTDOWN_COMPLETE, STREAM_EVENT_SHUTDOWN_COMPLETE, STREAM_EVENT_START_COMPLETE, STREAM_SHUTDOWN_FLAG_NONE, }; -use tokio::sync::oneshot; - -use crate::{ - buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, conn::QConnection, info, sync::{QQueue, QResetChannel}, utils::SBox, QApi -}; // #[derive(Debug)] pub struct QStream { _api: QApi, inner: SBox, - ctx: Box, + ctx: Box>, } -// #[derive(Debug, Clone)] -// enum ShutdownError { -// PeerSendShutdown, -// PeerSendAbort, -// ShutdownComplete, -// } - -// #[allow(dead_code)] -// #[derive(Debug)] -// enum ReceivePayload { -// Success(Vec), -// //Stop(ShutdownError), -// } - #[derive(Debug, Clone)] enum SentPayload { Success, @@ -50,40 +39,71 @@ enum StartPayload { Success, } -// dummy state lock -enum State { - Idle, - Frontend, - Backend, -} - -// struct ReceiveState { -// data: Vec, // Buffers received but not yet given to frontend. -// receive_tx: Option>, // frontend pending wait -// is_peer_closed: bool, -// } - struct QStreamCtx { - start_tx: Option>, + start_sig: QResetChannel, receive_ch: QQueue>, send_ch: QResetChannel, - send_shtdwn_tx: Option>, - drain_tx: Option>, + send_shtdwn_sig: QSignal, + drain_sig: QSignal, is_drained: bool, - state: Mutex, } impl QStreamCtx { fn new() -> Self { Self { - //start_rx: Some(start_rx), - start_tx: None, + start_sig: QResetChannel::new(), receive_ch: QQueue::new(), send_ch: QResetChannel::new(), - send_shtdwn_tx: None, - drain_tx: None, + send_shtdwn_sig: QSignal::new(), + drain_sig: QSignal::new(), is_drained: false, - state: Mutex::new(State::Idle), + } + } + + fn on_start_complete(&mut self) { + if self.start_sig.can_set() { + self.start_sig.set(StartPayload::Success); + } + } + fn on_send_complete(&mut self, cancelled: bool) { + let payload = if cancelled { + SentPayload::Canceled + } else { + SentPayload::Success + }; + if self.send_ch.can_set() { + self.send_ch.set(payload); + } + } + fn on_receive(&mut self, buffs: &[Buffer]) { + // send to frontend + let v = buffs + .iter() + .map(|b| QVecBuffer::from(&QBuffRef::from(b))) + .collect::>(); + self.receive_ch.insert(v); + } + fn on_peer_send_shutdown(&mut self) { + // peer can shutdown their direction. But we should receive what is pending. + // Peer will no longer send new stuff, so the receive can be dropped. + // if frontend is waiting stop it. + self.receive_ch.close(0); + } + fn on_peer_send_abort(&mut self, _ec: u64) { + self.receive_ch.close(0); + } + fn on_send_shutdown_complete(&mut self) { + if self.send_shtdwn_sig.can_set() { + self.send_shtdwn_sig.set(()); + } + } + fn on_shutdown_complete(&mut self) { + // close all channels + self.receive_ch.close(0); + // drain signal + self.is_drained = true; + if self.drain_sig.can_set() { + self.drain_sig.set(()) } } } @@ -94,28 +114,23 @@ extern "C" fn qstream_handler_callback( event: &StreamEvent, ) -> u32 { assert!(!context.is_null()); - let ctx = unsafe { (context as *mut QStreamCtx).as_mut().unwrap() }; + let ctx = unsafe { (context as *mut Mutex).as_mut().unwrap() }; + #[allow(clippy::mut_mutex_lock)] + let mut ctx = ctx.lock().unwrap(); let status = 0; - let mut state = ctx.state.lock().unwrap(); - *state = State::Backend; match event.event_type { STREAM_EVENT_START_COMPLETE => { info!("[{:?}] STREAM_EVENT_START_COMPLETE", stream); - let tx = ctx.start_tx.take().unwrap(); - tx.send(StartPayload::Success).unwrap(); + ctx.on_start_complete(); } STREAM_EVENT_SEND_COMPLETE => { let raw = unsafe { event.payload.send_complete }; - info!("[{:?}] STREAM_EVENT_SEND_COMPLETE", stream); - let payload = if raw.canceled { - SentPayload::Canceled - } else { - SentPayload::Success - }; - if ctx.send_ch.can_set(){ - ctx.send_ch.set(payload); - } + info!( + "[{:?}] STREAM_EVENT_SEND_COMPLETE cancel {}", + stream, raw.canceled + ); + ctx.on_send_complete(raw.canceled); } STREAM_EVENT_RECEIVE => { info!("[{:?}] QUIC_STREAM_EVENT_RECEIVE", stream); @@ -123,46 +138,27 @@ extern "C" fn qstream_handler_callback( let count = raw.buffer_count; let curr = raw.buffer; let buffs = unsafe { slice::from_raw_parts(curr, count.try_into().unwrap()) }; - // send to frontend - let v = buffs - .iter() - .map(|b| QVecBuffer::from(&QBuffRef::from(b))) - .collect::>(); - ctx.receive_ch.insert(v); + ctx.on_receive(buffs); } - // peer can shutdown their direction. But we should receive what is pending. STREAM_EVENT_PEER_SEND_SHUTDOWN => { info!("[{:?}] STREAM_EVENT_PEER_SEND_SHUTDOWN", stream); - // Peer will no longer send new stuff, so the receive can be dropped. - // if frontend is waiting stop it. - ctx.receive_ch.close(0); + ctx.on_peer_send_shutdown(); } STREAM_EVENT_PEER_SEND_ABORTED => { - let raw = unsafe{event.payload.peer_send_aborted}; - info!("[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}", stream, raw.error_code); - ctx.receive_ch.close(0); + let raw = unsafe { event.payload.peer_send_aborted }; + info!( + "[{:?}] STREAM_EVENT_PEER_SEND_ABORTED: ec {}", + stream, raw.error_code + ); + ctx.on_peer_send_abort(raw.error_code); } STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => { - // can ignore for now. This send to peer shutdown? info!("[{:?}] STREAM_EVENT_SEND_SHUTDOWN_COMPLETE", stream); - if let Some(tx) = ctx.send_shtdwn_tx.take() { - tx.send(()).unwrap(); - } + ctx.on_send_shutdown_complete(); } STREAM_EVENT_SHUTDOWN_COMPLETE => { info!("[{:?}] STREAM_EVENT_SHUTDOWN_COMPLETE", stream); - // close all channels - // ctx.receive_state.is_peer_closed = true; - // let err = ShutdownError::ShutdownComplete; - // if let Some(tx) = ctx.receive_state.receive_tx.take() { - // tx.send(ReceivePayload::Stop(err)).unwrap(); - // } - ctx.receive_ch.close(0); - // drain signal - ctx.is_drained = true; - if let Some(tx) = ctx.drain_tx.take() { - tx.send(()).unwrap(); - } + ctx.on_shutdown_complete(); } STREAM_EVENT_PEER_RECEIVE_ABORTED => { // can ignore for now @@ -178,10 +174,10 @@ extern "C" fn qstream_handler_callback( impl QStream { pub fn attach(api: QApi, h: Handle) -> Self { let s = Stream::from_parts(h, &api.inner.inner); - let ctx = Box::new(QStreamCtx::new()); + let ctx = Box::new(Mutex::new(QStreamCtx::new())); s.set_callback_handler( qstream_handler_callback, - &*ctx as *const QStreamCtx as *const c_void, + &*ctx as *const Mutex as *const c_void, ); Self { @@ -194,12 +190,12 @@ impl QStream { // open client stream pub fn open(connection: &QConnection, flags: StreamOpenFlags) -> Self { let s = Stream::new(&connection._api.inner.inner); - let ctx = Box::new(QStreamCtx::new()); + let ctx = Box::new(Mutex::new(QStreamCtx::new())); s.open( &connection.inner.inner, flags, qstream_handler_callback, - &*ctx as *const QStreamCtx as *const c_void, + &*ctx as *const Mutex as *const c_void, ); Self { _api: connection._api.clone(), @@ -211,18 +207,14 @@ impl QStream { // start stream for client pub async fn start(&mut self, flags: StreamStartFlags) -> Result<(), Error> { // regardless of start success of fail, there is a QUIC_STREAM_EVENT_START_COMPLETE callback. - let (start_tx, start_rx) = oneshot::channel(); + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - // prepare the channel. - assert!(self.ctx.start_tx.is_none()); - self.ctx.start_tx.replace(start_tx); + rx = self.ctx.lock().unwrap().start_sig.reset(); self.inner.inner.start(flags); } // wait for backend - match start_rx.await.unwrap() { + match rx.await { StartPayload::Success => Ok(()), } } @@ -232,29 +224,12 @@ impl QStream { pub async fn receive(&mut self, buff: &mut [u8]) -> Result { let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - rx = self.ctx.receive_ch.pop(); - // let r_state = &mut self.ctx.receive_state; - // if r_state.data.is_empty() { - // if r_state.is_peer_closed { - // return Err(Error::from(ErrorKind::BrokenPipe)); - // } - // // - // // need to wait for more. - // let (receive_tx, receive_rx) = oneshot::channel(); - // assert!(r_state.receive_tx.is_none()); - // r_state.receive_tx.replace(receive_tx); - // fu = receive_rx; - // } else { - // let v = &r_state.data; - // let copied = QStream::copy_vec(v, buff); - // assert_ne!(copied, 0); - // return Ok(copied); - // } + rx = self.ctx.lock().unwrap().receive_ch.pop(); } - let v = rx.await.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?; + let v = rx + .await + .map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?; let copied = QStream::copy_vec(&v, buff); // resume // self.receive_complete(copied as u64); @@ -289,10 +264,7 @@ impl QStream { pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> { let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - - rx = self.ctx.send_ch.reset(); + rx = self.ctx.lock().unwrap().send_ch.reset(); let b = QBufferVec::from(buffers); let bb = b.as_buffers(); @@ -302,43 +274,34 @@ impl QStream { } // wait backend - let res = rx - .await; + let res = rx.await; match res { - SentPayload::Success => Ok(()), + SentPayload::Success => Ok(()), SentPayload::Canceled => Err(Error::from(ErrorKind::ConnectionAborted)), } } + // send shutdown signal to peer. // do not call this if already indicated shutdown during send. pub async fn shutdown(&mut self) { - let fu; + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - let (send_shtdwn_tx, send_shtdwn_rx) = oneshot::channel(); - assert!(self.ctx.send_shtdwn_tx.is_none()); - self.ctx.send_shtdwn_tx.replace(send_shtdwn_tx); - fu = send_shtdwn_rx; + rx = self.ctx.lock().unwrap().send_shtdwn_sig.reset(); self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0); } - fu.await.unwrap(); + rx.await; } // wait for the complete shutdown event. before close handle. pub async fn drain(&mut self) { - let fu; + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - if self.ctx.is_drained { + let mut lk = self.ctx.lock().unwrap(); + if lk.is_drained { return; } - let (drain_tx, drain_rx) = oneshot::channel(); - assert!(self.ctx.drain_tx.is_none()); - self.ctx.drain_tx.replace(drain_tx); - fu = drain_rx; + rx = lk.drain_sig.reset(); } - fu.await.unwrap(); + rx.await; } }