Skip to content

Commit

Permalink
refactor: transport handling for ws & quic as fallback
Browse files Browse the repository at this point in the history
Includes:

- dns fallback update
- CLI removal of -w for running workflows; it's not a positional argument (the first one on run)
  * related to #489
- Closes #492 with local node bootstrapping
  • Loading branch information
zeeshanlakhani committed Feb 3, 2024
1 parent 02f07f4 commit f0199e4
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 89 deletions.
30 changes: 29 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ libp2p = { version = "0.53", default-features = false, features = [
"cbor",
"yamux",
"serde",
"quic",
"websocket",
] }
libsqlite3-sys = { workspace = true }
maplit = "1.0"
Expand Down
4 changes: 4 additions & 0 deletions homestar-runtime/config/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ transport_connection_timeout = 60
max_connected_peers = 32
max_announce_addresses = 10
dial_interval = 30
bootstrap_interval = 30

[node.network.libp2p.quic]
enable = true

[node.network.libp2p.mdns]
enable = true
Expand Down
4 changes: 2 additions & 2 deletions homestar-runtime/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ pub enum Command {
/// Supported:
/// - JSON (.json).
#[arg(
short='w',
long = "workflow",
value_hint = clap::ValueHint::FilePath,
value_name = "FILE",
value_parser = clap::value_parser!(file::ReadWorkflow),
index = 1,
required = true,
help = r#"IPVM-configured workflow file to run.
Supported:
- JSON (.json)"#
Expand Down
43 changes: 31 additions & 12 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub(crate) use event::Event;

type P2PSender = channel::AsyncChannelSender<ResponseEvent>;

struct Quorum {
/// Minimum number of peers required to receive a receipt.
receipt: usize,
/// Minimum number of peers required to receive workflow information.
workflow: usize,
}

struct Bootstrap {
interval: Duration,
}

/// Handler trait for [EventHandler] events.
#[async_trait]
pub(crate) trait Handler<DB>
Expand All @@ -52,10 +63,7 @@ where
#[cfg_attr(docsrs, doc(cfg(feature = "websocket-notify")))]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
/// Minimum number of peers required to receive a receipt.
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
quorum: Quorum,
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
Expand Down Expand Up @@ -94,16 +102,15 @@ pub(crate) struct EventHandler<DB: Database> {
external_address_limit: u32,
/// Interval for polling the cache for expired entries.
poll_cache_interval: Duration,
/// Bootstrap configuration.
bootstrap: Bootstrap,
}

/// Event loop handler for libp2p network events and commands.
#[cfg(not(feature = "websocket-notify"))]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
/// Minimum number of peers required to receive a receipt.
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
quorum: Quorum,
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
Expand Down Expand Up @@ -136,6 +143,8 @@ pub(crate) struct EventHandler<DB: Database> {
external_address_limit: u32,
/// Interval for polling the cache for expired entries.
poll_cache_interval: Duration,
/// Bootstrap configuration.
bootstrap: Bootstrap,
}

/// Rendezvous protocol configurations and state
Expand Down Expand Up @@ -179,8 +188,10 @@ where
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
quorum: Quorum {
receipt: settings.libp2p.dht.receipt_quorum,
workflow: settings.libp2p.dht.workflow_quorum,
},
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
Expand Down Expand Up @@ -208,6 +219,9 @@ where
announce_addresses: settings.libp2p.announce_addresses.clone(),
external_address_limit: settings.libp2p.max_announce_addresses,
poll_cache_interval: settings.poll_cache_interval,
bootstrap: Bootstrap {
interval: settings.libp2p.bootstrap_interval,
},
}
}

Expand All @@ -221,8 +235,10 @@ where
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
quorum: Quorum {
receipt: settings.libp2p.dht.receipt_quorum,
workflow: settings.libp2p.dht.workflow_quorum,
},
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
Expand All @@ -248,6 +264,9 @@ where
announce_addresses: settings.libp2p.announce_addresses.clone(),
external_address_limit: settings.libp2p.max_announce_addresses,
poll_cache_interval: settings.poll_cache_interval,
bootstrap: Bootstrap {
interval: settings.libp2p.boostrap_interval,
},
}
}

