diff --git a/Cargo.lock b/Cargo.lock index 32351e5..520ccfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,9 +181,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h3" -version = "0.0.4" +version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c8886b9e6e93e7ed93d9433f3779e8d07e3ff96bc67b977d14c7b20c849411" +checksum = "d5069de1c2ac82d9e361b07f2b8a2c582ec071750e063530fc7f3b5197e24805" dependencies = [ "bytes", "fastrand", diff --git a/crates/libs/msquic/Cargo.toml b/crates/libs/msquic/Cargo.toml index ef8408d..643b2fb 100644 --- a/crates/libs/msquic/Cargo.toml +++ b/crates/libs/msquic/Cargo.toml @@ -8,13 +8,11 @@ edition = "2021" [dependencies] "c2" = {path = "../c2"} "hex" = "0.4" -# log = "0.4" tokio = {version = "1", features = ["sync"]} tracing = { version = "0.1.40", features = ["log"] } bytes = "*" -h3 = "*" +h3 = "0.0.5" [dev-dependencies] -# env_logger = "0.10.1" tokio = {version = "1", features = ["full"]} tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt"]} \ No newline at end of file diff --git a/crates/libs/msquic/src/buffer.rs b/crates/libs/msquic/src/buffer.rs index 2b25d9f..d7c8ffd 100644 --- a/crates/libs/msquic/src/buffer.rs +++ b/crates/libs/msquic/src/buffer.rs @@ -144,35 +144,40 @@ impl From<&SBuffer> for Buffer { } } +// Currently we copy the buf into vec for ease of code pub struct QBufWrap { - _inner: Box, // mem owner + _inner: Vec>, // mem owner v: Vec, } unsafe impl Send for QBufWrap {} impl QBufWrap { - pub fn new(mut buf: Box) -> Self { + pub fn new(buf: impl Buf) -> Self { // make on heap so that no ptr move. - let v = Self::convert_buf(&mut buf); - Self { _inner: buf, v } + let (vcs, vbs) = Self::convert_buf(buf); + Self { + _inner: vcs, + v: vbs, + } } - fn convert_buf(b: &mut Box) -> Vec { - let mut v = Vec::new(); + fn convert_buf(mut b: impl Buf) -> (Vec>, Vec) { + let mut vcs = Vec::new(); + let mut vbs = Vec::new(); // change buf to vecs while b.has_remaining() { + // copy let ck = b.chunk(); - v.push(Buffer { - length: ck.len() as u32, - buffer: ck.as_ptr() as *mut u8, + let vc = Vec::from(ck); + vbs.push(Buffer { + length: vc.len() as u32, + buffer: vc.as_ptr() as *mut u8, }); + vcs.push(vc); b.advance(ck.len()); } - v - // let bb = - // v.iter().map(|s|{ Buffer{ length: s.len() as u32, buffer: s.as_ptr() as *mut u8 } }).collect::>(); - // bb + (vcs, vbs) } pub fn as_buffs(&self) -> &[Buffer] { diff --git a/crates/libs/msquic/src/lib.rs b/crates/libs/msquic/src/lib.rs index dedf422..9f85279 100644 --- a/crates/libs/msquic/src/lib.rs +++ b/crates/libs/msquic/src/lib.rs @@ -15,7 +15,7 @@ pub mod stream; pub mod sync; mod utils; -//pub mod msh3; +pub mod msh3; // Some useful defs pub const QUIC_STATUS_PENDING: u32 = 0x703e5; diff --git a/crates/libs/msquic/src/msh3/mod.rs b/crates/libs/msquic/src/msh3/mod.rs index 674a39f..58d3ad5 100644 --- a/crates/libs/msquic/src/msh3/mod.rs +++ b/crates/libs/msquic/src/msh3/mod.rs @@ -1,12 +1,6 @@ // h3 wrappings for msquic -use std::{ - fmt::Display, - future::Future, - pin::{self, pin}, - sync::Arc, - task::Poll, -}; +use std::fmt::Display; use bytes::{Buf, BytesMut}; use c2::SEND_FLAG_NONE; @@ -20,9 +14,12 @@ pub struct H3Error { error_code: Option, } -impl H3Error{ - pub fn new(status: std::io::Error, ec: Option) -> Self{ - Self { status, error_code: ec } +impl H3Error { + pub fn new(status: std::io::Error, ec: Option) -> Self { + Self { + status, + error_code: ec, + } } } @@ -39,13 +36,13 @@ impl h3::quic::Error for H3Error { impl std::error::Error for H3Error {} impl Display for H3Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { todo!() } } pub struct H3Conn { - inner: QConnection, + _inner: QConnection, } impl OpenStreams for H3Conn { @@ -59,19 +56,19 @@ impl OpenStreams for H3Conn { fn poll_open_bidi( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } fn poll_open_send( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } - fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + fn close(&mut self, _code: h3::error::Code, _reason: &[u8]) { todo!() } } @@ -89,28 +86,28 @@ impl Connection for H3Conn { fn poll_accept_recv( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll, Self::Error>> { todo!() } fn poll_accept_bidi( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll, Self::Error>> { todo!() } fn poll_open_bidi( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } fn poll_open_send( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } @@ -119,7 +116,7 @@ impl Connection for H3Conn { todo!() } - fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + fn close(&mut self, _code: h3::error::Code, _reason: &[u8]) { todo!() } } @@ -127,12 +124,16 @@ impl Connection for H3Conn { pub struct H3Stream { inner: QStream, id: h3::quic::StreamId, - //read: + shutdown: bool, } impl H3Stream { fn new(s: QStream, id: StreamId) -> Self { - Self { inner: s, id } + Self { + inner: s, + id, + shutdown: false, + } } } @@ -141,12 +142,11 @@ impl SendStream for H3Stream { fn poll_ready( &mut self, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - // always ready to send? - // convert this to open or start? - // if send is in progress? - Poll::Ready(Ok(())) + self.inner + .poll_ready_send(cx) + .map_err(|e| H3Error::new(e, None)) } fn send_data>>(&mut self, data: T) -> Result<(), Self::Error> { @@ -155,11 +155,19 @@ impl SendStream for H3Stream { Ok(()) } + // send shutdown signal to peer. fn poll_finish( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.inner.poll_send(cx).map_err(|e|{H3Error::new(e, None)}) + // close the stream + if !self.shutdown { + self.inner.shutdown_only(); + self.shutdown = true; + } + self.inner + .poll_shutdown(cx) + .map_err(|e| H3Error::new(e, None)) } fn reset(&mut self, _reset_code: u64) { @@ -178,7 +186,7 @@ impl RecvStream for H3Stream { fn poll_data( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll, Self::Error>> { // let fu = self.inner.receive(); // let innner = as Future>::poll(Pin::new(&mut self.rx), _cx); @@ -204,7 +212,7 @@ impl BidiStream for H3Stream { fn split(self) -> (Self::SendStream, Self::RecvStream) { let cp = self.inner.clone(); - let id = self.id.clone(); + let id = self.id; (self, H3Stream::new(cp, id)) } } diff --git a/crates/libs/msquic/src/stream.rs b/crates/libs/msquic/src/stream.rs index 6a3f7d0..d5a3629 100644 --- a/crates/libs/msquic/src/stream.rs +++ b/crates/libs/msquic/src/stream.rs @@ -8,7 +8,7 @@ use std::{ }; use crate::{ - buffer::{debug_buf_to_string, debug_raw_buf_to_string, QBufWrap, QBytesMut}, + buffer::{QBufWrap, QBytesMut}, conn::QConnection, info, sync::{QSignal, QWakableQueue, QWakableSig}, @@ -46,7 +46,7 @@ struct QStreamCtx { start_sig: QWakableQueue, receive_ch: QWakableQueue, send_sig: QWakableSig, - send_shtdwn_sig: QSignal, + send_shtdwn_sig: QWakableSig<()>, drain_sig: QSignal, is_drained: bool, pending_buf: Option, // because msquic copies buffers in background we need to hold the buffer temporarily @@ -58,7 +58,7 @@ impl QStreamCtx { start_sig: QWakableQueue::default(), receive_ch: QWakableQueue::default(), send_sig: QWakableSig::default(), - send_shtdwn_sig: QSignal::new(), + send_shtdwn_sig: QWakableSig::default(), drain_sig: QSignal::new(), is_drained: false, pending_buf: None, @@ -81,15 +81,15 @@ impl QStreamCtx { fn on_receive(&mut self, buffs: &[Buffer]) { // send to frontend let v = QBytesMut::from_buffs(buffs); - let s = debug_buf_to_string(v.0.clone()); - let original = debug_raw_buf_to_string(buffs[0]); - info!( - "debug: receive bytes: {} len:{}, original {}, len: {}", - s, - s.len(), - original, - original.len() - ); + // let s = debug_buf_to_string(v.0.clone()); + // let original = debug_raw_buf_to_string(buffs[0]); + // info!( + // "debug: receive bytes: {} len:{}, original {}, len: {}", + // s, + // s.len(), + // original, + // original.len() + // ); self.receive_ch.insert(v); } fn on_peer_send_shutdown(&mut self) { @@ -102,9 +102,7 @@ impl QStreamCtx { self.receive_ch.close(); } fn on_send_shutdown_complete(&mut self) { - if self.send_shtdwn_sig.can_set() { - self.send_shtdwn_sig.set(()); - } + self.send_shtdwn_sig.set(()); } fn on_shutdown_complete(&mut self) { // close all channels @@ -269,7 +267,7 @@ impl QStream { // // TODO: handle error // let _ = self.inner.inner.receive_complete(len); // } - pub fn send_only(&mut self, buffers: impl Buf + 'static, flags: SendFlags) { + pub fn send_only(&mut self, buffers: impl Buf, flags: SendFlags) { let mut lk = self.ctx.lock().unwrap(); lk.send_sig.set_frontend_pending(); let b = QBufWrap::new(Box::new(buffers)); @@ -306,11 +304,7 @@ impl QStream { } } - pub async fn send( - &mut self, - buffers: impl Buf + 'static, - flags: SendFlags, - ) -> Result<(), Error> { + pub async fn send(&mut self, buffers: impl Buf, flags: SendFlags) -> Result<(), Error> { self.send_only(buffers, flags); let fu = poll_fn(|cx| self.poll_send(cx)); fu.await @@ -326,15 +320,27 @@ impl QStream { Self::poll_send_inner(&mut lk, cx) } + pub fn shutdown_only(&mut self) { + let mut lk = self.ctx.lock().unwrap(); + lk.send_shtdwn_sig.reset(); + self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0); + } + + pub fn poll_shutdown(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + let mut lk = self.ctx.lock().unwrap(); + let p = lk.send_shtdwn_sig.poll(cx); + match p { + Poll::Ready(_) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } + // send shutdown signal to peer. // do not call this if already indicated shutdown during send. - pub async fn shutdown(&mut self) { - let rx; - { - rx = self.ctx.lock().unwrap().send_shtdwn_sig.reset(); - self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0); - } - rx.await; + pub async fn shutdown(&mut self) -> Result<(), Error> { + self.shutdown_only(); + let fu = poll_fn(|cx| self.poll_shutdown(cx)); + fu.await } // this is for h3 where the interface does not wait