diff --git a/node/components/bft/src/testonly/run.rs b/node/components/bft/src/testonly/run.rs index a240fc18..0be99713 100644 --- a/node/components/bft/src/testonly/run.rs +++ b/node/components/bft/src/testonly/run.rs @@ -199,12 +199,16 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { - let (node, runner) = network::testonly::Instance::new_with_filters( - spec.net.clone(), - spec.block_store.clone(), + let (send, recv) = sync::prunable_mpsc::channel( crate::inbound_filter_predicate, crate::inbound_selection_function, ); + let (node, runner) = network::testonly::Instance::new_with_channel( + spec.net.clone(), + spec.block_store.clone(), + send, + recv, + ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } diff --git a/node/components/network/src/gossip/tests/mod.rs b/node/components/network/src/gossip/tests/mod.rs index fafb3341..ff5d0b48 100644 --- a/node/components/network/src/gossip/tests/mod.rs +++ b/node/components/network/src/gossip/tests/mod.rs @@ -519,6 +519,7 @@ async fn test_batch_votes_propagation() { for (i, mut cfg) in cfgs.into_iter().enumerate() { cfg.rpc.push_batch_votes_rate = limiter::Rate::INF; cfg.validator_key = None; + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); let (node, runner) = testonly::Instance::new_from_config( testonly::InstanceConfig { cfg: cfg.clone(), @@ -526,8 +527,8 @@ async fn test_batch_votes_propagation() { attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) .into(), }, - /*filter_predicate*/ |_| true, - /*selection_function*/ |_, _| SelectionFunctionResult::Keep, + con_send, + con_recv, ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); // Task going through the schedule, waiting for ANY node to collect the certificate diff --git a/node/components/network/src/testonly.rs b/node/components/network/src/testonly.rs index 86469ec4..9fff6c27 100644 --- a/node/components/network/src/testonly.rs +++ b/node/components/network/src/testonly.rs @@ -195,27 +195,25 @@ pub struct InstanceConfig { impl Instance { /// Constructs a new instance. pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); Self::new_from_config( InstanceConfig { cfg, block_store, attestation: attestation::Controller::new(None).into(), }, - /*filter_predicate*/ |_| true, - /*selection_function*/ |_, _| SelectionFunctionResult::Keep, + con_send, + con_recv, ) } - /// Constructs a new instance with custom filter and selection functions for consensus + /// Constructs a new instance with a custom channel for consensus /// component. - pub fn new_with_filters( + pub fn new_with_channel( cfg: Config, block_store: Arc, - con_filter_predicate: impl 'static + Sync + Send + Fn(&ConsensusReq) -> bool, - con_selection_function: impl 'static - + Sync - + Send - + Fn(&ConsensusReq, &ConsensusReq) -> SelectionFunctionResult, + con_send: sync::prunable_mpsc::Sender, + con_recv: sync::prunable_mpsc::Receiver, ) -> (Self, InstanceRunner) { Self::new_from_config( InstanceConfig { @@ -223,22 +221,17 @@ impl Instance { block_store, attestation: attestation::Controller::new(None).into(), }, - con_filter_predicate, - con_selection_function, + con_send, + con_recv, ) } /// Construct an instance for a given config. pub fn new_from_config( cfg: InstanceConfig, - con_filter_predicate: impl 'static + Sync + Send + Fn(&ConsensusReq) -> bool, - con_selection_function: impl 'static - + Sync - + Send - + Fn(&ConsensusReq, &ConsensusReq) -> SelectionFunctionResult, + net_to_con_send: sync::prunable_mpsc::Sender, + net_to_con_recv: sync::prunable_mpsc::Receiver, ) -> (Self, InstanceRunner) { - let (net_to_con_send, net_to_con_recv) = - sync::prunable_mpsc::channel(con_filter_predicate, con_selection_function); let (con_to_net_send, con_to_net_recv) = channel::unbounded(); let (terminate_send, terminate_recv) = channel::bounded(1); diff --git a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs index ba8f0bd6..4676082b 100644 --- a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs +++ b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs @@ -56,6 +56,11 @@ pub fn channel( (send, recv) } +/// Creates a channel without any pruning or filtering, and returns the [`Sender`] and [`Receiver`] handles. +pub fn unpruned_channel() -> (Sender, Receiver) { + channel(|_| true, |_, _| SelectionFunctionResult::Keep) +} + struct Shared { send: watch::Sender>, }