Skip to content

Commit

Permalink
conn refactor start
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 10, 2024
1 parent 5d071c5 commit b751a4b
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 126 deletions.
1 change: 1 addition & 0 deletions crates/libs/msquic/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use c2::{Buffer, Configuration, CredentialConfig, Settings};

use crate::{reg::QRegistration, utils::SBox, QApi};

#[derive(Clone)]
pub struct QConfiguration {
pub _api: QApi,
pub inner: Arc<SBox<Configuration>>,
Expand Down
218 changes: 95 additions & 123 deletions crates/libs/msquic/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{config::QConfiguration, info, reg::QRegistration, stream::QStream};
use std::{
borrow::BorrowMut,
ffi::c_void,
fmt::Debug,
io::{Error, ErrorKind},
sync::Mutex,
use crate::{
config::QConfiguration,
info,
reg::QRegistration,
stream::QStream,
sync::{QQueue, QReceiver, QResetChannel, QSignal},
};
use std::{ffi::c_void, fmt::Debug, io::Error, sync::Mutex};

use c2::{
Configuration, Connection, ConnectionEvent, Handle, SendResumptionFlags,
Expand All @@ -14,14 +14,13 @@ use c2::{
CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER, CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT,
CONNECTION_SHUTDOWN_FLAG_NONE,
};
use tokio::sync::{mpsc, oneshot};

use crate::{utils::SBox, QApi};

pub struct QConnection {
pub _api: QApi,
pub inner: SBox<Connection>,
ctx: Box<QConnectionCtx>,
ctx: Box<Mutex<QConnectionCtx>>,
}

impl Debug for QConnection {
Expand All @@ -35,36 +34,33 @@ impl Debug for QConnection {
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
enum ShutdownError {
Ok, // no error
Transport((u64, u32)), // ec and status
Peer(u64), // ec
Complete,
}

#[derive(Debug, Clone, PartialEq)]
enum ConnStatus {
Ok, // connection success
Transport((u64, u32)), // app ec and status
}

#[allow(dead_code)]
enum StreamPayload {
Stream(QStream),
Stop(ShutdownError),
}

#[derive(Debug, PartialEq)]
enum State {
Idle,
Frontend,
Backend,
}

struct QConnectionCtx {
_api: QApi,
strm_tx: Option<mpsc::Sender<StreamPayload>>,
strm_rx: mpsc::Receiver<StreamPayload>,
conn_tx: Option<oneshot::Sender<ShutdownError>>,
conn_rx: Option<oneshot::Receiver<ShutdownError>>,
shtdwn_tx: Option<oneshot::Sender<()>>,
is_shutdown: bool,
state: Mutex<State>,
strm_ch: QQueue<QStream>,
shtdwn_sig: QSignal,
//state: Mutex<State>,
conn_ch: QResetChannel<ConnStatus>, // handle connect success or transport close
proceed_rx: Option<QReceiver<ConnStatus>>, // used for server wait conn
}

extern "C" fn qconnection_callback_handler(
Expand All @@ -73,31 +69,30 @@ extern "C" fn qconnection_callback_handler(
event: &ConnectionEvent,
) -> u32 {
assert!(!context.is_null());
let ctx = unsafe { (context as *mut QConnectionCtx).as_mut().unwrap() };
let ctx_mtx = unsafe { (context as *mut Mutex<QConnectionCtx>).as_mut().unwrap() };
#[allow(clippy::mut_mutex_lock)]
let mut ctx = ctx_mtx.lock().unwrap();
let status = 0;
let mut state = ctx.state.lock().unwrap();
*state = State::Backend;
match event.event_type {
CONNECTION_EVENT_CONNECTED => {
info!("[{:?}] CONNECTION_EVENT_CONNECTED", connection);
// server xor client connected
if let Some(tx) = ctx.conn_tx.take() {
tx.send(ShutdownError::Ok).unwrap();
}
ctx.on_connected(None);
}
// Not defined
// CONNECTION_EVENT_CLOSED => {
// }
CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT => {
let raw = unsafe { event.payload.shutdown_initiated_by_transport };
info!(
"[{:?}] CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT ec=0x{:x} status=0x{:x}",
connection, raw.error_code, raw.status
);
let err = ShutdownError::Transport((raw.error_code, raw.status));
if let Some(tx) = ctx.conn_tx.take() {
tx.send(err.clone()).unwrap();
}
if let Some(tx) = ctx.strm_tx.take() {
tx.blocking_send(StreamPayload::Stop(err)).unwrap();
if ctx.conn_ch.can_set() {
ctx.conn_ch
.set(ConnStatus::Transport((raw.error_code, raw.status)));
}
ctx.strm_ch.close(raw.status);
}
// Peer application called connection shutdown.
// Error code is application defined.
Expand All @@ -107,33 +102,15 @@ extern "C" fn qconnection_callback_handler(
"[{:?}] CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: ec {}",
connection, raw.error_code,
);
let err = ShutdownError::Peer(raw.error_code);
if let Some(tx) = ctx.conn_tx.take() {
tx.send(err.clone()).unwrap();
}
if let Some(tx) = ctx.strm_tx.take() {
tx.blocking_send(StreamPayload::Stop(err)).unwrap();
}
ctx.is_shutdown = true;
if let Some(tx) = ctx.shtdwn_tx.take() {
tx.send(()).unwrap();
}
ctx.strm_ch.close(0);
}
// only invoked after user calls connection shutdown.
// This can happend without peer or transport.
CONNECTION_EVENT_SHUTDOWN_COMPLETE => {
info!("[{:?}] CONNECTION_EVENT_SHUTDOWN_COMPLETE", connection,);
let err = ShutdownError::Complete;
if let Some(tx) = ctx.conn_tx.take() {
tx.send(err.clone()).unwrap();
}
// drop stream
if let Some(tx) = ctx.strm_tx.take() {
tx.blocking_send(StreamPayload::Stop(err)).unwrap();
}
ctx.is_shutdown = true;
if let Some(tx) = ctx.shtdwn_tx.take() {
tx.send(()).unwrap();
ctx.strm_ch.close(0);
if ctx.shtdwn_sig.can_set() {
ctx.shtdwn_sig.set(());
}
}
CONNECTION_EVENT_PEER_STREAM_STARTED => {
Expand All @@ -145,9 +122,7 @@ extern "C" fn qconnection_callback_handler(
connection, h
);
let s = QStream::attach(ctx._api.clone(), h);
if let Some(tx) = ctx.strm_tx.borrow_mut() {
tx.blocking_send(StreamPayload::Stream(s)).unwrap();
}
ctx.strm_ch.insert(s);
}
CONNECTION_EVENT_RESUMED => {
info!("[{:?}] CONNECTION_EVENT_RESUMED", connection,);
Expand All @@ -170,38 +145,65 @@ extern "C" fn qconnection_callback_handler(
}

impl QConnectionCtx {
fn new(api: &QApi, state: State) -> Self {
let (strm_tx, strm_rx) = mpsc::channel(2);
let (conn_tx, conn_rx) = oneshot::channel();
fn new(api: &QApi) -> Self {
Self {
_api: api.clone(),
strm_tx: Some(strm_tx),
strm_rx,
conn_tx: Some(conn_tx),
conn_rx: Some(conn_rx),
shtdwn_tx: None,
is_shutdown: false,
state: Mutex::new(state),
strm_ch: QQueue::new(),
shtdwn_sig: QSignal::new(),
conn_ch: QResetChannel::new(),
proceed_rx: None,
}
}

// prepare connected event
fn prepare_connect(&mut self) -> QReceiver<ConnStatus> {
self.conn_ch.reset()
}

fn on_connected(&mut self, ec: Option<(u64, u32)>) {
let status = match ec {
Some((ec, status)) => ConnStatus::Transport((ec, status)),
None => ConnStatus::Ok,
};
self.conn_ch.set(status)
}
}

impl QConnection {
// this is for attaching accepted connections
pub fn attach(api: QApi, h: Handle, config: &Configuration) -> Self {
let c = Connection::from_parts(h, &api.inner.inner);
let context = Box::new(QConnectionCtx::new(&api, State::Idle));

let context = Box::new(Mutex::new(QConnectionCtx::new(&api)));
c.set_callback_handler(
qconnection_callback_handler,
(&*context) as *const QConnectionCtx as *const c_void,
(&*context) as *const Mutex<QConnectionCtx> as *const c_void,
);
// set config
c.set_configuration(config);

{
// init the proceed channel
let mut ctx = context.lock().unwrap();
let rx = ctx.prepare_connect();
ctx.proceed_rx = Some(rx);
}
Self::new(c, api, context)
}

fn new(c: Connection, api: QApi, ctx: Box<QConnectionCtx>) -> Self {
// server proceed, wait for connect event
pub async fn proceed(&mut self) -> Result<(), Error> {
let rx;
{
rx = self.ctx.lock().unwrap().proceed_rx.take().unwrap();
}
let status = rx.await;
match status {
ConnStatus::Ok => Ok(()),
ConnStatus::Transport((_, s)) => Err(Error::from_raw_os_error(s as i32)),
}
}

fn new(c: Connection, api: QApi, ctx: Box<Mutex<QConnectionCtx>>) -> Self {
Self {
_api: api,
inner: SBox::new(c),
Expand All @@ -212,50 +214,30 @@ impl QConnection {
// open a client
pub fn open(registration: &QRegistration) -> Self {
let c = Connection::new(&registration.inner.inner);
let context = Box::new(QConnectionCtx::new(&registration.api, State::Idle));
let context = Box::new(Mutex::new(QConnectionCtx::new(&registration.api)));
c.open(
&registration.inner.inner,
qconnection_callback_handler,
(&*context) as *const QConnectionCtx as *const c_void,
(&*context) as *const Mutex<QConnectionCtx> as *const c_void,
);
//info!("[{:#?}] Connection front end open", c);
Self::new(c, registration.api.clone(), context)
}

// server wait for connect callback.
pub async fn connect(&mut self) -> Result<(), Error> {
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
}
let payload = self.ctx.conn_rx.take().unwrap().await.unwrap();
match payload {
ShutdownError::Ok => Ok(()),
ShutdownError::Transport((_, status)) => Err(Error::from_raw_os_error(status as i32)),
ShutdownError::Peer(_) => Err(Error::from(ErrorKind::ConnectionAborted)),
ShutdownError::Complete => Err(Error::from(ErrorKind::ConnectionAborted)),
}
}

pub fn send_resumption_ticket(&self, flags: SendResumptionFlags) {
self.inner.inner.send_resumption_ticket(flags)
}

// accept stream
pub async fn accept(&mut self) -> Option<QStream> {
let fu;
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
fu = self.ctx.strm_rx.recv();
rx = self.ctx.lock().unwrap().strm_ch.pop();
}
let payload = fu.await;
match payload {
Some(s) => match s {
StreamPayload::Stream(s) => Some(s),
StreamPayload::Stop(_) => None,
},
None => None, // channel closed
let res = rx.await;
match res {
Ok(s) => Some(s),
Err(_) => None,
}
}

Expand All @@ -266,40 +248,30 @@ impl QConnection {
server_name: &str,
server_port: u16,
) -> Result<(), Error> {
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
rx = self.ctx.lock().unwrap().prepare_connect();
self.inner
.inner
.start(&configuration.inner.inner, server_name, server_port);
}
self.ctx
.conn_rx
.take()
.unwrap()
.await
.map_or(Err(Error::from(ErrorKind::NotConnected)), Ok)?;
Ok(())
let status = rx.await;
match status {
ConnStatus::Ok => Ok(()),
ConnStatus::Transport((_, s)) => Err(Error::from_raw_os_error(s as i32)),
}
}

pub async fn shutdown(&mut self) {
let (shdwn_tx, shdwn_rx) = oneshot::channel();
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
*state = State::Frontend;
if self.ctx.is_shutdown {
info!("conn ctx.is_shutdown already");
return;
} else {
assert!(self.ctx.shtdwn_tx.is_none());
self.ctx.shtdwn_tx.replace(shdwn_tx);
}
rx = self.ctx.lock().unwrap().shtdwn_sig.reset();
}
info!("conn invoke shutdown");
// callback maybe sync
self.inner.inner.shutdown(CONNECTION_SHUTDOWN_FLAG_NONE, 0); // ec
info!("conn wait for shutdown evnet");
shdwn_rx.await.unwrap();
rx.await;
info!("conn wait for shutdown evnet end");
}
}
4 changes: 2 additions & 2 deletions crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ mod tests {
l = QListener::open(&q_req_copy, &q_config);
info!("Start listener.");
let alpn = QBufferVec::from(args.as_slice());
l.start(alpn.as_buffers(), &local_address).await;
l.start(alpn.as_buffers(), &local_address);
}
let mut i = 0;
loop {
Expand All @@ -151,7 +151,7 @@ mod tests {
let mut conn = conn.unwrap();
info!("server accepted conn id={}", conn_id);
info!("server conn connect");
conn.connect().await.unwrap();
conn.proceed().await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
conn.send_resumption_ticket(SEND_RESUMPTION_FLAG_NONE);
tokio::time::sleep(Duration::from_millis(1)).await;
Expand Down
Loading

0 comments on commit b751a4b

Please sign in to comment.