Expand Down
69 changes: 43 additions & 26 deletions homestar-runtime/src/event_handler/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
use crate::{channel, event_handler::Event};
use libp2p::PeerId;
use moka::{
future::Cache,
notification::RemovalCause::{self, Expired},
future::{Cache, FutureExt},
notification::{
ListenerFuture,
RemovalCause::{self, Expired},
},
Expiry as ExpiryBase,
};
use std::{
Expand Down Expand Up @@ -49,6 +52,7 @@ pub(crate) enum CacheData {
/// Events to be dispatched on cache expiration.
#[derive(Clone, Debug)]
pub(crate) enum DispatchEvent {
Bootstrap,
RegisterPeer,
DiscoverPeers,
DialPeer,
Expand All @@ -58,38 +62,51 @@ pub(crate) enum DispatchEvent {
pub(crate) fn setup_cache(
sender: Arc<channel::AsyncChannelSender<Event>>,
) -> Cache<String, CacheValue> {
let eviction_listener = move |_key: Arc<String>, val: CacheValue, cause: RemovalCause| {
let eviction_listener = move |_key: Arc<String>,
val: CacheValue,
cause: RemovalCause|
-> ListenerFuture {
let tx = Arc::clone(&sender);

if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
if cause != Expired {
return;
}

match event {
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned()));
};
}
DispatchEvent::DiscoverPeers => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned()));
};
}
DispatchEvent::DialPeer => {
if let Some(CacheData::Peer(node)) = val.data.get("node") {
let _ = tx.send(Event::DialPeer(node.to_owned()));
};
async move {
if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
if cause == Expired {
match event {
DispatchEvent::Bootstrap => {
let _ = tx.send_async(Event::Bootstrap).await;
}
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) =
val.data.get("rendezvous_node")
{
let _ = tx
.send_async(Event::RegisterPeer(rendezvous_node.to_owned()))
.await;
};
}
DispatchEvent::DiscoverPeers => {
if let Some(CacheData::Peer(rendezvous_node)) =
val.data.get("rendezvous_node")
{
let _ = tx
.send_async(Event::DiscoverPeers(rendezvous_node.to_owned()))
.await;
};
}
DispatchEvent::DialPeer => {
if let Some(CacheData::Peer(node)) = val.data.get("node") {
let _ = tx.send(Event::DialPeer(node.to_owned()));
};
}
}
}
}
}
.boxed()
};

Cache::builder()
.expire_after(Expiry)
.eviction_listener(eviction_listener)
.async_eviction_listener(eviction_listener)
.build()
}
38 changes: 34 additions & 4 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub(crate) enum Event {
GetNodeInfo(AsyncChannelSender<DynamicNodeInfo>),
/// Dial a peer.
DialPeer(PeerId),
/// Bootstrap the node to join the DHT.
Bootstrap,
}

#[allow(unreachable_patterns)]
Expand Down Expand Up @@ -302,6 +304,34 @@ impl Event {
.map_err(anyhow::Error::new)?;
}
_ => {}
Event::Bootstrap => {
if event_handler
.swarm
.connected_peers()
.peekable()
.peek()
.is_some()
{
let _ = event_handler
.swarm
.behaviour_mut()
.kademlia
.bootstrap()
.map(|_| {
debug!(
subject = "libp2p.kad.bootstrap",
category = "handle_event",
"bootstrapped kademlia"
)
})
.map_err(|err| {
warn!(subject = "libp2p.kad.bootstrap.err",
category = "handle_event",
err=?err,
"error bootstrapping kademlia")
});
}
}
}
Ok(())
}
Expand Down Expand Up @@ -391,14 +421,14 @@ impl Captured {
}
}

let receipt_quorum = if event_handler.receipt_quorum > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) }
let receipt_quorum = if event_handler.quorum.receipt > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) }
} else {
Quorum::One
};

let workflow_quorum = if event_handler.workflow_quorum > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) }
let workflow_quorum = if event_handler.quorum.workflow > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) }
} else {
Quorum::One
};
Expand Down
Loading

0 comments on commit f0199e4

Please sign in to comment.