Skip to content

Commit

Permalink
migrate to use bytes crate
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 11, 2024
1 parent f1a2bcc commit 791f294
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/libs/msquic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = "2021"
# log = "0.4"
tokio = {version = "1", features = ["sync"]}
tracing = { version = "0.1.40", features = ["log"] }
bytes = "*"

[dev-dependencies]
# env_logger = "0.10.1"
Expand Down
94 changes: 93 additions & 1 deletion crates/libs/msquic/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::slice;

use bytes::{Buf, BufMut, BytesMut};
use c2::Buffer;

use crate::SBox;
Expand Down Expand Up @@ -141,11 +144,76 @@ impl From<&SBuffer> for Buffer {
}
}

pub struct QBufWrap<B: Buf> {
_inner: Box<B>, // mem owner
v: Vec<Buffer>,
}

unsafe impl<B: Buf> Send for QBufWrap<B> {}

impl<B: Buf> QBufWrap<B> {
pub fn new(buf: B) -> Self {
// make on heap so that no ptr move.
let mut inner = Box::new(buf);
let v = Self::convert_buf(&mut inner);
Self { _inner: inner, v }
}

fn convert_buf(b: &mut impl Buf) -> Vec<Buffer> {
let mut v = Vec::new();
// change buf to vecs
while b.has_remaining() {
let ck = b.chunk();
v.push(Buffer {
length: ck.len() as u32,
buffer: ck.as_ptr() as *mut u8,
});
b.advance(ck.len());
}
v
// let bb =
// v.iter().map(|s|{ Buffer{ length: s.len() as u32, buffer: s.as_ptr() as *mut u8 } }).collect::<Vec<_>>();
// bb
}

pub fn as_buffs(&self) -> &[Buffer] {
&self.v
}
}

pub struct QBytesMut(pub BytesMut);

impl QBytesMut {
pub fn from_buffs(b: &[Buffer]) -> Self {
let mut res = BytesMut::new();
b.iter().for_each(|i| {
let s = unsafe { slice::from_raw_parts(i.buffer, i.length.try_into().unwrap()) };
res.put_slice(s);
});
Self(res)
}
}

pub fn debug_buf_to_string(mut b: impl Buf) -> String {
let cp = b.copy_to_bytes(b.remaining());
String::from_utf8_lossy(&cp).into_owned()
}

