Skip to content

Commit

Permalink
fixed: quic connection closed with error failed to fill whole buffer (#4
Browse files Browse the repository at this point in the history
)
  • Loading branch information
giangndm authored Oct 8, 2024
1 parent 9d5bfe9 commit b15b120
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ lru = { version = "0.12" }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
test-log = { version = "0.2" }
clap = { version = "4.4", features = ["derive", "env", "color"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
69 changes: 69 additions & 0 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{net::SocketAddr, time::Duration};

use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig};
use clap::Parser;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert");
pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key");

/// A Relayer node which can connect to each-other to build a high-available relay system
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// UDP/TCP port for serving QUIC/TCP connection for SDN network
#[arg(env, long, default_value = "127.0.0.1:11111")]
sdn_listener: SocketAddr,

/// Seeds
#[arg(env, long, value_delimiter = ',')]
sdn_seeds: Vec<SocketAddr>,

/// Allow it broadcast address to other peers
/// This allows other peer can active connect to this node
/// This option is useful with high performance relay node
#[arg(env, long)]
sdn_advertise_address: Option<SocketAddr>,
}

#[tokio::main]
async fn main() {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "info");
}
if std::env::var_os("RUST_BACKTRACE").is_none() {
std::env::set_var("RUST_BACKTRACE", "1");
}
let args: Args = Args::parse();
tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init();

let key = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec());
let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec());

let mut p2p = P2pNetwork::new(P2pNetworkConfig {
addr: args.sdn_listener,
advertise: args.sdn_advertise_address,
priv_key: key,
cert: cert,
tick_ms: 100,
})
.await
.expect("should create network");

let requester = p2p.requester();
tokio::spawn(async move {
loop {
for seed in &args.sdn_seeds {
requester.try_connect((*seed).into());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

loop {
p2p.recv().await.expect("should ok");
}
}
39 changes: 21 additions & 18 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use std::{

use anyhow::anyhow;
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::{
bytes::{Buf, BufMut},
codec::{Decoder, Encoder},
};
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::codec::{Decoder, Encoder};

use quinn::{RecvStream, SendStream};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -57,49 +55,54 @@ impl AsyncWrite for P2pQuicStream {
}

pub struct BincodeCodec<Item> {
length_decode: LengthDelimitedCodec,
_tmp: PhantomData<Item>,
}

impl<Item> Default for BincodeCodec<Item> {
fn default() -> Self {
Self { _tmp: Default::default() }
Self {
length_decode: LengthDelimitedCodec::default(),
_tmp: Default::default(),
}
}
}

impl<Item: Serialize> Encoder<Item> for BincodeCodec<Item> {
type Error = bincode::Error;
type Error = std::io::Error;

fn encode(&mut self, item: Item, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> {
let res = bincode::serialize_into(dst.writer(), &item);
res
let data: Vec<u8> = bincode::serialize(&item).map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "bincode serialize failure"))?;
self.length_decode.encode(data.into(), dst)
}
}

impl<Item: DeserializeOwned + Debug> Decoder for BincodeCodec<Item> {
type Error = bincode::Error;
type Error = std::io::Error;
type Item = Item;

fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Result::<_, Self::Error>::Ok(Option::<Self::Item>::None);
match self.length_decode.decode(src)? {
Some(buf) => Ok(Some(
bincode::deserialize(&buf).map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "bincode deserialize failure"))?,
)),
None => Ok(None),
}
let res = bincode::deserialize_from::<_, Item>(src.reader()).map(|o| Some(o));
res
}
}

pub async fn wait_object<R: AsyncRead + Unpin, O: DeserializeOwned, const MAX_SIZE: usize>(reader: &mut R) -> anyhow::Result<O> {
let mut len_buf = [0; 2];
let mut data_buf = [0; MAX_SIZE];
reader.read_exact(&mut len_buf).await?;
reader.read_exact(&mut len_buf).await.unwrap();
let handshake_len = u16::from_be_bytes([len_buf[0], len_buf[1]]) as usize;
if handshake_len > data_buf.len() {
return Err(anyhow!("packet to big {} vs {MAX_SIZE}", data_buf.len()));
}

reader.read_exact(&mut data_buf[0..handshake_len]).await?;
reader.read_exact(&mut data_buf[0..handshake_len]).await.unwrap();

Ok(bincode::deserialize(&data_buf[0..handshake_len])?)
Ok(bincode::deserialize(&data_buf[0..handshake_len]).unwrap())
}

pub async fn write_object<W: AsyncWrite + Send + Unpin, O: Serialize, const MAX_SIZE: usize>(writer: &mut W, object: &O) -> anyhow::Result<()> {
Expand All @@ -109,7 +112,7 @@ pub async fn write_object<W: AsyncWrite + Send + Unpin, O: Serialize, const MAX_
}
let len_buf = (data_buf.len() as u16).to_be_bytes();

writer.write_all(&len_buf).await?;
writer.write_all(&data_buf).await?;
writer.write_all(&len_buf).await.unwrap();
writer.write_all(&data_buf).await.unwrap();
Ok(())
}

0 comments on commit b15b120

Please sign in to comment.