Skip to content

Commit

Permalink
Fixed network actor. All tests pass now.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 22, 2024
1 parent 7f881f5 commit 2b0b30c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 43 deletions.
17 changes: 5 additions & 12 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ impl MsgPool {
// an implementation detail of the bft crate. Consider moving
// this logic there.
match (&v.message.msg, &msg.message.msg) {
(M::ReplicaPrepare(_), M::ReplicaPrepare(_)) => {}
(M::LeaderProposal(_), M::LeaderProposal(_)) => {}
(M::ReplicaCommit(_), M::ReplicaCommit(_)) => {}
(M::LeaderPrepare(_), M::LeaderPrepare(_)) => {}
(M::LeaderCommit(_), M::LeaderCommit(_)) => {}
(M::ReplicaNewView(_), M::ReplicaNewView(_)) => {}
(M::ReplicaTimeout(_), M::ReplicaTimeout(_)) => {}
_ => return true,
}
// If pool contains a message of the same type which is newer,
Expand Down Expand Up @@ -229,15 +229,8 @@ impl Network {
let mut sub = self.msg_pool.subscribe();
loop {
let call = consensus_cli.reserve(ctx).await?;
let msg = loop {
let msg = sub.recv(ctx).await?;
match &msg.recipient {
io::Target::Broadcast => {}
io::Target::Validator(recipient) if recipient == peer => {}
_ => continue,
}
break msg.message.clone();
};
let msg = sub.recv(ctx).await?.message.clone();

s.spawn(async {
let req = rpc::consensus::Req(msg);
let res = call.call(ctx, &req, RESP_MAX_SIZE).await;
Expand Down
11 changes: 3 additions & 8 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ async fn test_msg_pool() {
// We keep them sorted by type and view, so that it is easy to
// compute the expected state of the pool after insertions.
let msgs = [
gen(&mut || M::ReplicaPrepare(rng.gen())),
gen(&mut || M::LeaderProposal(rng.gen())),
gen(&mut || M::ReplicaCommit(rng.gen())),
gen(&mut || M::LeaderPrepare(rng.gen())),
gen(&mut || M::LeaderCommit(rng.gen())),
gen(&mut || M::ReplicaNewView(rng.gen())),
gen(&mut || M::ReplicaTimeout(rng.gen())),
];

// Insert messages at random.
Expand All @@ -42,7 +42,6 @@ async fn test_msg_pool() {
want[i] = Some(want[i].unwrap_or(0).max(j));
pool.send(Arc::new(io::ConsensusInputMessage {
message: msgs[i][j].clone(),
recipient: io::Target::Broadcast,
}));
// Here we compare the internal state of the pool to the expected state.
// Note that we compare sets of crypto hashes of messages, because the messages themselves do not
Expand Down Expand Up @@ -310,9 +309,6 @@ async fn test_transmission() {
let want: validator::Signed<validator::ConsensusMsg> = want.cast().unwrap();
let in_message = io::ConsensusInputMessage {
message: want.clone(),
recipient: io::Target::Validator(
nodes[1].cfg().validator_key.as_ref().unwrap().public(),
),
};
nodes[0].pipe.send(in_message.into());

Expand Down Expand Up @@ -355,7 +351,6 @@ async fn test_retransmission() {
node0.pipe.send(
io::ConsensusInputMessage {
message: want.clone(),
recipient: io::Target::Broadcast,
}
.into(),
);
Expand Down
7 changes: 0 additions & 7 deletions node/actors/network/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub enum InputMessage {
#[derive(Debug, PartialEq)]
pub struct ConsensusInputMessage {
pub message: validator::Signed<validator::ConsensusMsg>,
pub recipient: Target,
}

impl From<ConsensusInputMessage> for InputMessage {
Expand All @@ -39,9 +38,3 @@ pub enum OutputMessage {
/// Message to the Consensus actor.
Consensus(ConsensusReq),
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Target {
Validator(validator::PublicKey),
Broadcast,
}
19 changes: 3 additions & 16 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Testonly utilities.
#![allow(dead_code)]
use crate::{
gossip::attestation,
io::{ConsensusInputMessage, Target},
Config, GossipConfig, Network, RpcConfig, Runner,
gossip::attestation, io::ConsensusInputMessage, Config, GossipConfig, Network, RpcConfig,
Runner,
};
use rand::{
distributions::{Distribution, Standard},
Expand All @@ -21,21 +20,9 @@ use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe;

impl Distribution<Target> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Target {
match rng.gen_range(0..2) {
0 => Target::Broadcast,
_ => Target::Validator(rng.gen()),
}
}
}

impl Distribution<ConsensusInputMessage> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> ConsensusInputMessage {
ConsensusInputMessage {
message: rng.gen(),
recipient: rng.gen(),
}
ConsensusInputMessage { message: rng.gen() }
}
}

Expand Down

0 comments on commit 2b0b30c

Please sign in to comment.