Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes to the network actor #85

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
fmt,
sync::Arc,
};
use zksync_concurrency::{ctx, net, scope, time};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
Expand Down Expand Up @@ -95,6 +95,10 @@ impl Executor {
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
tcp_accept_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
rpc: network::RpcConfig::default(),
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct Config {
/// the connection is dropped.
/// `None` disables sending ping messages (useful for tests).
pub ping_timeout: Option<time::Duration>,
/// Max rate at which inbound TCP connections should be accepted.
pub tcp_accept_rate: limiter::Rate,
/// Rate limiting config for RPCs.
pub rpc: RpcConfig,
}
9 changes: 6 additions & 3 deletions node/actors/network/src/consensus/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{read_required, ProtoFmt};
use zksync_protobuf::{kB, read_required, ProtoFmt};

#[cfg(test)]
mod testonly;
Expand All @@ -13,6 +13,9 @@ mod tests;
/// Timeout on performing a handshake.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of a handshake frame.
const MAX_FRAME: usize = 10 * kB;

/// First message exchanged by nodes after establishing e2e encryption.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Handshake {
Expand Down Expand Up @@ -74,7 +77,7 @@ pub(super) async fn outbound(
)
.await
.map_err(Error::Stream)?;
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand All @@ -98,7 +101,7 @@ pub(super) async fn inbound(
) -> Result<validator::PublicKey, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/consensus/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_session_id_mismatch() {
let (mut s1, s2) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s2 = s2;
let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?;
let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?;
frame::send_proto(
ctx,
&mut s2,
Expand Down Expand Up @@ -149,9 +149,7 @@ async fn test_genesis_mismatch() {
Ok(())
});
let session_id = node::SessionId(s1.id().encode());
let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size())
.await
.unwrap();
let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap();
frame::send_proto(
ctx,
&mut s1,
Expand Down Expand Up @@ -184,7 +182,7 @@ async fn test_invalid_signature() {
let (mut s0, s1) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s1 = s1;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?;
h.session_id.key = key1.public();
frame::send_proto(ctx, &mut s1, &h).await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ async fn test_genesis_mismatch() {
.unwrap();

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::listen(ctx, &mut listener)
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("listen()")?;
.context("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
Expand Down
9 changes: 6 additions & 3 deletions node/actors/network/src/gossip/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{read_required, required, ProtoFmt};
use zksync_protobuf::{kB, read_required, required, ProtoFmt};

#[cfg(test)]
mod testonly;
Expand All @@ -13,6 +13,9 @@ mod tests;
/// Timeout on performing a handshake.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of a handshake frame.
const MAX_FRAME: usize = 10 * kB;

/// First message exchanged by nodes after establishing e2e encryption.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Handshake {
Expand Down Expand Up @@ -80,7 +83,7 @@ pub(super) async fn outbound(
)
.await
.map_err(Error::Stream)?;
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand All @@ -104,7 +107,7 @@ pub(super) async fn inbound(
) -> Result<node::PublicKey, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.session_id.msg != session_id {
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/gossip/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_session_id_mismatch() {
let (mut s1, s2) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s2 = s2;
let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?;
let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?;
frame::send_proto(
ctx,
&mut s2,
Expand Down Expand Up @@ -162,9 +162,7 @@ async fn test_genesis_mismatch() {
Ok(())
});
let session_id = node::SessionId(s1.id().encode());
let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size())
.await
.unwrap();
let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap();
frame::send_proto(
ctx,
&mut s1,
Expand Down Expand Up @@ -198,7 +196,7 @@ async fn test_invalid_signature() {
let (mut s0, s1) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s1 = s1;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?;
h.session_id.key = cfg1.key.public();
frame::send_proto(ctx, &mut s1, &h).await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ async fn test_genesis_mismatch() {
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::listen(ctx, &mut listener)
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("listen()")?;
.context("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
Expand Down
34 changes: 20 additions & 14 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Network actor maintaining a pool of outbound and inbound connections to other nodes.
use anyhow::Context as _;
use std::sync::Arc;
use zksync_concurrency::{ctx, ctx::channel, scope, time};
use zksync_concurrency::{ctx, ctx::channel, limiter, scope, time};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe::ActorPipe;

Expand Down Expand Up @@ -109,15 +109,15 @@ impl Network {
impl Runner {
/// Runs the network actor.
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let mut listener = self
.net
.gossip
.cfg
.server_addr
.bind()
.context("server_addr.bind()")?;
let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
let mut listener = self
.net
.gossip
.cfg
.server_addr
.bind()
.context("server_addr.bind()")?;

scope::run!(ctx, |ctx, s| async {
// Handle incoming messages.
s.spawn(async {
// We don't propagate cancellation errors
Expand Down Expand Up @@ -165,9 +165,12 @@ impl Runner {
}
}

// TODO(gprusak): add rate limit and inflight limit for inbound handshakes.
while let Ok(stream) = metrics::MeteredStream::listen(ctx, &mut listener).await {
let stream = stream.context("listener.accept()")?;
let accept_limiter = limiter::Limiter::new(ctx, self.net.gossip.cfg.tcp_accept_rate);
loop {
accept_limiter.acquire(ctx, 1).await?;
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("accept()")?;
s.spawn(async {
let res = async {
let (stream, endpoint) = preface::accept(ctx, stream)
Expand Down Expand Up @@ -198,8 +201,11 @@ impl Runner {
Ok(())
});
}
Ok(())
})
.await
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}
}
2 changes: 1 addition & 1 deletion node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl MeteredStream {
}

/// Accepts an inbound connection and returns a metered stream.
pub(crate) async fn listen(
pub(crate) async fn accept(
ctx: &ctx::Ctx,
listener: &mut net::tcp::Listener,
) -> ctx::OrCanceled<io::Result<Self>> {
Expand Down
9 changes: 4 additions & 5 deletions node/actors/network/src/mux/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::CapabilityId;
use crate::proto::mux as proto;
use anyhow::Context as _;
use std::collections::HashMap;
use zksync_protobuf::required;
use zksync_protobuf::{kB, required};

/// Max size of a handshkake frame.
pub(super) const MAX_FRAME: usize = 10 * kB;

pub(super) struct Handshake {
/// Maximal supported number of the accept streams per capability.
Expand Down Expand Up @@ -40,10 +43,6 @@ fn build_capabilities(
impl zksync_protobuf::ProtoFmt for Handshake {
type Proto = proto::Handshake;

fn max_size() -> usize {
zksync_protobuf::kB
}

fn read(r: &Self::Proto) -> anyhow::Result<Self> {
Ok(Self {
accept_max_streams: read_max_streams(&r.accept).context("accept")?,
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ use crate::{frame, noise::bytes};
use anyhow::Context as _;
use std::{collections::BTreeMap, sync::Arc};
use zksync_concurrency::{ctx, ctx::channel, io, scope, sync};
use zksync_protobuf::ProtoFmt as _;

mod config;
mod handshake;
Expand Down Expand Up @@ -279,7 +278,7 @@ impl Mux {
let h = self.handshake();
frame::send_proto(ctx, &mut write, &h).await
});
frame::recv_proto(ctx, &mut read, Handshake::max_size()).await
frame::recv_proto(ctx, &mut read, handshake::MAX_FRAME).await
})
.await
.map_err(RunError::Protocol)?;
Expand Down
6 changes: 2 additions & 4 deletions node/actors/network/src/mux/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
},
};
use zksync_concurrency::{ctx, scope, testonly::abort_on_panic};
use zksync_protobuf::ProtoFmt as _;

mod proto;

Expand Down Expand Up @@ -122,7 +121,7 @@ async fn run_server(
s.spawn(async {
let mut stream = stream;
while let Ok((req, _)) =
frame::mux_recv_proto::<Req>(ctx, &mut stream.read, Req::max_size()).await
frame::mux_recv_proto::<Req>(ctx, &mut stream.read, usize::MAX).await
{
let resp = Resp {
output: rpc_handler(req.0),
Expand Down Expand Up @@ -164,8 +163,7 @@ async fn run_client(
frame::mux_send_proto(ctx, &mut stream.write, &Req(req.clone())).await?;
stream.write.flush(ctx).await?;
let (resp, _) =
frame::mux_recv_proto::<Resp>(ctx, &mut stream.read, Resp::max_size())
.await?;
frame::mux_recv_proto::<Resp>(ctx, &mut stream.read, usize::MAX).await?;
assert_eq!(resp.output, rpc_handler(req));
assert_eq!(resp.capability_id, cap);
}
Expand Down
18 changes: 9 additions & 9 deletions node/actors/network/src/preface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
//! and multiplex between multiple endpoints available on the same TCP port.
use crate::{frame, metrics, noise, proto::preface as proto};
use zksync_concurrency::{ctx, time};
use zksync_protobuf::{required, ProtoFmt};
use zksync_protobuf::{kB, required, ProtoFmt};

/// Timeout on executing the preface protocol.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of the frames exchanged during preface.
const MAX_FRAME: usize = 10 * kB;

/// E2E encryption protocol to use on a TCP connection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Encryption {
Expand All @@ -32,9 +35,6 @@ pub(crate) enum Endpoint {

impl ProtoFmt for Encryption {
type Proto = proto::Encryption;
fn max_size() -> usize {
10 * zksync_protobuf::kB
}
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
use proto::encryption::T;
Ok(match required(&r.t)? {
Expand All @@ -52,9 +52,6 @@ impl ProtoFmt for Encryption {

impl ProtoFmt for Endpoint {
type Proto = proto::Endpoint;
fn max_size() -> usize {
10 * zksync_protobuf::kB
}
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
use proto::endpoint::T;
Ok(match required(&r.t)? {
Expand Down Expand Up @@ -92,8 +89,11 @@ pub(crate) async fn accept(
mut stream: metrics::MeteredStream,
) -> anyhow::Result<(noise::Stream, Endpoint)> {
let ctx = &ctx.with_timeout(TIMEOUT);
let _: Encryption = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?;
let encryption: Encryption = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?;
if encryption != Encryption::NoiseNN {
anyhow::bail!("unsupported encryption protocol: {encryption:?}");
}
let mut stream = noise::Stream::server_handshake(ctx, stream).await?;
let endpoint = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?;
let endpoint = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?;
Ok((stream, endpoint))
}
4 changes: 3 additions & 1 deletion node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zksync_concurrency::{ctx, ctx::channel, io, net, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -69,6 +69,7 @@ pub fn new_configs(
static_outbound: HashMap::default(),
},
max_block_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
}
});
Expand Down Expand Up @@ -104,6 +105,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config {
static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(),
},
max_block_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
}
}
Expand Down
Loading
Loading