Skip to content

Commit

Permalink
feat: pubsub (#12)
Browse files Browse the repository at this point in the history
* pubsub local

* pubsub remote

* added heatbeat

* feat: rename and pubsub with rpc

* feat: serialize and deserialize

* export more pubsub type

* added pub and sub requester

* add guest user

* added missing guest feedback

* added feedback and publish with object

* added benchmark
  • Loading branch information
giangndm authored Nov 5, 2024
1 parent 0b2d2c3 commit 6b78bd0
Show file tree
Hide file tree
Showing 13 changed files with 1,841 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bincode = { version = "1.3" }
blake3 = { version = "1.3" }
tokio-util = { version = "0.7", features = ["codec"] }
lru = { version = "0.12" }
thiserror = { version = "1.0" }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
Expand Down
66 changes: 62 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,74 @@ A lightweight peer-to-peer (P2P) network utilizing the atm0s routing mechanism,
- **Asynchronous Communication**: Built with async programming for high performance and scalability.
- **QUIC Protocol**: Utilizes the QUIC protocol for secure and fast data transmission.
- **atm0s Routing**: Implements the atm0s routing mechanism for efficient peer discovery and message routing.

## Architecture

TODO
The architecture of the P2P network is designed to facilitate efficient communication between peers. Key components include:

- **Peer Discovery**: The `PeerDiscovery` module manages the discovery of peers in the network, allowing nodes to find and connect to each other.
- **Routing**: The `RouterTable` manages the routing of messages between peers, ensuring that data is sent through the most efficient paths.
- **Secure Communication**: The `SharedKeyHandshake` protocol ensures that connections between peers are secure, using cryptographic techniques to verify identities and protect data.

## Getting Started

### Usage

TODO
To get started with the P2P network, you need to set up a node. Here’s a basic example of how to create a node:

```rust
let _ = rustls::crypto::ring::default_provider().install_default();

let priv_key: PrivatePkcs8KeyDer<'_> = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec());
let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec());

let peer_id = PeerId::from("127.0.0.1:10000".parse().unwrap());
let network = P2pNetwork::new(P2pNetworkConfig {
peer_id,
listen_addr: addr,
advertise: advertise.then(|| addr.into()),
priv_key,
cert,
tick_ms: 100,
seeds,
secure: DEFAULT_SECURE_KEY.into(),
}).await;
```

### Create a service

```rust
let service = network.create_service(1.into());
```

We can handle event from service

```rust
while let Some(event) = service.recv() {
match event {
P2pServiceEvent::Unicast(from_peer, data) => {
// handle data from other node here
},
P2pServiceEvent::Broadcast(from_peer, data) => {
// handle broadcast data from other node here
},
P2pServiceEvent::Stream(from_peer, meta, stream) => {
// stream is AsyncRead + AsyncWrite, we can tunnel it to other by bicopydirection ...
},
}
}
```

To send messages, you can use the `send_unicast` method for direct communication or `send_broadcast` for broadcasting messages to all connected peers:

```rust
service.send_unicast(dest_peer_id, data).await.expect("should send ok");
```

## Testing

###
The project includes a suite of tests to ensure the functionality of the P2P network. You can run the tests using:

TODO
```bash
cargo test
```
85 changes: 85 additions & 0 deletions examples/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::{
net::UdpSocket,
time::{Duration, Instant},
};

use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};

use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig, PeerAddress, PeerId, SharedKeyHandshake};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert");
pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key");
pub const DEFAULT_SECURE_KEY: &str = "atm0s";

async fn create_node(advertise: bool, peer_id: u64, seeds: Vec<PeerAddress>) -> (P2pNetwork<SharedKeyHandshake>, PeerAddress) {
let _ = rustls::crypto::ring::default_provider().install_default();

let priv_key: PrivatePkcs8KeyDer<'_> = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec());
let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec());

let addr = {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should bind");
socket.local_addr().expect("should get local")
};
let peer_id = PeerId::from(peer_id);
(
P2pNetwork::new(P2pNetworkConfig {
peer_id,
listen_addr: addr,
advertise: advertise.then(|| addr.into()),
priv_key,
cert,
tick_ms: 100,
seeds,
secure: DEFAULT_SECURE_KEY.into(),
})
.await
.expect("should create network"),
(peer_id, addr.into()).into(),
)
}

