Skip to content

Commit

Permalink
Move PVF code and PoV decompression to PVF host workers (#5142)
Browse files Browse the repository at this point in the history
Closes #5071 

This PR aims to
* Move all the blocking decompression from the candidate validation
subsystem to the PVF host workers;
* Run the candidate validation subsystem on the non-blocking pool again.

Upsides: no blocking operations in the subsystem's main loop. PVF
throughput is not limited by the ability of the subsystem to decompress
a lot of stuff. Correctness and homogeneity improve, as the artifact
used to be identified by the hash of decompressed code, and now they are
identified by the hash of compressed code, which coincides with the
on-chain `ValidationCodeHash`.

Downsides: the PVF code decompression is now accounted for in the PVF
preparation timeout (be it pre-checking or actual preparation). Taking
into account that the decompression duration is on the order of
milliseconds, and the preparation timeout is on the order of seconds, I
believe it is negligible.
  • Loading branch information
s0me0ne-unkn0wn authored Aug 9, 2024
1 parent 380cd21 commit 47c1b4c
Show file tree
Hide file tree
Showing 26 changed files with 642 additions and 525 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion polkadot/node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true }

sp-keystore = { workspace = true }
sp-application-crypto = { workspace = true }
sp-maybe-compressed-blob = { workspace = true, default-features = true }
codec = { features = ["bit-vec", "derive"], workspace = true }

polkadot-primitives = { workspace = true, default-features = true }
Expand All @@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true }
futures = { features = ["thread-pool"], workspace = true }
assert_matches = { workspace = true }
polkadot-node-subsystem-test-helpers = { workspace = true }
sp-maybe-compressed-blob = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
polkadot-primitives-test-helpers = { workspace = true }
134 changes: 47 additions & 87 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
};
use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
Expand All @@ -41,9 +39,7 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util as util;
use polkadot_overseer::ActiveLeavesUpdate;
use polkadot_parachain_primitives::primitives::{
ValidationParams, ValidationResult as WasmValidationResult,
};
use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
use polkadot_primitives::{
executor_params::{
DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
Expand Down Expand Up @@ -504,21 +500,12 @@ where
continue;
};

let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code");
continue
},
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
);

active_pvfs.push(pvf);
processed_code_hashes.push(code_hash);
Expand Down Expand Up @@ -651,21 +638,12 @@ where

let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);

let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
executor_params,
timeout,
PrepareJobKind::Prechecking,
),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
},
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params,
timeout,
PrepareJobKind::Prechecking,
);

match validation_backend.precheck_pvf(pvf).await {
Ok(_) => PreCheckOutcome::Valid,
Expand Down Expand Up @@ -873,49 +851,15 @@ async fn validate_candidate_exhaustive(
return Ok(ValidationResult::Invalid(e))
}

let raw_validation_code = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => code,
Err(e) => {
gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)");

// Code already passed pre-checking, if decompression fails now this most likely means
// some local corruption happened.
return Err(ValidationFailed("Code decompression failed".to_string()))
},
};
metrics.observe_code_size(raw_validation_code.len());

metrics.observe_pov_size(pov.block_data.0.len(), true);
let raw_block_data =
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
Ok(block_data) => BlockData(block_data.to_vec()),
Err(e) => {
gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)");

// If the PoV is invalid, the candidate certainly is.
return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))
},
};
metrics.observe_pov_size(raw_block_data.0.len(), false);

let params = ValidationParams {
parent_head: persisted_validation_data.parent_head.clone(),
block_data: raw_block_data,
relay_parent_number: persisted_validation_data.relay_parent_number,
relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root,
};

