Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3 / 5] Move crypto checks in the approval-distribution #4928

Merged
merged 22 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8dbb088
Make approval-distribution logic runnable on a separate thread
alexggh Jun 17, 2024
14727c5
Move crypto checks in the approval-distribution
alexggh Jun 19, 2024
7175957
Address review feedback
alexggh Jul 26, 2024
469866f
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Jul 26, 2024
d9fdd51
Address review feedback
alexggh Jul 26, 2024
2752d6e
Fail early on oversized claims
alexggh Jul 29, 2024
00d22ee
Add more unittests
alexggh Jul 29, 2024
4f99f65
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Jul 29, 2024
ead49d2
Review feedback
alexggh Aug 8, 2024
c67d99d
Add checked indirect assignment
alexggh Aug 8, 2024
247161f
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 12, 2024
0b140cc
Change OurViewChange order
alexggh Aug 13, 2024
0581468
Merge branch 'fix_signal_order' into alexaggh/approval-voting-paralle…
alexggh Aug 13, 2024
08889e1
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 14, 2024
b0fb6d4
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 27, 2024
7539184
Remove crate dependency between approval-distribution and approval-vo…
alexggh Aug 27, 2024
10efafa
Update documentation with the new flow
alexggh Sep 1, 2024
241b094
Add PrDoc
alexggh Sep 1, 2024
ecd7186
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Sep 1, 2024
04f481f
Make error case more robust
alexggh Sep 1, 2024
079f514
Make cargo fmt happy
alexggh Sep 1, 2024
4ac1ea2
Update subsystem-benchmark with the new numbers
alexggh Sep 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions polkadot/node/core/approval-voting/src/criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<'a> From<&'a SessionInfo> for Config {
}

