Skip to content

Commit

Permalink
feat: handle gossipsub, make node startup configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Evalir committed Jan 24, 2023
1 parent 6977c0e commit ac3b74f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
36 changes: 31 additions & 5 deletions src/bin/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,51 @@
use anyhow::Result;
use clap::Parser;
use ethers::types::H160;
use libp2p::identity;
use quay::{
configuration::get_configuration,
gossip::node::QuayGossipNode,
telemetry::{get_subscriber, init_subscriber},
};
use tracing::error;
use tracing::{error, info};

/// The options for running a gossip node.
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
struct GossipNodeArgs {
/// The port the node will run from. If this is not assigned,
/// a random port will be used.
#[arg(short, long)]
port: Option<u16>,
/// The collection addresses the node will subscribe to
#[arg(short, long)]
collection_addresses: Option<Vec<H160>>,
}

#[tokio::main]
async fn main() -> Result<()> {
let subscriber = get_subscriber("quay-gossip".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber);

let config = get_configuration().expect("Could not read config");

let mut config = get_configuration().expect("Could not read config");
let keypair = identity::Keypair::generate_ed25519();

let node = QuayGossipNode::new(keypair, config.gossip)?;
let args = GossipNodeArgs::parse();

if let Some(port) = args.port {
config.gossip.port = port;
}

if let Some(collection_addresses) = args.collection_addresses {
config.gossip.collection_addresses = Some(collection_addresses);
}

info!("Starting node on port {}", config.gossip.port);

let mut node = QuayGossipNode::new(keypair, config.gossip)?;

if let Err(e) = node.run().await {
error!("Unhandled node error. Exiting");
error!("Unhandled node error: {}", e.to_string());
panic!("{}", e);
}

Expand Down
69 changes: 61 additions & 8 deletions src/gossip/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ use std::{
};

use anyhow::Result;
use ethers::types::H160;
use futures::StreamExt;
use libp2p::{
core::upgrade,
gossipsub::{
self,
error::{PublishError, SubscriptionError},
Gossipsub, GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity,
MessageId, ValidationMode,
MessageId, TopicHash, ValidationMode,
},
identify,
identity::Keypair,
mdns, mplex, noise, ping,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, PeerId, Swarm, Transport,
};
use tracing::info;
use tracing::{info, warn};

use crate::configuration::GossipNodeSettings;

use super::types::SeaportGossipsubEvent;

/// The seaport gossip network protocol ID & version.
pub const PROTOCOL_ID: &str = "seaport/0.1.0";

Expand Down Expand Up @@ -131,7 +134,7 @@ impl QuayGossipNode {
}

/// Starts the node.
pub async fn run(mut self) -> Result<()> {
pub async fn run(&mut self) -> Result<()> {
info!("Starting Quay Gossip Client");

self.swarm.listen_on(
Expand All @@ -141,8 +144,22 @@ impl QuayGossipNode {
self.swarm
.behaviour_mut()
.subscribe(&Topic::new("gossipsub:message"))?;

let collection_addresses = self.config.collection_addresses.clone().unwrap_or(vec![]);

info!("Local peer ID: {}", self.local_peer_id);

for address in collection_addresses.iter() {
match self
.swarm
.behaviour_mut()
.subscribe(&Topic::new(hex::encode(address)))
{
Ok(_) => info!("Successfully subscribed to collection/topic {}", address),
Err(e) => warn!("Subscription/topic err: {}", e.to_string()),
}
}

loop {
tokio::select! {
event = self.swarm.select_next_some() => {
Expand All @@ -152,11 +169,7 @@ impl QuayGossipNode {
info!("Listening on {address:?}");
}
SwarmEvent::Behaviour(SeaportGossipBehaviourEvent::Gossipsub(GossipsubEvent::Message{ message, .. })) => {
info!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
self.on_gossipsub_message(message);
}
SwarmEvent::Behaviour(SeaportGossipBehaviourEvent::Mdns(event)) => {
match event {
Expand Down Expand Up @@ -184,4 +197,44 @@ impl QuayGossipNode {
}
}
}

fn on_gossipsub_message(&self, message: GossipsubMessage) {
info!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}

/// Publishes a seaport gossip event to the P2P network.
pub fn publish_gossipsup_message(
&mut self,
event: SeaportGossipsubEvent,
) -> Result<(), anyhow::Error> {
let mut addresses: Vec<H160> = event
.order
.offer
.iter()
.map(|offer| H160::from_slice(offer.token.as_slice()))
.collect();
let mut consideration_addresses: Vec<H160> = event
.order
.consideration
.iter()
.map(|consideration| H160::from_slice(consideration.offer.token.as_slice()))
.collect();
addresses.append(&mut consideration_addresses);
addresses.sort();
addresses.dedup();
let serialized_event = ssz_rs::serialize(&event)?;
// TODO: Look into parallelizing, probably with rayon
for address in addresses {
self.swarm.behaviour_mut().gossipsub.publish(
TopicHash::from_raw(address.clone().to_string()),
serialized_event.clone(),
)?;
}

Ok(())
}
}

0 comments on commit ac3b74f

Please sign in to comment.