Skip to content

Commit

Permalink
use buf for read
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 11, 2024
1 parent 8cf313a commit 1d38985
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 47 deletions.
22 changes: 21 additions & 1 deletion crates/libs/msquic/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bytes::Buf;
use std::slice;

use bytes::{Buf, BufMut, BytesMut};
use c2::Buffer;

use crate::SBox;
Expand Down Expand Up @@ -179,6 +181,24 @@ impl<B: Buf> QBufWrap<B> {
}
}

pub struct QBytesMut(pub BytesMut);

impl QBytesMut {
pub fn from_buffs(b: &[Buffer]) -> Self {
let mut res = BytesMut::new();
b.iter().for_each(|i| {
let s = unsafe { slice::from_raw_parts(i.buffer, i.length.try_into().unwrap()) };
res.put_slice(s);
});
Self(res)
}
}

pub fn debug_buf_to_string(mut b: impl Buf) -> String {
let cp = b.copy_to_bytes(b.remaining());
String::from_utf8_lossy(&cp).into_owned()
}

#[cfg(test)]
mod test {
use core::slice;
Expand Down
24 changes: 9 additions & 15 deletions crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod tests {
use tokio::sync::oneshot;

use crate::{
buffer::{QBufferVec, QVecBuffer},
buffer::{debug_buf_to_string, QBufferVec, QVecBuffer},
config::QConfiguration,
conn::QConnection,
listener::QListener,
Expand Down Expand Up @@ -166,11 +166,11 @@ mod tests {
rth.spawn(async move {
info!("server accepted stream");
let mut s = s.unwrap();
let mut buff = [0_u8; 99];
info!("server stream receive");
let read = s.receive(buff.as_mut_slice()).await.unwrap();
info!("server received len {}", read);
assert_eq!(read as usize, "world".len());
let read = s.receive().await.unwrap();
let payload = debug_buf_to_string(read);
info!("server received len {}", payload.len());
assert_eq!(payload, "hello");
let args = Bytes::from("hello world");
info!("server stream send");
s.send(args, SEND_FLAG_FIN).await.unwrap();
Expand All @@ -183,7 +183,6 @@ mod tests {
conn.shutdown().await;
info!("server conn shutdown end");
});
//break; // only accept for 1 request
}
info!("server listener stop");
l.stop().await;
Expand Down Expand Up @@ -218,8 +217,6 @@ mod tests {
info!("client conn start");
conn.start(&client_config, "localhost", 4567).await.unwrap();

// thread::sleep(Duration::from_secs(5));
//
info!("client stream open");
let mut st = QStream::open(&conn, STREAM_OPEN_FLAG_NONE);
info!("client stream start");
Expand All @@ -228,21 +225,18 @@ mod tests {
info!("client stream send");
st.send(args, SEND_FLAG_FIN).await.unwrap();

//tokio::time::sleep(Duration::from_millis(1)).await;
let mut buff = [0_u8; 99];
info!("client stream receive");
let read = st.receive(buff.as_mut_slice()).await.unwrap();
info!("client stream receive read :{}", read);
let read = st.receive().await.unwrap();
let payload = debug_buf_to_string(read);
info!("client stream receive read :{}", payload.len());
assert_eq!(payload, "hello world");
info!("client stream drain");
st.drain().await;
info!("client conn shutdown");
conn.shutdown().await;
// shutdown server
//tokio::time::sleep(Duration::from_millis(10)).await;
sht_tx.send(()).unwrap();
});

//thread::sleep(Duration::from_secs(5));
th.join().unwrap();
}
}
36 changes: 5 additions & 31 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use crate::{
buffer::{QBufWrap, QBuffRef, QOwnedBuffer, QVecBuffer},
buffer::{QBufWrap, QBytesMut},
conn::QConnection,
info,
sync::{QQueue, QResetChannel, QSignal},
Expand Down Expand Up @@ -42,7 +42,7 @@ enum StartPayload {

struct QStreamCtx {
start_sig: QResetChannel<StartPayload>,
receive_ch: QQueue<Vec<QVecBuffer>>,
receive_ch: QQueue<QBytesMut>,
send_ch: QResetChannel<SentPayload>,
send_shtdwn_sig: QSignal,
drain_sig: QSignal,
Expand Down Expand Up @@ -78,10 +78,7 @@ impl QStreamCtx {
}
fn on_receive(&mut self, buffs: &[Buffer]) {
// send to frontend
let v = buffs
.iter()
.map(|b| QVecBuffer::from(&QBuffRef::from(b)))
.collect::<Vec<_>>();
let v = QBytesMut::from_buffs(buffs);
self.receive_ch.insert(v);
}
fn on_peer_send_shutdown(&mut self) {
Expand Down Expand Up @@ -222,7 +219,7 @@ impl QStream {

// receive into this buff
// return num of bytes wrote.
pub async fn receive(&mut self, buff: &mut [u8]) -> Result<u32, Error> {
pub async fn receive(&mut self) -> Result<impl Buf, Error> {
let rx;
{
rx = self.ctx.lock().unwrap().receive_ch.pop();
Expand All @@ -231,30 +228,7 @@ impl QStream {
let v = rx
.await
.map_err(|e: u32| Error::from_raw_os_error(e.try_into().unwrap()))?;
let copied = QStream::copy_vec(&v, buff);
// resume
// self.receive_complete(copied as u64);
Ok(copied)
}

fn copy_vec(src: &Vec<QVecBuffer>, buff: &mut [u8]) -> u32 {
// chain all buff into a single iter
let mut it = buff.iter_mut();
let mut copied = 0_u32;
for b in src {
let buf_ref = b.as_buff_ref();
let mut src_iter = buf_ref.data.iter();
loop {
let dst = it.next();
let src = src_iter.next();
if dst.is_none() || src.is_none() {
break;
}
*dst.unwrap() = *src.unwrap();
copied += 1;
}
}
copied
Ok(v.0)
}

// fn receive_complete(&self, len: u64) {
Expand Down

0 comments on commit 1d38985

Please sign in to comment.