diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 3cd2e735..1b41bba6 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -175,8 +175,19 @@ impl Network { ping_client.ping_loop(ctx, *ping_timeout).await }); } + // If this is a loopback connection, announce periodically the address of this + // validator to the network. + // Note that this is executed only for outbound end of the loopback connection. + // Inbound end doesn't know the public address of itself. if peer == &self.key.public() { - + let mut sub = self.gossip.validator_addrs.subscribe(); + s.spawn(async { + while ctx.is_active() { + self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; + let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await; + } + Ok(()) + }); } service.run(ctx, stream).await?; Ok(()) @@ -223,50 +234,4 @@ impl Network { } } } - - /// Periodically announces this validator's public IP over gossip network, - /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { - let key = self.key.public(); - let mut outbound = self.outbound.subscribe(); - let mut addrs = self.gossip.validator_addrs.subscribe(); - // Current address of this node. - let mut my_addr = None; - loop { - // Wait for one of the following: - let _ : ctx::OrCanceled<()> = scope::run!(ctx,|ctx,s| async { - // loopback connection was established to a different address (this node's address has changed) - s.spawn_bg::<()>(async { - sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?; - tracing::info!("loopback conn addr"); - s.cancel(); - Ok(()) - }); - // an announcement from the node's previous execution has been received which - // overrides our announcement (reannouncement is needed). - s.spawn_bg::<()>(async { - sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?; - tracing::info!("reannouncement"); - s.cancel(); - Ok(()) - }); - // timeout has passed. - ctx.sleep(ADDRESS_ANNOUNCER_INTERVAL).await?; - tracing::info!("timeout announcement"); - Ok(()) - }).await; - if !ctx.is_active() { - return; - } - // If a loopback connection exists, update the current address. - if let Some(conn) = outbound.borrow().current().get(&key) { - my_addr = Some(conn.addr); - } - // If address of this node is known, announce it. - if let Some(addr) = my_addr { - tracing::debug!("announcing validator address {addr}"); - self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; - } - } - } }