let persisted_validation_data = Arc::new(persisted_validation_data);
let result = match exec_kind {
// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
// honest backers getting slashed.
PvfExecKind::Backing => {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind);
let pvf = PvfPrepData::from_code(
raw_validation_code.to_vec(),
validation_code.0,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
Expand All @@ -925,17 +869,19 @@ async fn validate_candidate_exhaustive(
.validate_candidate(
pvf,
exec_timeout,
params.encode(),
persisted_validation_data.clone(),
pov,
polkadot_node_core_pvf::Priority::Normal,
)
.await
},
PvfExecKind::Approval =>
validation_backend
.validate_candidate_with_retry(
raw_validation_code.to_vec(),
validation_code.0,
pvf_exec_timeout(&executor_params, exec_kind),
params,
persisted_validation_data.clone(),
pov,
executor_params,
PVF_APPROVAL_EXECUTION_RETRY_DELAY,
polkadot_node_core_pvf::Priority::Critical,
Expand All @@ -961,6 +907,8 @@ async fn validate_candidate_exhaustive(
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
Expand Down Expand Up @@ -1007,7 +955,7 @@ async fn validate_candidate_exhaustive(
// invalid.
Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
} else {
Ok(ValidationResult::Valid(outputs, persisted_validation_data))
Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone()))
}
},
}
Expand All @@ -1020,7 +968,8 @@ trait ValidationBackend {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
prepare_priority: polkadot_node_core_pvf::Priority,
) -> Result<WasmValidationResult, ValidationError>;
Expand All @@ -1035,9 +984,10 @@ trait ValidationBackend {
/// preparation.
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
code: Vec<u8>,
exec_timeout: Duration,
params: ValidationParams,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
retry_delay: Duration,
// The priority for the preparation job.
Expand All @@ -1046,7 +996,7 @@ trait ValidationBackend {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(
raw_validation_code,
code,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
Expand All @@ -1057,7 +1007,13 @@ trait ValidationBackend {

// Use `Priority::Critical` as finality trumps parachain liveliness.
let mut validation_result = self
.validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority)
.validate_candidate(
pvf.clone(),
exec_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
)
.await;
if validation_result.is_ok() {
return validation_result
Expand Down Expand Up @@ -1130,10 +1086,14 @@ trait ValidationBackend {
validation_result
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result = self
.validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority)
.validate_candidate(
pvf.clone(),
new_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
)
.await;
}
}
Expand All @@ -1153,13 +1113,13 @@ impl ValidationBackend for ValidationHost {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
prepare_priority: polkadot_node_core_pvf::Priority,
) -> Result<WasmValidationResult, ValidationError> {
let (tx, rx) = oneshot::channel();
if let Err(err) =
self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await
if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await
{
return Err(InternalValidationError::HostCommunication(format!(
"cannot send pvf to the validation host, it might have shut down: {:?}",
Expand Down
44 changes: 0 additions & 44 deletions polkadot/node/core/candidate-validation/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub(crate) struct MetricsInner {
pub(crate) validate_from_chain_state: prometheus::Histogram,
pub(crate) validate_from_exhaustive: prometheus::Histogram,
pub(crate) validate_candidate_exhaustive: prometheus::Histogram,
pub(crate) pov_size: prometheus::HistogramVec,
pub(crate) code_size: prometheus::Histogram,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -70,21 +68,6 @@ impl Metrics {
.as_ref()
.map(|metrics| metrics.validate_candidate_exhaustive.start_timer())
}

pub fn observe_code_size(&self, code_size: usize) {
if let Some(metrics) = &self.0 {
metrics.code_size.observe(code_size as f64);
}
}

pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) {
if let Some(metrics) = &self.0 {
metrics
.pov_size
.with_label_values(&[if compressed { "true" } else { "false" }])
.observe(pov_size as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
pov_size: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_candidate_validation_pov_size",
"The compressed and decompressed size of the proof of validity of a candidate",
)
.buckets(
prometheus::exponential_buckets(16384.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
&["compressed"],
)?,
registry,
)?,
code_size: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_candidate_validation_code_size",
"The size of the decompressed WASM validation blob used for checking a candidate",
)
.buckets(
prometheus::exponential_buckets(16384.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
Loading

0 comments on commit 47c1b4c

Please sign in to comment.