From 0636ffdc3dfea52e90102403527ff99d2f2d6e7c Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:09:21 +0300 Subject: [PATCH] [2 / 5] Make approval-distribution logic runnable on a separate thread (#4845) This is part of the work to further optimize the approval subsystems, if you want to understand the full context start with reading https://github.com/paritytech/polkadot-sdk/pull/4849#issue-2364261568, # Description This PR contain changes to make possible the run of multiple instances of approval-distribution, so that we can parallelise the work. This does not contain any functional changes it just decouples the subsystem from the subsystem Context and introduces more specific trait dependencies for each function instead of all of them requiring a context. It does not have any dependency of the follow PRs, so it can be merged independently of them. --------- Signed-off-by: Alexandru Gheorghe --- .../network/approval-distribution/src/lib.rs | 350 ++++++++++++------ prdoc/pr_4845.prdoc | 13 + 2 files changed, 241 insertions(+), 122 deletions(-) create mode 100644 prdoc/pr_4845.prdoc diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index d48fb08a311c..3462aaef1f69 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -320,7 +320,7 @@ enum Resend { /// It tracks metadata about our view of the unfinalized chain, /// which assignments and approvals we have seen, and our peers' views. #[derive(Default)] -struct State { +pub struct State { /// These two fields are used in conjunction to construct a view over the unfinalized chain. blocks_by_number: BTreeMap>, blocks: HashMap, @@ -662,9 +662,13 @@ enum PendingMessage { #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] impl State { - async fn handle_network_msg( + async fn handle_network_msg< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, event: NetworkBridgeEvent, rng: &mut (impl CryptoRng + Rng), @@ -689,7 +693,7 @@ impl State { }, NetworkBridgeEvent::NewGossipTopology(topology) => { self.handle_new_session_topology( - ctx, + network_sender, topology.session, topology.topology, topology.local_index, @@ -697,7 +701,7 @@ impl State { .await; }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => { - self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await; + self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await; }, NetworkBridgeEvent::OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); @@ -720,7 +724,15 @@ impl State { }); }, NetworkBridgeEvent::PeerMessage(peer_id, message) => { - self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; + self.process_incoming_peer_message( + approval_voting_sender, + network_sender, + metrics, + peer_id, + message, + rng, + ) + .await; }, NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); @@ -743,7 +755,7 @@ impl State { let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( - ctx.sender(), + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -761,9 +773,13 @@ impl State { } } - async fn handle_new_blocks( + async fn handle_new_blocks< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, metas: Vec, rng: &mut (impl CryptoRng + Rng), @@ -814,12 +830,11 @@ impl State { ); { - let sender = ctx.sender(); for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() { let intersection = view.iter().filter(|h| new_hashes.contains(h)); let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( - sender, + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -866,7 +881,8 @@ impl State { match message { PendingMessage::Assignment(assignment, claimed_indices) => { self.import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), assignment, @@ -877,7 +893,8 @@ impl State { }, PendingMessage::Approval(approval_vote) => { self.import_and_circulate_approval( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), approval_vote, @@ -889,12 +906,12 @@ impl State { } } - self.enable_aggression(ctx, Resend::Yes, metrics).await; + self.enable_aggression(network_sender, Resend::Yes, metrics).await; } - async fn handle_new_session_topology( + async fn handle_new_session_topology>( &mut self, - ctx: &mut Context, + network_sender: &mut N, session: SessionIndex, topology: SessionGridTopology, local_index: Option, @@ -908,7 +925,7 @@ impl State { let topology = self.topologies.get_topology(session).expect("just inserted above; qed"); adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| block_entry.session == session, @@ -926,14 +943,17 @@ impl State { .await; } - async fn process_incoming_assignments( + async fn process_incoming_assignments( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, rng: &mut R, ) where + A: overseer::SubsystemSender, + N: overseer::SubsystemSender, R: CryptoRng + Rng, { for (assignment, claimed_indices) in assignments { @@ -956,7 +976,8 @@ impl State { } self.import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), assignment, @@ -968,9 +989,13 @@ impl State { } // Entry point for processing an approval coming from a peer. - async fn process_incoming_approvals( + async fn process_incoming_approvals< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, approvals: Vec, @@ -1001,7 +1026,8 @@ impl State { } self.import_and_circulate_approval( - ctx, + approval_voting_sender, + network_sender, metrics, MessageSource::Peer(peer_id), approval_vote, @@ -1010,9 +1036,10 @@ impl State { } } - async fn process_incoming_peer_message( + async fn process_incoming_peer_message( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, msg: Versioned< @@ -1022,6 +1049,8 @@ impl State { >, rng: &mut R, ) where + A: overseer::SubsystemSender, + N: overseer::SubsystemSender, R: CryptoRng + Rng, { match msg { @@ -1033,10 +1062,11 @@ impl State { "Processing assignments from a peer", ); let sanitized_assignments = - self.sanitize_v2_assignments(peer_id, ctx.sender(), assignments).await; + self.sanitize_v2_assignments(peer_id, network_sender, assignments).await; self.process_incoming_assignments( - ctx, + approval_voting_sender, + network_sender, metrics, peer_id, sanitized_assignments, @@ -1054,10 +1084,11 @@ impl State { ); let sanitized_assignments = - self.sanitize_v1_assignments(peer_id, ctx.sender(), assignments).await; + self.sanitize_v1_assignments(peer_id, network_sender, assignments).await; self.process_incoming_assignments( - ctx, + approval_voting_sender, + network_sender, metrics, peer_id, sanitized_assignments, @@ -1067,25 +1098,37 @@ impl State { }, Versioned::V3(protocol_v3::ApprovalDistributionMessage::Approvals(approvals)) => { let sanitized_approvals = - self.sanitize_v2_approvals(peer_id, ctx.sender(), approvals).await; - self.process_incoming_approvals(ctx, metrics, peer_id, sanitized_approvals) - .await; + self.sanitize_v2_approvals(peer_id, network_sender, approvals).await; + self.process_incoming_approvals( + approval_voting_sender, + network_sender, + metrics, + peer_id, + sanitized_approvals, + ) + .await; }, Versioned::V1(protocol_v1::ApprovalDistributionMessage::Approvals(approvals)) | Versioned::V2(protocol_v2::ApprovalDistributionMessage::Approvals(approvals)) => { let sanitized_approvals = - self.sanitize_v1_approvals(peer_id, ctx.sender(), approvals).await; - self.process_incoming_approvals(ctx, metrics, peer_id, sanitized_approvals) - .await; + self.sanitize_v1_approvals(peer_id, network_sender, approvals).await; + self.process_incoming_approvals( + approval_voting_sender, + network_sender, + metrics, + peer_id, + sanitized_approvals, + ) + .await; }, } } // handle a peer view change: requires that the peer is already connected // and has an entry in the `PeerData` struct. - async fn handle_peer_view_change( + async fn handle_peer_view_change, R>( &mut self, - ctx: &mut Context, + network_sender: &mut N, metrics: &Metrics, peer_id: PeerId, view: View, @@ -1132,7 +1175,7 @@ impl State { } Self::unify_with_peer( - ctx.sender(), + network_sender, metrics, &mut self.blocks, &self.topologies, @@ -1146,9 +1189,9 @@ impl State { .await; } - async fn handle_block_finalized( + async fn handle_block_finalized>( &mut self, - ctx: &mut Context, + network_sender: &mut N, metrics: &Metrics, finalized_number: BlockNumber, ) { @@ -1172,18 +1215,21 @@ impl State { // If a block was finalized, this means we may need to move our aggression // forward to the now oldest block(s). - self.enable_aggression(ctx, Resend::No, metrics).await; + self.enable_aggression(network_sender, Resend::No, metrics).await; } - async fn import_and_circulate_assignment( + async fn import_and_circulate_assignment( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, source: MessageSource, assignment: IndirectAssignmentCertV2, claimed_candidate_indices: CandidateBitfield, rng: &mut R, ) where + A: overseer::SubsystemSender, + N: overseer::SubsystemSender, R: CryptoRng + Rng, { let _span = self @@ -1218,7 +1264,7 @@ impl State { if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1255,7 +1301,7 @@ impl State { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_DUPLICATE_MESSAGE, ) @@ -1283,7 +1329,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1296,7 +1342,7 @@ impl State { if entry.knowledge.contains(&message_subject, message_kind) { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE, ) @@ -1311,12 +1357,13 @@ impl State { let (tx, rx) = oneshot::channel(); - ctx.send_message(ApprovalVotingMessage::CheckAndImportAssignment( - assignment.clone(), - claimed_candidate_indices.clone(), - tx, - )) - .await; + approval_voting_sender + .send_message(ApprovalVotingMessage::CheckAndImportAssignment( + assignment.clone(), + claimed_candidate_indices.clone(), + tx, + )) + .await; let timer = metrics.time_awaiting_approval_voting(); let result = match rx.await { @@ -1339,7 +1386,7 @@ impl State { AssignmentCheckResult::Accepted => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE_FIRST, ) @@ -1375,7 +1422,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE, ) @@ -1394,7 +1441,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_INVALID_MESSAGE, ) @@ -1526,14 +1573,16 @@ impl State { }) .collect::>(); - send_assignments_batched(ctx.sender(), assignments, &peers).await; + send_assignments_batched(network_sender, assignments, &peers).await; } } // Checks if an approval can be processed. // Returns true if we can continue with processing the approval and false otherwise. - async fn check_approval_can_be_processed( - ctx: &mut Context, + async fn check_approval_can_be_processed< + N: overseer::SubsystemSender, + >( + network_sender: &mut N, assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>, approval_knowledge_key: &(MessageSubject, MessageKind), entry: &mut BlockEntry, @@ -1549,7 +1598,8 @@ impl State { ?message_subject, "Unknown approval assignment", ); - modify_reputation(reputation, ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE) + .await; metrics.on_approval_unknown_assignment(); return false } @@ -1573,7 +1623,7 @@ impl State { modify_reputation( reputation, - ctx.sender(), + network_sender, peer_id, COST_DUPLICATE_MESSAGE, ) @@ -1590,7 +1640,8 @@ impl State { ?approval_knowledge_key, "Approval from a peer is out of view", ); - modify_reputation(reputation, ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE) + .await; metrics.on_approval_out_of_view(); }, } @@ -1605,16 +1656,20 @@ impl State { // We already processed this approval no need to continue. gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval"); metrics.on_approval_good_known(); - modify_reputation(reputation, ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await; + modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await; false } else { true } } - async fn import_and_circulate_approval( + async fn import_and_circulate_approval< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( &mut self, - ctx: &mut Context, + approval_voting_sender: &mut A, + network_sender: &mut N, metrics: &Metrics, source: MessageSource, vote: IndirectSignedApprovalVoteV2, @@ -1652,7 +1707,7 @@ impl State { ); modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_UNEXPECTED_MESSAGE, ) @@ -1672,7 +1727,7 @@ impl State { if let Some(peer_id) = source.peer_id() { if !Self::check_approval_can_be_processed( - ctx, + network_sender, &assignments_knowledge_keys, &approval_knwowledge_key, entry, @@ -1687,7 +1742,8 @@ impl State { let (tx, rx) = oneshot::channel(); - ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) + approval_voting_sender + .send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) .await; let timer = metrics.time_awaiting_approval_voting(); @@ -1711,7 +1767,7 @@ impl State { ApprovalCheckResult::Accepted => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, BENEFIT_VALID_MESSAGE_FIRST, ) @@ -1729,7 +1785,7 @@ impl State { ApprovalCheckResult::Bad(error) => { modify_reputation( &mut self.reputation, - ctx.sender(), + network_sender, peer_id, COST_INVALID_MESSAGE, ) @@ -1831,7 +1887,7 @@ impl State { num_peers = peers.len(), "Sending an approval to peers", ); - send_approvals_batched(ctx.sender(), approvals, &peers).await; + send_approvals_batched(network_sender, approvals, &peers).await; } } @@ -1882,7 +1938,7 @@ impl State { } async fn unify_with_peer( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, metrics: &Metrics, entries: &mut HashMap, topologies: &SessionGridTopologies, @@ -2027,9 +2083,9 @@ impl State { // // In order to switch to using approval lag as a trigger we need a request/response protocol // to fetch votes from validators rather than use gossip. - async fn enable_aggression( + async fn enable_aggression>( &mut self, - ctx: &mut Context, + network_sender: &mut N, resend: Resend, metrics: &Metrics, ) { @@ -2058,7 +2114,7 @@ impl State { gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",); adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| { @@ -2086,7 +2142,7 @@ impl State { .await; adjust_required_routing_and_propagate( - ctx, + network_sender, &mut self.blocks, &self.topologies, |block_entry| { @@ -2137,7 +2193,7 @@ impl State { async fn sanitize_v1_assignments( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>, ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> { let mut sanitized_assignments = Vec::new(); @@ -2172,7 +2228,7 @@ impl State { async fn sanitize_v2_assignments( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> { let mut sanitized_assignments = Vec::new(); @@ -2216,7 +2272,7 @@ impl State { async fn sanitize_v1_approvals( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approval: Vec, ) -> Vec { let mut sanitized_approvals = Vec::new(); @@ -2243,7 +2299,7 @@ impl State { async fn sanitize_v2_approvals( &mut self, peer_id: PeerId, - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approval: Vec, ) -> Vec { let mut sanitized_approvals = Vec::new(); @@ -2280,8 +2336,12 @@ impl State { // Note that the required routing of a message can be modified even if the // topology is unknown yet. #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] -async fn adjust_required_routing_and_propagate( - ctx: &mut Context, +async fn adjust_required_routing_and_propagate< + N: overseer::SubsystemSender, + BlockFilter, + RoutingModifier, +>( + network_sender: &mut N, blocks: &mut HashMap, topologies: &SessionGridTopologies, block_filter: BlockFilter, @@ -2363,7 +2423,7 @@ async fn adjust_required_routing_and_propagate, peer_id: PeerId, rep: Rep, ) { @@ -2414,7 +2474,6 @@ impl ApprovalDistribution { async fn run(self, ctx: Context) { let mut state = State::default(); - // According to the docs of `rand`, this is a ChaCha12 RNG in practice // and will always be chosen for strong performance and security properties. let mut rng = rand::rngs::StdRng::from_entropy(); @@ -2431,7 +2490,8 @@ impl ApprovalDistribution { ) { let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse(); let mut reputation_delay = new_reputation_delay(); - + let mut approval_voting_sender = ctx.sender().clone(); + let mut network_sender = ctx.sender().clone(); loop { select! { _ = reputation_delay => { @@ -2446,35 +2506,65 @@ impl ApprovalDistribution { return }, }; - match message { - FromOrchestra::Communication { msg } => - Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, - FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { - gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); - // the relay chain blocks relevant to the approval subsystems - // are those that are available, but not finalized yet - // activated and deactivated heads hence are irrelevant to this subsystem, other than - // for tracing purposes. - if let Some(activated) = update.activated { - let head = activated.hash; - let approval_distribution_span = - jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); - state.spans.insert(head, approval_distribution_span); - } - }, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { - gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); - state.handle_block_finalized(&mut ctx, &self.metrics, number).await; - }, - FromOrchestra::Signal(OverseerSignal::Conclude) => return, - } + + + self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, state, rng).await; + }, } } } - async fn handle_incoming( - ctx: &mut Context, + /// Handles a from orchestra message received by approval distribution subystem. + pub async fn handle_from_orchestra< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( + &self, + message: FromOrchestra, + approval_voting_sender: &mut A, + network_sender: &mut N, + state: &mut State, + rng: &mut (impl CryptoRng + Rng), + ) { + match message { + FromOrchestra::Communication { msg } => + Self::handle_incoming( + approval_voting_sender, + network_sender, + state, + msg, + &self.metrics, + rng, + ) + .await, + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { + gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); + // the relay chain blocks relevant to the approval subsystems + // are those that are available, but not finalized yet + // activated and deactivated heads hence are irrelevant to this subsystem, other + // than for tracing purposes. + if let Some(activated) = update.activated { + let head = activated.hash; + let approval_distribution_span = + jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); + state.spans.insert(head, approval_distribution_span); + } + }, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); + state.handle_block_finalized(network_sender, &self.metrics, number).await; + }, + FromOrchestra::Signal(OverseerSignal::Conclude) => return, + } + } + + async fn handle_incoming< + N: overseer::SubsystemSender, + A: overseer::SubsystemSender, + >( + approval_voting_sender: &mut A, + network_sender: &mut N, state: &mut State, msg: ApprovalDistributionMessage, metrics: &Metrics, @@ -2482,10 +2572,14 @@ impl ApprovalDistribution { ) { match msg { ApprovalDistributionMessage::NetworkBridgeUpdate(event) => { - state.handle_network_msg(ctx, metrics, event, rng).await; + state + .handle_network_msg(approval_voting_sender, network_sender, metrics, event, rng) + .await; }, ApprovalDistributionMessage::NewBlocks(metas) => { - state.handle_new_blocks(ctx, metrics, metas, rng).await; + state + .handle_new_blocks(approval_voting_sender, network_sender, metrics, metas, rng) + .await; }, ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => { let _span = state @@ -2506,7 +2600,8 @@ impl ApprovalDistribution { state .import_and_circulate_assignment( - ctx, + approval_voting_sender, + network_sender, &metrics, MessageSource::Local, cert, @@ -2524,7 +2619,13 @@ impl ApprovalDistribution { ); state - .import_and_circulate_approval(ctx, metrics, MessageSource::Local, vote) + .import_and_circulate_approval( + approval_voting_sender, + network_sender, + metrics, + MessageSource::Local, + vote, + ) .await; }, ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => { @@ -2579,7 +2680,7 @@ pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero( // Low level helper for sending assignments. async fn send_assignments_batched_inner( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, batch: impl IntoIterator, peers: Vec, peer_version: ValidationVersion, @@ -2634,7 +2735,7 @@ async fn send_assignments_batched_inner( /// destination, such that the subsystem doesn't get stuck for long processing a batch /// of assignments and can `select!` other tasks. pub(crate) async fn send_assignments_batched( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + network_sender: &mut impl overseer::SubsystemSender, v2_assignments: impl IntoIterator + Clone, peers: &[(PeerId, ProtocolVersion)], ) { @@ -2658,7 +2759,7 @@ pub(crate) async fn send_assignments_batched( let batch: Vec<_> = v1_batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect(); if !v1_peers.is_empty() { send_assignments_batched_inner( - sender, + network_sender, batch.clone(), v1_peers.clone(), ValidationVersion::V1, @@ -2668,7 +2769,7 @@ pub(crate) async fn send_assignments_batched( if !v2_peers.is_empty() { send_assignments_batched_inner( - sender, + network_sender, batch, v2_peers.clone(), ValidationVersion::V2, @@ -2683,15 +2784,20 @@ pub(crate) async fn send_assignments_batched( while v3.peek().is_some() { let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::>(); - send_assignments_batched_inner(sender, batch, v3_peers.clone(), ValidationVersion::V3) - .await; + send_assignments_batched_inner( + network_sender, + batch, + v3_peers.clone(), + ValidationVersion::V3, + ) + .await; } } } /// Send approvals while honoring the `max_notification_size` of the protocol and peer version. pub(crate) async fn send_approvals_batched( - sender: &mut impl overseer::ApprovalDistributionSenderTrait, + sender: &mut impl overseer::SubsystemSender, approvals: impl IntoIterator + Clone, peers: &[(PeerId, ProtocolVersion)], ) { diff --git a/prdoc/pr_4845.prdoc b/prdoc/pr_4845.prdoc new file mode 100644 index 000000000000..012d34ef090e --- /dev/null +++ b/prdoc/pr_4845.prdoc @@ -0,0 +1,13 @@ +title: Make approval-distribution logic runnable on a separate thread + +doc: + - audience: Node Dev + description: | + Pass SubsystemSender trait inside approval-distribution instead of passing SubsystemContext everywhere. + + This allows us in the future to be able to run multiple approval-distribution instances on different workers. + + +crates: +- name: polkadot-approval-distribution + bump: minor