From b751a4b09bc59f7fae53f013246e0d142fb9b551 Mon Sep 17 00:00:00 2001 From: Youyuan Wu Date: Thu, 9 May 2024 22:58:34 -0700 Subject: [PATCH] conn refactor start --- crates/libs/msquic/src/config.rs | 1 + crates/libs/msquic/src/conn.rs | 218 +++++++++++++---------------- crates/libs/msquic/src/lib.rs | 4 +- crates/libs/msquic/src/listener.rs | 2 +- 4 files changed, 99 insertions(+), 126 deletions(-) diff --git a/crates/libs/msquic/src/config.rs b/crates/libs/msquic/src/config.rs index 1e5c55c..3faa246 100644 --- a/crates/libs/msquic/src/config.rs +++ b/crates/libs/msquic/src/config.rs @@ -4,6 +4,7 @@ use c2::{Buffer, Configuration, CredentialConfig, Settings}; use crate::{reg::QRegistration, utils::SBox, QApi}; +#[derive(Clone)] pub struct QConfiguration { pub _api: QApi, pub inner: Arc>, diff --git a/crates/libs/msquic/src/conn.rs b/crates/libs/msquic/src/conn.rs index 2e53494..12aa56c 100644 --- a/crates/libs/msquic/src/conn.rs +++ b/crates/libs/msquic/src/conn.rs @@ -1,11 +1,11 @@ -use crate::{config::QConfiguration, info, reg::QRegistration, stream::QStream}; -use std::{ - borrow::BorrowMut, - ffi::c_void, - fmt::Debug, - io::{Error, ErrorKind}, - sync::Mutex, +use crate::{ + config::QConfiguration, + info, + reg::QRegistration, + stream::QStream, + sync::{QQueue, QReceiver, QResetChannel, QSignal}, }; +use std::{ffi::c_void, fmt::Debug, io::Error, sync::Mutex}; use c2::{ Configuration, Connection, ConnectionEvent, Handle, SendResumptionFlags, @@ -14,14 +14,13 @@ use c2::{ CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER, CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT, CONNECTION_SHUTDOWN_FLAG_NONE, }; -use tokio::sync::{mpsc, oneshot}; use crate::{utils::SBox, QApi}; pub struct QConnection { pub _api: QApi, pub inner: SBox, - ctx: Box, + ctx: Box>, } impl Debug for QConnection { @@ -35,7 +34,7 @@ impl Debug for QConnection { } #[allow(dead_code)] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] enum ShutdownError { Ok, // no error Transport((u64, u32)), // ec and status @@ -43,28 +42,25 @@ enum ShutdownError { Complete, } +#[derive(Debug, Clone, PartialEq)] +enum ConnStatus { + Ok, // connection success + Transport((u64, u32)), // app ec and status +} + #[allow(dead_code)] enum StreamPayload { Stream(QStream), Stop(ShutdownError), } -#[derive(Debug, PartialEq)] -enum State { - Idle, - Frontend, - Backend, -} - struct QConnectionCtx { _api: QApi, - strm_tx: Option>, - strm_rx: mpsc::Receiver, - conn_tx: Option>, - conn_rx: Option>, - shtdwn_tx: Option>, - is_shutdown: bool, - state: Mutex, + strm_ch: QQueue, + shtdwn_sig: QSignal, + //state: Mutex, + conn_ch: QResetChannel, // handle connect success or transport close + proceed_rx: Option>, // used for server wait conn } extern "C" fn qconnection_callback_handler( @@ -73,31 +69,30 @@ extern "C" fn qconnection_callback_handler( event: &ConnectionEvent, ) -> u32 { assert!(!context.is_null()); - let ctx = unsafe { (context as *mut QConnectionCtx).as_mut().unwrap() }; + let ctx_mtx = unsafe { (context as *mut Mutex).as_mut().unwrap() }; + #[allow(clippy::mut_mutex_lock)] + let mut ctx = ctx_mtx.lock().unwrap(); let status = 0; - let mut state = ctx.state.lock().unwrap(); - *state = State::Backend; match event.event_type { CONNECTION_EVENT_CONNECTED => { info!("[{:?}] CONNECTION_EVENT_CONNECTED", connection); // server xor client connected - if let Some(tx) = ctx.conn_tx.take() { - tx.send(ShutdownError::Ok).unwrap(); - } + ctx.on_connected(None); } + // Not defined + // CONNECTION_EVENT_CLOSED => { + // } CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT => { let raw = unsafe { event.payload.shutdown_initiated_by_transport }; info!( "[{:?}] CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT ec=0x{:x} status=0x{:x}", connection, raw.error_code, raw.status ); - let err = ShutdownError::Transport((raw.error_code, raw.status)); - if let Some(tx) = ctx.conn_tx.take() { - tx.send(err.clone()).unwrap(); - } - if let Some(tx) = ctx.strm_tx.take() { - tx.blocking_send(StreamPayload::Stop(err)).unwrap(); + if ctx.conn_ch.can_set() { + ctx.conn_ch + .set(ConnStatus::Transport((raw.error_code, raw.status))); } + ctx.strm_ch.close(raw.status); } // Peer application called connection shutdown. // Error code is application defined. @@ -107,33 +102,15 @@ extern "C" fn qconnection_callback_handler( "[{:?}] CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: ec {}", connection, raw.error_code, ); - let err = ShutdownError::Peer(raw.error_code); - if let Some(tx) = ctx.conn_tx.take() { - tx.send(err.clone()).unwrap(); - } - if let Some(tx) = ctx.strm_tx.take() { - tx.blocking_send(StreamPayload::Stop(err)).unwrap(); - } - ctx.is_shutdown = true; - if let Some(tx) = ctx.shtdwn_tx.take() { - tx.send(()).unwrap(); - } + ctx.strm_ch.close(0); } // only invoked after user calls connection shutdown. // This can happend without peer or transport. CONNECTION_EVENT_SHUTDOWN_COMPLETE => { info!("[{:?}] CONNECTION_EVENT_SHUTDOWN_COMPLETE", connection,); - let err = ShutdownError::Complete; - if let Some(tx) = ctx.conn_tx.take() { - tx.send(err.clone()).unwrap(); - } - // drop stream - if let Some(tx) = ctx.strm_tx.take() { - tx.blocking_send(StreamPayload::Stop(err)).unwrap(); - } - ctx.is_shutdown = true; - if let Some(tx) = ctx.shtdwn_tx.take() { - tx.send(()).unwrap(); + ctx.strm_ch.close(0); + if ctx.shtdwn_sig.can_set() { + ctx.shtdwn_sig.set(()); } } CONNECTION_EVENT_PEER_STREAM_STARTED => { @@ -145,9 +122,7 @@ extern "C" fn qconnection_callback_handler( connection, h ); let s = QStream::attach(ctx._api.clone(), h); - if let Some(tx) = ctx.strm_tx.borrow_mut() { - tx.blocking_send(StreamPayload::Stream(s)).unwrap(); - } + ctx.strm_ch.insert(s); } CONNECTION_EVENT_RESUMED => { info!("[{:?}] CONNECTION_EVENT_RESUMED", connection,); @@ -170,38 +145,65 @@ extern "C" fn qconnection_callback_handler( } impl QConnectionCtx { - fn new(api: &QApi, state: State) -> Self { - let (strm_tx, strm_rx) = mpsc::channel(2); - let (conn_tx, conn_rx) = oneshot::channel(); + fn new(api: &QApi) -> Self { Self { _api: api.clone(), - strm_tx: Some(strm_tx), - strm_rx, - conn_tx: Some(conn_tx), - conn_rx: Some(conn_rx), - shtdwn_tx: None, - is_shutdown: false, - state: Mutex::new(state), + strm_ch: QQueue::new(), + shtdwn_sig: QSignal::new(), + conn_ch: QResetChannel::new(), + proceed_rx: None, } } + + // prepare connected event + fn prepare_connect(&mut self) -> QReceiver { + self.conn_ch.reset() + } + + fn on_connected(&mut self, ec: Option<(u64, u32)>) { + let status = match ec { + Some((ec, status)) => ConnStatus::Transport((ec, status)), + None => ConnStatus::Ok, + }; + self.conn_ch.set(status) + } } impl QConnection { // this is for attaching accepted connections pub fn attach(api: QApi, h: Handle, config: &Configuration) -> Self { let c = Connection::from_parts(h, &api.inner.inner); - let context = Box::new(QConnectionCtx::new(&api, State::Idle)); - + let context = Box::new(Mutex::new(QConnectionCtx::new(&api))); c.set_callback_handler( qconnection_callback_handler, - (&*context) as *const QConnectionCtx as *const c_void, + (&*context) as *const Mutex as *const c_void, ); // set config c.set_configuration(config); + + { + // init the proceed channel + let mut ctx = context.lock().unwrap(); + let rx = ctx.prepare_connect(); + ctx.proceed_rx = Some(rx); + } Self::new(c, api, context) } - fn new(c: Connection, api: QApi, ctx: Box) -> Self { + // server proceed, wait for connect event + pub async fn proceed(&mut self) -> Result<(), Error> { + let rx; + { + rx = self.ctx.lock().unwrap().proceed_rx.take().unwrap(); + } + let status = rx.await; + match status { + ConnStatus::Ok => Ok(()), + ConnStatus::Transport((_, s)) => Err(Error::from_raw_os_error(s as i32)), + } + } + + fn new(c: Connection, api: QApi, ctx: Box>) -> Self { Self { _api: api, inner: SBox::new(c), @@ -212,50 +214,30 @@ impl QConnection { // open a client pub fn open(registration: &QRegistration) -> Self { let c = Connection::new(®istration.inner.inner); - let context = Box::new(QConnectionCtx::new(®istration.api, State::Idle)); + let context = Box::new(Mutex::new(QConnectionCtx::new(®istration.api))); c.open( ®istration.inner.inner, qconnection_callback_handler, - (&*context) as *const QConnectionCtx as *const c_void, + (&*context) as *const Mutex as *const c_void, ); //info!("[{:#?}] Connection front end open", c); Self::new(c, registration.api.clone(), context) } - // server wait for connect callback. - pub async fn connect(&mut self) -> Result<(), Error> { - { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - } - let payload = self.ctx.conn_rx.take().unwrap().await.unwrap(); - match payload { - ShutdownError::Ok => Ok(()), - ShutdownError::Transport((_, status)) => Err(Error::from_raw_os_error(status as i32)), - ShutdownError::Peer(_) => Err(Error::from(ErrorKind::ConnectionAborted)), - ShutdownError::Complete => Err(Error::from(ErrorKind::ConnectionAborted)), - } - } - pub fn send_resumption_ticket(&self, flags: SendResumptionFlags) { self.inner.inner.send_resumption_ticket(flags) } // accept stream pub async fn accept(&mut self) -> Option { - let fu; + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - fu = self.ctx.strm_rx.recv(); + rx = self.ctx.lock().unwrap().strm_ch.pop(); } - let payload = fu.await; - match payload { - Some(s) => match s { - StreamPayload::Stream(s) => Some(s), - StreamPayload::Stop(_) => None, - }, - None => None, // channel closed + let res = rx.await; + match res { + Ok(s) => Some(s), + Err(_) => None, } } @@ -266,40 +248,30 @@ impl QConnection { server_name: &str, server_port: u16, ) -> Result<(), Error> { + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; + rx = self.ctx.lock().unwrap().prepare_connect(); self.inner .inner .start(&configuration.inner.inner, server_name, server_port); } - self.ctx - .conn_rx - .take() - .unwrap() - .await - .map_or(Err(Error::from(ErrorKind::NotConnected)), Ok)?; - Ok(()) + let status = rx.await; + match status { + ConnStatus::Ok => Ok(()), + ConnStatus::Transport((_, s)) => Err(Error::from_raw_os_error(s as i32)), + } } pub async fn shutdown(&mut self) { - let (shdwn_tx, shdwn_rx) = oneshot::channel(); + let rx; { - let mut state = self.ctx.state.lock().unwrap(); - *state = State::Frontend; - if self.ctx.is_shutdown { - info!("conn ctx.is_shutdown already"); - return; - } else { - assert!(self.ctx.shtdwn_tx.is_none()); - self.ctx.shtdwn_tx.replace(shdwn_tx); - } + rx = self.ctx.lock().unwrap().shtdwn_sig.reset(); } info!("conn invoke shutdown"); // callback maybe sync self.inner.inner.shutdown(CONNECTION_SHUTDOWN_FLAG_NONE, 0); // ec info!("conn wait for shutdown evnet"); - shdwn_rx.await.unwrap(); + rx.await; info!("conn wait for shutdown evnet end"); } } diff --git a/crates/libs/msquic/src/lib.rs b/crates/libs/msquic/src/lib.rs index 1346197..8c30fee 100644 --- a/crates/libs/msquic/src/lib.rs +++ b/crates/libs/msquic/src/lib.rs @@ -127,7 +127,7 @@ mod tests { l = QListener::open(&q_req_copy, &q_config); info!("Start listener."); let alpn = QBufferVec::from(args.as_slice()); - l.start(alpn.as_buffers(), &local_address).await; + l.start(alpn.as_buffers(), &local_address); } let mut i = 0; loop { @@ -151,7 +151,7 @@ mod tests { let mut conn = conn.unwrap(); info!("server accepted conn id={}", conn_id); info!("server conn connect"); - conn.connect().await.unwrap(); + conn.proceed().await.unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; conn.send_resumption_ticket(SEND_RESUMPTION_FLAG_NONE); tokio::time::sleep(Duration::from_millis(1)).await; diff --git a/crates/libs/msquic/src/listener.rs b/crates/libs/msquic/src/listener.rs index 7af4e44..7fac3e5 100644 --- a/crates/libs/msquic/src/listener.rs +++ b/crates/libs/msquic/src/listener.rs @@ -98,7 +98,7 @@ impl QListener { } } - pub async fn start(&self, alpn: &[Buffer], local_address: &Addr) { + pub fn start(&self, alpn: &[Buffer], local_address: &Addr) { { let mut lk = self.ctx.lock().unwrap(); assert_eq!(lk.state, State::Idle);