Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added build_version label to gossip handshake (BFT-502) #185

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct Validator {
/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
/// Label identifying the build version of the binary that this node is running.
/// There is no specific semantics assigned to it.
pub build_version: Option<String>,
/// IP:port to listen on, for incoming TCP connections.
/// Use `0.0.0.0:<port>` to listen on all network interfaces (i.e. on all IPs exposed by this VM).
pub server_addr: std::net::SocketAddr,
Expand Down Expand Up @@ -99,6 +102,7 @@ impl Executor {
/// Extracts a network crate config.
fn network_config(&self) -> network::Config {
network::Config {
build_version: self.config.build_version.clone(),
server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use zksync_consensus_storage::{

fn config(cfg: &network::Config) -> Config {
Config {
build_version: None,
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
Expand Down
3 changes: 3 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub struct GossipConfig {
/// Network actor config.
#[derive(Debug, Clone)]
pub struct Config {
/// Label identifying the build version of the binary that this node is running.
/// There is no specific semantics assigned to it.
pub build_version: Option<String>,
/// TCP socket address to listen for inbound connections at.
pub server_addr: net::tcp::ListenerAddr,
/// Public TCP address that other nodes are expected to connect to.
Expand Down
8 changes: 2 additions & 6 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ impl Network {
) -> anyhow::Result<()> {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound
.insert(peer.clone(), stream.get_values())
.await?;
self.inbound.insert(peer.clone(), stream.stats()).await?;
tracing::info!("peer = {peer:?}");
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
Expand Down Expand Up @@ -211,9 +209,7 @@ impl Network {
peer,
)
.await?;
self.outbound
.insert(peer.clone(), stream.get_values())
.await?;
self.outbound.insert(peer.clone(), stream.stats()).await?;
tracing::info!("peer = {peer:?}");
let consensus_cli =
rpc::Client::<rpc::consensus::Rpc>::new(ctx, self.gossip.cfg.rpc.consensus_rate);
Expand Down
40 changes: 29 additions & 11 deletions node/actors/network/src/gossip/handshake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{frame, noise, proto::gossip as proto, GossipConfig};
use super::Connection;
use crate::{frame, noise, proto::gossip as proto, Config};
use anyhow::Context as _;
use std::sync::Arc;
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
Expand Down Expand Up @@ -28,6 +30,9 @@ pub(crate) struct Handshake {
/// Information whether the peer treats this connection as static.
/// It is informational only, it doesn't affect the logic of the node.
pub(crate) is_static: bool,
/// Version at which peer's binary has been built.
/// It is declared by peer (i.e. not verified in any way).
pub(crate) build_version: Option<String>,
}

impl ProtoFmt for Handshake {
Expand All @@ -37,13 +42,15 @@ impl ProtoFmt for Handshake {
session_id: read_required(&r.session_id).context("session_id")?,
genesis: read_required(&r.genesis).context("genesis")?,
is_static: *required(&r.is_static).context("is_static")?,
build_version: r.build_version.clone(),
})
}
fn build(&self) -> Self::Proto {
Self::Proto {
session_id: Some(self.session_id.build()),
genesis: Some(self.genesis.build()),
is_static: Some(self.is_static),
build_version: self.build_version.clone(),
}
}
}
Expand All @@ -65,20 +72,21 @@ pub(super) enum Error {

pub(super) async fn outbound(
ctx: &ctx::Ctx,
cfg: &GossipConfig,
cfg: &Config,
genesis: validator::GenesisHash,
stream: &mut noise::Stream,
peer: &node::PublicKey,
) -> Result<(), Error> {
) -> Result<Connection, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
frame::send_proto(
ctx,
stream,
&Handshake {
session_id: cfg.key.sign_msg(session_id.clone()),
session_id: cfg.gossip.key.sign_msg(session_id.clone()),
genesis,
is_static: cfg.static_outbound.contains_key(peer),
is_static: cfg.gossip.static_outbound.contains_key(peer),
build_version: cfg.build_version.clone(),
},
)
.await
Expand All @@ -96,15 +104,19 @@ pub(super) async fn outbound(
return Err(Error::PeerMismatch);
}
h.session_id.verify()?;
Ok(())
Ok(Connection {
key: h.session_id.key,
build_version: h.build_version,
stats: stream.stats(),
})
}

pub(super) async fn inbound(
ctx: &ctx::Ctx,
cfg: &GossipConfig,
cfg: &Config,
genesis: validator::GenesisHash,
stream: &mut noise::Stream,
) -> Result<node::PublicKey, Error> {
) -> Result<Arc<Connection>, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
Expand All @@ -121,12 +133,18 @@ pub(super) async fn inbound(
ctx,
stream,
&Handshake {
session_id: cfg.key.sign_msg(session_id.clone()),
build_version: cfg.build_version.clone(),
session_id: cfg.gossip.key.sign_msg(session_id.clone()),
genesis,
is_static: cfg.static_inbound.contains(&h.session_id.key),
is_static: cfg.gossip.static_inbound.contains(&h.session_id.key),
},
)
.await
.wrap("send_proto()")?;
Ok(h.session_id.key)
Ok(Connection {
key: h.session_id.key,
build_version: h.build_version,
stats: stream.stats(),
}
.into())
}
3 changes: 2 additions & 1 deletion node/actors/network/src/gossip/handshake/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! if tests require stricter properties of the generated data.
use super::Handshake;
use rand::{
distributions::{Distribution, Standard},
distributions::{Alphanumeric, DistString, Distribution, Standard},
Rng,
};
use zksync_consensus_roles::node;
Expand All @@ -17,6 +17,7 @@ impl Distribution<Handshake> for Standard {
session_id: key.sign_msg(session_id),
genesis: rng.gen(),
is_static: rng.gen(),
build_version: Some(Alphanumeric.sample_string(rng, 10)),
}
}
}
73 changes: 40 additions & 33 deletions node/actors/network/src/gossip/handshake/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::*;
use crate::{frame, noise, testonly, GossipConfig};
use crate::{frame, noise, testonly};
use assert_matches::assert_matches;
use rand::Rng;
use std::collections::{HashMap, HashSet};
use zksync_concurrency::{ctx, io, scope, testonly::abort_on_panic};
use zksync_consensus_roles::node;
use zksync_protobuf::testonly::test_encode_random;
Expand All @@ -13,23 +12,14 @@ fn test_schema_encode_decode() {
test_encode_random::<Handshake>(rng);
}

fn make_cfg<R: Rng>(rng: &mut R) -> GossipConfig {
GossipConfig {
key: rng.gen(),
dynamic_inbound_limit: 0,
static_inbound: HashSet::default(),
static_outbound: HashMap::default(),
}
}

#[tokio::test]
async fn test_session_id_mismatch() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let cfg0 = make_cfg(rng);
let cfg1 = make_cfg(rng);
let cfg0 = testonly::make_config(rng.gen());
let cfg1 = testonly::make_config(rng.gen());
let genesis: validator::GenesisHash = rng.gen();

// MitM attempt detected on the inbound end.
Expand All @@ -55,7 +45,7 @@ async fn test_session_id_mismatch() {
});
s.spawn(async {
let mut s1 = s1;
match outbound(ctx, &cfg1, genesis, &mut s1, &cfg0.key.public()).await {
match outbound(ctx, &cfg1, genesis, &mut s1, &cfg0.gossip.key.public()).await {
Err(Error::Stream(..)) => Ok(()),
res => panic!("unexpected res: {res:?}"),
}
Expand All @@ -75,15 +65,16 @@ async fn test_session_id_mismatch() {
ctx,
&mut s2,
&Handshake {
session_id: cfg1.key.sign_msg(rng.gen::<node::SessionId>()),
session_id: cfg1.gossip.key.sign_msg(rng.gen::<node::SessionId>()),
genesis,
is_static: false,
build_version: None,
},
)
.await?;
Ok(())
});
match outbound(ctx, &cfg0, genesis, &mut s1, &cfg1.key.public()).await {
match outbound(ctx, &cfg0, genesis, &mut s1, &cfg1.gossip.key.public()).await {
Err(Error::SessionIdMismatch) => anyhow::Ok(()),
res => panic!("unexpected res: {res:?}"),
}
Expand All @@ -98,9 +89,9 @@ async fn test_peer_mismatch() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let cfg0 = make_cfg(rng);
let cfg1 = make_cfg(rng);
let cfg2 = make_cfg(rng);
let cfg0 = testonly::make_config(rng.gen());
let cfg1 = testonly::make_config(rng.gen());
let cfg2 = testonly::make_config(rng.gen());

let genesis: validator::GenesisHash = rng.gen();

Expand All @@ -109,14 +100,14 @@ async fn test_peer_mismatch() {
s.spawn(async {
let mut s0 = s0;
assert_eq!(
cfg1.key.public(),
inbound(ctx, &cfg0, genesis, &mut s0).await?
cfg1.gossip.key.public(),
inbound(ctx, &cfg0, genesis, &mut s0).await?.key
);
Ok(())
});
s.spawn(async {
let mut s1 = s1;
match outbound(ctx, &cfg1, genesis, &mut s1, &cfg2.key.public()).await {
match outbound(ctx, &cfg1, genesis, &mut s1, &cfg2.gossip.key.public()).await {
Err(Error::PeerMismatch) => Ok(()),
res => panic!("unexpected res: {res:?}"),
}
Expand All @@ -133,15 +124,22 @@ async fn test_genesis_mismatch() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let cfg0 = make_cfg(rng);
let cfg1 = make_cfg(rng);
let cfg0 = testonly::make_config(rng.gen());
let cfg1 = testonly::make_config(rng.gen());

tracing::info!("test that inbound handshake rejects mismatching genesis");
scope::run!(ctx, |ctx, s| async {
let (s0, mut s1) = noise::testonly::pipe(ctx).await;
s.spawn(async {
let mut s0 = s0;
let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await;
let res = outbound(
ctx,
&cfg0,
ctx.rng().gen(),
&mut s0,
&cfg1.gossip.key.public(),
)
.await;
assert_matches!(res, Err(Error::Stream(_)));
Ok(())
});
Expand All @@ -157,7 +155,14 @@ async fn test_genesis_mismatch() {
let (s0, mut s1) = noise::testonly::pipe(ctx).await;
s.spawn(async {
let mut s0 = s0;
let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await;
let res = outbound(
ctx,
&cfg0,
ctx.rng().gen(),
&mut s0,
&cfg1.gossip.key.public(),
)
.await;
assert_matches!(res, Err(Error::GenesisMismatch));
Ok(())
});
Expand All @@ -167,9 +172,10 @@ async fn test_genesis_mismatch() {
ctx,
&mut s1,
&Handshake {
session_id: cfg1.key.sign_msg(session_id),
session_id: cfg1.gossip.key.sign_msg(session_id),
genesis: rng.gen(),
is_static: false,
build_version: None,
},
)
.await
Expand All @@ -186,8 +192,8 @@ async fn test_invalid_signature() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let cfg0 = make_cfg(rng);
let cfg1 = make_cfg(rng);
let cfg0 = testonly::make_config(rng.gen());
let cfg1 = testonly::make_config(rng.gen());

let genesis: validator::GenesisHash = rng.gen();

Expand All @@ -197,11 +203,11 @@ async fn test_invalid_signature() {
s.spawn_bg(async {
let mut s1 = s1;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?;
h.session_id.key = cfg1.key.public();
h.session_id.key = cfg1.gossip.key.public();
frame::send_proto(ctx, &mut s1, &h).await?;
Ok(())
});
match outbound(ctx, &cfg0, genesis, &mut s0, &cfg1.key.public()).await {
match outbound(ctx, &cfg0, genesis, &mut s0, &cfg1.gossip.key.public()).await {
Err(Error::Signature(..)) => anyhow::Ok(()),
res => panic!("unexpected res: {res:?}"),
}
Expand All @@ -215,11 +221,12 @@ async fn test_invalid_signature() {
s.spawn_bg(async {
let mut s1 = s1;
let mut h = Handshake {
session_id: cfg0.key.sign_msg(node::SessionId(s1.id().encode())),
session_id: cfg0.gossip.key.sign_msg(node::SessionId(s1.id().encode())),
genesis,
is_static: true,
build_version: None,
};
h.session_id.key = cfg1.key.public();
h.session_id.key = cfg1.gossip.key.public();
frame::send_proto(ctx, &mut s1, &h).await
});
match inbound(ctx, &cfg0, genesis, &mut s0).await {
Expand Down
9 changes: 2 additions & 7 deletions node/actors/network/src/gossip/loadtest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Loadtest of the gossip endpoint of a node.
use crate::{gossip, mux, noise, preface, rpc, GossipConfig};
use crate::{gossip, mux, noise, preface, rpc, testonly};
use anyhow::Context as _;
use async_trait::async_trait;
use rand::Rng;
Expand Down Expand Up @@ -91,12 +91,7 @@ impl Loadtest {
let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet)
.await
.context("connect()")?;
let cfg = GossipConfig {
key: node::SecretKey::generate(),
dynamic_inbound_limit: 0,
static_inbound: [].into(),
static_outbound: [].into(),
};
let cfg = testonly::make_config(ctx.rng().gen());
gossip::handshake::outbound(ctx, &cfg, self.genesis.hash(), &mut stream, &self.peer)
.await
.context("handshake")?;
Expand Down
Loading
Loading