From 47c1b4cd8dd673d25c37b802f44da30a23ec9b9e Mon Sep 17 00:00:00 2001 From: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Date: Fri, 9 Aug 2024 17:59:56 +0200 Subject: [PATCH] Move PVF code and PoV decompression to PVF host workers (#5142) Closes #5071 This PR aims to * Move all the blocking decompression from the candidate validation subsystem to the PVF host workers; * Run the candidate validation subsystem on the non-blocking pool again. Upsides: no blocking operations in the subsystem's main loop. PVF throughput is not limited by the ability of the subsystem to decompress a lot of stuff. Correctness and homogeneity improve, as the artifact used to be identified by the hash of decompressed code, and now they are identified by the hash of compressed code, which coincides with the on-chain `ValidationCodeHash`. Downsides: the PVF code decompression is now accounted for in the PVF preparation timeout (be it pre-checking or actual preparation). Taking into account that the decompression duration is on the order of milliseconds, and the preparation timeout is on the order of seconds, I believe it is negligible. --- Cargo.lock | 3 + .../node/core/candidate-validation/Cargo.toml | 2 +- .../node/core/candidate-validation/src/lib.rs | 134 +++++------- .../core/candidate-validation/src/metrics.rs | 44 ---- .../core/candidate-validation/src/tests.rs | 183 +---------------- .../benches/host_prepare_rococo_runtime.rs | 2 +- polkadot/node/core/pvf/common/src/error.rs | 10 +- polkadot/node/core/pvf/common/src/execute.rs | 4 + polkadot/node/core/pvf/common/src/prepare.rs | 2 + polkadot/node/core/pvf/common/src/pvf.rs | 20 +- .../node/core/pvf/execute-worker/Cargo.toml | 3 + .../node/core/pvf/execute-worker/src/lib.rs | 65 +++++- .../node/core/pvf/prepare-worker/Cargo.toml | 2 + .../benches/prepare_rococo_runtime.rs | 6 +- .../node/core/pvf/prepare-worker/src/lib.rs | 50 ++++- polkadot/node/core/pvf/src/error.rs | 3 + polkadot/node/core/pvf/src/execute/queue.rs | 52 ++++- .../core/pvf/src/execute/worker_interface.rs | 36 ++-- polkadot/node/core/pvf/src/host.rs | 106 +++++++--- polkadot/node/core/pvf/src/metrics.rs | 46 +++++ .../core/pvf/src/prepare/worker_interface.rs | 10 +- polkadot/node/core/pvf/tests/it/adder.rs | 84 ++++---- polkadot/node/core/pvf/tests/it/main.rs | 192 ++++++++++++------ polkadot/node/core/pvf/tests/it/process.rs | 80 +++++--- polkadot/node/overseer/src/lib.rs | 2 +- prdoc/pr_5142.prdoc | 26 +++ 26 files changed, 642 insertions(+), 525 deletions(-) create mode 100644 prdoc/pr_5142.prdoc diff --git a/Cargo.lock b/Cargo.lock index 54a01f12f35c..4db38311fe66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13589,8 +13589,10 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-parachain-primitives", "polkadot-primitives", + "sp-maybe-compressed-blob", "tracing-gum", ] @@ -13605,6 +13607,7 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-primitives", "rayon", "rococo-runtime", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index 13ab3e3fba50..fcacc38cae65 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true } sp-keystore = { workspace = true } sp-application-crypto = { workspace = true } -sp-maybe-compressed-blob = { workspace = true, default-features = true } codec = { features = ["bit-vec", "derive"], workspace = true } polkadot-primitives = { workspace = true, default-features = true } @@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true } futures = { features = ["thread-pool"], workspace = true } assert_matches = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } polkadot-primitives-test-helpers = { workspace = true } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 1985964ebc51..103d29e8d269 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, }; -use polkadot_node_primitives::{ - BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, -}; +use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult}; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ @@ -41,9 +39,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util as util; use polkadot_overseer::ActiveLeavesUpdate; -use polkadot_parachain_primitives::primitives::{ - ValidationParams, ValidationResult as WasmValidationResult, -}; +use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult; use polkadot_primitives::{ executor_params::{ DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT, @@ -504,21 +500,12 @@ where continue; }; - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params.clone(), - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code"); - continue - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params.clone(), + timeout, + PrepareJobKind::Prechecking, + ); active_pvfs.push(pvf); processed_code_hashes.push(code_hash); @@ -651,21 +638,12 @@ where let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck); - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params, - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); - return PreCheckOutcome::Invalid - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params, + timeout, + PrepareJobKind::Prechecking, + ); match validation_backend.precheck_pvf(pvf).await { Ok(_) => PreCheckOutcome::Valid, @@ -873,41 +851,7 @@ async fn validate_candidate_exhaustive( return Ok(ValidationResult::Invalid(e)) } - let raw_validation_code = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => code, - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)"); - - // Code already passed pre-checking, if decompression fails now this most likely means - // some local corruption happened. - return Err(ValidationFailed("Code decompression failed".to_string())) - }, - }; - metrics.observe_code_size(raw_validation_code.len()); - - metrics.observe_pov_size(pov.block_data.0.len(), true); - let raw_block_data = - match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { - Ok(block_data) => BlockData(block_data.to_vec()), - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)"); - - // If the PoV is invalid, the candidate certainly is. - return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)) - }, - }; - metrics.observe_pov_size(raw_block_data.0.len(), false); - - let params = ValidationParams { - parent_head: persisted_validation_data.parent_head.clone(), - block_data: raw_block_data, - relay_parent_number: persisted_validation_data.relay_parent_number, - relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, - }; - + let persisted_validation_data = Arc::new(persisted_validation_data); let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and // honest backers getting slashed. @@ -915,7 +859,7 @@ async fn validate_candidate_exhaustive( let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind); let pvf = PvfPrepData::from_code( - raw_validation_code.to_vec(), + validation_code.0, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -925,7 +869,8 @@ async fn validate_candidate_exhaustive( .validate_candidate( pvf, exec_timeout, - params.encode(), + persisted_validation_data.clone(), + pov, polkadot_node_core_pvf::Priority::Normal, ) .await @@ -933,9 +878,10 @@ async fn validate_candidate_exhaustive( PvfExecKind::Approval => validation_backend .validate_candidate_with_retry( - raw_validation_code.to_vec(), + validation_code.0, pvf_exec_timeout(&executor_params, exec_kind), - params, + persisted_validation_data.clone(), + pov, executor_params, PVF_APPROVAL_EXECUTION_RETRY_DELAY, polkadot_node_core_pvf::Priority::Critical, @@ -961,6 +907,8 @@ async fn validate_candidate_exhaustive( Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), + Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) => + Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)), Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambiguous worker death".to_string(), @@ -1007,7 +955,7 @@ async fn validate_candidate_exhaustive( // invalid. Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch)) } else { - Ok(ValidationResult::Valid(outputs, persisted_validation_data)) + Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone())) } }, } @@ -1020,7 +968,8 @@ trait ValidationBackend { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result; @@ -1035,9 +984,10 @@ trait ValidationBackend { /// preparation. async fn validate_candidate_with_retry( &mut self, - raw_validation_code: Vec, + code: Vec, exec_timeout: Duration, - params: ValidationParams, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, retry_delay: Duration, // The priority for the preparation job. @@ -1046,7 +996,7 @@ trait ValidationBackend { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = PvfPrepData::from_code( - raw_validation_code, + code, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -1057,7 +1007,13 @@ trait ValidationBackend { // Use `Priority::Critical` as finality trumps parachain liveliness. let mut validation_result = self - .validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + exec_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; if validation_result.is_ok() { return validation_result @@ -1130,10 +1086,14 @@ trait ValidationBackend { validation_result ); - // Encode the params again when re-trying. We expect the retry case to be relatively - // rare, and we want to avoid unconditionally cloning data. validation_result = self - .validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + new_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; } } @@ -1153,13 +1113,13 @@ impl ValidationBackend for ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(err) = - self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await + if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await { return Err(InternalValidationError::HostCommunication(format!( "cannot send pvf to the validation host, it might have shut down: {:?}", diff --git a/polkadot/node/core/candidate-validation/src/metrics.rs b/polkadot/node/core/candidate-validation/src/metrics.rs index 28fc957ddb1a..1459907aa599 100644 --- a/polkadot/node/core/candidate-validation/src/metrics.rs +++ b/polkadot/node/core/candidate-validation/src/metrics.rs @@ -23,8 +23,6 @@ pub(crate) struct MetricsInner { pub(crate) validate_from_chain_state: prometheus::Histogram, pub(crate) validate_from_exhaustive: prometheus::Histogram, pub(crate) validate_candidate_exhaustive: prometheus::Histogram, - pub(crate) pov_size: prometheus::HistogramVec, - pub(crate) code_size: prometheus::Histogram, } /// Candidate validation metrics. @@ -70,21 +68,6 @@ impl Metrics { .as_ref() .map(|metrics| metrics.validate_candidate_exhaustive.start_timer()) } - - pub fn observe_code_size(&self, code_size: usize) { - if let Some(metrics) = &self.0 { - metrics.code_size.observe(code_size as f64); - } - } - - pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) { - if let Some(metrics) = &self.0 { - metrics - .pov_size - .with_label_values(&[if compressed { "true" } else { "false" }]) - .observe(pov_size as f64); - } - } } impl metrics::Metrics for Metrics { @@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, - pov_size: prometheus::register( - prometheus::HistogramVec::new( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_pov_size", - "The compressed and decompressed size of the proof of validity of a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - &["compressed"], - )?, - registry, - )?, - code_size: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_code_size", - "The size of the decompressed WASM validation blob used for checking a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - )?, - registry, - )?, }; Ok(Metrics(Some(metrics))) } diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 86d855f78b45..55282fdf4ee1 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -20,6 +20,7 @@ use super::*; use assert_matches::assert_matches; use futures::executor; use polkadot_node_core_pvf::PrepareError; +use polkadot_node_primitives::{BlockData, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_util::reexports::SubsystemContext; use polkadot_overseer::ActivatedLeaf; @@ -385,7 +386,8 @@ impl ValidationBackend for MockValidateCandidateBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { // This is expected to panic if called more times than expected, indicating an error in the @@ -950,115 +952,6 @@ fn compressed_code_works() { assert_matches!(v, Ok(ValidationResult::Valid(_, _))); } -#[test] -fn code_decompression_failure_is_error() { - let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; - let pov = PoV { block_data: BlockData(vec![1; 32]) }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Err(_)); -} - -#[test] -fn pov_decompression_failure_is_invalid() { - let validation_data = - PersistedValidationData { max_pov_size: POV_BOMB_LIMIT as u32, ..Default::default() }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_block_data = vec![2u8; POV_BOMB_LIMIT + 1]; - let pov = sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1) - .map(|raw| PoV { block_data: BlockData(raw) }) - .unwrap(); - - let validation_code = ValidationCode(vec![2; 16]); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))); -} - struct MockPreCheckBackend { result: Result<(), PrepareError>, } @@ -1075,7 +968,8 @@ impl ValidationBackend for MockPreCheckBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { unreachable!() @@ -1149,70 +1043,6 @@ fn precheck_works() { executor::block_on(test_fut); } -#[test] -fn precheck_invalid_pvf_blob_compression() { - let relay_parent = [3; 32].into(); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - let validation_code_hash = validation_code.hash(); - - let pool = TaskExecutor::new(); - let (mut ctx, mut ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let (check_fut, check_result) = precheck_pvf( - ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(())), - relay_parent, - validation_code_hash, - ) - .remote_handle(); - - let test_fut = async move { - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCodeByHash( - vch, - tx - ), - )) => { - assert_eq!(vch, validation_code_hash); - assert_eq!(rp, relay_parent); - - let _ = tx.send(Ok(Some(validation_code.clone()))); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) - ) => { - tx.send(Ok(1u32.into())).unwrap(); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) - ) => { - tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); - } - ); - assert_matches!(check_result.await, PreCheckOutcome::Invalid); - }; - - let test_fut = future::join(test_fut, check_fut); - executor::block_on(test_fut); -} - #[test] fn precheck_properly_classifies_outcomes() { let inner = |prepare_result, precheck_outcome| { @@ -1292,7 +1122,8 @@ impl ValidationBackend for MockHeadsUp { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { unreachable!() diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs index 97a03e6596d1..342128b7cca2 100644 --- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs @@ -116,7 +116,7 @@ fn host_prepare_rococo_runtime(c: &mut Criterion) { cfg.prepare_workers_hard_max_num = 1; }) .await, - pvf.clone().code(), + pvf.clone().maybe_compressed_code(), ) }, |result| async move { diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 7ee05448d3c5..b0cdba9501db 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -94,6 +94,10 @@ pub enum PrepareError { #[codec(index = 11)] #[error("prepare: error interfacing with the kernel: {0}")] Kernel(String), + /// Code blob failed to decompress + #[codec(index = 12)] + #[error("prepare: could not decompress code blob: {0}")] + CouldNotDecompressCodeBlob(String), } impl PrepareError { @@ -106,7 +110,11 @@ impl PrepareError { pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { - Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true, + Prevalidation(_) | + Preparation(_) | + JobError(_) | + OutOfMemory | + CouldNotDecompressCodeBlob(_) => true, IoErr(_) | JobDied { .. } | CreateTmpFile(_) | diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 46862f9f80b6..cff3f3b86e95 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -35,6 +35,8 @@ pub struct WorkerResponse { pub job_response: JobResponse, /// The amount of CPU time taken by the job. pub duration: Duration, + /// The uncompressed PoV size. + pub pov_size: u32, } /// An error occurred in the worker process. @@ -77,6 +79,8 @@ pub enum JobResponse { RuntimeConstruction(String), /// The candidate is invalid. InvalidCandidate(String), + /// PoV decompression failed + PoVDecompressionFailure, } impl JobResponse { diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 81e165a7b8a4..4cd1beb30991 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -44,6 +44,8 @@ pub struct PrepareStats { pub cpu_time_elapsed: std::time::Duration, /// The observed memory statistics for the preparation job. pub memory_stats: MemoryStats, + /// The decompressed Wasm code length observed during the preparation. + pub observed_wasm_code_len: u32, } /// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs index e2ac36a2406a..4019a8d8b0d0 100644 --- a/polkadot/node/core/pvf/common/src/pvf.rs +++ b/polkadot/node/core/pvf/common/src/pvf.rs @@ -26,9 +26,9 @@ use std::{fmt, sync::Arc, time::Duration}; /// Should be cheap to clone. #[derive(Clone, Encode, Decode)] pub struct PvfPrepData { - /// Wasm code (uncompressed) - code: Arc>, - /// Wasm code hash + /// Wasm code (maybe compressed) + maybe_compressed_code: Arc>, + /// Wasm code hash. code_hash: ValidationCodeHash, /// Executor environment parameters for the session for which artifact is prepared executor_params: Arc, @@ -46,20 +46,20 @@ impl PvfPrepData { prep_timeout: Duration, prep_kind: PrepareJobKind, ) -> Self { - let code = Arc::new(code); - let code_hash = sp_crypto_hashing::blake2_256(&code).into(); + let maybe_compressed_code = Arc::new(code); + let code_hash = sp_crypto_hashing::blake2_256(&maybe_compressed_code).into(); let executor_params = Arc::new(executor_params); - Self { code, code_hash, executor_params, prep_timeout, prep_kind } + Self { maybe_compressed_code, code_hash, executor_params, prep_timeout, prep_kind } } - /// Returns validation code hash for the PVF + /// Returns validation code hash pub fn code_hash(&self) -> ValidationCodeHash { self.code_hash } - /// Returns PVF code - pub fn code(&self) -> Arc> { - self.code.clone() + /// Returns PVF code blob + pub fn maybe_compressed_code(&self) -> Arc> { + self.maybe_compressed_code.clone() } /// Returns executor params diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml index f24b66dc4a0e..6ad340d25612 100644 --- a/polkadot/node/core/pvf/execute-worker/Cargo.toml +++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml @@ -19,8 +19,11 @@ libc = { workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-parachain-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } + [features] builder = [] diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 35858ab36cec..4b7c167cc9ec 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -22,6 +22,7 @@ pub use polkadot_node_core_pvf_common::{ error::ExecuteError, executor_interface::execute_artifact, }; +use polkadot_parachain_primitives::primitives::ValidationParams; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. @@ -50,8 +51,9 @@ use polkadot_node_core_pvf_common::{ }, worker_dir, }; +use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT}; use polkadot_parachain_primitives::primitives::ValidationResult; -use polkadot_primitives::ExecutorParams; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; use std::{ io::{self, Read}, os::{ @@ -85,8 +87,23 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result { Ok(handshake) } -fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { - let params = framed_recv_blocking(stream)?; +fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> { + let pvd = framed_recv_blocking(stream)?; + let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode persisted validation data".to_string(), + ) + })?; + + let pov = framed_recv_blocking(stream)?; + let pov = PoV::decode(&mut &pov[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode PoV".to_string(), + ) + })?; + let execution_timeout = framed_recv_blocking(stream)?; let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { io::Error::new( @@ -94,7 +111,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { "execute pvf recv_request: failed to decode duration".to_string(), ) })?; - Ok((params, execution_timeout)) + Ok((pvd, pov, execution_timeout)) } /// Sends an error to the host and returns the original error wrapped in `io::Error`. @@ -149,7 +166,7 @@ pub fn worker_entrypoint( let execute_thread_stack_size = max_stack_size(&executor_params); loop { - let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| { + let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| { map_and_send_err!( e, InternalValidationError::HostCommunication, @@ -197,7 +214,33 @@ pub fn worker_entrypoint( let stream_fd = stream.as_raw_fd(); let compiled_artifact_blob = Arc::new(compiled_artifact_blob); - let params = Arc::new(params); + + let raw_block_data = + match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { + Ok(data) => data, + Err(_) => { + send_result::( + &mut stream, + Ok(WorkerResponse { + job_response: JobResponse::PoVDecompressionFailure, + duration: Duration::ZERO, + pov_size: 0, + }), + worker_info, + )?; + continue; + }, + }; + + let pov_size = raw_block_data.len() as u32; + + let params = ValidationParams { + parent_head: pvd.parent_head.clone(), + block_data: BlockData(raw_block_data.to_vec()), + relay_parent_number: pvd.relay_parent_number, + relay_parent_storage_root: pvd.relay_parent_storage_root, + }; + let params = Arc::new(params.encode()); cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { @@ -214,6 +257,7 @@ pub fn worker_entrypoint( worker_info, security_status.can_unshare_user_namespace_and_change_root, usage_before, + pov_size, )? } else { // Fall back to using fork. @@ -228,6 +272,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )? }; } else { @@ -242,6 +287,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )?; } } @@ -300,6 +346,7 @@ fn handle_clone( worker_info: &WorkerInfo, have_unshare_newuser: bool, usage_before: Usage, + pov_size: u32, ) -> io::Result> { use polkadot_node_core_pvf_common::worker::security; @@ -329,6 +376,7 @@ fn handle_clone( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(security::clone::Error::Clone(errno)) => @@ -347,6 +395,7 @@ fn handle_fork( execute_worker_stack_size: usize, worker_info: &WorkerInfo, usage_before: Usage, + pov_size: u32, ) -> io::Result> { // SAFETY: new process is spawned within a single threaded process. This invariant // is enforced by tests. @@ -367,6 +416,7 @@ fn handle_fork( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))), @@ -513,6 +563,7 @@ fn handle_parent_process( worker_info: &WorkerInfo, job_pid: Pid, usage_before: Usage, + pov_size: u32, timeout: Duration, ) -> io::Result> { // the read end will wait until all write ends have been closed, @@ -578,7 +629,7 @@ fn handle_parent_process( )))); } - Ok(Ok(WorkerResponse { job_response, duration: cpu_tv })) + Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv })) }, Err(job_error) => { gum::warn!( diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index 9e0d01fc438b..56235bd82192 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -23,10 +23,12 @@ nix = { features = ["process", "resource", "sched"], workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } sc-executor-common = { workspace = true, default-features = true } sc-executor-wasmtime = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } [target.'cfg(target_os = "linux")'.dependencies] tikv-jemallocator = "0.5.0" diff --git a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs index d531c90b64b5..49b30dc33ceb 100644 --- a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs @@ -24,7 +24,11 @@ use polkadot_primitives::ExecutorParams; use std::time::Duration; fn do_prepare_runtime(pvf: PvfPrepData) { - let blob = match prevalidate(&pvf.code()) { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, usize::MAX).unwrap(); + + let blob = match prevalidate(&raw_validation_code) { Err(err) => panic!("{:?}", err), Ok(b) => b, }; diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index ef33d11720eb..f8ebb6effcec 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_core_pvf_common::{ executor_interface::{prepare, prevalidate}, worker::{pipe2_cloexec, PipeFd, WorkerInfo}, }; +use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT; use codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ @@ -105,6 +106,12 @@ impl AsRef<[u8]> for CompiledArtifact { } } +#[derive(Encode, Decode)] +pub struct PrepareOutcome { + pub compiled_artifact: CompiledArtifact, + pub observed_wasm_code_len: u32, +} + /// Get a worker request. fn recv_request(stream: &mut UnixStream) -> io::Result { let pvf = framed_recv_blocking(stream)?; @@ -294,14 +301,23 @@ pub fn worker_entrypoint( ); } -fn prepare_artifact(pvf: PvfPrepData) -> Result { - let blob = match prevalidate(&pvf.code()) { +fn prepare_artifact(pvf: PvfPrepData) -> Result { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT) + .map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?; + let observed_wasm_code_len = raw_validation_code.len() as u32; + + let blob = match prevalidate(&raw_validation_code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; match prepare(blob, &pvf.executor_params()) { - Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Ok(compiled_artifact) => Ok(PrepareOutcome { + compiled_artifact: CompiledArtifact::new(compiled_artifact), + observed_wasm_code_len, + }), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } } @@ -322,6 +338,7 @@ fn runtime_construction_check( struct JobResponse { artifact: CompiledArtifact, memory_stats: MemoryStats, + observed_wasm_code_len: u32, } #[cfg(target_os = "linux")] @@ -500,11 +517,11 @@ fn handle_child_process( "prepare worker", move || { #[allow(unused_mut)] - let mut result = prepare_artifact(pvf); + let mut result = prepare_artifact(pvf).map(|o| (o,)); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] - let mut result = result.map(|artifact| (artifact, get_max_rss_thread())); + let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread())); // If we are pre-checking, check for runtime construction errors. // @@ -513,7 +530,10 @@ fn handle_child_process( // anyway. if let PrepareJobKind::Prechecking = prepare_job_kind { result = result.and_then(|output| { - runtime_construction_check(output.0.as_ref(), &executor_params)?; + runtime_construction_check( + output.0.compiled_artifact.as_ref(), + &executor_params, + )?; Ok(output) }); } @@ -553,9 +573,9 @@ fn handle_child_process( Ok(ok) => { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { - let (artifact, max_rss) = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok; } else { - let artifact = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok; } } @@ -574,7 +594,11 @@ fn handle_child_process( peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 }, }; - Ok(JobResponse { artifact, memory_stats }) + Ok(JobResponse { + artifact: compiled_artifact, + observed_wasm_code_len, + memory_stats, + }) }, } }, @@ -665,7 +689,7 @@ fn handle_parent_process( match result { Err(err) => Err(err), - Ok(JobResponse { artifact, memory_stats }) => { + Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => { // The exit status should have been zero if no error occurred. if exit_status != 0 { return Err(PrepareError::JobError(format!( @@ -696,7 +720,11 @@ fn handle_parent_process( let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string(); Ok(PrepareWorkerSuccess { checksum, - stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv }, + stats: PrepareStats { + memory_stats, + cpu_time_elapsed: cpu_tv, + observed_wasm_code_len, + }, }) }, } diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 8dc96305eadb..a0634106052d 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -52,6 +52,9 @@ pub enum InvalidCandidate { /// PVF execution (compilation is not included) took more time than was allotted. #[error("invalid: hard timeout")] HardTimeout, + /// Proof-of-validity failed to decompress correctly + #[error("invalid: PoV failed to decompress")] + PoVDecompressionFailure, } /// Possibly transient issue that may resolve after retries. diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index bb00a5a652d6..11031bf1074a 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -34,12 +34,14 @@ use polkadot_node_core_pvf_common::{ execute::{JobResponse, WorkerError, WorkerResponse}, SecurityStatus, }; -use polkadot_primitives::{ExecutorParams, ExecutorParamsHash}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData}; use slotmap::HopSlotMap; use std::{ collections::VecDeque, fmt, path::PathBuf, + sync::Arc, time::{Duration, Instant}, }; @@ -68,7 +70,8 @@ pub enum FromQueue { #[derive(Debug)] pub struct PendingExecutionRequest { pub exec_timeout: Duration, - pub params: Vec, + pub pvd: Arc, + pub pov: Arc, pub executor_params: ExecutorParams, pub result_tx: ResultSender, } @@ -76,7 +79,8 @@ pub struct PendingExecutionRequest { struct ExecuteJob { artifact: ArtifactPathId, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, result_tx: ResultSender, waiting_since: Instant, @@ -293,18 +297,20 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; - let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } = + let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } = pending_execution_request; gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); + queue.metrics.observe_pov_size(pov.block_data.0.len(), true); queue.metrics.execute_enqueued(); let job = ExecuteJob { artifact, exec_timeout, - params, + pvd, + pov, executor_params, result_tx, waiting_since: Instant::now(), @@ -352,15 +358,19 @@ async fn handle_job_finish( artifact_id: ArtifactId, result_tx: ResultSender, ) { - let (idle_worker, result, duration, sync_channel) = match worker_result { + let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result { Ok(WorkerInterfaceResponse { worker_response: - WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration }, + WorkerResponse { + job_response: JobResponse::Ok { result_descriptor }, + duration, + pov_size, + }, idle_worker, }) => { // TODO: propagate the soft timeout - (Some(idle_worker), Ok(result_descriptor), Some(duration), None) + (Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size)) }, Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. }, @@ -370,6 +380,18 @@ async fn handle_job_finish( Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))), None, None, + None, + ), + Ok(WorkerInterfaceResponse { + worker_response: + WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. }, + idle_worker, + }) => ( + Some(idle_worker), + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)), + None, + None, + None, ), Ok(WorkerInterfaceResponse { worker_response: @@ -393,39 +415,46 @@ async fn handle_job_finish( ))), None, Some(result_rx), + None, ) }, Err(WorkerInterfaceError::InternalError(err)) | Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) => - (None, Err(ValidationError::Internal(err)), None, None), + (None, Err(ValidationError::Internal(err)), None, None, None), // Either the worker or the job timed out. Kill the worker in either case. Treated as // definitely-invalid, because if we timed out, there's no time left for a retry. Err(WorkerInterfaceError::HardTimeout) | Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) => - (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None), + (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None), // "Maybe invalid" errors (will retry). Err(WorkerInterfaceError::CommunicationErr(_err)) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))), None, None, + None, ), }; queue.metrics.execute_finished(); + if let Some(pov_size) = pov_size { + queue.metrics.observe_pov_size(pov_size as usize, false) + } if let Err(ref err) = result { gum::warn!( target: LOG_TARGET, @@ -573,7 +602,8 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { idle, job.artifact.clone(), job.exec_timeout, - job.params, + job.pvd, + job.pov, ) .await; QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx) diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs index d15d7c15426e..77bd6bedd75c 100644 --- a/polkadot/node/core/pvf/src/execute/worker_interface.rs +++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs @@ -32,8 +32,9 @@ use polkadot_node_core_pvf_common::{ execute::{Handshake, WorkerError, WorkerResponse}, worker_dir, SecurityStatus, }; -use polkadot_primitives::ExecutorParams; -use std::{path::Path, time::Duration}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; +use std::{path::Path, sync::Arc, time::Duration}; use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. @@ -123,7 +124,8 @@ pub async fn start_work( worker: IdleWorker, artifact: ArtifactPathId, execution_timeout: Duration, - validation_params: Vec, + pvd: Arc, + pov: Arc, ) -> Result { let IdleWorker { mut stream, pid, worker_dir } = worker; @@ -137,18 +139,16 @@ pub async fn start_work( ); with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { - send_request(&mut stream, &validation_params, execution_timeout).await.map_err( - |error| { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - "failed to send an execute request: {}", - error, - ); - Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) - }, - )?; + send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "failed to send an execute request: {}", + error, + ); + Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) + })?; // We use a generous timeout here. This is in addition to the one in the child process, in // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout @@ -288,10 +288,12 @@ async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) - async fn send_request( stream: &mut UnixStream, - validation_params: &[u8], + pvd: Arc, + pov: Arc, execution_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, validation_params).await?; + framed_send(stream, &pvd.encode()).await?; + framed_send(stream, &pov.encode()).await?; framed_send(stream, &execution_timeout.encode()).await } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 462631d33b52..44a4cba2fbf8 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -36,11 +36,14 @@ use polkadot_node_core_pvf_common::{ prepare::PrepareSuccess, pvf::PvfPrepData, }; +use polkadot_node_primitives::PoV; use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; use polkadot_parachain_primitives::primitives::ValidationResult; +use polkadot_primitives::PersistedValidationData; use std::{ collections::HashMap, path::PathBuf, + sync::Arc, time::{Duration, SystemTime}, }; @@ -108,7 +111,8 @@ impl ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, priority: Priority, result_tx: ResultSender, ) -> Result<(), String> { @@ -116,7 +120,8 @@ impl ValidationHost { .send(ToHost::ExecutePvf(ExecutePvfInputs { pvf, exec_timeout, - params, + pvd, + pov, priority, result_tx, })) @@ -147,7 +152,8 @@ enum ToHost { struct ExecutePvfInputs { pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, priority: Priority, result_tx: ResultSender, } @@ -539,7 +545,7 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; + let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, result_tx } = inputs; let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); let executor_params = (*pvf.executor_params()).clone(); @@ -558,7 +564,8 @@ async fn handle_execute_pvf( artifact: ArtifactPathId::new(artifact_id, path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -587,7 +594,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -598,7 +606,7 @@ async fn handle_execute_pvf( ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { @@ -627,7 +635,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -648,7 +657,7 @@ async fn handle_execute_pvf( pvf, priority, artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ) .await?; } @@ -770,7 +779,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in + for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } in pending_requests { if result_tx.is_canceled() { @@ -793,7 +802,8 @@ async fn handle_prepare_done( artifact: ArtifactPathId::new(artifact_id.clone(), &path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -967,6 +977,8 @@ pub(crate) mod tests { use assert_matches::assert_matches; use futures::future::BoxFuture; use polkadot_node_core_pvf_common::prepare::PrepareStats; + use polkadot_node_primitives::BlockData; + use sp_core::H256; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -1223,12 +1235,21 @@ pub(crate) mod tests { async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) }); + let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) }); let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1.clone(), Priority::Normal, result_tx, ) @@ -1239,7 +1260,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1, Priority::Critical, result_tx, ) @@ -1250,7 +1272,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov2, Priority::Normal, result_tx, ) @@ -1382,6 +1405,13 @@ pub(crate) mod tests { async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Test mixed cases of receiving execute and precheck requests // for the same PVF. @@ -1391,7 +1421,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1438,7 +1469,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov, Priority::Critical, result_tx, ) @@ -1534,13 +1566,21 @@ pub(crate) mod tests { async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit a execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1570,7 +1610,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1592,7 +1633,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1636,13 +1678,21 @@ pub(crate) mod tests { async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit an execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1672,7 +1722,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1694,7 +1745,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1755,12 +1807,20 @@ pub(crate) mod tests { async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd, + pov, Priority::Normal, result_tx, ) diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index bc8d300037fe..c59cab464180 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -105,6 +105,21 @@ impl Metrics { .observe((memory_stats.peak_tracked_alloc / 1024) as f64); } } + + pub(crate) fn observe_code_size(&self, code_size: usize) { + if let Some(metrics) = &self.0 { + metrics.code_size.observe(code_size as f64); + } + } + + pub(crate) fn observe_pov_size(&self, pov_size: usize, compressed: bool) { + if let Some(metrics) = &self.0 { + metrics + .pov_size + .with_label_values(&[if compressed { "true" } else { "false" }]) + .observe(pov_size as f64); + } + } } #[derive(Clone)] @@ -129,6 +144,8 @@ struct MetricsInner { preparation_max_resident: prometheus::Histogram, // Peak allocation value, tracked by tracking-allocator preparation_peak_tracked_allocation: prometheus::Histogram, + pov_size: prometheus::HistogramVec, + code_size: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -323,6 +340,35 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + // The following metrics was moved here from the candidate valiidation subsystem. + // Names are kept to avoid breaking dashboards and stuff. + pov_size: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_pov_size", + "The compressed and decompressed size of the proof of validity of a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + &["compressed"], + )?, + registry, + )?, + code_size: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_code_size", + "The size of the decompressed WASM validation blob used for checking a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs index 22ee93319d84..d29d2717c4b6 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs @@ -211,7 +211,7 @@ async fn handle_response( // https://github.com/paritytech/polkadot-sdk/issues/2399 let PrepareWorkerSuccess { checksum: _, - stats: PrepareStats { cpu_time_elapsed, memory_stats }, + stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len }, } = match result.clone() { Ok(result) => result, // Timed out on the child. This should already be logged by the child. @@ -221,6 +221,8 @@ async fn handle_response( Err(err) => return Outcome::Concluded { worker, result: Err(err) }, }; + metrics.observe_code_size(observed_wasm_code_len as usize); + if cpu_time_elapsed > preparation_timeout { // The job didn't complete within the timeout. gum::warn!( @@ -267,7 +269,11 @@ async fn handle_response( result: Ok(PrepareSuccess { path: artifact_path, size, - stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() }, + stats: PrepareStats { + cpu_time_elapsed, + memory_stats: memory_stats.clone(), + observed_wasm_code_len, + }, }), }, Err(err) => { diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 455e8c36c88d..1a95a28fe077 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -18,29 +18,33 @@ use super::TestHost; use codec::{Decode, Encode}; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, RelayChainBlockNumber, - ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; +use sp_core::H256; use test_parachain_adder::{hash_state, BlockData, HeadData}; #[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -63,18 +67,20 @@ async fn execute_good_chain_on_parent() { for (number, add) in (0..10).enumerate() { let parent_head = HeadData { number: number as u64, parent_hash, post_state: hash_state(last_state) }; - let block_data = BlockData { state: last_state, add }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: number as RelayChainBlockNumber + 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -94,23 +100,25 @@ async fn execute_good_chain_on_parent() { #[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 256, // start state is wrong. add: 256, }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let _err = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -124,15 +132,18 @@ async fn stress_spawn() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -161,15 +172,18 @@ async fn execute_can_run_serially() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 9ad486657512..a4a085318957 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -17,7 +17,6 @@ //! General PVF host integration tests checking the functionality of the PVF host itself. use assert_matches::assert_matches; -use codec::Encode as _; #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] use polkadot_node_core_pvf::SecurityStatus; use polkadot_node_core_pvf::{ @@ -25,10 +24,14 @@ use polkadot_node_core_pvf::{ PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; -use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; -use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind}; +use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; +use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; +use polkadot_primitives::{ + ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind, +}; +use sp_core::H256; -use std::{io::Write, time::Duration}; +use std::{io::Write, sync::Arc, time::Duration}; use tokio::sync::Mutex; mod adder; @@ -80,9 +83,6 @@ impl TestHost { ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -103,14 +103,12 @@ impl TestHost { async fn validate_candidate( &self, code: &[u8], - params: ValidationParams, + pvd: PersistedValidationData, + pov: PoV, executor_params: ExecutorParams, ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -122,7 +120,8 @@ impl TestHost { PrepareJobKind::Compilation, ), TEST_EXECUTION_TIMEOUT, - params.encode(), + Arc::new(pvd), + Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, result_tx, ) @@ -159,19 +158,17 @@ async fn prepare_job_terminates_on_timeout() { #[tokio::test] async fn execute_job_terminates_on_timeout() { let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let start = std::time::Instant::now(); let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; match result { @@ -189,24 +186,23 @@ async fn execute_job_terminates_on_timeout() { async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let execute_pvf_future_1 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ); let execute_pvf_future_2 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ); @@ -237,6 +233,13 @@ async fn execute_queue_doesnt_stall_if_workers_died() { cfg.execute_workers_max_num = 5; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; // Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The // first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of @@ -245,12 +248,8 @@ async fn execute_queue_doesnt_stall_if_workers_died() { futures::future::join_all((0u8..=8).map(|_| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ) })) @@ -275,6 +274,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { cfg.execute_workers_max_num = 2; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let executor_params_1 = ExecutorParams::default(); let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]); @@ -288,12 +294,8 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { futures::future::join_all((0u8..6).map(|i| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), match i % 3 { 0 => executor_params_1.clone(), _ => executor_params_2.clone(), @@ -324,6 +326,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -347,16 +356,7 @@ async fn deleting_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get recreated. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); @@ -367,6 +367,13 @@ async fn deleting_prepared_artifact_does_not_dispute() { async fn corrupted_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -400,16 +407,7 @@ async fn corrupted_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get removed because of the corruption. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!( @@ -652,3 +650,65 @@ async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() { assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added } + +// Checks that we cannot prepare oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_prechecking() { + let host = TestHost::new().await; + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let res = host.precheck_pvf(&validation_code, Default::default()).await; + + assert_matches!(res, Err(PrepareError::CouldNotDecompressCodeBlob(_))); +} + +// Checks that we cannot validate with oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; + + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await; + + assert_matches!( + result, + Err(ValidationError::Preparation(PrepareError::CouldNotDecompressCodeBlob(_))) + ); +} + +// Checks that we cannot validate with an oversized PoV +#[tokio::test] +async fn invalid_compressed_pov_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let raw_block_data = vec![1u8; POV_BOMB_LIMIT + 1]; + let block_data = + sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1).unwrap(); + let pov = PoV { block_data: BlockData(block_data) }; + + let result = host + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .await; + + assert_matches!( + result, + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)) + ); +} diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index b8fd2cdce0ce..b3023c8a45c3 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -23,11 +23,14 @@ use codec::Encode; use polkadot_node_core_pvf::{ InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError, }; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; use procfs::process; use rusty_fork::rusty_fork_test; +use sp_core::H256; use std::{future::Future, sync::Arc, time::Duration}; use test_parachain_adder::{hash_state, BlockData, HeadData}; @@ -125,15 +128,18 @@ rusty_fork_test! { test_wrapper(|host, _sid| async move { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -166,17 +172,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Send a stop signal to pause the worker. @@ -218,17 +227,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -274,17 +286,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -342,17 +357,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let _ = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that tests the thread count while the worker is running. diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 4e13d5eda76f..baaff9c7c9f6 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -466,7 +466,7 @@ pub async fn forward_events>(client: Arc

, mut hand message_capacity=2048, )] pub struct Overseer { - #[subsystem(blocking, CandidateValidationMessage, sends: [ + #[subsystem(CandidateValidationMessage, sends: [ RuntimeApiMessage, ])] candidate_validation: CandidateValidation, diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc new file mode 100644 index 000000000000..4083e5bf53cd --- /dev/null +++ b/prdoc/pr_5142.prdoc @@ -0,0 +1,26 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "Move decompression to worker processes" + +doc: + - audience: Node Dev + description: | + Candidate validation subsystem performed the PVF code decompression as well as the PoV + decompression itself which might affect the subsystem main loop performance and required + it to run on the blocking threadpool. This change moves the decompression to PVF host + workers running synchronously in separate processes. + +crates: + - name: polkadot-node-core-candidate-validation + bump: patch + - name: polkadot-overseer + bump: patch + - name: polkadot-node-core-pvf + bump: major + - name: polkadot-node-core-pvf-common + bump: major + - name: polkadot-node-core-pvf-execute-worker + bump: patch + - name: polkadot-node-core-pvf-prepare-worker + bump: patch