diff --git a/src/bin/gossip.rs b/src/bin/gossip.rs index f50c201..dd9569a 100644 --- a/src/bin/gossip.rs +++ b/src/bin/gossip.rs @@ -1,25 +1,51 @@ use anyhow::Result; +use clap::Parser; +use ethers::types::H160; use libp2p::identity; use quay::{ configuration::get_configuration, gossip::node::QuayGossipNode, telemetry::{get_subscriber, init_subscriber}, }; -use tracing::error; +use tracing::{error, info}; + +/// The options for running a gossip node. +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +struct GossipNodeArgs { + /// The port the node will run from. If this is not assigned, + /// a random port will be used. + #[arg(short, long)] + port: Option, + /// The collection addresses the node will subscribe to + #[arg(short, long)] + collection_addresses: Option>, +} #[tokio::main] async fn main() -> Result<()> { let subscriber = get_subscriber("quay-gossip".into(), "info".into(), std::io::stdout); init_subscriber(subscriber); - let config = get_configuration().expect("Could not read config"); - + let mut config = get_configuration().expect("Could not read config"); let keypair = identity::Keypair::generate_ed25519(); - let node = QuayGossipNode::new(keypair, config.gossip)?; + let args = GossipNodeArgs::parse(); + + if let Some(port) = args.port { + config.gossip.port = port; + } + + if let Some(collection_addresses) = args.collection_addresses { + config.gossip.collection_addresses = Some(collection_addresses); + } + + info!("Starting node on port {}", config.gossip.port); + + let mut node = QuayGossipNode::new(keypair, config.gossip)?; if let Err(e) = node.run().await { - error!("Unhandled node error. Exiting"); + error!("Unhandled node error: {}", e.to_string()); panic!("{}", e); } diff --git a/src/gossip/node.rs b/src/gossip/node.rs index 6f3356b..477605d 100644 --- a/src/gossip/node.rs +++ b/src/gossip/node.rs @@ -5,6 +5,7 @@ use std::{ }; use anyhow::Result; +use ethers::types::H160; use futures::StreamExt; use libp2p::{ core::upgrade, @@ -12,7 +13,7 @@ use libp2p::{ self, error::{PublishError, SubscriptionError}, Gossipsub, GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, - MessageId, ValidationMode, + MessageId, TopicHash, ValidationMode, }, identify, identity::Keypair, @@ -20,10 +21,12 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent}, tcp, PeerId, Swarm, Transport, }; -use tracing::info; +use tracing::{info, warn}; use crate::configuration::GossipNodeSettings; +use super::types::SeaportGossipsubEvent; + /// The seaport gossip network protocol ID & version. pub const PROTOCOL_ID: &str = "seaport/0.1.0"; @@ -131,7 +134,7 @@ impl QuayGossipNode { } /// Starts the node. - pub async fn run(mut self) -> Result<()> { + pub async fn run(&mut self) -> Result<()> { info!("Starting Quay Gossip Client"); self.swarm.listen_on( @@ -141,8 +144,22 @@ impl QuayGossipNode { self.swarm .behaviour_mut() .subscribe(&Topic::new("gossipsub:message"))?; + + let collection_addresses = self.config.collection_addresses.clone().unwrap_or(vec![]); + info!("Local peer ID: {}", self.local_peer_id); + for address in collection_addresses.iter() { + match self + .swarm + .behaviour_mut() + .subscribe(&Topic::new(hex::encode(address))) + { + Ok(_) => info!("Successfully subscribed to collection/topic {}", address), + Err(e) => warn!("Subscription/topic err: {}", e.to_string()), + } + } + loop { tokio::select! { event = self.swarm.select_next_some() => { @@ -152,11 +169,7 @@ impl QuayGossipNode { info!("Listening on {address:?}"); } SwarmEvent::Behaviour(SeaportGossipBehaviourEvent::Gossipsub(GossipsubEvent::Message{ message, .. })) => { - info!( - "Received: '{:?}' from {:?}", - String::from_utf8_lossy(&message.data), - message.source - ); + self.on_gossipsub_message(message); } SwarmEvent::Behaviour(SeaportGossipBehaviourEvent::Mdns(event)) => { match event { @@ -184,4 +197,44 @@ impl QuayGossipNode { } } } + + fn on_gossipsub_message(&self, message: GossipsubMessage) { + info!( + "Received: '{:?}' from {:?}", + String::from_utf8_lossy(&message.data), + message.source + ); + } + + /// Publishes a seaport gossip event to the P2P network. + pub fn publish_gossipsup_message( + &mut self, + event: SeaportGossipsubEvent, + ) -> Result<(), anyhow::Error> { + let mut addresses: Vec = event + .order + .offer + .iter() + .map(|offer| H160::from_slice(offer.token.as_slice())) + .collect(); + let mut consideration_addresses: Vec = event + .order + .consideration + .iter() + .map(|consideration| H160::from_slice(consideration.offer.token.as_slice())) + .collect(); + addresses.append(&mut consideration_addresses); + addresses.sort(); + addresses.dedup(); + let serialized_event = ssz_rs::serialize(&event)?; + // TODO: Look into parallelizing, probably with rayon + for address in addresses { + self.swarm.behaviour_mut().gossipsub.publish( + TopicHash::from_raw(address.clone().to_string()), + serialized_event.clone(), + )?; + } + + Ok(()) + } }