/// A trait for producing and checking assignments. Used to mock.
pub(crate) trait AssignmentCriteria {
pub trait AssignmentCriteria {
fn compute_assignments(
&self,
keystore: &LocalKeystore,
Expand All @@ -276,7 +276,7 @@ pub(crate) trait AssignmentCriteria {
) -> Result<DelayTranche, InvalidAssignment>;
}

pub(crate) struct RealAssignmentCriteria;
pub struct RealAssignmentCriteria;

impl AssignmentCriteria for RealAssignmentCriteria {
fn compute_assignments(
Expand Down Expand Up @@ -614,7 +614,7 @@ fn compute_relay_vrf_delay_assignments(

/// Assignment invalid.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvalidAssignment(pub(crate) InvalidAssignmentReason);
pub struct InvalidAssignment(pub InvalidAssignmentReason);

impl std::fmt::Display for InvalidAssignment {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Expand All @@ -626,7 +626,7 @@ impl std::error::Error for InvalidAssignment {}

/// Failure conditions when checking an assignment cert.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InvalidAssignmentReason {
pub enum InvalidAssignmentReason {
ValidatorIndexOutOfBounds,
SampleOutOfBounds,
CoreIndexOutOfBounds,
Expand Down
6 changes: 5 additions & 1 deletion polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,13 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
hash: block_hash,
number: block_header.number,
parent_hash: block_header.parent_hash,
candidates: included_candidates.iter().map(|(hash, _, _, _)| *hash).collect(),
candidates: included_candidates
.iter()
.map(|(hash, _, core_index, group_index)| (*hash, *core_index, *group_index))
.collect(),
slot,
session: session_index,
vrf_story: relay_vrf_story,
});

imported_candidates.push(BlockImportedCandidates {
Expand Down
182 changes: 56 additions & 126 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
CandidateIndex, CandidateReceipt, CoreIndex, DisputeStatement, ExecutorParams, GroupIndex,
Hash, PvfExecKind, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId,
ValidatorIndex, ValidatorPair, ValidatorSignature,
CandidateIndex, CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex, Hash, PvfExecKind,
SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
Expand Down Expand Up @@ -91,7 +90,7 @@ use schnellru::{ByLength, LruMap};

use approval_checking::RequiredTranches;
use bitvec::{order::Lsb0, vec::BitVec};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
use time::{slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick};

Expand Down Expand Up @@ -123,7 +122,7 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
const APPROVAL_CACHE_SIZE: u32 = 1024;

const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
pub const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
const APPROVAL_DELAY: Tick = 2;
pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";

Expand Down Expand Up @@ -1607,9 +1606,21 @@ async fn distribution_messages_for_activation<Context>(
hash: block_hash,
number: block_entry.block_number(),
parent_hash: block_entry.parent_hash(),
candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
candidates: block_entry
.candidates()
.iter()
.flat_map(|(core_index, c_hash)| {
let candidate = db.load_candidate_entry(c_hash).ok().flatten();
candidate
.and_then(|entry| {
entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
})
.and_then(|group_index| Some((*c_hash, *core_index, group_index)))
})
.collect(),
slot: block_entry.slot(),
session: block_entry.session(),
vrf_story: block_entry.relay_vrf_story(),
});
let mut signatures_queued = HashSet::new();
for (core_index, candidate_hash) in block_entry.candidates() {
Expand Down Expand Up @@ -1872,35 +1883,47 @@ async fn handle_from_overseer<Context>(
vec![Action::Conclude]
},
FromOrchestra::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_cores, res) => {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_cores, tranche, tx) => {
let (check_outcome, actions) = check_and_import_assignment(
ctx.sender(),
state,
db,
session_info_provider,
a,
claimed_cores,
tranche,
)
.await?;
let _ = res.send(check_outcome);

// approval-distribution already sanity checks the assignment is valid and expected,
// so this import should never fail if it does it means there is a bug in the
// code.
if let AssignmentCheckResult::Bad(ref err) = check_outcome {
gum::error!(target: LOG_TARGET, ?err, "Unexpected fail to check and import assignment");
}
let _ = tx.map(|tx| tx.send(check_outcome));
actions
},
ApprovalVotingMessage::CheckAndImportApproval(a, res) =>
check_and_import_approval(
ApprovalVotingMessage::CheckAndImportApproval(a, tx) => {
let result = check_and_import_approval(
ctx.sender(),
state,
db,
session_info_provider,
metrics,
a,
|r| {
let _ = res.send(r);
},
&wakeups,
)
.await?
.0,
.await?;
// approval-distribution already sanity checks the vote is valid and expected,
// so this import should never fail if it does it means there is a bug in the
// code.
if let ApprovalCheckResult::Bad(ref err) = result.1 {
eskimor marked this conversation as resolved.
Show resolved Hide resolved
gum::error!(target: LOG_TARGET, ?err, "Unexpected fail to check and import approval");
}
let _ = tx.map(|tx| tx.send(result.1));

result.0
},
ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
let mut approved_ancestor_span = state
.spans
Expand Down Expand Up @@ -2384,7 +2407,12 @@ fn schedule_wakeup_action(
last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
next_no_show,
)
.map(|tick| Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick })
.map(|tick| Action::ScheduleWakeup {
block_hash,
block_number,
candidate_hash,
tick,
})
},
RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
Expand Down Expand Up @@ -2446,6 +2474,7 @@ async fn check_and_import_assignment<Sender>(
session_info_provider: &mut RuntimeInfo,
assignment: IndirectAssignmentCertV2,
candidate_indices: CandidateBitfield,
tranche: DelayTranche,
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
where
Sender: SubsystemSender<RuntimeApiMessage>,
Expand Down Expand Up @@ -2514,8 +2543,6 @@ where
))
}

// The Compact VRF modulo assignment cert has multiple core assignments.
let mut backing_groups = Vec::new();
let mut claimed_core_indices = Vec::new();
let mut assigned_candidate_hashes = Vec::new();

Expand Down Expand Up @@ -2551,7 +2578,7 @@ where
format!("{:?}", jaeger::hash_to_trace_identifier(assigned_candidate_hash.0)),
);

let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
let _approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
alexggh marked this conversation as resolved.
Show resolved Hide resolved
Some(a) => a,
None =>
return Ok((
Expand All @@ -2563,7 +2590,6 @@ where
)),
};

backing_groups.push(approval_entry.backing_group());
claimed_core_indices.push(claimed_core_index);
assigned_candidate_hashes.push(assigned_candidate_hash);
}
Expand All @@ -2579,42 +2605,6 @@ where
))
}

