From 406d11cea6d42da9c9ba124717d0c55a9c47b085 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 17 Jun 2024 18:33:52 +0300 Subject: [PATCH] Make approval-distribution logic runnable on a separate thread Signed-off-by: Alexandru Gheorghe --- .../network/approval-distribution/src/lib.rs | 356 ++++++++++++------ 1 file changed, 234 insertions(+), 122 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index d48fb08a311c..26cedf1a2a13 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -320,7 +320,7 @@ enum Resend { /// It tracks metadata about our view of the unfinalized chain, /// which assignments and approvals we have seen, and our peers' views. #[derive(Default)] -struct State { +pub struct State { /// These two fields are used in conjunction to construct a view over the unfinalized chain. blocks_by_number: BTreeMap>, blocks: HashMap, @@ -662,9 +662,13 @@ enum PendingMessage { #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] impl State { - async fn handle_network_msg( + async fn handle_network_msg< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, event: NetworkBridgeEvent, rng: &mut (impl CryptoRng + Rng), @@ -689,7 +693,7 @@ impl State { }, NetworkBridgeEvent::NewGossipTopology(topology) => { self.handle_new_session_topology( - ctx, + network_sender, topology.session, topology.topology, topology.local_index, @@ -697,7 +701,7 @@ impl State { .await; }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => { - self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await; + self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await; }, NetworkBridgeEvent::OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); @@ -720,7 +724,15 @@ impl State { }); }, NetworkBridgeEvent::PeerMessage(peer_id, message) => { - self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; + self.process_incoming_peer_message( + approval_voting_sender, + network_sender, + metrics, + peer_id, + message, + rng, + ) + .await; }, NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); @@ -743,7 +755,7 @@ impl State { let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( - ctx.sender(), + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -761,9 +773,13 @@ impl State { } } - async fn handle_new_blocks( + async fn handle_new_blocks< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, metas: Vec, rng: &mut (impl CryptoRng + Rng), @@ -814,12 +830,11 @@ impl State { ); { - let sender = ctx.sender(); for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() { let intersection = view.iter().filter(|h| new_hashes.contains(h)); let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( - sender, + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -866,7 +881,8 @@ impl State { match message { PendingMessage::Assignment(assignment, claimed_indices) => { self.import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), assignment, @@ -877,7 +893,8 @@ impl State { }, PendingMessage::Approval(approval_vote) => { self.import_and_circulate_approval( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), approval_vote, @@ -889,12 +906,12 @@ impl State { } } - self.enable_aggression(ctx, Resend::Yes, metrics).await; + self.enable_aggression(network_sender, Resend::Yes, metrics).await; } - async fn handle_new_session_topology( + async fn handle_new_session_topology>( &mut self, - ctx: &mut Context, + network_sender: &mut N, session: SessionIndex, topology: SessionGridTopology, local_index: Option, @@ -908,7 +925,7 @@ impl State { let topology = self.topologies.get_topology(session).expect("just inserted above; qed"); adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| block_entry.session == session, @@ -926,9 +943,14 @@ impl State { .await; } - async fn process_incoming_assignments( + async fn process_incoming_assignments< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + R, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, @@ -956,7 +978,8 @@ impl State { } self.import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), assignment, @@ -968,9 +991,13 @@ impl State { } // Entry point for processing an approval coming from a peer. - async fn process_incoming_approvals( + async fn process_incoming_approvals< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, approvals: Vec, @@ -1001,7 +1028,8 @@ impl State { } self.import_and_circulate_approval( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), approval_vote, @@ -1010,9 +1038,14 @@ impl State { } } - async fn process_incoming_peer_message( + async fn process_incoming_peer_message< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + R, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, msg: Versioned< @@ -1033,10 +1066,11 @@ impl State { "Processing assignments from a peer", ); let sanitized_assignments = - self.sanitize_v2_assignments(peer_id, ctx.sender(), assignments).await; + self.sanitize_v2_assignments(peer_id, network_sender, assignments).await; self.process_incoming_assignments( - ctx, + approval_voting_sender, + network_sender, metrics, peer_id, sanitized_assignments, @@ -1054,10 +1088,11 @@ impl State { ); let sanitized_assignments = - self.sanitize_v1_assignments(peer_id, ctx.sender(), assignments).await; + self.sanitize_v1_assignments(peer_id, network_sender, assignments).await; self.process_incoming_assignments( - ctx, + approval_voting_sender, + network_sender, metrics, peer_id, sanitized_assignments, @@ -1067,25 +1102,37 @@ impl State { }, Versioned::V3(protocol_v3::ApprovalDistributionMessage::Approvals(approvals)) => { let sanitized_approvals = - self.sanitize_v2_approvals(peer_id, ctx.sender(), approvals).await; - self.process_incoming_approvals(ctx, metrics, peer_id, sanitized_approvals) - .await; + self.sanitize_v2_approvals(peer_id, network_sender, approvals).await; + self.process_incoming_approvals( + approval_voting_sender, + network_sender, + metrics, + peer_id, + sanitized_approvals, + ) + .await; }, Versioned::V1(protocol_v1::ApprovalDistributionMessage::Approvals(approvals)) | Versioned::V2(protocol_v2::ApprovalDistributionMessage::Approvals(approvals)) => { let sanitized_approvals = - self.sanitize_v1_approvals(peer_id, ctx.sender(), approvals).await; - self.process_incoming_approvals(ctx, metrics, peer_id, sanitized_approvals) - .await; + self.sanitize_v1_approvals(peer_id, network_sender, approvals).await; + self.process_incoming_approvals( + approval_voting_sender, + network_sender, + metrics, + peer_id, + sanitized_approvals, + ) + .await; }, } } // handle a peer view change: requires that the peer is already connected // and has an entry in the `PeerData` struct. - async fn handle_peer_view_change( + async fn handle_peer_view_change, R>( &mut self, - ctx: &mut Context, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, view: View, @@ -1132,7 +1179,7 @@ impl State { } Self::unify_with_peer( - ctx.sender(), + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -1146,9 +1193,9 @@ impl State { .await; } - async fn handle_block_finalized( + async fn handle_block_finalized>( &mut self, - ctx: &mut Context, + network_sender: &mut N, metrics: &Metrics, finalized_number: BlockNumber, ) { @@ -1172,12 +1219,17 @@ impl State { // If a block was finalized, this means we may need to move our aggression // forward to the now oldest block(s). - self.enable_aggression(ctx, Resend::No, metrics).await; + self.enable_aggression(network_sender, Resend::No, metrics).await; } - async fn import_and_circulate_assignment( + async fn import_and_circulate_assignment< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + R, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, source: MessageSource, assignment: IndirectAssignmentCertV2, @@ -1218,7 +1270,7 @@ impl State { if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1255,7 +1307,7 @@ impl State { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_DUPLICATE_MESSAGE, ) @@ -1283,7 +1335,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1296,7 +1348,7 @@ impl State { if entry.knowledge.contains(&message_subject, message_kind) { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE, ) @@ -1311,12 +1363,13 @@ impl State { let (tx, rx) = oneshot::channel(); - ctx.send_message(ApprovalVotingMessage::CheckAndImportAssignment( - assignment.clone(), - claimed_candidate_indices.clone(), - tx, - )) - .await; + approval_voting_sender + .send_message(ApprovalVotingMessage::CheckAndImportAssignment( + assignment.clone(), + claimed_candidate_indices.clone(), + tx, + )) + .await; let timer = metrics.time_awaiting_approval_voting(); let result = match rx.await { @@ -1339,7 +1392,7 @@ impl State { AssignmentCheckResult::Accepted => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE_FIRST, ) @@ -1375,7 +1428,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE, ) @@ -1394,7 +1447,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_INVALID_MESSAGE, ) @@ -1526,14 +1579,16 @@ impl State { }) .collect::>(); - send_assignments_batched(ctx.sender(), assignments, &peers).await; + send_assignments_batched(network_sender, assignments, &peers).await; } } // Checks if an approval can be processed. // Returns true if we can continue with processing the approval and false otherwise. - async fn check_approval_can_be_processed( - ctx: &mut Context, + async fn check_approval_can_be_processed< + N: overseer::SubsystemSender, + >( + network_sender: &mut N, assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>, approval_knowledge_key: &(MessageSubject, MessageKind), entry: &mut BlockEntry, @@ -1549,7 +1604,8 @@ impl State { ?message_subject, "Unknown approval assignment", ); - modify_reputation(reputation, ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE) + .await; metrics.on_approval_unknown_assignment(); return false } @@ -1573,7 +1629,7 @@ impl State { modify_reputation( reputation, - ctx.sender(), + network_sender, peer_id, COST_DUPLICATE_MESSAGE, ) @@ -1590,7 +1646,8 @@ impl State { ?approval_knowledge_key, "Approval from a peer is out of view", ); - modify_reputation(reputation, ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE) + .await; metrics.on_approval_out_of_view(); }, } @@ -1605,16 +1662,20 @@ impl State { // We already processed this approval no need to continue. gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval"); metrics.on_approval_good_known(); - modify_reputation(reputation, ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await; false } else { true } } - async fn import_and_circulate_approval( + async fn import_and_circulate_approval< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, source: MessageSource, vote: IndirectSignedApprovalVoteV2, @@ -1652,7 +1713,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1672,7 +1733,7 @@ impl State { if let Some(peer_id) = source.peer_id() { if !Self::check_approval_can_be_processed( - ctx, + network_sender, &assignments_knowledge_keys, &approval_knwowledge_key, entry, @@ -1687,7 +1748,8 @@ impl State { let (tx, rx) = oneshot::channel(); - ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) + approval_voting_sender + .send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) .await; let timer = metrics.time_awaiting_approval_voting(); @@ -1711,7 +1773,7 @@ impl State { ApprovalCheckResult::Accepted => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE_FIRST, ) @@ -1729,7 +1791,7 @@ impl State { ApprovalCheckResult::Bad(error) => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_INVALID_MESSAGE, ) @@ -1831,7 +1893,7 @@ impl State { num_peers = peers.len(), "Sending an approval to peers", ); - send_approvals_batched(ctx.sender(), approvals, &peers).await; + send_approvals_batched(network_sender, approvals, &peers).await; } } @@ -1882,7 +1944,7 @@ impl State { } async fn unify_with_peer( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, metrics: &Metrics, entries: &mut HashMap, topologies: &SessionGridTopologies, @@ -2027,9 +2089,9 @@ impl State { // // In order to switch to using approval lag as a trigger we need a request/response protocol // to fetch votes from validators rather than use gossip. - async fn enable_aggression( + async fn enable_aggression>( &mut self, - ctx: &mut Context, + network_sender: &mut N, resend: Resend, metrics: &Metrics, ) { @@ -2058,7 +2120,7 @@ impl State { gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",); adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| { @@ -2086,7 +2148,7 @@ impl State { .await; adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| { @@ -2137,7 +2199,7 @@ impl State { async fn sanitize_v1_assignments( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>, ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> { let mut sanitized_assignments = Vec::new(); @@ -2172,7 +2234,7 @@ impl State { async fn sanitize_v2_assignments( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> { let mut sanitized_assignments = Vec::new(); @@ -2216,7 +2278,7 @@ impl State { async fn sanitize_v1_approvals( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approval: Vec, ) -> Vec { let mut sanitized_approvals = Vec::new(); @@ -2243,7 +2305,7 @@ impl State { async fn sanitize_v2_approvals( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approval: Vec, ) -> Vec { let mut sanitized_approvals = Vec::new(); @@ -2280,8 +2342,12 @@ impl State { // Note that the required routing of a message can be modified even if the // topology is unknown yet. #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] -async fn adjust_required_routing_and_propagate( - ctx: &mut Context, +async fn adjust_required_routing_and_propagate< + N: overseer::SubsystemSender, + BlockFilter, + RoutingModifier, +>( + network_sender: &mut N, blocks: &mut HashMap, topologies: &SessionGridTopologies, block_filter: BlockFilter, @@ -2363,7 +2429,7 @@ async fn adjust_required_routing_and_propagate, peer_id: PeerId, rep: Rep, ) { @@ -2414,7 +2480,6 @@ impl ApprovalDistribution { async fn run(self, ctx: Context) { let mut state = State::default(); - // According to the docs of `rand`, this is a ChaCha12 RNG in practice // and will always be chosen for strong performance and security properties. let mut rng = rand::rngs::StdRng::from_entropy(); @@ -2431,7 +2496,8 @@ impl ApprovalDistribution { ) { let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse(); let mut reputation_delay = new_reputation_delay(); - + let mut approval_voting_sender = ctx.sender().clone(); + let mut network_sender = ctx.sender().clone(); loop { select! { _ = reputation_delay => { @@ -2446,35 +2512,65 @@ impl ApprovalDistribution { return }, }; - match message { - FromOrchestra::Communication { msg } => - Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, - FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { - gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); - // the relay chain blocks relevant to the approval subsystems - // are those that are available, but not finalized yet - // activated and deactivated heads hence are irrelevant to this subsystem, other than - // for tracing purposes. - if let Some(activated) = update.activated { - let head = activated.hash; - let approval_distribution_span = - jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); - state.spans.insert(head, approval_distribution_span); - } - }, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { - gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); - state.handle_block_finalized(&mut ctx, &self.metrics, number).await; - }, - FromOrchestra::Signal(OverseerSignal::Conclude) => return, - } + + + self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, state, rng).await; + }, } } } - async fn handle_incoming( - ctx: &mut Context, + /// Handles a from orchestra message received by approval distribution subystem. + pub async fn handle_from_orchestra< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( + &self, + message: FromOrchestra, + approval_voting_sender: &mut A, + network_sender: &mut N, + state: &mut State, + rng: &mut (impl CryptoRng + Rng), + ) { + match message { + FromOrchestra::Communication { msg } => + Self::handle_incoming( + approval_voting_sender, + network_sender, + state, + msg, + &self.metrics, + rng, + ) + .await, + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { + gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); + // the relay chain blocks relevant to the approval subsystems + // are those that are available, but not finalized yet + // activated and deactivated heads hence are irrelevant to this subsystem, other + // than for tracing purposes. + if let Some(activated) = update.activated { + let head = activated.hash; + let approval_distribution_span = + jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); + state.spans.insert(head, approval_distribution_span); + } + }, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); + state.handle_block_finalized(network_sender, &self.metrics, number).await; + }, + FromOrchestra::Signal(OverseerSignal::Conclude) => return, + } + } + + async fn handle_incoming< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( + approval_voting_sender: &mut A, + network_sender: &mut N, state: &mut State, msg: ApprovalDistributionMessage, metrics: &Metrics, @@ -2482,10 +2578,14 @@ impl ApprovalDistribution { ) { match msg { ApprovalDistributionMessage::NetworkBridgeUpdate(event) => { - state.handle_network_msg(ctx, metrics, event, rng).await; + state + .handle_network_msg(approval_voting_sender, network_sender, metrics, event, rng) + .await; }, ApprovalDistributionMessage::NewBlocks(metas) => { - state.handle_new_blocks(ctx, metrics, metas, rng).await; + state + .handle_new_blocks(approval_voting_sender, network_sender, metrics, metas, rng) + .await; }, ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => { let _span = state @@ -2506,7 +2606,8 @@ impl ApprovalDistribution { state .import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, &metrics, MessageSource::Local, cert, @@ -2524,7 +2625,13 @@ impl ApprovalDistribution { ); state - .import_and_circulate_approval(ctx, metrics, MessageSource::Local, vote) + .import_and_circulate_approval( + approval_voting_sender, + network_sender, + metrics, + MessageSource::Local, + vote, + ) .await; }, ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => { @@ -2579,7 +2686,7 @@ pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero( // Low level helper for sending assignments. async fn send_assignments_batched_inner( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, batch: impl IntoIterator, peers: Vec, peer_version: ValidationVersion, @@ -2634,7 +2741,7 @@ async fn send_assignments_batched_inner( /// destination, such that the subsystem doesn't get stuck for long processing a batch /// of assignments and can `select!` other tasks. pub(crate) async fn send_assignments_batched( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + network_sender: &mut impl overseer::SubsystemSender, v2_assignments: impl IntoIterator + Clone, peers: &[(PeerId, ProtocolVersion)], ) { @@ -2658,7 +2765,7 @@ pub(crate) async fn send_assignments_batched( let batch: Vec<_> = v1_batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect(); if !v1_peers.is_empty() { send_assignments_batched_inner( - sender, + network_sender, batch.clone(), v1_peers.clone(), ValidationVersion::V1, @@ -2668,7 +2775,7 @@ pub(crate) async fn send_assignments_batched( if !v2_peers.is_empty() { send_assignments_batched_inner( - sender, + network_sender, batch, v2_peers.clone(), ValidationVersion::V2, @@ -2683,15 +2790,20 @@ pub(crate) async fn send_assignments_batched( while v3.peek().is_some() { let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::>(); - send_assignments_batched_inner(sender, batch, v3_peers.clone(), ValidationVersion::V3) - .await; + send_assignments_batched_inner( + network_sender, + batch, + v3_peers.clone(), + ValidationVersion::V3, + ) + .await; } } } /// Send approvals while honoring the `max_notification_size` of the protocol and peer version. pub(crate) async fn send_approvals_batched( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approvals: impl IntoIterator + Clone, peers: &[(PeerId, ProtocolVersion)], ) {