Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Network Peer actor #231

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
node-version: 20

- name: Cache node modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: "**/node_modules"
key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
override: true

- name: Cache node modules
uses: actions/cache@v2
uses: actions/cache@v4
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
with:
path: "**/node_modules"
key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }}
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
override: true

- name: Cache node modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: "**/node_modules"
key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
node-version: 20

- name: Cache node modules
uses: actions/cache@v2
uses: actions/cache@v4
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
with:
path: "**/node_modules"
key: ${{ runner.os }}-modules-${{ hashFiles('**/yarn.lock') }}
Expand Down
1 change: 0 additions & 1 deletion deploy/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ peers:
- "/dns4/cn1/udp/9091/quic-v1"
- "/dns4/cn2/udp/9092/quic-v1"
- "/dns4/cn3/udp/9093/quic-v1"
- "/dns4/aggregator/udp/9094/quic-v1"
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
chains:
- name: "sepolia"
rpc_url: "${RPC_URL}"
Expand Down
7 changes: 3 additions & 4 deletions deploy/cn1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ address: "${ADDRESS}"
quic_port: ${QUIC_PORT}
enable_mdns: false
peers:
- "/dns4/cn1/udp/9091/quic-v1"
- "/dns4/cn1/udp/9092/quic-v1"
- "/dns4/cn1/udp/9093/quic-v1"
- "/dns4/cn1/udp/9094/quic-v1"
- "/dns4/cn2/udp/9092/quic-v1"
- "/dns4/cn3/udp/9093/quic-v1"
- "/dns4/aggregator/udp/9094/quic-v1"
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
chains:
- name: "sepolia"
rpc_url: "${RPC_URL}"
Expand Down
1 change: 0 additions & 1 deletion deploy/cn2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ quic_port: ${QUIC_PORT}
enable_mdns: false
peers:
- "/dns4/cn1/udp/9091/quic-v1"
- "/dns4/cn2/udp/9092/quic-v1"
- "/dns4/cn3/udp/9093/quic-v1"
- "/dns4/aggregator/udp/9094/quic-v1"
chains:
Expand Down
1 change: 0 additions & 1 deletion deploy/cn3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ enable_mdns: false
peers:
- "/dns4/cn1/udp/9091/quic-v1"
- "/dns4/cn2/udp/9092/quic-v1"
- "/dns4/cn3/udp/9093/quic-v1"
- "/dns4/aggregator/udp/9094/quic-v1"
chains:
- name: "sepolia"
Expand Down
7 changes: 3 additions & 4 deletions packages/ciphernode/enclave_core/src/aggregator_start.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::helpers::datastore::setup_datastore;
use actix::{Actor, Addr};
use aggregator::ext::{PlaintextAggregatorExtension, PublicKeyAggregatorExtension};
use anyhow::Result;
Expand All @@ -22,8 +23,6 @@ use std::sync::{Arc, Mutex};
use test_helpers::{PlaintextWriter, PublicKeyWriter};
use tokio::task::JoinHandle;

use crate::helpers::datastore::setup_datastore;