#[tokio::main]
async fn main() {
let (mut node1, addr1) = create_node(false, 1, vec![]).await;
let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await;

let service1 = node1.create_service(0.into());
let mut service2 = node2.create_service(0.into());

tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

tokio::time::sleep(Duration::from_secs(2)).await;

tokio::spawn(async move {
let mut stream = service1.open_stream(addr2.peer_id(), vec![]).await.expect("should open stream");
let data = [0; 65000];
loop {
let _ = stream.write_all(&data).await;
}
});

while let Some(event) = service2.recv().await {
match event {
atm0s_small_p2p::P2pServiceEvent::Unicast(..) => {}
atm0s_small_p2p::P2pServiceEvent::Broadcast(..) => {}
atm0s_small_p2p::P2pServiceEvent::Stream(.., mut p2p_quic_stream) => {
tokio::spawn(async move {
let mut buf = [0; 65000];
let mut recv_count = 0;
let mut recv_at = Instant::now();
while let Ok(size) = p2p_quic_stream.read(&mut buf).await {
recv_count += size;
if recv_at.elapsed() > Duration::from_secs(1) {
println!("Speed {} kbps", recv_count * 8 / recv_at.elapsed().as_millis() as usize);
recv_at = Instant::now();
recv_count = 0;
}
}
});
}
}
}
}
7 changes: 2 additions & 5 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,8 @@ async fn run_connection<SECURE: HandshakeProtocol>(
ctx.register_conn(conn_id, alias);
internal_tx.send(InternalEvent::PeerConnected(conn_id, to_id, rtt_ms)).await.expect("should send to main");
log::info!("[PeerConnection {conn_id}] run loop for {remote}");
loop {
if let Err(e) = internal.recv_complex().await {
log::error!("[PeerConnection {conn_id}] {remote} error {e}");
break;
}
if let Err(e) = internal.run_loop().await {
log::error!("[PeerConnection {conn_id}] {remote} error {e}");
}
internal_tx.send(InternalEvent::PeerDisconnected(conn_id, to_id)).await.expect("should send to main");
log::info!("[PeerConnection {conn_id}] end loop for {remote}");
Expand Down
40 changes: 20 additions & 20 deletions src/peer/peer_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,26 @@ impl PeerConnectionInternal {
}
}

pub async fn recv_complex(&mut self) -> anyhow::Result<()> {
select! {
_ = self.ticker.tick() => {
let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16;
self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms);
Ok(())
},
open = self.connection.accept_bi() => {
let (send, recv) = open?;
self.on_accept_bi(send, recv).await?;
Ok(())
},
frame = self.framed.next() => {
let msg = frame.ok_or(anyhow!("peer main stream ended"))??;
self.on_msg(msg).await
},
control = self.control_rx.recv() => {
let control = control.ok_or(anyhow!("peer control channel ended"))?;
self.on_control(control).await
},
pub async fn run_loop(&mut self) -> anyhow::Result<()> {
loop {
select! {
_ = self.ticker.tick() => {
let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16;
self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms);
},
open = self.connection.accept_bi() => {
let (send, recv) = open?;
self.on_accept_bi(send, recv).await?;
},
frame = self.framed.next() => {
let msg = frame.ok_or(anyhow!("peer main stream ended"))??;
self.on_msg(msg).await?;
},
control = self.control_rx.recv() => {
let control = control.ok_or(anyhow!("peer control channel ended"))?;
self.on_control(control).await?;
},
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerId};

pub mod alias_service;
pub mod pubsub_service;
pub mod visualization_service;

const SERVICE_CHANNEL_SIZE: usize = 10;
Expand Down
40 changes: 19 additions & 21 deletions src/service/alias_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,30 +165,28 @@ impl AliasService {
AliasServiceRequester { tx: self.tx.clone() }
}

pub async fn recv(&mut self) -> anyhow::Result<()> {
select! {
_ = self.interval.tick() => {
self.on_tick().await;
Ok(())
},
event = self.service.recv() => match event.expect("service channel should work") {
P2pServiceEvent::Unicast(from, data) => {
if let Ok(msg) = bincode::deserialize::<AliasMessage>(&data) {
self.on_msg(from, msg).await;
pub async fn run_loop(&mut self) -> anyhow::Result<()> {
loop {
select! {
_ = self.interval.tick() => {
self.on_tick().await;
},
event = self.service.recv() => match event.expect("service channel should work") {
P2pServiceEvent::Unicast(from, data) => {
if let Ok(msg) = bincode::deserialize::<AliasMessage>(&data) {
self.on_msg(from, msg).await;
}
}
Ok(())
}
P2pServiceEvent::Broadcast(from, data) => {
if let Ok(msg) = bincode::deserialize::<AliasMessage>(&data) {
self.on_msg(from, msg).await;
P2pServiceEvent::Broadcast(from, data) => {
if let Ok(msg) = bincode::deserialize::<AliasMessage>(&data) {
self.on_msg(from, msg).await;
}
}
Ok(())
P2pServiceEvent::Stream(..) => {},
},
control = self.rx.recv() => {
self.on_control(control.expect("service channel should work")).await;
}
P2pServiceEvent::Stream(..) => Ok(()),
},
control = self.rx.recv() => {
self.on_control(control.expect("service channel should work")).await;
Ok(())
}
}
}
Expand Down
Loading

0 comments on commit 6b78bd0

Please sign in to comment.