From 791f29453ac727d8653e824912bb8148d2c3f595 Mon Sep 17 00:00:00 2001 From: Youyuan Wu Date: Sat, 11 May 2024 15:06:45 -0700 Subject: [PATCH] migrate to use bytes crate --- Cargo.lock | 1 + crates/libs/msquic/Cargo.toml | 1 + crates/libs/msquic/src/buffer.rs | 94 +++++++++++++++++++++++++++++++- crates/libs/msquic/src/lib.rs | 33 +++++------ crates/libs/msquic/src/stream.rs | 44 +++------------ 5 files changed, 118 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c67f62..3365a74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,7 @@ dependencies = [ name = "msquic" version = "0.1.0" dependencies = [ + "bytes", "c2", "hex", "tokio", diff --git a/crates/libs/msquic/Cargo.toml b/crates/libs/msquic/Cargo.toml index 4695840..f08d21d 100644 --- a/crates/libs/msquic/Cargo.toml +++ b/crates/libs/msquic/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" # log = "0.4" tokio = {version = "1", features = ["sync"]} tracing = { version = "0.1.40", features = ["log"] } +bytes = "*" [dev-dependencies] # env_logger = "0.10.1" diff --git a/crates/libs/msquic/src/buffer.rs b/crates/libs/msquic/src/buffer.rs index c135b43..3fcac4d 100644 --- a/crates/libs/msquic/src/buffer.rs +++ b/crates/libs/msquic/src/buffer.rs @@ -1,3 +1,6 @@ +use std::slice; + +use bytes::{Buf, BufMut, BytesMut}; use c2::Buffer; use crate::SBox; @@ -141,11 +144,76 @@ impl From<&SBuffer> for Buffer { } } +pub struct QBufWrap { + _inner: Box, // mem owner + v: Vec, +} + +unsafe impl Send for QBufWrap {} + +impl QBufWrap { + pub fn new(buf: B) -> Self { + // make on heap so that no ptr move. + let mut inner = Box::new(buf); + let v = Self::convert_buf(&mut inner); + Self { _inner: inner, v } + } + + fn convert_buf(b: &mut impl Buf) -> Vec { + let mut v = Vec::new(); + // change buf to vecs + while b.has_remaining() { + let ck = b.chunk(); + v.push(Buffer { + length: ck.len() as u32, + buffer: ck.as_ptr() as *mut u8, + }); + b.advance(ck.len()); + } + v + // let bb = + // v.iter().map(|s|{ Buffer{ length: s.len() as u32, buffer: s.as_ptr() as *mut u8 } }).collect::>(); + // bb + } + + pub fn as_buffs(&self) -> &[Buffer] { + &self.v + } +} + +pub struct QBytesMut(pub BytesMut); + +impl QBytesMut { + pub fn from_buffs(b: &[Buffer]) -> Self { + let mut res = BytesMut::new(); + b.iter().for_each(|i| { + let s = unsafe { slice::from_raw_parts(i.buffer, i.length.try_into().unwrap()) }; + res.put_slice(s); + }); + Self(res) + } +} + +pub fn debug_buf_to_string(mut b: impl Buf) -> String { + let cp = b.copy_to_bytes(b.remaining()); + String::from_utf8_lossy(&cp).into_owned() +} + #[cfg(test)] mod test { + use core::slice; + + use bytes::{BufMut, Bytes, BytesMut}; use c2::Buffer; - use super::{QBuffRef, QBufferVec, QVecBuffer}; + use super::{QBufWrap, QBuffRef, QBufferVec, QVecBuffer}; + + fn buf_to_string(b: Buffer) -> String { + let s = String::from_utf8_lossy(unsafe { + slice::from_raw_parts(b.buffer, b.length.try_into().unwrap()) + }); + s.into_owned() + } #[test] fn test_vec_buffer() { @@ -167,4 +235,28 @@ mod test { let arg1 = QVecBuffer::from(&QBuffRef::from(b1)); assert_eq!(args[0], arg1); } + + #[test] + fn test_buf() { + let b = Bytes::from("mydata"); + let wrap = QBufWrap::new(b); + let v = wrap.as_buffs(); + assert_eq!(v.len(), 1); + let b1 = v[0]; + let s = buf_to_string(b1); + assert_eq!(s, "mydata"); + } + + #[test] + fn test_buf2() { + let mut b = BytesMut::with_capacity(5); + b.put(&b"hello"[..]); + b.put(&b"world"[..]); // this will grow + let wrap = QBufWrap::new(b); + let v = wrap.as_buffs(); + assert_eq!(v.len(), 1); + let b1 = v[0]; + let s = buf_to_string(b1); + assert_eq!(s, "helloworld"); + } } diff --git a/crates/libs/msquic/src/lib.rs b/crates/libs/msquic/src/lib.rs index 8c30fee..1c3feaa 100644 --- a/crates/libs/msquic/src/lib.rs +++ b/crates/libs/msquic/src/lib.rs @@ -37,6 +37,7 @@ mod tests { use std::{process::Command, thread, time::Duration}; + use bytes::Bytes; use c2::{ Addr, CertificateHash, CertificateUnion, CredentialConfig, RegistrationConfig, Settings, ADDRESS_FAMILY_UNSPEC, CREDENTIAL_FLAG_CLIENT, CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION, @@ -46,7 +47,7 @@ mod tests { use tokio::sync::oneshot; use crate::{ - buffer::{QBufferVec, QVecBuffer}, + buffer::{debug_buf_to_string, QBufferVec, QVecBuffer}, config::QConfiguration, conn::QConnection, listener::QListener, @@ -165,14 +166,14 @@ mod tests { rth.spawn(async move { info!("server accepted stream"); let mut s = s.unwrap(); - let mut buff = [0_u8; 99]; info!("server stream receive"); - let read = s.receive(buff.as_mut_slice()).await.unwrap(); - info!("server received len {}", read); - assert_eq!(read as usize, "world".len()); - let args: [QVecBuffer; 1] = [QVecBuffer::from("hello world")]; + let read = s.receive().await.unwrap(); + let payload = debug_buf_to_string(read); + info!("server received len {}", payload.len()); + assert_eq!(payload, "hello"); + let args = Bytes::from("hello world"); info!("server stream send"); - s.send(args.as_slice(), SEND_FLAG_FIN).await.unwrap(); + s.send(args, SEND_FLAG_FIN).await.unwrap(); info!("server stream drain"); s.drain().await; info!("server stream end"); @@ -182,7 +183,6 @@ mod tests { conn.shutdown().await; info!("server conn shutdown end"); }); - //break; // only accept for 1 request } info!("server listener stop"); l.stop().await; @@ -217,31 +217,26 @@ mod tests { info!("client conn start"); conn.start(&client_config, "localhost", 4567).await.unwrap(); - // thread::sleep(Duration::from_secs(5)); - // info!("client stream open"); let mut st = QStream::open(&conn, STREAM_OPEN_FLAG_NONE); info!("client stream start"); st.start(STREAM_START_FLAG_NONE).await.unwrap(); - let args: [QVecBuffer; 1] = [QVecBuffer::from("hello")]; + let args = Bytes::from("hello"); info!("client stream send"); - st.send(args.as_slice(), SEND_FLAG_FIN).await.unwrap(); + st.send(args, SEND_FLAG_FIN).await.unwrap(); - tokio::time::sleep(Duration::from_millis(1)).await; - let mut buff = [0_u8; 99]; info!("client stream receive"); - let read = st.receive(buff.as_mut_slice()).await.unwrap(); - info!("client stream receive read :{}", read); + let read = st.receive().await.unwrap(); + let payload = debug_buf_to_string(read); + info!("client stream receive read :{}", payload.len()); + assert_eq!(payload, "hello world"); info!("client stream drain"); st.drain().await; info!("client conn shutdown"); conn.shutdown().await; // shutdown server - tokio::time::sleep(Duration::from_millis(10)).await; sht_tx.send(()).unwrap(); }); - - thread::sleep(Duration::from_secs(5)); th.join().unwrap(); } } diff --git a/crates/libs/msquic/src/stream.rs b/crates/libs/msquic/src/stream.rs index a97fc18..fdfd342 100644 --- a/crates/libs/msquic/src/stream.rs +++ b/crates/libs/msquic/src/stream.rs @@ -6,13 +6,14 @@ use std::{ }; use crate::{ - buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer}, + buffer::{QBufWrap, QBytesMut}, conn::QConnection, info, sync::{QQueue, QResetChannel, QSignal}, utils::SBox, QApi, }; +use bytes::Buf; use c2::{ Buffer, Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags, STREAM_EVENT_PEER_RECEIVE_ABORTED, STREAM_EVENT_PEER_SEND_ABORTED, @@ -41,7 +42,7 @@ enum StartPayload { struct QStreamCtx { start_sig: QResetChannel, - receive_ch: QQueue>, + receive_ch: QQueue, send_ch: QResetChannel, send_shtdwn_sig: QSignal, drain_sig: QSignal, @@ -77,10 +78,7 @@ impl QStreamCtx { } fn on_receive(&mut self, buffs: &[Buffer]) { // send to frontend - let v = buffs - .iter() - .map(|b| QVecBuffer::from(&QBuffRef::from(b))) - .collect::>(); + let v = QBytesMut::from_buffs(buffs); self.receive_ch.insert(v); } fn on_peer_send_shutdown(&mut self) { @@ -221,7 +219,7 @@ impl QStream { // receive into this buff // return num of bytes wrote. - pub async fn receive(&mut self, buff: &mut [u8]) -> Result { + pub async fn receive(&mut self) -> Result { let rx; { rx = self.ctx.lock().unwrap().receive_ch.pop(); @@ -230,30 +228,7 @@ impl QStream { let v = rx .await .map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?; - let copied = QStream::copy_vec(&v, buff); - // resume - // self.receive_complete(copied as u64); - Ok(copied) - } - - fn copy_vec(src: &Vec, buff: &mut [u8]) -> u32 { - // chain all buff into a single iter - let mut it = buff.iter_mut(); - let mut copied = 0_u32; - for b in src { - let buf_ref = b.as_buff_ref(); - let mut src_iter = buf_ref.data.iter(); - loop { - let dst = it.next(); - let src = src_iter.next(); - if dst.is_none() || src.is_none() { - break; - } - *dst.unwrap() = *src.unwrap(); - copied += 1; - } - } - copied + Ok(v.0) } // fn receive_complete(&self, len: u64) { @@ -261,13 +236,12 @@ impl QStream { // let _ = self.inner.inner.receive_complete(len); // } - pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> { + pub async fn send(&mut self, buffers: impl Buf, flags: SendFlags) -> Result<(), Error> { + let b = QBufWrap::new(buffers); let rx; { + let bb = b.as_buffs(); rx = self.ctx.lock().unwrap().send_ch.reset(); - - let b = QBufferVec::from(buffers); - let bb = b.as_buffers(); self.inner .inner .send(&bb[0], bb.len() as u32, flags, std::ptr::null());