diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 72ddc552c474..9bd3df3cf1dc 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -21,13 +21,10 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. -<<<<<<< HEAD +use futures_timer::Delay; use itertools::Itertools; use jaeger::{hash_to_trace_identifier, PerLeafSpan}; use polkadot_node_jaeger as jaeger; -======= -use futures_timer::Delay; ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) use polkadot_node_primitives::{ approval::{ v1::{BlockApprovalMeta, DelayTranche}, @@ -172,16 +169,11 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, -<<<<<<< HEAD clock: Box, -======= - clock: Arc, - spawner: Arc, /// The maximum time we retry to approve a block if it is still needed and PoV fetch failed. max_approval_retries: u32, /// The backoff before we retry the approval. retry_backoff: Duration, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) } #[derive(Clone)] @@ -507,14 +499,9 @@ impl ApprovalVotingSubsystem { keystore, sync_oracle, metrics, -<<<<<<< HEAD Box::new(SystemClock {}), -======= - Arc::new(SystemClock {}), - spawner, MAX_APPROVAL_RETRIES, APPROVAL_CHECKING_TIMEOUT / 2, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) } @@ -525,14 +512,9 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, -<<<<<<< HEAD clock: Box, -======= - clock: Arc, - spawner: Arc, max_approval_retries: u32, retry_backoff: Duration, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) -> Self { ApprovalVotingSubsystem { keystore, @@ -542,12 +524,8 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, -<<<<<<< HEAD -======= - spawner, max_approval_retries, retry_backoff, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) } } @@ -1331,13 +1309,19 @@ where if let Some(retry_info) = retry_info { for block_hash in relay_block_hashes { if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) { - let sender = to_other_subsystems.clone(); - let spawn_handle = subsystem.spawner.clone(); + let ctx = &mut ctx; let metrics = subsystem.metrics.clone(); let retry_info = retry_info.clone(); let executor_params = retry_info.executor_params.clone(); let candidate = retry_info.candidate.clone(); - + let launch_approval_span = state + .spans + .get(&block_hash) + .map(|span| span.child("launch-approval")) + .unwrap_or_else(|| jaeger::Span::new(candidate_hash, "launch-approval")) + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_stage(jaeger::Stage::ApprovalChecking); currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1345,8 +1329,7 @@ where block_hash, async move { launch_approval( - sender, - spawn_handle, + ctx, metrics, retry_info.session_index, candidate, @@ -1355,6 +1338,7 @@ where retry_info.backing_group, executor_params, retry_info.core_index, + &launch_approval_span, retry_info, ) .await @@ -1430,68 +1414,6 @@ where Ok(()) } -<<<<<<< HEAD -======= -// Starts a worker thread that runs the approval voting subsystem. -pub async fn start_approval_worker< - WorkProvider: ApprovalVotingWorkProvider + Send + 'static, - Sender: SubsystemSender - + SubsystemSender - + SubsystemSender - + SubsystemSender - + SubsystemSender - + SubsystemSender - + Clone, - ADSender: SubsystemSender, ->( - work_provider: WorkProvider, - to_other_subsystems: Sender, - to_approval_distr: ADSender, - config: Config, - db: Arc, - keystore: Arc, - sync_oracle: Box, - metrics: Metrics, - spawner: Arc, - task_name: &'static str, - group_name: &'static str, - clock: Arc, -) -> SubsystemResult<()> { - let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( - config, - db.clone(), - keystore, - sync_oracle, - metrics, - clock, - spawner, - MAX_APPROVAL_RETRIES, - APPROVAL_CHECKING_TIMEOUT / 2, - ); - let backend = DbBackend::new(db.clone(), approval_voting.db_config); - let spawner = approval_voting.spawner.clone(); - spawner.spawn_blocking( - task_name, - Some(group_name), - Box::pin(async move { - if let Err(err) = run( - work_provider, - to_other_subsystems, - to_approval_distr, - approval_voting, - Box::new(RealAssignmentCriteria), - backend, - ) - .await - { - gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages"); - }; - }), - ); - Ok(()) -} - ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) // Handle actions is a function that accepts a set of instructions // and subsequently updates the underlying approvals_db in accordance // with the linear set of instructions passed in. Therefore, actions @@ -1644,11 +1566,8 @@ async fn handle_actions( backing_group, executor_params, core_index, -<<<<<<< HEAD &launch_approval_span, -======= retry, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) .await }, @@ -2612,7 +2531,12 @@ fn schedule_wakeup_action( last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now), next_no_show, ) - .map(|tick| Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }) + .map(|tick| Action::ScheduleWakeup { + block_hash, + block_number, + candidate_hash, + tick, + }) }, RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => { // select the minimum of `next_no_show`, or the tick of the next non-empty tranche @@ -3510,11 +3434,8 @@ async fn launch_approval( backing_group: GroupIndex, executor_params: ExecutorParams, core_index: Option, -<<<<<<< HEAD span: &jaeger::Span, -======= retry: RetryApprovalInfo, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -3545,12 +3466,8 @@ async fn launch_approval( } let candidate_hash = candidate.hash(); -<<<<<<< HEAD let para_id = candidate.descriptor.para_id; -======= - let para_id = candidate.descriptor.para_id(); let mut next_retry = None; ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); let request_validation_data_span = span diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index d076243c377b..65334b71580a 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -266,7 +266,8 @@ where _relay_vrf_story: polkadot_node_primitives::approval::v1::RelayVRFStory, _assignment: &polkadot_node_primitives::approval::v2::AssignmentCertV2, _backing_groups: Vec, - ) -> Result { + ) -> Result + { self.1(validator_index) } } @@ -578,12 +579,8 @@ fn test_harness>( sync_oracle, Metrics::default(), clock.clone(), -<<<<<<< HEAD -======= - Arc::new(SpawnGlue(pool)), NUM_APPROVAL_RETRIES, RETRY_BACKOFF, ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ), assignment_criteria, backend, diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index c23d51914c01..f05d061f3fde 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -852,52 +852,8 @@ fn build_overseer( .replace_availability_recovery(|_| MockAvailabilityRecovery::new()) .replace_candidate_validation(|_| MockCandidateValidation::new()); -<<<<<<< HEAD let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).expect("Should not fail"); -======= - let (overseer, raw_handle) = if state.options.approval_voting_parallel_enabled { - let approval_voting_parallel = ApprovalVotingParallelSubsystem::with_config_and_clock( - TEST_CONFIG, - db.clone(), - keystore.clone(), - Box::new(TestSyncOracle {}), - state.approval_voting_parallel_metrics.clone(), - Arc::new(system_clock.clone()), - SpawnGlue(spawn_task_handle.clone()), - None, - ); - dummy - .replace_approval_voting_parallel(|_| approval_voting_parallel) - .build_with_connector(overseer_connector) - .expect("Should not fail") - } else { - let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( - TEST_CONFIG, - db.clone(), - keystore.clone(), - Box::new(TestSyncOracle {}), - state.approval_voting_parallel_metrics.approval_voting_metrics(), - Arc::new(system_clock.clone()), - Arc::new(SpawnGlue(spawn_task_handle.clone())), - 1, - Duration::from_secs(1), - ); - - let approval_distribution = ApprovalDistribution::new_with_clock( - state.approval_voting_parallel_metrics.approval_distribution_metrics(), - TEST_CONFIG.slot_duration_millis, - Arc::new(system_clock.clone()), - Arc::new(RealAssignmentCriteria {}), - ); - - dummy - .replace_approval_voting(|_| approval_voting) - .replace_approval_distribution(|_| approval_distribution) - .build_with_connector(overseer_connector) - .expect("Should not fail") - }; ->>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) let overseer_handle = OverseerHandleReal::new(raw_handle); (overseer, overseer_handle)