pub async fn execute(
config: AppConfig,
pubkey_write_path: Option<&str>,
Expand Down Expand Up @@ -83,7 +82,7 @@ pub async fn execute(
.build()
.await?;

let (_, join_handle, peer_id) = NetworkManager::setup_with_peer(
let (_, handle, peer_id) = NetworkManager::setup_with_peer(
bus.clone(),
config.peers(),
&cipher,
Expand All @@ -103,5 +102,5 @@ pub async fn execute(

SimpleLogger::<EnclaveEvent>::attach("AGG", bus.clone());

Ok((bus, join_handle, peer_id))
Ok((bus, handle, peer_id))
}
7 changes: 3 additions & 4 deletions packages/ciphernode/enclave_core/src/start.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::helpers::datastore::setup_datastore;
use actix::{Actor, Addr};
use alloy::primitives::Address;
use anyhow::Result;
Expand All @@ -23,8 +24,6 @@ use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::helpers::datastore::setup_datastore;

#[instrument(name = "app", skip_all)]
pub async fn execute(
config: AppConfig,
Expand Down Expand Up @@ -80,7 +79,7 @@ pub async fn execute(
.build()
.await?;

let (_, join_handle, peer_id) = NetworkManager::setup_with_peer(
let (_, handle, peer_id) = NetworkManager::setup_with_peer(
bus.clone(),
config.peers(),
&cipher,
Expand All @@ -93,5 +92,5 @@ pub async fn execute(
let nm = format!("CIPHER({})", &address.to_string()[0..5]);
SimpleLogger::<EnclaveEvent>::attach(&nm, bus.clone());

Ok((bus, join_handle, peer_id))
Ok((bus, handle, peer_id))
}
7 changes: 0 additions & 7 deletions packages/ciphernode/events/src/enclave_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,3 @@ fn extract_enclave_event_name(s: &str) -> &str {
}
s
}

impl EnclaveEvent {
pub fn event_type(&self) -> String {
let s = format!("{:?}", self);
extract_enclave_event_name(&s).to_string()
}
}
4 changes: 2 additions & 2 deletions packages/ciphernode/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ repository = "https://github.com/gnosisguild/enclave/packages/ciphernode"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { workspace = true }
actix = { workspace = true }
async-std = { workspace = true, features = ["attributes"] }
async-trait = { workspace = true }
futures = { workspace = true }
Expand All @@ -29,6 +31,4 @@ tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
events = { workspace = true }
anyhow = { workspace = true }
actix = { workspace = true }
zeroize = { workspace = true }
75 changes: 46 additions & 29 deletions packages/ciphernode/net/src/bin/p2p_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use actix::prelude::*;
use anyhow::Result;
use events::{EventBus, EventBusConfig, GetHistory};
use libp2p::gossipsub;
use net::correlation_id::CorrelationId;
use net::events::{NetworkPeerCommand, NetworkPeerEvent};
use net::DialerActor;
use net::NetworkPeer;
use std::time::Duration;
use std::{collections::HashSet, env, process};
use tokio::sync::mpsc;
use tokio::time::{sleep, timeout};
use tracing_subscriber::{prelude::*, EnvFilter};

Expand All @@ -14,7 +19,7 @@ use tracing_subscriber::{prelude::*, EnvFilter};
// We have a docker test harness that runs the nodes and blocks things like mdns ports to ensure
// that basic discovery is working

#[tokio::main]
#[actix::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
Expand All @@ -40,26 +45,36 @@ async fn main() -> Result<()> {
let peers: Vec<String> = dial_to.iter().cloned().collect();

let id = libp2p::identity::Keypair::generate_ed25519();
let mut peer = NetworkPeer::new(&id, peers, udp_port, "test-topic", enable_mdns)?;

// Extract input and outputs
let tx = peer.tx();
let mut rx = peer.rx();

let router_task = tokio::spawn({
let name = name.clone();
async move {
println!("{} starting router task", name);
if let Err(e) = peer.start().await {
println!("{} router task failed: {}", name, e);
}
println!("{} router task finished", name);
let (tx, rx) = mpsc::channel(100);

let net_bus = EventBus::<NetworkPeerEvent>::new(EventBusConfig {
capture_history: true,
deduplicate: false,
})
.start();

let mut peer = NetworkPeer::new(&id, enable_mdns, net_bus.clone(), rx)?;
let topic_id = gossipsub::IdentTopic::new(topic);
peer.subscribe(&topic_id)?;
peer.listen_on(udp_port.unwrap_or(0))?;

let name_clone = name.clone();
let swarm_handle = tokio::spawn(async move {
println!("{} starting swarm", name_clone);
if let Err(e) = peer.start().await {
println!("{} swarm failed: {}", name_clone, e);
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
}
println!("{} swarm finished", name_clone);
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
});

// Give network time to initialize
sleep(Duration::from_secs(3)).await;

// Set up dialer for peers
for peer in peers {
DialerActor::dial_peer(peer, net_bus.clone(), tx.clone());
}

// Send our message first
println!("{} sending message", name);
tx.send(NetworkPeerCommand::GossipPublish {
Expand All @@ -78,26 +93,28 @@ async fn main() -> Result<()> {
.into_iter()
.filter(|n| *n != name)
.collect();

println!("{} waiting for messages from: {:?}", name, expected);

// Then wait to receive from others with a timeout
let mut received = HashSet::new();

// Wrap the message receiving loop in a timeout
let receive_result = timeout(Duration::from_secs(10), async {
while received != expected {
match rx.recv().await? {
NetworkPeerEvent::GossipData(msg) => match String::from_utf8(msg) {
Ok(msg) => {
if !received.contains(&msg) {
println!("{} received '{}'", name, msg);
received.insert(msg);
}
let history = net_bus.send(GetHistory::<NetworkPeerEvent>::new()).await?;
for event in history.clone() {
match event {
NetworkPeerEvent::GossipData(msg) => {
println!(
"{} received '{}'",
name,
String::from_utf8(msg.clone()).unwrap()
);
received.insert(String::from_utf8(msg).unwrap());
}
Err(e) => println!("{} received invalid UTF8: {}", name, e),
},
_ => (),
_ => (),
}
hmzakhalid marked this conversation as resolved.
Show resolved Hide resolved
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok::<(), anyhow::Error>(())
})
Expand All @@ -121,8 +138,8 @@ async fn main() -> Result<()> {
}

// Make sure router task is still running
if router_task.is_finished() {
println!("{} warning: router task finished early", name);
if swarm_handle.is_finished() {
println!("{} warning: swarm task finished early", name);
}

// Give some time for final message propagation
Expand Down
Loading
Loading