diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index cec596d8..9c4e4688 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -6,7 +6,7 @@ use std::{ fmt, sync::Arc, }; -use zksync_concurrency::{ctx, net, scope, time}; +use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; use zksync_consensus_network as network; use zksync_consensus_roles::{node, validator}; @@ -95,6 +95,10 @@ impl Executor { validator_key: self.validator.as_ref().map(|v| v.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), max_block_size: self.config.max_payload_size.saturating_add(kB), + tcp_accept_rate: limiter::Rate { + burst: 10, + refresh: time::Duration::milliseconds(100), + }, rpc: network::RpcConfig::default(), } } diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 66e56f51..c9b46f82 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -80,6 +80,8 @@ pub struct Config { /// the connection is dropped. /// `None` disables sending ping messages (useful for tests). pub ping_timeout: Option, + /// Max rate at which inbound TCP connections should be accepted. + pub tcp_accept_rate: limiter::Rate, /// Rate limiting config for RPCs. pub rpc: RpcConfig, } diff --git a/node/actors/network/src/consensus/handshake/mod.rs b/node/actors/network/src/consensus/handshake/mod.rs index 5c96c74d..a023fc6c 100644 --- a/node/actors/network/src/consensus/handshake/mod.rs +++ b/node/actors/network/src/consensus/handshake/mod.rs @@ -3,7 +3,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, time}; use zksync_consensus_crypto::ByteFmt; use zksync_consensus_roles::{node, validator}; -use zksync_protobuf::{read_required, ProtoFmt}; +use zksync_protobuf::{kB, read_required, ProtoFmt}; #[cfg(test)] mod testonly; @@ -13,6 +13,9 @@ mod tests; /// Timeout on performing a handshake. const TIMEOUT: time::Duration = time::Duration::seconds(5); +/// Max size of a handshake frame. +const MAX_FRAME: usize = 10 * kB; + /// First message exchanged by nodes after establishing e2e encryption. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Handshake { @@ -74,7 +77,7 @@ pub(super) async fn outbound( ) .await .map_err(Error::Stream)?; - let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size()) + let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME) .await .map_err(Error::Stream)?; if h.genesis != genesis { @@ -98,7 +101,7 @@ pub(super) async fn inbound( ) -> Result { let ctx = &ctx.with_timeout(TIMEOUT); let session_id = node::SessionId(stream.id().encode()); - let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size()) + let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME) .await .map_err(Error::Stream)?; if h.genesis != genesis { diff --git a/node/actors/network/src/consensus/handshake/tests.rs b/node/actors/network/src/consensus/handshake/tests.rs index ce69744b..22209d6f 100644 --- a/node/actors/network/src/consensus/handshake/tests.rs +++ b/node/actors/network/src/consensus/handshake/tests.rs @@ -61,7 +61,7 @@ async fn test_session_id_mismatch() { let (mut s1, s2) = noise::testonly::pipe(ctx).await; s.spawn_bg(async { let mut s2 = s2; - let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?; + let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?; frame::send_proto( ctx, &mut s2, @@ -149,9 +149,7 @@ async fn test_genesis_mismatch() { Ok(()) }); let session_id = node::SessionId(s1.id().encode()); - let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()) - .await - .unwrap(); + let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap(); frame::send_proto( ctx, &mut s1, @@ -184,7 +182,7 @@ async fn test_invalid_signature() { let (mut s0, s1) = noise::testonly::pipe(ctx).await; s.spawn_bg(async { let mut s1 = s1; - let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?; + let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?; h.session_id.key = key1.public(); frame::send_proto(ctx, &mut s1, &h).await?; Ok(()) diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index 6e23a5a3..1c752378 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -96,9 +96,9 @@ async fn test_genesis_mismatch() { .unwrap(); tracing::info!("Accept a connection with mismatching genesis."); - let stream = metrics::MeteredStream::listen(ctx, &mut listener) + let stream = metrics::MeteredStream::accept(ctx, &mut listener) .await? - .context("listen()")?; + .context("accept()")?; let (mut stream, endpoint) = preface::accept(ctx, stream) .await .context("preface::accept()")?; diff --git a/node/actors/network/src/gossip/handshake/mod.rs b/node/actors/network/src/gossip/handshake/mod.rs index de412474..1c86b72c 100644 --- a/node/actors/network/src/gossip/handshake/mod.rs +++ b/node/actors/network/src/gossip/handshake/mod.rs @@ -3,7 +3,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, time}; use zksync_consensus_crypto::ByteFmt; use zksync_consensus_roles::{node, validator}; -use zksync_protobuf::{read_required, required, ProtoFmt}; +use zksync_protobuf::{kB, read_required, required, ProtoFmt}; #[cfg(test)] mod testonly; @@ -13,6 +13,9 @@ mod tests; /// Timeout on performing a handshake. const TIMEOUT: time::Duration = time::Duration::seconds(5); +/// Max size of a handshake frame. +const MAX_FRAME: usize = 10 * kB; + /// First message exchanged by nodes after establishing e2e encryption. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Handshake { @@ -80,7 +83,7 @@ pub(super) async fn outbound( ) .await .map_err(Error::Stream)?; - let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size()) + let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME) .await .map_err(Error::Stream)?; if h.genesis != genesis { @@ -104,7 +107,7 @@ pub(super) async fn inbound( ) -> Result { let ctx = &ctx.with_timeout(TIMEOUT); let session_id = node::SessionId(stream.id().encode()); - let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size()) + let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME) .await .map_err(Error::Stream)?; if h.session_id.msg != session_id { diff --git a/node/actors/network/src/gossip/handshake/tests.rs b/node/actors/network/src/gossip/handshake/tests.rs index efb3471a..2e4e275d 100644 --- a/node/actors/network/src/gossip/handshake/tests.rs +++ b/node/actors/network/src/gossip/handshake/tests.rs @@ -70,7 +70,7 @@ async fn test_session_id_mismatch() { let (mut s1, s2) = noise::testonly::pipe(ctx).await; s.spawn_bg(async { let mut s2 = s2; - let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?; + let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?; frame::send_proto( ctx, &mut s2, @@ -162,9 +162,7 @@ async fn test_genesis_mismatch() { Ok(()) }); let session_id = node::SessionId(s1.id().encode()); - let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()) - .await - .unwrap(); + let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap(); frame::send_proto( ctx, &mut s1, @@ -198,7 +196,7 @@ async fn test_invalid_signature() { let (mut s0, s1) = noise::testonly::pipe(ctx).await; s.spawn_bg(async { let mut s1 = s1; - let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?; + let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?; h.session_id.key = cfg1.key.public(); frame::send_proto(ctx, &mut s1, &h).await?; Ok(()) diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 85e4d0d1..40a50875 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -278,9 +278,9 @@ async fn test_genesis_mismatch() { s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Accept a connection with mismatching genesis."); - let stream = metrics::MeteredStream::listen(ctx, &mut listener) + let stream = metrics::MeteredStream::accept(ctx, &mut listener) .await? - .context("listen()")?; + .context("accept()")?; let (mut stream, endpoint) = preface::accept(ctx, stream) .await .context("preface::accept()")?; diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index c8d3e70d..8f6acf1b 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -1,7 +1,7 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; use std::sync::Arc; -use zksync_concurrency::{ctx, ctx::channel, scope, time}; +use zksync_concurrency::{ctx, ctx::channel, limiter, scope, time}; use zksync_consensus_storage::BlockStore; use zksync_consensus_utils::pipe::ActorPipe; @@ -109,15 +109,15 @@ impl Network { impl Runner { /// Runs the network actor. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let mut listener = self - .net - .gossip - .cfg - .server_addr - .bind() - .context("server_addr.bind()")?; + let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async { + let mut listener = self + .net + .gossip + .cfg + .server_addr + .bind() + .context("server_addr.bind()")?; - scope::run!(ctx, |ctx, s| async { // Handle incoming messages. s.spawn(async { // We don't propagate cancellation errors @@ -165,9 +165,12 @@ impl Runner { } } - // TODO(gprusak): add rate limit and inflight limit for inbound handshakes. - while let Ok(stream) = metrics::MeteredStream::listen(ctx, &mut listener).await { - let stream = stream.context("listener.accept()")?; + let accept_limiter = limiter::Limiter::new(ctx, self.net.gossip.cfg.tcp_accept_rate); + loop { + accept_limiter.acquire(ctx, 1).await?; + let stream = metrics::MeteredStream::accept(ctx, &mut listener) + .await? + .context("accept()")?; s.spawn(async { let res = async { let (stream, endpoint) = preface::accept(ctx, stream) @@ -198,8 +201,11 @@ impl Runner { Ok(()) }); } - Ok(()) }) - .await + .await; + match res { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } } } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 5cf1c52f..33359805 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -31,7 +31,7 @@ impl MeteredStream { } /// Accepts an inbound connection and returns a metered stream. - pub(crate) async fn listen( + pub(crate) async fn accept( ctx: &ctx::Ctx, listener: &mut net::tcp::Listener, ) -> ctx::OrCanceled> { diff --git a/node/actors/network/src/mux/handshake.rs b/node/actors/network/src/mux/handshake.rs index 1105e079..3c84c0a1 100644 --- a/node/actors/network/src/mux/handshake.rs +++ b/node/actors/network/src/mux/handshake.rs @@ -2,7 +2,10 @@ use super::CapabilityId; use crate::proto::mux as proto; use anyhow::Context as _; use std::collections::HashMap; -use zksync_protobuf::required; +use zksync_protobuf::{kB, required}; + +/// Max size of a handshkake frame. +pub(super) const MAX_FRAME: usize = 10 * kB; pub(super) struct Handshake { /// Maximal supported number of the accept streams per capability. @@ -40,10 +43,6 @@ fn build_capabilities( impl zksync_protobuf::ProtoFmt for Handshake { type Proto = proto::Handshake; - fn max_size() -> usize { - zksync_protobuf::kB - } - fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { accept_max_streams: read_max_streams(&r.accept).context("accept")?, diff --git a/node/actors/network/src/mux/mod.rs b/node/actors/network/src/mux/mod.rs index 6cbd9e22..b8b4d0c3 100644 --- a/node/actors/network/src/mux/mod.rs +++ b/node/actors/network/src/mux/mod.rs @@ -81,7 +81,6 @@ use crate::{frame, noise::bytes}; use anyhow::Context as _; use std::{collections::BTreeMap, sync::Arc}; use zksync_concurrency::{ctx, ctx::channel, io, scope, sync}; -use zksync_protobuf::ProtoFmt as _; mod config; mod handshake; @@ -279,7 +278,7 @@ impl Mux { let h = self.handshake(); frame::send_proto(ctx, &mut write, &h).await }); - frame::recv_proto(ctx, &mut read, Handshake::max_size()).await + frame::recv_proto(ctx, &mut read, handshake::MAX_FRAME).await }) .await .map_err(RunError::Protocol)?; diff --git a/node/actors/network/src/mux/tests/mod.rs b/node/actors/network/src/mux/tests/mod.rs index 0c43d0ea..3106c047 100644 --- a/node/actors/network/src/mux/tests/mod.rs +++ b/node/actors/network/src/mux/tests/mod.rs @@ -9,7 +9,6 @@ use std::{ }, }; use zksync_concurrency::{ctx, scope, testonly::abort_on_panic}; -use zksync_protobuf::ProtoFmt as _; mod proto; @@ -122,7 +121,7 @@ async fn run_server( s.spawn(async { let mut stream = stream; while let Ok((req, _)) = - frame::mux_recv_proto::(ctx, &mut stream.read, Req::max_size()).await + frame::mux_recv_proto::(ctx, &mut stream.read, usize::MAX).await { let resp = Resp { output: rpc_handler(req.0), @@ -164,8 +163,7 @@ async fn run_client( frame::mux_send_proto(ctx, &mut stream.write, &Req(req.clone())).await?; stream.write.flush(ctx).await?; let (resp, _) = - frame::mux_recv_proto::(ctx, &mut stream.read, Resp::max_size()) - .await?; + frame::mux_recv_proto::(ctx, &mut stream.read, usize::MAX).await?; assert_eq!(resp.output, rpc_handler(req)); assert_eq!(resp.capability_id, cap); } diff --git a/node/actors/network/src/preface.rs b/node/actors/network/src/preface.rs index 78f04ea7..ff37ec98 100644 --- a/node/actors/network/src/preface.rs +++ b/node/actors/network/src/preface.rs @@ -9,11 +9,14 @@ //! and multiplex between multiple endpoints available on the same TCP port. use crate::{frame, metrics, noise, proto::preface as proto}; use zksync_concurrency::{ctx, time}; -use zksync_protobuf::{required, ProtoFmt}; +use zksync_protobuf::{kB, required, ProtoFmt}; /// Timeout on executing the preface protocol. const TIMEOUT: time::Duration = time::Duration::seconds(5); +/// Max size of the frames exchanged during preface. +const MAX_FRAME: usize = 10 * kB; + /// E2E encryption protocol to use on a TCP connection. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Encryption { @@ -32,9 +35,6 @@ pub(crate) enum Endpoint { impl ProtoFmt for Encryption { type Proto = proto::Encryption; - fn max_size() -> usize { - 10 * zksync_protobuf::kB - } fn read(r: &Self::Proto) -> anyhow::Result { use proto::encryption::T; Ok(match required(&r.t)? { @@ -52,9 +52,6 @@ impl ProtoFmt for Encryption { impl ProtoFmt for Endpoint { type Proto = proto::Endpoint; - fn max_size() -> usize { - 10 * zksync_protobuf::kB - } fn read(r: &Self::Proto) -> anyhow::Result { use proto::endpoint::T; Ok(match required(&r.t)? { @@ -92,8 +89,11 @@ pub(crate) async fn accept( mut stream: metrics::MeteredStream, ) -> anyhow::Result<(noise::Stream, Endpoint)> { let ctx = &ctx.with_timeout(TIMEOUT); - let _: Encryption = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?; + let encryption: Encryption = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?; + if encryption != Encryption::NoiseNN { + anyhow::bail!("unsupported encryption protocol: {encryption:?}"); + } let mut stream = noise::Stream::server_handshake(ctx, stream).await?; - let endpoint = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?; + let endpoint = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?; Ok((stream, endpoint)) } diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 3a55c324..0241ed8e 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -6,7 +6,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use zksync_concurrency::{ctx, ctx::channel, io, net, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_consensus_utils::pipe; @@ -69,6 +69,7 @@ pub fn new_configs( static_outbound: HashMap::default(), }, max_block_size: usize::MAX, + tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), } }); @@ -104,6 +105,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, + tcp_accept_rate: limiter::Rate::INF, rpc: RpcConfig::default(), } } diff --git a/node/libs/concurrency/src/limiter/mod.rs b/node/libs/concurrency/src/limiter/mod.rs index 9f609b26..6772a64c 100644 --- a/node/libs/concurrency/src/limiter/mod.rs +++ b/node/libs/concurrency/src/limiter/mod.rs @@ -18,6 +18,14 @@ pub struct Rate { pub refresh: time::Duration, } +impl Rate { + /// Infinite refresh rate. + pub const INF: Rate = Rate { + burst: usize::MAX, + refresh: time::Duration::ZERO, + }; +} + /// Representation of Duration in nanoseconds. /// Duration is equivalent to {seconds:i64,nanos:i32}, /// so Nanos is a strictly bigger type. diff --git a/node/libs/protobuf/src/proto_fmt.rs b/node/libs/protobuf/src/proto_fmt.rs index db8b67e7..5a4f4a49 100644 --- a/node/libs/protobuf/src/proto_fmt.rs +++ b/node/libs/protobuf/src/proto_fmt.rs @@ -273,10 +273,6 @@ pub trait ProtoFmt: Sized { fn read(r: &Self::Proto) -> anyhow::Result; /// Converts Self to Proto. fn build(&self) -> Self::Proto; - /// Maximal allowed message size (in bytes). - fn max_size() -> usize { - usize::MAX - } } /// Parses a required proto field.