Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Mar 27, 2024
1 parent dc6ec39 commit aec5777
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 90 deletions.
47 changes: 34 additions & 13 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
//! BFT consensus messages are exchanged over this network.
use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc};
use anyhow::Context as _;
use std::{
collections::{HashSet},
sync::Arc,
};
use rand::seq::SliceRandom;
use std::{collections::HashSet, sync::Arc};
use zksync_concurrency::{ctx, oneshot, scope, sync, time};
use zksync_consensus_roles::validator;
use zksync_protobuf::kB;
Expand All @@ -22,7 +19,11 @@ const RESP_MAX_SIZE: usize = kB;
/// is down.
const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10);

/// Outbound connection state.
pub(crate) struct Connection {
/// Peer's address.
/// This is not used for now, but will be required for the debug page.
#[allow(dead_code)]
addr: std::net::SocketAddr,
consensus: rpc::Client<rpc::consensus::Rpc>,
}
Expand Down Expand Up @@ -106,7 +107,9 @@ impl Network {
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let outbound = self.outbound.current();
outbound.get(key).context("not an active validator")?
outbound
.get(key)
.context("not an active validator")?
.consensus
.call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE)
.await?;
Expand All @@ -122,7 +125,7 @@ impl Network {
) -> anyhow::Result<()> {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound.insert(peer.clone(),()).await?;
self.inbound.insert(peer.clone(), ()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand Down Expand Up @@ -162,7 +165,7 @@ impl Network {
addr,
consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate),
});
self.outbound.insert(peer.clone(),conn.clone()).await?;
self.outbound.insert(peer.clone(), conn.clone()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand All @@ -180,11 +183,19 @@ impl Network {
// Note that this is executed only for outbound end of the loopback connection.
// Inbound end doesn't know the public address of itself.
if peer == &self.key.public() {
let mut sub = self.gossip.validator_addrs.subscribe();
s.spawn(async {
let mut sub = self.gossip.validator_addrs.subscribe();
while ctx.is_active() {
self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await;
let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await;
self.gossip
.validator_addrs
.announce(&self.key, addr, ctx.now_utc())
.await;
let _ = sync::wait_for(
&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),
&mut sub,
|got| got.get(peer).map(|x| x.msg.addr) != Some(addr),
)
.await;
}
Ok(())
});
Expand All @@ -198,9 +209,19 @@ impl Network {
}

async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let addr = *self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")?
.choose(&mut ctx.rng()).with_context(||format!("{:?} resolved to no addresses",self.gossip.cfg.public_addr))?;
self.run_outbound_stream(ctx, &self.key.public(), addr).await
let addr = *self
.gossip
.cfg
.public_addr
.resolve(ctx)
.await?
.context("resolve()")?
.choose(&mut ctx.rng())
.with_context(|| {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.await
}

/// Maintains a connection to the given validator.
Expand Down
23 changes: 13 additions & 10 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
//! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
use crate::{
gossip::{ValidatorAddrsWatch},
io,
pool::PoolWatch,
rpc, Config,
};
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, rpc, Config};
use anyhow::Context as _;
use std::sync::{atomic::AtomicUsize, Arc};

Expand All @@ -33,7 +28,9 @@ use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_protobuf::kB;

/// State of the gossip connection.
pub(crate) struct Connection {
/// `get_block` rpc client.
pub(crate) get_block: rpc::Client<rpc::get_block::Rpc>,
}

Expand All @@ -42,9 +39,9 @@ pub(crate) struct Network {
/// Gossip network configuration.
pub(crate) cfg: Config,
/// Currently open inbound connections.
pub(crate) inbound: PoolWatch<node::PublicKey,Arc<Connection>>,
pub(crate) inbound: PoolWatch<node::PublicKey, Arc<Connection>>,
/// Currently open outbound connections.
pub(crate) outbound: PoolWatch<node::PublicKey,Arc<Connection>>,
pub(crate) outbound: PoolWatch<node::PublicKey, Arc<Connection>>,
/// Current state of knowledge about validators' endpoints.
pub(crate) validator_addrs: ValidatorAddrsWatch,
/// Block store to serve `get_block` requests from.
Expand Down Expand Up @@ -90,10 +87,16 @@ impl Network {
) -> anyhow::Result<Option<validator::FinalBlock>> {
let outbound = self.outbound.current();
let inbound = self.inbound.current();
Ok(outbound.get(recipient).or(inbound.get(recipient))
Ok(outbound
.get(recipient)
.or(inbound.get(recipient))
.context("recipient is unreachable")?
.get_block
.call(ctx, &rpc::get_block::Req(number), self.cfg.max_block_size.saturating_add(kB))
.call(
ctx,
&rpc::get_block::Req(number),
self.cfg.max_block_size.saturating_add(kB),
)
.await?
.0)
}
Expand Down
27 changes: 16 additions & 11 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::{handshake, Network, ValidatorAddrs, Connection};
use super::{handshake, Connection, Network, ValidatorAddrs};
use crate::{io, noise, preface, rpc};
use anyhow::Context as _;
use async_trait::async_trait;
use rand::seq::SliceRandom;
use std::sync::{atomic::Ordering, Arc};
use zksync_concurrency::{net, ctx, oneshot, scope, sync};
use zksync_concurrency::{ctx, net, oneshot, scope, sync};
use zksync_consensus_roles::node;
use zksync_consensus_storage::BlockStore;
use zksync_protobuf::kB;
use rand::seq::SliceRandom;
use anyhow::Context as _;

struct PushValidatorAddrsServer<'a>(&'a Network);

Expand Down Expand Up @@ -159,13 +159,14 @@ impl Network {
ctx: &ctx::Ctx,
mut stream: noise::Stream,
) -> anyhow::Result<()> {
let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?;
let peer =
handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?;
tracing::Span::current().record("peer", tracing::field::debug(&peer));
let conn = Arc::new(Connection {
get_block: rpc::Client::<rpc::get_block::Rpc>::new(ctx, self.cfg.rpc.get_block_rate),
});
self.inbound.insert(peer.clone(),conn.clone()).await?;
let res = self.run_stream(ctx, &peer, stream, &*conn).await;
self.inbound.insert(peer.clone(), conn.clone()).await?;
let res = self.run_stream(ctx, &peer, stream, &conn).await;
self.inbound.remove(&peer).await;
res
}
Expand All @@ -177,8 +178,12 @@ impl Network {
peer: &node::PublicKey,
addr: net::Host,
) -> anyhow::Result<()> {
let addr = *addr.resolve(ctx).await?.context("resolve()")?
.choose(&mut ctx.rng()).with_context(||"{addr:?} resolved to empty address set")?;
let addr = *addr
.resolve(ctx)
.await?
.context("resolve()")?
.choose(&mut ctx.rng())
.with_context(|| "{addr:?} resolved to empty address set")?;
let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?;
handshake::outbound(
ctx,
Expand All @@ -191,8 +196,8 @@ impl Network {
let conn = Arc::new(Connection {
get_block: rpc::Client::<rpc::get_block::Rpc>::new(ctx, self.cfg.rpc.get_block_rate),
});
self.outbound.insert(peer.clone(),conn.clone()).await?;
let res = self.run_stream(ctx, peer, stream, &*conn).await;
self.outbound.insert(peer.clone(), conn.clone()).await?;
let res = self.run_stream(ctx, peer, stream, &conn).await;
self.outbound.remove(peer).await;
res
}
Expand Down
5 changes: 4 additions & 1 deletion node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,10 @@ async fn validator_node_restart() {
tracing::info!("wait for the update to arrive to node1");
let sub = &mut node1.net.gossip.validator_addrs.subscribe();
let want = Some(*cfgs[0].server_addr);
sync::wait_for(ctx, sub, |got| got.get(&setup.keys[0].public()).map(|x|x.msg.addr) == want).await?;
sync::wait_for(ctx, sub, |got| {
got.get(&setup.keys[0].public()).map(|x| x.msg.addr) == want
})
.await?;
Ok(())
})
.await?;
Expand Down
8 changes: 6 additions & 2 deletions node/actors/network/src/gossip/validator_addrs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Global state distributed by active validators, observed by all the nodes in the network.
use crate::watch::Watch;
use std::{collections::HashSet, sync::Arc};
use zksync_concurrency::{time,sync};
use zksync_concurrency::{sync, time};
use zksync_consensus_roles::validator;

/// Mapping from validator::PublicKey to a signed validator::NetAddress.
Expand Down Expand Up @@ -90,6 +90,7 @@ impl ValidatorAddrsWatch {
self.0.subscribe()
}

/// Inserts a new version of the announcement signed with the given key.
pub(crate) async fn announce(
&self,
key: &validator::SecretKey,
Expand All @@ -98,7 +99,10 @@ impl ValidatorAddrsWatch {
) {
let this = self.0.lock().await;
let mut validator_addrs = this.borrow().clone();
let version = validator_addrs.get(&key.public()).map(|x| x.msg.version + 1).unwrap_or(0);
let version = validator_addrs
.get(&key.public())
.map(|x| x.msg.version + 1)
.unwrap_or(0);
let d = Arc::new(key.sign_msg(validator::NetAddress {
addr,
version,
Expand Down
12 changes: 5 additions & 7 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ impl Runner {
for (peer, addr) in &self.net.gossip.cfg.gossip.static_outbound {
s.spawn(async {
loop {
let run_result =
self.net.gossip.run_outbound_stream(ctx, peer, addr.clone()).await;
let run_result = self
.net
.gossip
.run_outbound_stream(ctx, peer, addr.clone())
.await;
if let Err(err) = run_result {
tracing::info!("gossip.run_outbound_stream(): {err:#}");
}
Expand All @@ -159,11 +162,6 @@ impl Runner {
Ok(())
});
}
// Announce IP periodically.
s.spawn(async {
c.run_address_announcer(ctx).await;
Ok(())
});
}
}

Expand Down
9 changes: 4 additions & 5 deletions node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,14 @@ impl NetworkGauges {
let register_result = COLLECTOR.before_scrape(move || {
state_ref.upgrade().map(|state| {
let gauges = NetworkGauges::default();
let len = state.gossip.inbound.subscribe().borrow().current().len();
let len = state.gossip.inbound.current().len();
gauges.gossip_inbound_connections.set(len);
let len = state.gossip.outbound.subscribe().borrow().current().len();
let len = state.gossip.outbound.current().len();
gauges.gossip_outbound_connections.set(len);
if let Some(consensus_state) = &state.consensus {
let len = consensus_state.inbound.subscribe().borrow().current().len();
let len = consensus_state.inbound.current().len();
gauges.consensus_inbound_connections.set(len);
let subscriber = consensus_state.outbound.subscribe();
let len = subscriber.borrow().current().len();
let len = consensus_state.outbound.current().len();
gauges.consensus_outbound_connections.set(len);
}
gauges
Expand Down
25 changes: 13 additions & 12 deletions node/actors/network/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
//! An abstraction for a set of "connections" which constraints
//! which peers are allowed to connect.
use crate::watch::Watch;
use std::collections::{HashSet};
use std::collections::HashSet;
use zksync_concurrency::sync;

/// Map with restrictions on the allowed keys.
/// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of
/// `allowed`.
#[derive(Clone)]
pub(crate) struct Pool<K,V> {
pub(crate) struct Pool<K, V> {
extra_limit: usize,
extra_count: usize,
allowed: HashSet<K>,
current: im::HashMap<K,V>,
current: im::HashMap<K, V>,
}

impl<K,V> Pool<K,V> {
pub(crate) fn current(&self) -> &im::HashMap<K,V> {
impl<K, V> Pool<K, V> {
/// Current pool state.
pub(crate) fn current(&self) -> &im::HashMap<K, V> {
&self.current
}
}

/// Watch wrapper of the Pool.
/// Supports subscribing to the Pool membership changes.
pub(crate) struct PoolWatch<K,V>(Watch<Pool<K,V>>);
pub(crate) struct PoolWatch<K, V>(Watch<Pool<K, V>>);

impl<K: std::hash::Hash + Eq + Clone, V:Clone> PoolWatch<K,V> {
impl<K: std::hash::Hash + Eq + Clone, V: Clone> PoolWatch<K, V> {
/// Constructs a new pool.
pub(crate) fn new(allowed: HashSet<K>, extra_limit: usize) -> Self {
Self(Watch::new(Pool {
Expand All @@ -40,7 +41,7 @@ impl<K: std::hash::Hash + Eq + Clone, V:Clone> PoolWatch<K,V> {
/// Returns an error if
/// * `v` is already in the set
/// * `v` cannot be added due to size restrictions
pub(crate) async fn insert(&self, k:K,v:V) -> anyhow::Result<()> {
pub(crate) async fn insert(&self, k: K, v: V) -> anyhow::Result<()> {
self.0
.send_if_ok(|pool| {
if pool.current.contains_key(&k) {
Expand All @@ -52,7 +53,7 @@ impl<K: std::hash::Hash + Eq + Clone, V:Clone> PoolWatch<K,V> {
}
pool.extra_count += 1;
}
pool.current.insert(k,v);
pool.current.insert(k, v);
Ok(())
})
.await
Expand All @@ -71,13 +72,13 @@ impl<K: std::hash::Hash + Eq + Clone, V:Clone> PoolWatch<K,V> {
});
}

pub(crate) fn current(&self) -> im::HashMap<K,V> {
/// Copy of the current pool state.
pub(crate) fn current(&self) -> im::HashMap<K, V> {
self.0.subscribe().borrow().current.clone()
}

/// Subscribes to the set changes.
#[allow(dead_code)]
pub(crate) fn subscribe(&self) -> sync::watch::Receiver<Pool<K,V>> {
pub(crate) fn subscribe(&self) -> sync::watch::Receiver<Pool<K, V>> {
self.0.subscribe()
}
}
2 changes: 2 additions & 0 deletions node/actors/network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub(crate) struct Client<R: Rpc> {

impl<R: Rpc> Client<R> {
/// Constructs a new client.
// TODO(gprusak): at this point we don't need the clients to be reusable,
// so perhaps they should be constructed by `Servive::add_client` instead?
pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self {
Client {
limiter: limiter::Limiter::new(ctx, rate),
Expand Down
Loading

0 comments on commit aec5777

Please sign in to comment.