Skip to content

Commit

Permalink
snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Mar 27, 2024
1 parent 03f3137 commit dc6ec39
Showing 1 changed file with 12 additions and 47 deletions.
59 changes: 12 additions & 47 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,19 @@ impl Network {
ping_client.ping_loop(ctx, *ping_timeout).await
});
}
// If this is a loopback connection, announce periodically the address of this
// validator to the network.
// Note that this is executed only for outbound end of the loopback connection.
// Inbound end doesn't know the public address of itself.
if peer == &self.key.public() {

let mut sub = self.gossip.validator_addrs.subscribe();
s.spawn(async {
while ctx.is_active() {
self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await;
let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await;
}
Ok(())
});
}
service.run(ctx, stream).await?;
Ok(())
Expand Down Expand Up @@ -223,50 +234,4 @@ impl Network {
}
}
}

/// Periodically announces this validator's public IP over gossip network,
/// so that other validators can discover and connect to this validator.
pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) {
let key = self.key.public();
let mut outbound = self.outbound.subscribe();
let mut addrs = self.gossip.validator_addrs.subscribe();
// Current address of this node.
let mut my_addr = None;
loop {
// Wait for one of the following:
let _ : ctx::OrCanceled<()> = scope::run!(ctx,|ctx,s| async {
// loopback connection was established to a different address (this node's address has changed)
s.spawn_bg::<()>(async {
sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?;
tracing::info!("loopback conn addr");
s.cancel();
Ok(())
});
// an announcement from the node's previous execution has been received which
// overrides our announcement (reannouncement is needed).
s.spawn_bg::<()>(async {
sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?;
tracing::info!("reannouncement");
s.cancel();
Ok(())
});
// timeout has passed.
ctx.sleep(ADDRESS_ANNOUNCER_INTERVAL).await?;
tracing::info!("timeout announcement");
Ok(())
}).await;
if !ctx.is_active() {
return;
}
// If a loopback connection exists, update the current address.
if let Some(conn) = outbound.borrow().current().get(&key) {
my_addr = Some(conn.addr);
}
// If address of this node is known, announce it.
if let Some(addr) = my_addr {
tracing::debug!("announcing validator address {addr}");
self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await;
}
}
}
}

0 comments on commit dc6ec39

Please sign in to comment.