// Check the assignment certificate.
let res = state.assignment_criteria.check_assignment_cert(
claimed_core_indices
.clone()
.try_into()
.expect("Checked for null assignment above; qed"),
assignment.validator,
&criteria::Config::from(session_info),
block_entry.relay_vrf_story(),
&assignment.cert,
backing_groups,
);

let tranche = match res {
Err(crate::criteria::InvalidAssignment(reason)) =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
assignment.validator,
format!("{:?}", reason),
)),
Vec::new(),
)),
Ok(tranche) => {
let current_tranche =
state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());

let too_far_in_future = current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;

if tranche >= too_far_in_future {
return Ok((AssignmentCheckResult::TooFarInFuture, Vec::new()))
}

tranche
},
};

let mut actions = Vec::new();
let res = {
let mut is_duplicate = true;
Expand Down Expand Up @@ -2704,23 +2694,21 @@ where
Ok((res, actions))
}

async fn check_and_import_approval<T, Sender>(
async fn check_and_import_approval<Sender>(
sender: &mut Sender,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
approval: IndirectSignedApprovalVoteV2,
with_response: impl FnOnce(ApprovalCheckResult) -> T,
wakeups: &Wakeups,
) -> SubsystemResult<(Vec<Action>, T)>
) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
macro_rules! respond_early {
($e: expr) => {{
let t = with_response($e);
return Ok((Vec::new(), t))
return Ok((Vec::new(), $e))
}};
}
let mut span = state
Expand Down Expand Up @@ -2774,67 +2762,11 @@ where
),
);

{
let session_info = match get_session_info(
session_info_provider,
sender,
approval.block_hash,
block_entry.session(),
)
.await
{
Some(s) => s,
None => {
respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex(
block_entry.session()
),))
},
};

let pubkey = match session_info.validators.get(approval.validator) {
Some(k) => k,
None => respond_early!(ApprovalCheckResult::Bad(
ApprovalCheckError::InvalidValidatorIndex(approval.validator),
)),
};

gum::trace!(
target: LOG_TARGET,
"Received approval for num_candidates {:}",
approval.candidate_indices.count_ones()
);

let candidate_hashes: Vec<CandidateHash> =
approved_candidates_info.iter().map(|candidate| candidate.1).collect();
// Signature check:
match DisputeStatement::Valid(
ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(candidate_hashes.clone()),
)
.check_signature(
&pubkey,
if let Some(candidate_hash) = candidate_hashes.first() {
*candidate_hash
} else {
respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidValidatorIndex(
approval.validator
),))
},
block_entry.session(),
&approval.signature,
) {
Err(_) => {
gum::error!(
target: LOG_TARGET,
"Error while checking signature {:}",
approval.candidate_indices.count_ones()
);
respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidSignature(
approval.validator
),))
},
Ok(()) => {},
};
}
gum::trace!(
target: LOG_TARGET,
"Received approval for num_candidates {:}",
approval.candidate_indices.count_ones()
);

let mut actions = Vec::new();
for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
Expand Down Expand Up @@ -2898,9 +2830,7 @@ where
}

// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
let t = with_response(ApprovalCheckResult::Accepted);

Ok((actions, t))
Ok((actions, ApprovalCheckResult::Accepted))
}

#[derive(Debug)]
Expand Down
Loading
Loading