Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
fix relay and fully-specify errors
Browse files Browse the repository at this point in the history
  • Loading branch information
chunningham committed Apr 5, 2023
1 parent 0299a07 commit e60bd7a
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 75 deletions.
10 changes: 4 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ pub async fn app(config: &Figment) -> Result<Rocket<Build>> {

storage::KV::healthcheck(kepler_config.storage.indexes.clone()).await?;

let relay_node = RelayConfig::default()
.launch(
kepler_config.storage.blocks.relay_key_pair().await?,
Both::<MemoryConfig, TcpConfig>::default(),
)
.await?;
let mut relay_node = RelayConfig::default().launch(
kepler_config.storage.blocks.relay_key_pair().await?,
Both::<MemoryConfig, TcpConfig>::default(),
)?;

relay_node
.listen_on([
Expand Down
189 changes: 121 additions & 68 deletions src/p2p/relay.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::p2p::{transport::IntoTransport, IdentifyConfig};
use anyhow::Result;
use crate::p2p::{
transport::{build_transport, IntoTransport},
IdentifyConfig,
};
use futures::{
channel::{mpsc, oneshot},
future::{select, Either},
Expand All @@ -9,14 +11,14 @@ use futures::{
};
use libp2p::{
autonat::{Behaviour as AutoNat, Config as AutoNatConfig},
core::{identity::Keypair, upgrade, Multiaddr, Transport},
core::{identity::Keypair, Multiaddr, Transport},
identify::Behaviour as Identify,
identity::PublicKey,
mplex, noise,
noise,
ping::{Behaviour as Ping, Config as PingConfig},
relay::{Behaviour as Relay, Config as RelayConfig},
swarm::{NetworkBehaviour, SwarmBuilder},
yamux, PeerId,
swarm::{NetworkBehaviour, Swarm, SwarmBuilder},
PeerId,
};

#[derive(Clone, Debug)]
Expand All @@ -33,29 +35,53 @@ pub struct Behaviour {
autonat: AutoNat,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("failed to listen on multiaddress: {0}")]
Listen(Multiaddr),
#[error("failed to dial multiaddress: {0}")]
Dial(Multiaddr),
#[error("failed to send message: {0}")]
SendError(#[from] mpsc::SendError),
#[error("failed to recieve behaviour response: {0}")]
RecieveError(#[from] oneshot::Canceled),
#[error("failed to open listener: {0}")]
TransportError(#[from] libp2p::TransportError<std::io::Error>),
}

#[derive(Debug)]
enum Message {
GetAddresses(oneshot::Sender<Vec<Multiaddr>>),
ListenOn(Vec<Multiaddr>, oneshot::Sender<Result<()>>),
ListenOn(Vec<Multiaddr>, oneshot::Sender<Result<(), Error>>),
Dial(Multiaddr, oneshot::Sender<Result<(), Error>>),
}

impl RelayNode {
pub fn id(&self) -> &PeerId {
&self.id
}
pub async fn get_addresses(&mut self) -> Result<Vec<Multiaddr>> {
pub async fn get_addresses(&mut self) -> Result<Vec<Multiaddr>, Error> {
let (s, r) = oneshot::channel();
self.sender.send(Message::GetAddresses(s)).await?;
Ok(r.await?)
}

pub async fn listen_on(&mut self, addr: impl IntoIterator<Item = Multiaddr>) -> Result<()> {
pub async fn listen_on(
&mut self,
addr: impl IntoIterator<Item = Multiaddr>,
) -> Result<(), Error> {
let (s, r) = oneshot::channel();
self.sender
.send(Message::ListenOn(addr.into_iter().collect(), s))
.await?;
r.await?
}

pub async fn dial(&mut self, addr: Multiaddr) -> Result<(), Error> {
let (s, r) = oneshot::channel();
self.sender.send(Message::Dial(addr, s)).await?;
r.await?
}
}

#[derive(Debug)]
Expand All @@ -65,6 +91,7 @@ pub struct Config {
relay: RelayConfig,
autonat: AutoNatConfig,
channel_size: usize,
transport_timeout: std::time::Duration,
}

impl Default for Config {
Expand All @@ -75,6 +102,7 @@ impl Default for Config {
relay: RelayConfig::default(),
autonat: AutoNatConfig::default(),
channel_size: 100,
transport_timeout: std::time::Duration::from_secs(20),
}
}
}
Expand All @@ -100,6 +128,10 @@ impl Config {
self.channel_size = i.into();
self
}
pub fn transport_timeout(&mut self, i: impl Into<std::time::Duration>) -> &mut Self {
self.transport_timeout = i.into();
self
}

fn build(self, pubkey: PublicKey) -> Behaviour {
let peer_id = pubkey.to_peer_id();
Expand All @@ -111,7 +143,7 @@ impl Config {
}
}

pub fn launch<T>(self, keypair: Keypair, transport: T) -> Result<RelayNode, T::Error>
pub fn launch<T>(self, keypair: Keypair, transport: T) -> Result<RelayNode, BuildError<T>>
where
T: IntoTransport,
T::T: 'static + Send + Unpin,
Expand All @@ -123,71 +155,93 @@ impl Config {
{
let local_public_key = keypair.public();
let id = local_public_key.to_peer_id();
let b = self.build(local_public_key);
let (sender, mut reciever) = mpsc::channel(100);
let r = RelayNode { id, sender };

let mut swarm = SwarmBuilder::with_tokio_executor(
transport
.into_transport()?
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap())
.multiplex(upgrade::SelectUpgrade::new(
yamux::YamuxConfig::default(),
mplex::MplexConfig::default(),
))
.timeout(std::time::Duration::from_secs(20))
.boxed(),
b,
let (sender, reciever) = mpsc::channel(self.channel_size);

let swarm = SwarmBuilder::with_tokio_executor(
build_transport(
transport
.into_transport()
.map_err(BuildError::TransportConfig)?,
self.transport_timeout,
&keypair,
)?,
self.build(local_public_key),
id,
)
.build();

tokio::spawn(async move {
loop {
match select(reciever.next(), swarm.next()).await {
// if the swarm or the channel are closed, close the relay
Either::Right((None, _)) | Either::Left((None, _)) => {
break;
}
// process command
Either::Left((Some(e), _)) => match e {
Message::ListenOn(a, s) => {
// try listen on each given address
match a.into_iter().try_fold(Vec::new(), |mut listeners, addr| {
match swarm.listen_on(addr) {
Ok(l) => {
listeners.push(l);
Ok(listeners)
}
Err(e) => Err((e, listeners)),
}
}) {
Ok(_) => s.send(Ok(())),
// if one fails, roll back all of them
Err((e, listeners)) => {
for l in listeners {
swarm.remove_listener(l);
}
s.send(Err(e.into()))
}
tokio::spawn(poll_swarm(swarm, reciever));

Ok(RelayNode { id, sender })
}
}

#[derive(thiserror::Error, Debug)]
pub enum BuildError<T>
where
T: IntoTransport,
{
#[error(transparent)]
TransportConfig(T::Error),
#[error(transparent)]
Noise(#[from] noise::NoiseError),
}

#[derive(thiserror::Error, Debug)]
pub enum SwarmError {
#[error("failed to send response via oneshot")]
SendError,
#[error("failed to dial multiaddress: {0}")]
DialError(#[from] libp2p::swarm::DialError),
}

async fn poll_swarm(
mut swarm: Swarm<Behaviour>,
mut reciever: mpsc::Receiver<Message>,
) -> Result<(), SwarmError> {
loop {
match select(reciever.next(), swarm.next()).await {
// if the swarm or the channel are closed, close the relay
Either::Right((None, _)) | Either::Left((None, _)) => {
break;
}
// process command
Either::Left((Some(e), _)) => match e {
Message::ListenOn(a, s) => {
// try listen on each given address
match a.into_iter().try_fold(Vec::new(), |mut listeners, addr| {
match swarm.listen_on(addr) {
Ok(l) => {
listeners.push(l);
Ok(listeners)
}
.map_err(|_| anyhow!("failed to return listening result"))?;
Err(e) => Err((e, listeners)),
}
Message::GetAddresses(s) => {
s.send(swarm.listeners().map(|a| a.clone()).collect())
.map_err(|_| anyhow!("failed to return listeners"))?;
}) {
Ok(_) => s.send(Ok(())).map_err(|_| SwarmError::SendError)?,
// if one fails, roll back all of them
Err((e, listeners)) => {
for l in listeners {
swarm.remove_listener(l);
}
s.send(Err(e.into())).map_err(|_| SwarmError::SendError)?
}
},
Either::Right((Some(_), _)) => {
// process swarm event
}
};
}
Message::GetAddresses(s) => s
.send(swarm.listeners().map(|a| a.clone()).collect())
.map_err(|_| SwarmError::SendError)?,
Message::Dial(addr, s) => {
swarm.dial(addr)?;
s.send(Ok(())).map_err(|_| SwarmError::SendError)?
}
},
Either::Right((Some(_), _)) => {
// process swarm event
}
Result::<(), anyhow::Error>::Ok(())
});
Ok(r)
}
}
Result::Ok(())
}

#[cfg(test)]
Expand All @@ -198,11 +252,10 @@ mod test {

#[test]
async fn basic_test() {
let addr = build_multiaddr!(Memory(0));
let addr = build_multiaddr!(Memory(1u8));

let relay = RelayConfig::default()
let mut relay = Config::default()
.launch(Keypair::generate_ed25519(), MemoryConfig)
.await
.unwrap();

relay.listen_on([addr.clone()]).await.unwrap();
Expand Down
33 changes: 32 additions & 1 deletion src/p2p/transport.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,45 @@
use crate::storage::either::EitherError;
use futures::io::{AsyncRead, AsyncWrite};
use libp2p::{
core::transport::{dummy::DummyTransport, MemoryTransport, OrTransport, Transport},
core::{
muxing::StreamMuxerBox,
transport::{dummy::DummyTransport, Boxed, MemoryTransport, OrTransport, Transport},
upgrade,
},
dns::{ResolverConfig, ResolverOpts, TokioDnsConfig as DnsTransport},
identity::Keypair,
mplex,
noise::{NoiseAuthenticated, NoiseError},
tcp::tokio::Transport as TcpTransport,
wasm_ext::ExtTransport,
websocket::{tls::Config as WsTlsConfig, WsConfig as WsTransport},
yamux, PeerId,
};
use std::io::Error as IoError;

pub fn build_transport<T>(
t: T,
timeout: std::time::Duration,
keypair: &Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox)>, NoiseError>
where
T: 'static + Transport + Send + Unpin,
T::Output: 'static + AsyncRead + AsyncWrite + Unpin + Send,
T::Dial: Send,
T::Error: 'static + Send + Sync,
T::ListenerUpgrade: Send,
{
Ok(t.upgrade(upgrade::Version::V1)
// TODO replace with AWAKE protcol (or similar)
.authenticate(NoiseAuthenticated::xx(keypair)?)
.multiplex(upgrade::SelectUpgrade::new(
yamux::YamuxConfig::default(),
mplex::MplexConfig::default(),
))
.timeout(timeout)
.boxed())
}

pub trait IntoTransport {
type T: Transport;
type Error: std::error::Error;
Expand Down

0 comments on commit e60bd7a

Please sign in to comment.