From ebc513b53b9684e26cae286f7c6ea4fbeadf43e1 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:52:49 +0200 Subject: [PATCH] BACKPORT-CONFLICT --- polkadot/node/core/approval-voting/src/lib.rs | 229 +++++++++++++++++- .../node/core/approval-voting/src/tests.rs | 150 ++++++++++++ .../subsystem-bench/src/lib/approval/mod.rs | 44 ++++ prdoc/pr_6807.prdoc | 19 ++ 4 files changed, 435 insertions(+), 7 deletions(-) create mode 100644 prdoc/pr_6807.prdoc diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 5f1e1c2c786f..72ddc552c474 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -21,9 +21,13 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. +<<<<<<< HEAD use itertools::Itertools; use jaeger::{hash_to_trace_identifier, PerLeafSpan}; use polkadot_node_jaeger as jaeger; +======= +use futures_timer::Delay; +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) use polkadot_node_primitives::{ approval::{ v1::{BlockApprovalMeta, DelayTranche}, @@ -124,6 +128,9 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500); const APPROVAL_CACHE_SIZE: u32 = 1024; +/// The maximum number of times we retry to approve a block if is still needed. +const MAX_APPROVAL_RETRIES: u32 = 16; + const APPROVAL_DELAY: Tick = 2; pub(crate) const LOG_TARGET: &str = "parachain::approval-voting"; @@ -165,7 +172,16 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, +<<<<<<< HEAD clock: Box, +======= + clock: Arc, + spawner: Arc, + /// The maximum time we retry to approve a block if it is still needed and PoV fetch failed. + max_approval_retries: u32, + /// The backoff before we retry the approval. + retry_backoff: Duration, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) } #[derive(Clone)] @@ -491,7 +507,14 @@ impl ApprovalVotingSubsystem { keystore, sync_oracle, metrics, +<<<<<<< HEAD Box::new(SystemClock {}), +======= + Arc::new(SystemClock {}), + spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) } @@ -502,7 +525,14 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, +<<<<<<< HEAD clock: Box, +======= + clock: Arc, + spawner: Arc, + max_approval_retries: u32, + retry_backoff: Duration, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) -> Self { ApprovalVotingSubsystem { keystore, @@ -512,6 +542,12 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, +<<<<<<< HEAD +======= + spawner, + max_approval_retries, + retry_backoff, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) } } @@ -701,18 +737,53 @@ enum ApprovalOutcome { TimedOut, } +#[derive(Clone)] +struct RetryApprovalInfo { + candidate: CandidateReceipt, + backing_group: GroupIndex, + executor_params: ExecutorParams, + core_index: Option, + session_index: SessionIndex, + attempts_remaining: u32, + backoff: Duration, +} + struct ApprovalState { validator_index: ValidatorIndex, candidate_hash: CandidateHash, approval_outcome: ApprovalOutcome, + retry_info: Option, } impl ApprovalState { fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Approved, + retry_info: None, + } } fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info: None, + } + } + + fn failed_with_retry( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + retry_info: Option, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info, + } } } @@ -752,6 +823,7 @@ impl CurrentlyCheckingSet { candidate_hash, validator_index, approval_outcome: ApprovalOutcome::TimedOut, + retry_info: None, }, Some(approval_state) => approval_state, } @@ -1236,18 +1308,19 @@ where validator_index, candidate_hash, approval_outcome, + retry_info, } ) = approval_state; if matches!(approval_outcome, ApprovalOutcome::Approved) { let mut approvals: Vec = relay_block_hashes - .into_iter() + .iter() .map(|block_hash| Action::IssueApproval( candidate_hash, ApprovalVoteRequest { validator_index, - block_hash, + block_hash: *block_hash, }, ) ) @@ -1255,6 +1328,43 @@ where actions.append(&mut approvals); } + if let Some(retry_info) = retry_info { + for block_hash in relay_block_hashes { + if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) { + let sender = to_other_subsystems.clone(); + let spawn_handle = subsystem.spawner.clone(); + let metrics = subsystem.metrics.clone(); + let retry_info = retry_info.clone(); + let executor_params = retry_info.executor_params.clone(); + let candidate = retry_info.candidate.clone(); + + currently_checking_set + .insert_relay_block_hash( + candidate_hash, + validator_index, + block_hash, + async move { + launch_approval( + sender, + spawn_handle, + metrics, + retry_info.session_index, + candidate, + validator_index, + block_hash, + retry_info.backing_group, + executor_params, + retry_info.core_index, + retry_info, + ) + .await + }, + ) + .await?; + } + } + } + actions }, (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { @@ -1302,6 +1412,8 @@ where &mut approvals_cache, &mut subsystem.mode, actions, + subsystem.max_approval_retries, + subsystem.retry_backoff, ) .await? { @@ -1318,6 +1430,68 @@ where Ok(()) } +<<<<<<< HEAD +======= +// Starts a worker thread that runs the approval voting subsystem. +pub async fn start_approval_worker< + WorkProvider: ApprovalVotingWorkProvider + Send + 'static, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + work_provider: WorkProvider, + to_other_subsystems: Sender, + to_approval_distr: ADSender, + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + spawner: Arc, + task_name: &'static str, + group_name: &'static str, + clock: Arc, +) -> SubsystemResult<()> { + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + config, + db.clone(), + keystore, + sync_oracle, + metrics, + clock, + spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, + ); + let backend = DbBackend::new(db.clone(), approval_voting.db_config); + let spawner = approval_voting.spawner.clone(); + spawner.spawn_blocking( + task_name, + Some(group_name), + Box::pin(async move { + if let Err(err) = run( + work_provider, + to_other_subsystems, + to_approval_distr, + approval_voting, + Box::new(RealAssignmentCriteria), + backend, + ) + .await + { + gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages"); + }; + }), + ); + Ok(()) +} + +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) // Handle actions is a function that accepts a set of instructions // and subsequently updates the underlying approvals_db in accordance // with the linear set of instructions passed in. Therefore, actions @@ -1350,6 +1524,8 @@ async fn handle_actions( approvals_cache: &mut LruMap, mode: &mut Mode, actions: Vec, + max_approval_retries: u32, + retry_backoff: Duration, ) -> SubsystemResult { let mut conclude = false; let mut actions_iter = actions.into_iter(); @@ -1442,6 +1618,16 @@ async fn handle_actions( None => { let ctx = &mut *ctx; + let retry = RetryApprovalInfo { + candidate: candidate.clone(), + backing_group, + executor_params: executor_params.clone(), + core_index, + session_index: session, + attempts_remaining: max_approval_retries, + backoff: retry_backoff, + }; + currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1458,7 +1644,11 @@ async fn handle_actions( backing_group, executor_params, core_index, +<<<<<<< HEAD &launch_approval_span, +======= + retry, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) .await }, @@ -3320,7 +3510,11 @@ async fn launch_approval( backing_group: GroupIndex, executor_params: ExecutorParams, core_index: Option, +<<<<<<< HEAD span: &jaeger::Span, +======= + retry: RetryApprovalInfo, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -3351,7 +3545,12 @@ async fn launch_approval( } let candidate_hash = candidate.hash(); +<<<<<<< HEAD let para_id = candidate.descriptor.para_id; +======= + let para_id = candidate.descriptor.para_id(); + let mut next_retry = None; +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); let request_validation_data_span = span @@ -3390,7 +3589,6 @@ async fn launch_approval( let background = async move { // Force the move of the timer into the background task. let _timer = timer; - let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a, @@ -3401,10 +3599,27 @@ async fn launch_approval( target: LOG_TARGET, ?para_id, ?candidate_hash, + attempts_remaining = retry.attempts_remaining, "Data unavailable for candidate {:?}", (candidate_hash, candidate.descriptor.para_id), ); - // do nothing. we'll just be a no-show and that'll cause others to rise up. + // Availability could fail if we did not discover much of the network, so + // let's back off and order the subsystem to retry at a later point if the + // approval is still needed, because no-show wasn't covered yet. + if retry.attempts_remaining > 0 { + Delay::new(retry.backoff).await; + next_retry = Some(RetryApprovalInfo { + candidate, + backing_group, + executor_params, + core_index, + session_index, + attempts_remaining: retry.attempts_remaining - 1, + backoff: retry.backoff, + }); + } else { + next_retry = None; + } metrics_guard.take().on_approval_unavailable(); }, &RecoveryError::ChannelClosed => { @@ -3435,7 +3650,7 @@ async fn launch_approval( metrics_guard.take().on_approval_invalid(); }, } - return ApprovalState::failed(validator_index, candidate_hash) + return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry) }, }; drop(request_validation_data_span); diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 561f5fbbdf2c..d076243c377b 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -75,6 +75,9 @@ const SLOT_DURATION_MILLIS: u64 = 5000; const TIMEOUT: Duration = Duration::from_millis(2000); +const NUM_APPROVAL_RETRIES: u32 = 3; +const RETRY_BACKOFF: Duration = Duration::from_millis(300); + #[derive(Clone)] struct TestSyncOracle { flag: Arc, @@ -575,6 +578,12 @@ fn test_harness>( sync_oracle, Metrics::default(), clock.clone(), +<<<<<<< HEAD +======= + Arc::new(SpawnGlue(pool)), + NUM_APPROVAL_RETRIES, + RETRY_BACKOFF, +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) ), assignment_criteria, backend, @@ -3202,6 +3211,20 @@ async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) { ); } +async fn recover_available_data_failure(virtual_overseer: &mut VirtualOverseer) { + let available_data = RecoveryError::Unavailable; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, tx) + ) => { + tx.send(Err(available_data)).unwrap(); + }, + "overseer did not receive recover available data message", + ); +} + struct TriggersAssignmentConfig { our_assigned_tranche: DelayTranche, assign_validator_tranche: F1, @@ -4791,6 +4814,133 @@ fn subsystem_relaunches_approval_work_on_restart() { }); } +/// Test that we retry the approval of candidate on availability failure, up to max retries. +#[test] +fn subsystem_relaunches_approval_work_on_availability_failure() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay { + core_index: CoreIndex(1), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_blocks_with_two_assignments_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + // We have two candidates for one we are going to fail the availability for up to + // max_retries and for the other we are going to succeed on the last retry, so we should + // see the approval being distributed. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + // Test that cached approvals, which are candidates that we approved but we didn't issue // the signature yet because we want to coalesce it with more candidate are sent after restart. #[test] diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index f05d061f3fde..c23d51914c01 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -852,8 +852,52 @@ fn build_overseer( .replace_availability_recovery(|_| MockAvailabilityRecovery::new()) .replace_candidate_validation(|_| MockCandidateValidation::new()); +<<<<<<< HEAD let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).expect("Should not fail"); +======= + let (overseer, raw_handle) = if state.options.approval_voting_parallel_enabled { + let approval_voting_parallel = ApprovalVotingParallelSubsystem::with_config_and_clock( + TEST_CONFIG, + db.clone(), + keystore.clone(), + Box::new(TestSyncOracle {}), + state.approval_voting_parallel_metrics.clone(), + Arc::new(system_clock.clone()), + SpawnGlue(spawn_task_handle.clone()), + None, + ); + dummy + .replace_approval_voting_parallel(|_| approval_voting_parallel) + .build_with_connector(overseer_connector) + .expect("Should not fail") + } else { + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + TEST_CONFIG, + db.clone(), + keystore.clone(), + Box::new(TestSyncOracle {}), + state.approval_voting_parallel_metrics.approval_voting_metrics(), + Arc::new(system_clock.clone()), + Arc::new(SpawnGlue(spawn_task_handle.clone())), + 1, + Duration::from_secs(1), + ); + + let approval_distribution = ApprovalDistribution::new_with_clock( + state.approval_voting_parallel_metrics.approval_distribution_metrics(), + TEST_CONFIG.slot_duration_millis, + Arc::new(system_clock.clone()), + Arc::new(RealAssignmentCriteria {}), + ); + + dummy + .replace_approval_voting(|_| approval_voting) + .replace_approval_distribution(|_| approval_distribution) + .build_with_connector(overseer_connector) + .expect("Should not fail") + }; +>>>>>>> 6878ba1f (Retry approval on availability failure if the check is still needed (#6807)) let overseer_handle = OverseerHandleReal::new(raw_handle); (overseer, overseer_handle) diff --git a/prdoc/pr_6807.prdoc b/prdoc/pr_6807.prdoc new file mode 100644 index 000000000000..b9564dfb2fe2 --- /dev/null +++ b/prdoc/pr_6807.prdoc @@ -0,0 +1,19 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Retry approval on availability failure if the check is still needed + +doc: + - audience: Node Dev + description: | + Recovering the POV can fail in situation where the node just restart and the DHT topology + wasn't fully discovered yet, so the current node can't connect to most of its Peers. + This is bad because for gossiping the assignment you need to be connected to just a few + peers, so because we can't approve the candidate other nodes will see this as a no show. + Fix it by retrying to approve a candidate for a fixed number of atttempts if the block is + still needed. + + +crates: + - name: polkadot-node-core-approval-voting + bump: minor