diff --git a/Cargo.toml b/Cargo.toml index 1307377..02d1523 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,5 @@ lru = { version = "0.12" } [dev-dependencies] tokio = { version = "1", features = ["full"] } test-log = { version = "0.2" } +clap = { version = "4.4", features = ["derive", "env", "color"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } \ No newline at end of file diff --git a/examples/simple.rs b/examples/simple.rs new file mode 100644 index 0000000..62b16ce --- /dev/null +++ b/examples/simple.rs @@ -0,0 +1,69 @@ +use std::{net::SocketAddr, time::Duration}; + +use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig}; +use clap::Parser; +use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert"); +pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key"); + +/// A Relayer node which can connect to each-other to build a high-available relay system +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// UDP/TCP port for serving QUIC/TCP connection for SDN network + #[arg(env, long, default_value = "127.0.0.1:11111")] + sdn_listener: SocketAddr, + + /// Seeds + #[arg(env, long, value_delimiter = ',')] + sdn_seeds: Vec, + + /// Allow it broadcast address to other peers + /// This allows other peer can active connect to this node + /// This option is useful with high performance relay node + #[arg(env, long)] + sdn_advertise_address: Option, +} + +#[tokio::main] +async fn main() { + rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); + + if std::env::var_os("RUST_LOG").is_none() { + std::env::set_var("RUST_LOG", "info"); + } + if std::env::var_os("RUST_BACKTRACE").is_none() { + std::env::set_var("RUST_BACKTRACE", "1"); + } + let args: Args = Args::parse(); + tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); + + let key = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec()); + let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec()); + + let mut p2p = P2pNetwork::new(P2pNetworkConfig { + addr: args.sdn_listener, + advertise: args.sdn_advertise_address, + priv_key: key, + cert: cert, + tick_ms: 100, + }) + .await + .expect("should create network"); + + let requester = p2p.requester(); + tokio::spawn(async move { + loop { + for seed in &args.sdn_seeds { + requester.try_connect((*seed).into()); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + loop { + p2p.recv().await.expect("should ok"); + } +} diff --git a/src/stream.rs b/src/stream.rs index dcec941..3bfc548 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -7,10 +7,8 @@ use std::{ use anyhow::anyhow; use serde::{de::DeserializeOwned, Serialize}; -use tokio_util::{ - bytes::{Buf, BufMut}, - codec::{Decoder, Encoder}, -}; +use tokio_util::codec::LengthDelimitedCodec; +use tokio_util::codec::{Decoder, Encoder}; use quinn::{RecvStream, SendStream}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -57,49 +55,54 @@ impl AsyncWrite for P2pQuicStream { } pub struct BincodeCodec { + length_decode: LengthDelimitedCodec, _tmp: PhantomData, } impl Default for BincodeCodec { fn default() -> Self { - Self { _tmp: Default::default() } + Self { + length_decode: LengthDelimitedCodec::default(), + _tmp: Default::default(), + } } } impl Encoder for BincodeCodec { - type Error = bincode::Error; + type Error = std::io::Error; fn encode(&mut self, item: Item, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> { - let res = bincode::serialize_into(dst.writer(), &item); - res + let data: Vec = bincode::serialize(&item).map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "bincode serialize failure"))?; + self.length_decode.encode(data.into(), dst) } } impl Decoder for BincodeCodec { - type Error = bincode::Error; + type Error = std::io::Error; type Item = Item; fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result, Self::Error> { - if src.is_empty() { - return Result::<_, Self::Error>::Ok(Option::::None); + match self.length_decode.decode(src)? { + Some(buf) => Ok(Some( + bincode::deserialize(&buf).map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "bincode deserialize failure"))?, + )), + None => Ok(None), } - let res = bincode::deserialize_from::<_, Item>(src.reader()).map(|o| Some(o)); - res } } pub async fn wait_object(reader: &mut R) -> anyhow::Result { let mut len_buf = [0; 2]; let mut data_buf = [0; MAX_SIZE]; - reader.read_exact(&mut len_buf).await?; + reader.read_exact(&mut len_buf).await.unwrap(); let handshake_len = u16::from_be_bytes([len_buf[0], len_buf[1]]) as usize; if handshake_len > data_buf.len() { return Err(anyhow!("packet to big {} vs {MAX_SIZE}", data_buf.len())); } - reader.read_exact(&mut data_buf[0..handshake_len]).await?; + reader.read_exact(&mut data_buf[0..handshake_len]).await.unwrap(); - Ok(bincode::deserialize(&data_buf[0..handshake_len])?) + Ok(bincode::deserialize(&data_buf[0..handshake_len]).unwrap()) } pub async fn write_object(writer: &mut W, object: &O) -> anyhow::Result<()> { @@ -109,7 +112,7 @@ pub async fn write_object