Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add max_batch_size to limit the SyncBatch request sizes #152

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct Config {
pub public_addr: net::Host,
/// Maximal size of the block payload.
pub max_payload_size: usize,
/// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
/// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB).
pub max_batch_size: usize,
/// Key of this node. It uniquely identifies the node.
/// It should match the secret key provided in the `node_key` file.
pub node_key: node::SecretKey,
Expand Down Expand Up @@ -107,6 +110,7 @@ impl Executor {
attester_key: self.attester.as_ref().map(|a| a.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
max_batch_size: self.config.max_batch_size.saturating_add(kB),
max_block_queue_size: 20,
tcp_accept_rate: limiter::Rate {
burst: 10,
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn config(cfg: &network::Config) -> Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
max_batch_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct Config {
pub attester_key: Option<attester::SecretKey>,
/// Maximal size of the proto-encoded `validator::FinalBlock` in bytes.
pub max_block_size: usize,
/// Maximal size of the proto-encoded `attester::SyncBatch` in bytes.
pub max_batch_size: usize,
/// If a peer doesn't respond to a ping message within `ping_timeout`,
/// the connection is dropped.
/// `None` disables sending ping messages (useful for tests).
Expand Down
15 changes: 6 additions & 9 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,19 @@ impl<'a> PushBlockStoreStateServer<'a> {
/// Represents what we know about the state of available batches on the remote peer.
struct PushBatchStoreStateServer {
state: sync::watch::Sender<BatchStoreState>,
max_batch_size: usize,
}

impl PushBatchStoreStateServer {
/// Start out not knowing anything about the remote peer.
fn new() -> Self {
fn new(max_batch_size: usize) -> Self {
Self {
state: sync::watch::channel(BatchStoreState {
first: BatchNumber(0),
last: None,
})
.0,
max_batch_size,
}
}
}
Expand All @@ -113,10 +115,7 @@ impl rpc::Handler<rpc::push_block_store_state::Rpc> for &PushBlockStoreStateServ
#[async_trait]
impl rpc::Handler<rpc::push_batch_store_state::Rpc> for &PushBatchStoreStateServer {
fn max_req_size(&self) -> usize {
// XXX: The request will actually contain a `SyncBatch` which has all the blocks in the batch,
// so a constant 10kB cannot be the right limit. There is a `max_block_size` in config which
// should come into play, with some other limit on the batch size.
10 * kB
self.max_batch_size.saturating_add(kB)
}
async fn handle(
&self,
Expand Down Expand Up @@ -178,7 +177,7 @@ impl Network {
ctx,
self.cfg.rpc.push_batch_store_state_rate,
);
let push_batch_store_state_server = PushBatchStoreStateServer::new();
let push_batch_store_state_server = PushBatchStoreStateServer::new(self.cfg.max_batch_size);
scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_client(&push_validator_addrs_client)
Expand Down Expand Up @@ -356,10 +355,8 @@ impl Network {
let ctx_with_timeout =
self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t));
let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx);
// XXX: `max_block_size` isn't the right limit here as the response
// will contain all blocks of a batch.
let batch = call
.call(ctx, &req, self.cfg.max_block_size.saturating_add(kB))
.call(ctx, &req, self.cfg.max_batch_size.saturating_add(kB))
.await?
.0
.context("empty response")?;
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ where
static_outbound: HashMap::default(),
},
max_block_size: usize::MAX,
max_batch_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
max_block_queue_size: 10,
Expand Down Expand Up @@ -146,6 +147,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config {
static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(),
},
max_block_size: usize::MAX,
max_batch_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
max_block_queue_size: 10,
Expand Down
1 change: 1 addition & 0 deletions node/tools/src/bin/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option<usize>) -> V
metrics_server_addr: None,
genesis: setup.genesis.clone(),
max_payload_size: 1000000,
max_batch_size: 100000000,
validator_key: Some(validator_keys[i].clone()),
attester_key: Some(attester_keys[i].clone()),
node_key: node_keys[i].clone(),
Expand Down
1 change: 1 addition & 0 deletions node/tools/src/bin/localnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ fn main() -> anyhow::Result<()> {
.map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)),
genesis: setup.genesis.clone(),
max_payload_size: 1000000,
max_batch_size: 100000000,
node_key: node_keys[i].clone(),
validator_key: validator_keys.get(i).cloned(),
attester_key: attester_keys.get(i).cloned(),
Expand Down
20 changes: 16 additions & 4 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zksync_consensus_network::http;
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner};
use zksync_consensus_utils::debug_page;
use zksync_protobuf::{read_required, required, ProtoFmt};
use zksync_protobuf::{kB, read_required, required, ProtoFmt};

fn read_required_secret_text<T: TextFmt>(text: &Option<String>) -> anyhow::Result<T> {
Text::new(
Expand Down Expand Up @@ -96,6 +96,7 @@ pub struct AppConfig {

pub genesis: validator::Genesis,
pub max_payload_size: usize,
pub max_batch_size: usize,
pub validator_key: Option<validator::SecretKey>,
pub attester_key: Option<attester::SecretKey>,

Expand Down Expand Up @@ -126,6 +127,16 @@ impl ProtoFmt for AppConfig {
ProtoFmt::read(e).with_context(|| format!("gossip_static_outbound[{i}]"))?;
gossip_static_outbound.insert(node_addr.key, node_addr.addr);
}

let max_payload_size = required(&r.max_payload_size)
.and_then(|x| Ok((*x).try_into()?))
.context("max_payload_size")?;

let max_batch_size = match &r.max_batch_size {
Some(x) => (*x).try_into().context("max_payload_size")?,
None => max_payload_size * 100 + kB, // Merkle proof is ~1kB and we have a batch per minute.
};

Ok(Self {
server_addr: read_required_text(&r.server_addr).context("server_addr")?,
public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()),
Expand All @@ -134,9 +145,8 @@ impl ProtoFmt for AppConfig {
.context("metrics_server_addr")?,

genesis: read_required(&r.genesis).context("genesis")?,
max_payload_size: required(&r.max_payload_size)
.and_then(|x| Ok((*x).try_into()?))
.context("max_payload_size")?,
max_payload_size,
max_batch_size,
// TODO: read secret.
validator_key: read_optional_secret_text(&r.validator_secret_key)
.context("validator_secret_key")?,
Expand Down Expand Up @@ -180,6 +190,7 @@ impl ProtoFmt for AppConfig {

genesis: Some(self.genesis.build()),
max_payload_size: Some(self.max_payload_size.try_into().unwrap()),
max_batch_size: Some(self.max_batch_size.try_into().unwrap()),
validator_secret_key: self.validator_key.as_ref().map(TextFmt::encode),
attester_secret_key: self.attester_key.as_ref().map(TextFmt::encode),

Expand Down Expand Up @@ -257,6 +268,7 @@ impl Configs {
gossip_static_inbound: self.app.gossip_static_inbound.clone(),
gossip_static_outbound: self.app.gossip_static_outbound.clone(),
max_payload_size: self.app.max_payload_size,
max_batch_size: self.app.max_batch_size,
rpc: executor::RpcConfig::default(),
debug_page: self.app.debug_page.as_ref().map(|debug_page_config| {
http::DebugPageConfig {
Expand Down
3 changes: 3 additions & 0 deletions node/tools/src/proto/mod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ message AppConfig {
// Maximal size of the block payload.
optional uint64 max_payload_size = 5; // required; bytes

// Maximal size of the sync batch payload.
optional uint64 max_batch_size = 17; // optional; bytes

// Validator secret key.
optional string validator_secret_key = 10; // optional; ValidatorSecretKey

Expand Down
1 change: 1 addition & 0 deletions node/tools/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ impl Distribution<AppConfig> for EncodeDist {

genesis: rng.gen(),
max_payload_size: rng.gen(),
max_batch_size: rng.gen(),
validator_key: self.sample_opt(|| rng.gen()),
attester_key: self.sample_opt(|| rng.gen()),

Expand Down
Loading