#[cfg(test)]
mod test {
use core::slice;

use bytes::{BufMut, Bytes, BytesMut};
use c2::Buffer;

use super::{QBuffRef, QBufferVec, QVecBuffer};
use super::{QBufWrap, QBuffRef, QBufferVec, QVecBuffer};

fn buf_to_string(b: Buffer) -> String {
let s = String::from_utf8_lossy(unsafe {
slice::from_raw_parts(b.buffer, b.length.try_into().unwrap())
});
s.into_owned()
}

#[test]
fn test_vec_buffer() {
Expand All @@ -167,4 +235,28 @@ mod test {
let arg1 = QVecBuffer::from(&QBuffRef::from(b1));
assert_eq!(args[0], arg1);
}

#[test]
fn test_buf() {
let b = Bytes::from("mydata");
let wrap = QBufWrap::new(b);
let v = wrap.as_buffs();
assert_eq!(v.len(), 1);
let b1 = v[0];
let s = buf_to_string(b1);
assert_eq!(s, "mydata");
}

#[test]
fn test_buf2() {
let mut b = BytesMut::with_capacity(5);
b.put(&b"hello"[..]);
b.put(&b"world"[..]); // this will grow
let wrap = QBufWrap::new(b);
let v = wrap.as_buffs();
assert_eq!(v.len(), 1);
let b1 = v[0];
let s = buf_to_string(b1);
assert_eq!(s, "helloworld");
}
}
33 changes: 14 additions & 19 deletions crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod tests {

use std::{process::Command, thread, time::Duration};

use bytes::Bytes;
use c2::{
Addr, CertificateHash, CertificateUnion, CredentialConfig, RegistrationConfig, Settings,
ADDRESS_FAMILY_UNSPEC, CREDENTIAL_FLAG_CLIENT, CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION,
Expand All @@ -46,7 +47,7 @@ mod tests {
use tokio::sync::oneshot;

use crate::{
buffer::{QBufferVec, QVecBuffer},
buffer::{debug_buf_to_string, QBufferVec, QVecBuffer},
config::QConfiguration,
conn::QConnection,
listener::QListener,
Expand Down Expand Up @@ -165,14 +166,14 @@ mod tests {
rth.spawn(async move {
info!("server accepted stream");
let mut s = s.unwrap();
let mut buff = [0_u8; 99];
info!("server stream receive");
let read = s.receive(buff.as_mut_slice()).await.unwrap();
info!("server received len {}", read);
assert_eq!(read as usize, "world".len());
let args: [QVecBuffer; 1] = [QVecBuffer::from("hello world")];
let read = s.receive().await.unwrap();
let payload = debug_buf_to_string(read);
info!("server received len {}", payload.len());
assert_eq!(payload, "hello");
let args = Bytes::from("hello world");
info!("server stream send");
s.send(args.as_slice(), SEND_FLAG_FIN).await.unwrap();
s.send(args, SEND_FLAG_FIN).await.unwrap();
info!("server stream drain");
s.drain().await;
info!("server stream end");
Expand All @@ -182,7 +183,6 @@ mod tests {
conn.shutdown().await;
info!("server conn shutdown end");
});
//break; // only accept for 1 request
}
info!("server listener stop");
l.stop().await;
Expand Down Expand Up @@ -217,31 +217,26 @@ mod tests {
info!("client conn start");
conn.start(&client_config, "localhost", 4567).await.unwrap();

// thread::sleep(Duration::from_secs(5));
//
info!("client stream open");
let mut st = QStream::open(&conn, STREAM_OPEN_FLAG_NONE);
info!("client stream start");
st.start(STREAM_START_FLAG_NONE).await.unwrap();
let args: [QVecBuffer; 1] = [QVecBuffer::from("hello")];
let args = Bytes::from("hello");
info!("client stream send");
st.send(args.as_slice(), SEND_FLAG_FIN).await.unwrap();
st.send(args, SEND_FLAG_FIN).await.unwrap();

tokio::time::sleep(Duration::from_millis(1)).await;
let mut buff = [0_u8; 99];
info!("client stream receive");
let read = st.receive(buff.as_mut_slice()).await.unwrap();
info!("client stream receive read :{}", read);
let read = st.receive().await.unwrap();
let payload = debug_buf_to_string(read);
info!("client stream receive read :{}", payload.len());
assert_eq!(payload, "hello world");
info!("client stream drain");
st.drain().await;
info!("client conn shutdown");
conn.shutdown().await;
// shutdown server
tokio::time::sleep(Duration::from_millis(10)).await;
sht_tx.send(()).unwrap();
});

thread::sleep(Duration::from_secs(5));
th.join().unwrap();
}
}
44 changes: 9 additions & 35 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::{
};

use crate::{
buffer::{QBuffRef, QBufferVec, QOwnedBuffer, QVecBuffer},
buffer::{QBufWrap, QBytesMut},
conn::QConnection,
info,
sync::{QQueue, QResetChannel, QSignal},
utils::SBox,
QApi,
};
use bytes::Buf;
use c2::{
Buffer, Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags,
STREAM_EVENT_PEER_RECEIVE_ABORTED, STREAM_EVENT_PEER_SEND_ABORTED,
Expand Down Expand Up @@ -41,7 +42,7 @@ enum StartPayload {

struct QStreamCtx {
start_sig: QResetChannel<StartPayload>,
receive_ch: QQueue<Vec<QVecBuffer>>,
receive_ch: QQueue<QBytesMut>,
send_ch: QResetChannel<SentPayload>,
send_shtdwn_sig: QSignal,
drain_sig: QSignal,
Expand Down Expand Up @@ -77,10 +78,7 @@ impl QStreamCtx {
}
fn on_receive(&mut self, buffs: &[Buffer]) {
// send to frontend
let v = buffs
.iter()
.map(|b| QVecBuffer::from(&QBuffRef::from(b)))
.collect::<Vec<_>>();
let v = QBytesMut::from_buffs(buffs);
self.receive_ch.insert(v);
}
fn on_peer_send_shutdown(&mut self) {
Expand Down Expand Up @@ -221,7 +219,7 @@ impl QStream {

// receive into this buff
// return num of bytes wrote.
pub async fn receive(&mut self, buff: &mut [u8]) -> Result<u32, Error> {
pub async fn receive(&mut self) -> Result<impl Buf, Error> {
let rx;
{
rx = self.ctx.lock().unwrap().receive_ch.pop();
Expand All @@ -230,44 +228,20 @@ impl QStream {
let v = rx
.await
.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?;
let copied = QStream::copy_vec(&v, buff);
// resume
// self.receive_complete(copied as u64);
Ok(copied)
}

fn copy_vec(src: &Vec<QVecBuffer>, buff: &mut [u8]) -> u32 {
// chain all buff into a single iter
let mut it = buff.iter_mut();
let mut copied = 0_u32;
for b in src {
let buf_ref = b.as_buff_ref();
let mut src_iter = buf_ref.data.iter();
loop {
let dst = it.next();
let src = src_iter.next();
if dst.is_none() || src.is_none() {
break;
}
*dst.unwrap() = *src.unwrap();
copied += 1;
}
}
copied
Ok(v.0)
}

// fn receive_complete(&self, len: u64) {
// // TODO: handle error
// let _ = self.inner.inner.receive_complete(len);
// }

pub async fn send(&mut self, buffers: &[QVecBuffer], flags: SendFlags) -> Result<(), Error> {
pub async fn send(&mut self, buffers: impl Buf, flags: SendFlags) -> Result<(), Error> {
let b = QBufWrap::new(buffers);
let rx;
{
let bb = b.as_buffs();
rx = self.ctx.lock().unwrap().send_ch.reset();

let b = QBufferVec::from(buffers);
let bb = b.as_buffers();
self.inner
.inner
.send(&bb[0], bb.len() as u32, flags, std::ptr::null());
Expand Down

0 comments on commit 791f294

Please sign in to comment.