diff --git a/.github/workflows/bitcoin-tests.yml b/.github/workflows/bitcoin-tests.yml index 888bf120ca..23d2d9d6b8 100644 --- a/.github/workflows/bitcoin-tests.yml +++ b/.github/workflows/bitcoin-tests.yml @@ -132,6 +132,8 @@ jobs: - tests::signer::v0::block_commit_delay - tests::signer::v0::continue_after_fast_block_no_sortition - tests::signer::v0::block_validation_response_timeout + - tests::signer::v0::block_validation_pending_table + - tests::signer::v0::new_tenure_while_validating_previous_scenario - tests::signer::v0::tenure_extend_after_bad_commit - tests::signer::v0::block_proposal_max_age_rejections - tests::signer::v0::global_acceptance_depends_on_block_announcement diff --git a/libsigner/src/v0/messages.rs b/libsigner/src/v0/messages.rs index 0ef3b904d2..7565b3bd7e 100644 --- a/libsigner/src/v0/messages.rs +++ b/libsigner/src/v0/messages.rs @@ -688,6 +688,14 @@ impl BlockResponse { } } + /// The signer signature hash for the block response + pub fn signer_signature_hash(&self) -> Sha512Trunc256Sum { + match self { + BlockResponse::Accepted(accepted) => accepted.signer_signature_hash, + BlockResponse::Rejected(rejection) => rejection.signer_signature_hash, + } + } + /// Get the block accept data from the block response pub fn as_block_accepted(&self) -> Option<&BlockAccepted> { match self { diff --git a/stacks-signer/CHANGELOG.md b/stacks-signer/CHANGELOG.md index 04d2d76a7a..1887952256 100644 --- a/stacks-signer/CHANGELOG.md +++ b/stacks-signer/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE ## Added - Introduced the `block_proposal_max_age_secs` configuration option for signers, enabling them to automatically ignore block proposals that exceed the specified age in seconds. +- When a new block proposal is received while the signer is waiting for an existing proposal to be validated, the signer will wait until the existing block is done validating before submitting the new one for validating. ([#5453](https://github.com/stacks-network/stacks-core/pull/5453)) ## Changed - Improvements to the stale signer cleanup logic: deletes the prior signer if it has no remaining unprocessed blocks in its database diff --git a/stacks-signer/src/chainstate.rs b/stacks-signer/src/chainstate.rs index 31454c96b6..95ce12c98d 100644 --- a/stacks-signer/src/chainstate.rs +++ b/stacks-signer/src/chainstate.rs @@ -589,8 +589,8 @@ impl SortitionsView { signer_db.block_lookup(&nakamoto_tip.signer_signature_hash()) { if block_info.state != BlockState::GloballyAccepted { - if let Err(e) = block_info.mark_globally_accepted() { - warn!("Failed to update block info in db: {e}"); + if let Err(e) = signer_db.mark_block_globally_accepted(&mut block_info) { + warn!("Failed to mark block as globally accepted: {e}"); } else if let Err(e) = signer_db.insert_block(&block_info) { warn!("Failed to update block info in db: {e}"); } diff --git a/stacks-signer/src/signerdb.rs b/stacks-signer/src/signerdb.rs index 67321c7218..671e4617f9 100644 --- a/stacks-signer/src/signerdb.rs +++ b/stacks-signer/src/signerdb.rs @@ -24,6 +24,8 @@ use blockstack_lib::util_lib::db::{ query_row, query_rows, sqlite_open, table_exists, tx_begin_immediate, u64_to_sql, Error as DBError, }; +#[cfg(any(test, feature = "testing"))] +use blockstack_lib::util_lib::db::{FromColumn, FromRow}; use clarity::types::chainstate::{BurnchainHeaderHash, StacksAddress}; use libsigner::BlockProposal; use rusqlite::functions::FunctionFlags; @@ -209,7 +211,7 @@ impl BlockInfo { /// Mark this block as valid, signed over, and records a group timestamp in the block info if it wasn't /// already set. - pub fn mark_globally_accepted(&mut self) -> Result<(), String> { + fn mark_globally_accepted(&mut self) -> Result<(), String> { self.move_to(BlockState::GloballyAccepted)?; self.valid = Some(true); self.signed_over = true; @@ -225,7 +227,7 @@ impl BlockInfo { } /// Mark the block as globally rejected and invalid - pub fn mark_globally_rejected(&mut self) -> Result<(), String> { + fn mark_globally_rejected(&mut self) -> Result<(), String> { self.move_to(BlockState::GloballyRejected)?; self.valid = Some(false); Ok(()) @@ -342,6 +344,10 @@ CREATE INDEX IF NOT EXISTS blocks_state ON blocks (state); CREATE INDEX IF NOT EXISTS blocks_signed_group ON blocks (signed_group); "#; +static CREATE_INDEXES_6: &str = r#" +CREATE INDEX IF NOT EXISTS block_validations_pending_on_added_time ON block_validations_pending(added_time ASC); +"#; + static CREATE_SIGNER_STATE_TABLE: &str = " CREATE TABLE IF NOT EXISTS signer_states ( reward_cycle INTEGER PRIMARY KEY, @@ -436,15 +442,15 @@ INSERT INTO temp_blocks ( broadcasted, stacks_height, burn_block_height, - valid, + valid, state, - signed_group, + signed_group, signed_self, proposed_time, validation_time_ms, tenure_change ) -SELECT +SELECT signer_signature_hash, reward_cycle, block_info, @@ -452,7 +458,7 @@ SELECT signed_over, broadcasted, stacks_height, - burn_block_height, + burn_block_height, json_extract(block_info, '$.valid') AS valid, json_extract(block_info, '$.state') AS state, json_extract(block_info, '$.signed_group') AS signed_group, @@ -466,6 +472,14 @@ DROP TABLE blocks; ALTER TABLE temp_blocks RENAME TO blocks;"#; +static CREATE_BLOCK_VALIDATION_PENDING_TABLE: &str = r#" +CREATE TABLE IF NOT EXISTS block_validations_pending ( + signer_signature_hash TEXT NOT NULL, + -- the time at which the block was added to the pending table + added_time INTEGER NOT NULL, + PRIMARY KEY (signer_signature_hash) +) STRICT;"#; + static SCHEMA_1: &[&str] = &[ DROP_SCHEMA_0, CREATE_DB_CONFIG, @@ -514,9 +528,15 @@ static SCHEMA_5: &[&str] = &[ "INSERT INTO db_config (version) VALUES (5);", ]; +static SCHEMA_6: &[&str] = &[ + CREATE_BLOCK_VALIDATION_PENDING_TABLE, + CREATE_INDEXES_6, + "INSERT OR REPLACE INTO db_config (version) VALUES (6);", +]; + impl SignerDb { /// The current schema version used in this build of the signer binary. - pub const SCHEMA_VERSION: u32 = 5; + pub const SCHEMA_VERSION: u32 = 6; /// Create a new `SignerState` instance. /// This will create a new SQLite database at the given path @@ -616,6 +636,20 @@ impl SignerDb { Ok(()) } + /// Migrate from schema 5 to schema 6 + fn schema_6_migration(tx: &Transaction) -> Result<(), DBError> { + if Self::get_schema_version(tx)? >= 6 { + // no migration necessary + return Ok(()); + } + + for statement in SCHEMA_6.iter() { + tx.execute_batch(statement)?; + } + + Ok(()) + } + /// Register custom scalar functions used by the database fn register_scalar_functions(&self) -> Result<(), DBError> { // Register helper function for determining if a block is a tenure change transaction @@ -654,7 +688,8 @@ impl SignerDb { 2 => Self::schema_3_migration(&sql_tx)?, 3 => Self::schema_4_migration(&sql_tx)?, 4 => Self::schema_5_migration(&sql_tx)?, - 5 => break, + 5 => Self::schema_6_migration(&sql_tx)?, + 6 => break, x => return Err(DBError::Other(format!( "Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}", Self::SCHEMA_VERSION, @@ -960,6 +995,43 @@ impl SignerDb { Ok(Some(broadcasted)) } + /// Get a pending block validation, sorted by the time at which it was added to the pending table. + /// If found, remove it from the pending table. + pub fn get_and_remove_pending_block_validation( + &self, + ) -> Result, DBError> { + let qry = "DELETE FROM block_validations_pending WHERE signer_signature_hash = (SELECT signer_signature_hash FROM block_validations_pending ORDER BY added_time ASC LIMIT 1) RETURNING signer_signature_hash"; + let args = params![]; + let mut stmt = self.db.prepare(qry)?; + let sighash: Option = stmt.query_row(args, |row| row.get(0)).optional()?; + Ok(sighash.and_then(|sighash| Sha512Trunc256Sum::from_hex(&sighash).ok())) + } + + /// Remove a pending block validation + pub fn remove_pending_block_validation( + &self, + sighash: &Sha512Trunc256Sum, + ) -> Result<(), DBError> { + self.db.execute( + "DELETE FROM block_validations_pending WHERE signer_signature_hash = ?1", + params![sighash.to_string()], + )?; + Ok(()) + } + + /// Insert a pending block validation + pub fn insert_pending_block_validation( + &self, + sighash: &Sha512Trunc256Sum, + ts: u64, + ) -> Result<(), DBError> { + self.db.execute( + "INSERT INTO block_validations_pending (signer_signature_hash, added_time) VALUES (?1, ?2)", + params![sighash.to_string(), u64_to_sql(ts)?], + )?; + Ok(()) + } + /// Return the start time (epoch time in seconds) and the processing time in milliseconds of the tenure (idenfitied by consensus_hash). fn get_tenure_times(&self, tenure: &ConsensusHash) -> Result<(u64, u64), DBError> { let query = "SELECT tenure_change, proposed_time, validation_time_ms FROM blocks WHERE consensus_hash = ?1 AND state = ?2 ORDER BY stacks_height DESC"; @@ -1022,6 +1094,26 @@ impl SignerDb { ); tenure_extend_timestamp } + + /// Mark a block as globally accepted. This removes the block from the pending + /// validations table. This does **not** update the block's state in SignerDb. + pub fn mark_block_globally_accepted(&self, block_info: &mut BlockInfo) -> Result<(), DBError> { + block_info + .mark_globally_accepted() + .map_err(DBError::Other)?; + self.remove_pending_block_validation(&block_info.signer_signature_hash())?; + Ok(()) + } + + /// Mark a block as globally rejected. This removes the block from the pending + /// validations table. This does **not** update the block's state in SignerDb. + pub fn mark_block_globally_rejected(&self, block_info: &mut BlockInfo) -> Result<(), DBError> { + block_info + .mark_globally_rejected() + .map_err(DBError::Other)?; + self.remove_pending_block_validation(&block_info.signer_signature_hash())?; + Ok(()) + } } fn try_deserialize(s: Option) -> Result, DBError> @@ -1034,6 +1126,50 @@ where .map_err(DBError::SerializationError) } +/// For tests, a struct to represent a pending block validation +#[cfg(any(test, feature = "testing"))] +pub struct PendingBlockValidation { + /// The signer signature hash of the block + pub signer_signature_hash: Sha512Trunc256Sum, + /// The time at which the block was added to the pending table + pub added_time: u64, +} + +#[cfg(any(test, feature = "testing"))] +impl FromRow for PendingBlockValidation { + fn from_row(row: &rusqlite::Row) -> Result { + let signer_signature_hash = Sha512Trunc256Sum::from_column(row, "signer_signature_hash")?; + let added_time = row.get_unwrap(1); + Ok(PendingBlockValidation { + signer_signature_hash, + added_time, + }) + } +} + +#[cfg(any(test, feature = "testing"))] +impl SignerDb { + /// For tests, fetch all pending block validations + pub fn get_all_pending_block_validations( + &self, + ) -> Result, DBError> { + let qry = "SELECT signer_signature_hash, added_time FROM block_validations_pending ORDER BY added_time ASC"; + query_rows(&self.db, qry, params![]) + } + + /// For tests, check if a pending block validation exists + pub fn has_pending_block_validation( + &self, + sighash: &Sha512Trunc256Sum, + ) -> Result { + let qry = "SELECT signer_signature_hash FROM block_validations_pending WHERE signer_signature_hash = ?1"; + let args = params![sighash.to_string()]; + let sighash_opt: Option = query_row(&self.db, qry, args)?; + Ok(sighash_opt.is_some()) + } +} + +/// Tests for SignerDb #[cfg(test)] mod tests { use std::fs; @@ -1734,4 +1870,38 @@ mod tests { < block_infos[0].proposed_time ); } + + #[test] + fn test_get_and_remove_pending_block_validation() { + let db_path = tmp_db_path(); + let db = SignerDb::new(db_path).expect("Failed to create signer db"); + + let pending_hash = db.get_and_remove_pending_block_validation().unwrap(); + assert!(pending_hash.is_none()); + + db.insert_pending_block_validation(&Sha512Trunc256Sum([0x01; 32]), 1000) + .unwrap(); + db.insert_pending_block_validation(&Sha512Trunc256Sum([0x02; 32]), 2000) + .unwrap(); + db.insert_pending_block_validation(&Sha512Trunc256Sum([0x03; 32]), 3000) + .unwrap(); + + let pending_hash = db.get_and_remove_pending_block_validation().unwrap(); + assert_eq!(pending_hash, Some(Sha512Trunc256Sum([0x01; 32]))); + + let pendings = db.get_all_pending_block_validations().unwrap(); + assert_eq!(pendings.len(), 2); + + let pending_hash = db.get_and_remove_pending_block_validation().unwrap(); + assert_eq!(pending_hash, Some(Sha512Trunc256Sum([0x02; 32]))); + + let pendings = db.get_all_pending_block_validations().unwrap(); + assert_eq!(pendings.len(), 1); + + let pending_hash = db.get_and_remove_pending_block_validation().unwrap(); + assert_eq!(pending_hash, Some(Sha512Trunc256Sum([0x03; 32]))); + + let pendings = db.get_all_pending_block_validations().unwrap(); + assert_eq!(pendings.len(), 0); + } } diff --git a/stacks-signer/src/v0/signer.rs b/stacks-signer/src/v0/signer.rs index fb52394771..4ab63eda59 100644 --- a/stacks-signer/src/v0/signer.rs +++ b/stacks-signer/src/v0/signer.rs @@ -19,7 +19,7 @@ use std::time::{Duration, Instant}; use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader}; use blockstack_lib::net::api::postblock_proposal::{ - BlockValidateOk, BlockValidateReject, BlockValidateResponse, + BlockValidateOk, BlockValidateReject, BlockValidateResponse, TOO_MANY_REQUESTS_STATUS, }; use blockstack_lib::util_lib::db::Error as DBError; use clarity::types::chainstate::StacksPrivateKey; @@ -38,7 +38,7 @@ use stacks_common::util::secp256k1::MessageSignature; use stacks_common::{debug, error, info, warn}; use crate::chainstate::{ProposalEvalConfig, SortitionsView}; -use crate::client::{SignerSlotID, StackerDB, StacksClient}; +use crate::client::{ClientError, SignerSlotID, StackerDB, StacksClient}; use crate::config::SignerConfig; use crate::runloop::SignerResult; use crate::signerdb::{BlockInfo, BlockState, SignerDb}; @@ -75,7 +75,7 @@ pub struct Signer { /// marking a submitted block as invalid pub block_proposal_validation_timeout: Duration, /// The current submitted block proposal and its submission time - pub submitted_block_proposal: Option<(BlockProposal, Instant)>, + pub submitted_block_proposal: Option<(Sha512Trunc256Sum, Instant)>, /// Maximum age of a block proposal in seconds before it is dropped without processing pub block_proposal_max_age_secs: u64, } @@ -249,7 +249,7 @@ impl SignerTrait for Signer { // We have already globally accepted this block. Do nothing. return; } - if let Err(e) = block_info.mark_globally_accepted() { + if let Err(e) = self.signer_db.mark_block_globally_accepted(&mut block_info) { warn!("{self}: Failed to mark block as globally accepted: {e:?}"); return; } @@ -547,23 +547,23 @@ impl Signer { "block_height" => block_proposal.block.header.chain_length, "burn_height" => block_proposal.burn_height, ); + #[cfg(any(test, feature = "testing"))] self.test_stall_block_validation_submission(); - match stacks_client.submit_block_for_validation(block_info.block.clone()) { - Ok(_) => { - self.submitted_block_proposal = - Some((block_proposal.clone(), Instant::now())); - } - Err(e) => { - warn!("{self}: Failed to submit block for validation: {e:?}"); - } - }; + self.submit_block_for_validation(stacks_client, &block_proposal.block); } else { // Still store the block but log we can't submit it for validation. We may receive enough signatures/rejections // from other signers to push the proposed block into a global rejection/acceptance regardless of our participation. // However, we will not be able to participate beyond this until our block submission times out or we receive a response // from our node. - warn!("{self}: cannot submit block proposal for validation as we are already waiting for a response for a prior submission") + warn!("{self}: cannot submit block proposal for validation as we are already waiting for a response for a prior submission. Inserting pending proposal."; + "signer_signature_hash" => signer_signature_hash.to_string(), + ); + self.signer_db + .insert_pending_block_validation(&signer_signature_hash, get_epoch_time_secs()) + .unwrap_or_else(|e| { + warn!("{self}: Failed to insert pending block validation: {e:?}") + }); } // Do not store KNOWN invalid blocks as this could DOS the signer. We only store blocks that are valid or unknown. @@ -586,7 +586,7 @@ impl Signer { BlockResponse::Rejected(block_rejection) => { self.handle_block_rejection(block_rejection); } - } + }; } /// WARNING: This is an incomplete check. Do NOT call this function PRIOR to check_proposal or block_proposal validation succeeds. @@ -675,10 +675,7 @@ impl Signer { let signer_signature_hash = block_validate_ok.signer_signature_hash; if self .submitted_block_proposal - .as_ref() - .map(|(proposal, _)| { - proposal.block.header.signer_signature_hash() == signer_signature_hash - }) + .map(|(proposal_hash, _)| proposal_hash == signer_signature_hash) .unwrap_or(false) { self.submitted_block_proposal = None; @@ -754,10 +751,7 @@ impl Signer { let signer_signature_hash = block_validate_reject.signer_signature_hash; if self .submitted_block_proposal - .as_ref() - .map(|(proposal, _)| { - proposal.block.header.signer_signature_hash() == signer_signature_hash - }) + .map(|(proposal_hash, _)| proposal_hash == signer_signature_hash) .unwrap_or(false) { self.submitted_block_proposal = None; @@ -809,6 +803,12 @@ impl Signer { self.handle_block_validate_reject(block_validate_reject) } }; + // Remove this block validation from the pending table + let signer_sig_hash = block_validate_response.signer_signature_hash(); + self.signer_db + .remove_pending_block_validation(&signer_sig_hash) + .unwrap_or_else(|e| warn!("{self}: Failed to remove pending block validation: {e:?}")); + let Some(response) = block_response else { return; }; @@ -828,23 +828,45 @@ impl Signer { warn!("{self}: Failed to send block rejection to stacker-db: {e:?}",); } } + + // Check if there is a pending block validation that we need to submit to the node + match self.signer_db.get_and_remove_pending_block_validation() { + Ok(Some(signer_sig_hash)) => { + info!("{self}: Found a pending block validation: {signer_sig_hash:?}"); + match self.signer_db.block_lookup(&signer_sig_hash) { + Ok(Some(block_info)) => { + self.submit_block_for_validation(stacks_client, &block_info.block); + } + Ok(None) => { + // This should never happen + error!( + "{self}: Pending block validation not found in DB: {signer_sig_hash:?}" + ); + } + Err(e) => error!("{self}: Failed to get block info: {e:?}"), + } + } + Ok(None) => {} + Err(e) => warn!("{self}: Failed to get pending block validation: {e:?}"), + } } /// Check the current tracked submitted block proposal to see if it has timed out. /// Broadcasts a rejection and marks the block locally rejected if it has. fn check_submitted_block_proposal(&mut self) { - let Some((block_proposal, block_submission)) = self.submitted_block_proposal.take() else { + let Some((proposal_signer_sighash, block_submission)) = + self.submitted_block_proposal.take() + else { // Nothing to check. return; }; if block_submission.elapsed() < self.block_proposal_validation_timeout { // Not expired yet. Put it back! - self.submitted_block_proposal = Some((block_proposal, block_submission)); + self.submitted_block_proposal = Some((proposal_signer_sighash, block_submission)); return; } - let signature_sighash = block_proposal.block.header.signer_signature_hash(); // For mutability reasons, we need to take the block_info out of the map and add it back after processing - let mut block_info = match self.signer_db.block_lookup(&signature_sighash) { + let mut block_info = match self.signer_db.block_lookup(&proposal_signer_sighash) { Ok(Some(block_info)) => { if block_info.has_reached_consensus() { // The block has already reached consensus. @@ -856,8 +878,7 @@ impl Signer { // This is weird. If this is reached, its probably an error in code logic or the db was flushed. // Why are we tracking a block submission for a block we have never seen / stored before. error!("{self}: tracking an unknown block validation submission."; - "signer_sighash" => %signature_sighash, - "block_id" => %block_proposal.block.block_id(), + "signer_sighash" => %proposal_signer_sighash, ); return; } @@ -870,11 +891,10 @@ impl Signer { // Reject it so we aren't holding up the network because of our inaction. warn!( "{self}: Failed to receive block validation response within {} ms. Rejecting block.", self.block_proposal_validation_timeout.as_millis(); - "signer_sighash" => %signature_sighash, - "block_id" => %block_proposal.block.block_id(), + "signer_sighash" => %proposal_signer_sighash, ); let rejection = - self.create_block_rejection(RejectCode::ConnectivityIssues, &block_proposal.block); + self.create_block_rejection(RejectCode::ConnectivityIssues, &block_info.block); if let Err(e) = block_info.mark_locally_rejected() { if !block_info.has_reached_consensus() { warn!("{self}: Failed to mark block as locally rejected: {e:?}"); @@ -988,7 +1008,7 @@ impl Signer { return; } debug!("{self}: {total_reject_weight}/{total_weight} signers voted to reject the block {block_hash}"); - if let Err(e) = block_info.mark_globally_rejected() { + if let Err(e) = self.signer_db.mark_block_globally_rejected(&mut block_info) { warn!("{self}: Failed to mark block as globally rejected: {e:?}",); } if let Err(e) = self.signer_db.insert_block(&block_info) { @@ -998,7 +1018,7 @@ impl Signer { if self .submitted_block_proposal .as_ref() - .map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash) + .map(|(proposal_signer_sighash, _)| proposal_signer_sighash == block_hash) .unwrap_or(false) { // Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway. @@ -1114,7 +1134,7 @@ impl Signer { if self .submitted_block_proposal .as_ref() - .map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash) + .map(|(proposal_hash, _)| proposal_hash == block_hash) .unwrap_or(false) { // Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway. @@ -1157,6 +1177,37 @@ impl Signer { } } + /// Submit a block for validation, and mark it as pending if the node + /// is busy with a previous request. + fn submit_block_for_validation(&mut self, stacks_client: &StacksClient, block: &NakamotoBlock) { + let signer_signature_hash = block.header.signer_signature_hash(); + match stacks_client.submit_block_for_validation(block.clone()) { + Ok(_) => { + self.submitted_block_proposal = Some((signer_signature_hash, Instant::now())); + } + Err(ClientError::RequestFailure(status)) => { + if status.as_u16() == TOO_MANY_REQUESTS_STATUS { + info!("{self}: Received 429 from stacks node for block validation request. Inserting pending block validation..."; + "signer_signature_hash" => %signer_signature_hash, + ); + self.signer_db + .insert_pending_block_validation( + &signer_signature_hash, + get_epoch_time_secs(), + ) + .unwrap_or_else(|e| { + warn!("{self}: Failed to insert pending block validation: {e:?}") + }); + } else { + warn!("{self}: Received non-429 status from stacks node: {status}"); + } + } + Err(e) => { + warn!("{self}: Failed to submit block for validation: {e:?}"); + } + } + } + /// Send a mock signature to stackerdb to prove we are still alive fn mock_sign(&mut self, mock_proposal: MockProposal) { info!("{self}: Mock signing mock proposal: {mock_proposal:?}"); diff --git a/stackslib/src/net/api/postblock_proposal.rs b/stackslib/src/net/api/postblock_proposal.rs index 8a8b138d69..78daab031b 100644 --- a/stackslib/src/net/api/postblock_proposal.rs +++ b/stackslib/src/net/api/postblock_proposal.rs @@ -15,6 +15,8 @@ // along with this program. If not, see . use std::io::{Read, Write}; +#[cfg(any(test, feature = "testing"))] +use std::sync::LazyLock; use std::thread::{self, JoinHandle, Thread}; #[cfg(any(test, feature = "testing"))] use std::time::Duration; @@ -35,6 +37,8 @@ use stacks_common::types::net::PeerHost; use stacks_common::types::StacksPublicKeyBuffer; use stacks_common::util::hash::{hex_bytes, to_hex, Hash160, Sha256Sum, Sha512Trunc256Sum}; use stacks_common::util::retry::BoundReader; +#[cfg(any(test, feature = "testing"))] +use stacks_common::util::tests::TestFlag; use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs}; use crate::burnchains::affirmation::AffirmationMap; @@ -67,11 +71,11 @@ use crate::net::{ use crate::util_lib::db::Error as DBError; #[cfg(any(test, feature = "testing"))] -pub static TEST_VALIDATE_STALL: std::sync::Mutex> = std::sync::Mutex::new(None); +pub static TEST_VALIDATE_STALL: LazyLock> = LazyLock::new(TestFlag::default); #[cfg(any(test, feature = "testing"))] /// Artificial delay to add to block validation. -pub static TEST_VALIDATE_DELAY_DURATION_SECS: std::sync::Mutex> = - std::sync::Mutex::new(None); +pub static TEST_VALIDATE_DELAY_DURATION_SECS: LazyLock> = + LazyLock::new(TestFlag::default); // This enum is used to supply a `reason_code` for validation // rejection responses. This is serialized as an enum with string @@ -86,6 +90,8 @@ define_u8_enum![ValidateRejectCode { NoSuchTenure = 6 }]; +pub static TOO_MANY_REQUESTS_STATUS: u16 = 429; + impl TryFrom for ValidateRejectCode { type Error = CodecError; fn try_from(value: u8) -> Result { @@ -173,6 +179,23 @@ impl From> for BlockValidateRespons } } +impl BlockValidateResponse { + /// Get the signer signature hash from the response + pub fn signer_signature_hash(&self) -> Sha512Trunc256Sum { + match self { + BlockValidateResponse::Ok(o) => o.signer_signature_hash, + BlockValidateResponse::Reject(r) => r.signer_signature_hash, + } + } +} + +#[cfg(any(test, feature = "testing"))] +fn inject_validation_delay() { + let delay = TEST_VALIDATE_DELAY_DURATION_SECS.get(); + warn!("Sleeping for {} seconds to simulate slow processing", delay); + thread::sleep(Duration::from_secs(delay)); +} + /// Represents a block proposed to the `v3/block_proposal` endpoint for validation #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct NakamotoBlockProposal { @@ -353,10 +376,10 @@ impl NakamotoBlockProposal { ) -> Result { #[cfg(any(test, feature = "testing"))] { - if *TEST_VALIDATE_STALL.lock().unwrap() == Some(true) { + if TEST_VALIDATE_STALL.get() { // Do an extra check just so we don't log EVERY time. warn!("Block validation is stalled due to testing directive."); - while *TEST_VALIDATE_STALL.lock().unwrap() == Some(true) { + while TEST_VALIDATE_STALL.get() { std::thread::sleep(std::time::Duration::from_millis(10)); } info!( @@ -367,12 +390,7 @@ impl NakamotoBlockProposal { let start = Instant::now(); #[cfg(any(test, feature = "testing"))] - { - if let Some(delay) = *TEST_VALIDATE_DELAY_DURATION_SECS.lock().unwrap() { - warn!("Sleeping for {} seconds to simulate slow processing", delay); - thread::sleep(Duration::from_secs(delay)); - } - } + inject_validation_delay(); let mainnet = self.chain_id == CHAIN_ID_MAINNET; if self.chain_id != chainstate.chain_id || mainnet != chainstate.mainnet { @@ -743,7 +761,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler { let res = node.with_node_state(|network, sortdb, chainstate, _mempool, rpc_args| { if network.is_proposal_thread_running() { return Err(( - 429, + TOO_MANY_REQUESTS_STATUS, NetError::SendError("Proposal currently being evaluated".into()), )); } @@ -778,7 +796,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler { .spawn_validation_thread(sortdb, chainstate, receiver) .map_err(|_e| { ( - 429, + TOO_MANY_REQUESTS_STATUS, NetError::SendError( "IO error while spawning proposal callback thread".into(), ), diff --git a/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs index 70c9aab190..f3df78c66b 100644 --- a/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs +++ b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs @@ -107,7 +107,7 @@ impl SignerCoordinator { // Spawn the signer DB listener thread let listener_thread = std::thread::Builder::new() - .name("stackerdb_listener".to_string()) + .name(format!("stackerdb_listener_{}", burn_tip.block_height)) .spawn(move || { if let Err(e) = listener.run() { error!("StackerDBListener: exited with error: {e:?}"); diff --git a/testnet/stacks-node/src/tests/signer/mod.rs b/testnet/stacks-node/src/tests/signer/mod.rs index ebb0990411..ed4560c70a 100644 --- a/testnet/stacks-node/src/tests/signer/mod.rs +++ b/testnet/stacks-node/src/tests/signer/mod.rs @@ -39,7 +39,7 @@ use clarity::vm::types::PrincipalData; use libsigner::v0::messages::{ BlockAccepted, BlockResponse, MessageSlotID, PeerInfo, SignerMessage, }; -use libsigner::{SignerEntries, SignerEventTrait}; +use libsigner::{BlockProposal, SignerEntries, SignerEventTrait}; use stacks::chainstate::coordinator::comm::CoordinatorChannels; use stacks::chainstate::nakamoto::signer_set::NakamotoSigners; use stacks::chainstate::nakamoto::NakamotoBlock; @@ -725,6 +725,25 @@ impl + Send + 'static, T: SignerEventTrait + 'static> SignerTest Vec { + let proposals: Vec<_> = test_observer::get_stackerdb_chunks() + .into_iter() + .flat_map(|chunk| chunk.modified_slots) + .filter_map(|chunk| { + let Ok(message) = SignerMessage::consensus_deserialize(&mut chunk.data.as_slice()) + else { + return None; + }; + match message { + SignerMessage::BlockProposal(proposal) => Some(proposal), + _ => None, + } + }) + .collect(); + proposals + } + /// Get /v2/info from the node pub fn get_peer_info(&self) -> PeerInfo { self.stacks_client diff --git a/testnet/stacks-node/src/tests/signer/v0.rs b/testnet/stacks-node/src/tests/signer/v0.rs index 86002e6c3a..190145279f 100644 --- a/testnet/stacks-node/src/tests/signer/v0.rs +++ b/testnet/stacks-node/src/tests/signer/v0.rs @@ -61,6 +61,7 @@ use stacks_common::util::sleep_ms; use stacks_signer::chainstate::{ProposalEvalConfig, SortitionsView}; use stacks_signer::client::{SignerSlotID, StackerDB}; use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network}; +use stacks_signer::signerdb::SignerDb; use stacks_signer::v0::tests::{ TEST_IGNORE_ALL_BLOCK_PROPOSALS, TEST_PAUSE_BLOCK_BROADCAST, TEST_REJECT_ALL_BLOCK_PROPOSAL, TEST_SKIP_BLOCK_BROADCAST, TEST_SKIP_SIGNER_CLEANUP, TEST_STALL_BLOCK_VALIDATION_SUBMISSION, @@ -2263,7 +2264,7 @@ fn end_of_tenure() { ); info!("------------------------- Test Block Validation Stalled -------------------------"); - TEST_VALIDATE_STALL.lock().unwrap().replace(true); + TEST_VALIDATE_STALL.set(true); let proposals_before = signer_test .running_nodes @@ -2335,7 +2336,7 @@ fn end_of_tenure() { info!("Unpausing block validation and waiting for block to be processed"); // Disable the stall and wait for the block to be processed - TEST_VALIDATE_STALL.lock().unwrap().replace(false); + TEST_VALIDATE_STALL.set(false); wait_for(short_timeout.as_secs(), || { let processed_now = get_chain_info(&signer_test.running_nodes.conf).stacks_tip_height; Ok(processed_now > blocks_before) @@ -2831,7 +2832,7 @@ fn stx_transfers_dont_effect_idle_timeout() { signer_test.boot_to_epoch_3(); // Add a delay to the block validation process - TEST_VALIDATE_DELAY_DURATION_SECS.lock().unwrap().replace(5); + TEST_VALIDATE_DELAY_DURATION_SECS.set(5); let info_before = signer_test.get_peer_info(); let blocks_before = signer_test.running_nodes.nakamoto_blocks_mined.get(); @@ -2978,7 +2979,7 @@ fn idle_tenure_extend_active_mining() { signer_test.boot_to_epoch_3(); // Add a delay to the block validation process - TEST_VALIDATE_DELAY_DURATION_SECS.lock().unwrap().replace(3); + TEST_VALIDATE_DELAY_DURATION_SECS.set(3); signer_test.mine_nakamoto_block(Duration::from_secs(60), true); @@ -5571,7 +5572,7 @@ fn locally_rejected_blocks_overriden_by_global_acceptance() { sender_nonce += 1; info!("Submitted tx {tx} in to mine block N+1"); - wait_for(30, || { + wait_for(45, || { Ok(mined_blocks.load(Ordering::SeqCst) > blocks_before && signer_test .stacks_client @@ -5628,7 +5629,7 @@ fn locally_rejected_blocks_overriden_by_global_acceptance() { ); let tx = submit_tx(&http_origin, &transfer_tx); info!("Submitted tx {tx} in to mine block N+2"); - wait_for(30, || { + wait_for(45, || { Ok(mined_blocks.load(Ordering::SeqCst) > blocks_before && signer_test .stacks_client @@ -7601,7 +7602,7 @@ fn block_validation_response_timeout() { info!("------------------------- Test Mine and Verify Confirmed Nakamoto Block -------------------------"); signer_test.mine_and_verify_confirmed_naka_block(timeout, num_signers, true); info!("------------------------- Test Block Validation Stalled -------------------------"); - TEST_VALIDATE_STALL.lock().unwrap().replace(true); + TEST_VALIDATE_STALL.set(true); let validation_stall_start = Instant::now(); let proposals_before = signer_test @@ -7703,7 +7704,7 @@ fn block_validation_response_timeout() { let info_before = info_after; info!("Unpausing block validation"); // Disable the stall and wait for the block to be processed successfully - TEST_VALIDATE_STALL.lock().unwrap().replace(false); + TEST_VALIDATE_STALL.set(false); wait_for(30, || { let info = get_chain_info(&signer_test.running_nodes.conf); Ok(info.stacks_tip_height > info_before.stacks_tip_height) @@ -7732,6 +7733,336 @@ fn block_validation_response_timeout() { ); } +/// Test scenario: +/// +/// - when a signer submits a block validation request and +/// gets a 429, +/// - the signer stores the pending request +/// - and submits it again after the current block validation +/// request finishes. +#[test] +#[ignore] +fn block_validation_pending_table() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("------------------------- Test Setup -------------------------"); + let num_signers = 5; + let timeout = Duration::from_secs(30); + let sender_sk = Secp256k1PrivateKey::new(); + let sender_addr = tests::to_addr(&sender_sk); + let send_amt = 100; + let send_fee = 180; + let recipient = PrincipalData::from(StacksAddress::burn_address(false)); + let short_timeout = Duration::from_secs(20); + + let mut signer_test: SignerTest = SignerTest::new_with_config_modifications( + num_signers, + vec![(sender_addr, send_amt + send_fee)], + |_| {}, + |_| {}, + None, + None, + ); + let db_path = signer_test.signer_configs[0].db_path.clone(); + let http_origin = format!("http://{}", &signer_test.running_nodes.conf.node.rpc_bind); + signer_test.boot_to_epoch_3(); + + info!("----- Starting test -----"; + "db_path" => db_path.clone().to_str(), + ); + signer_test.mine_and_verify_confirmed_naka_block(timeout, num_signers, true); + TEST_VALIDATE_DELAY_DURATION_SECS.set(30); + + let signer_db = SignerDb::new(db_path).unwrap(); + + let proposals_before = signer_test.get_miner_proposal_messages().len(); + + let peer_info = signer_test.get_peer_info(); + + // submit a tx so that the miner will attempt to mine an extra block + let sender_nonce = 0; + let transfer_tx = make_stacks_transfer( + &sender_sk, + sender_nonce, + send_fee, + signer_test.running_nodes.conf.burnchain.chain_id, + &recipient, + send_amt, + ); + submit_tx(&http_origin, &transfer_tx); + + info!("----- Waiting for miner to propose a block -----"); + + // Wait for the miner to propose a block + wait_for(30, || { + Ok(signer_test.get_miner_proposal_messages().len() > proposals_before) + }) + .expect("Timed out waiting for miner to propose a block"); + + info!("----- Proposing a concurrent block -----"); + let proposal_conf = ProposalEvalConfig { + first_proposal_burn_block_timing: Duration::from_secs(0), + block_proposal_timeout: Duration::from_secs(100), + tenure_last_block_proposal_timeout: Duration::from_secs(30), + tenure_idle_timeout: Duration::from_secs(300), + }; + let mut block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + block.header.timestamp = get_epoch_time_secs(); + + let view = SortitionsView::fetch_view(proposal_conf, &signer_test.stacks_client).unwrap(); + block.header.pox_treatment = BitVec::ones(1).unwrap(); + block.header.consensus_hash = view.cur_sortition.consensus_hash; + block.header.chain_length = peer_info.stacks_tip_height + 1; + let block_signer_signature_hash = block.header.signer_signature_hash(); + signer_test.propose_block(block.clone(), short_timeout); + + info!( + "----- Waiting for a pending block proposal in SignerDb -----"; + "signer_signature_hash" => block_signer_signature_hash.to_hex(), + ); + let mut last_log = Instant::now(); + last_log -= Duration::from_secs(5); + wait_for(120, || { + let is_pending = signer_db + .has_pending_block_validation(&block_signer_signature_hash) + .expect("Unexpected DBError"); + if last_log.elapsed() > Duration::from_secs(5) && !is_pending { + let pending_block_validations = signer_db + .get_all_pending_block_validations() + .expect("Failed to get pending block validations"); + info!( + "----- Waiting for pending block proposal in SignerDB -----"; + "proposed_signer_signature_hash" => block_signer_signature_hash.to_hex(), + "pending_block_validations_len" => pending_block_validations.len(), + "pending_block_validations" => pending_block_validations.iter() + .map(|p| p.signer_signature_hash.to_hex()) + .collect::>() + .join(", "), + ); + last_log = Instant::now(); + } + Ok(is_pending) + }) + .expect("Timed out waiting for pending block proposal"); + + info!("----- Waiting for pending block validation to be submitted -----"); + + // Set the delay to 0 so that the block validation finishes quickly + TEST_VALIDATE_DELAY_DURATION_SECS.set(0); + + wait_for(30, || { + let proposal_responses = test_observer::get_proposal_responses(); + let found_proposal = proposal_responses + .iter() + .any(|p| p.signer_signature_hash() == block_signer_signature_hash); + Ok(found_proposal) + }) + .expect("Timed out waiting for pending block validation to be submitted"); + + info!("----- Waiting for pending block validation to be removed -----"); + wait_for(30, || { + let is_pending = signer_db + .has_pending_block_validation(&block_signer_signature_hash) + .expect("Unexpected DBError"); + Ok(!is_pending) + }) + .expect("Timed out waiting for pending block validation to be removed"); + + // for test cleanup we need to wait for block rejections + let signer_keys = signer_test + .signer_configs + .iter() + .map(|c| StacksPublicKey::from_private(&c.stacks_private_key)) + .collect::>(); + signer_test + .wait_for_block_rejections(30, &signer_keys) + .expect("Timed out waiting for block rejections"); + + info!("------------------------- Shutdown -------------------------"); + signer_test.shutdown(); +} + +/// Test scenario: +/// +/// - Miner A proposes a block in tenure A +/// - While that block is pending validation, +/// Miner B proposes a new block in tenure B +/// - After A's block is validated, Miner B's block is +/// rejected (because it's a sister block) +/// - Miner B retries and successfully mines a block +#[test] +#[ignore] +fn new_tenure_while_validating_previous_scenario() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("------------------------- Test Setup -------------------------"); + let num_signers = 5; + let timeout = Duration::from_secs(30); + let sender_sk = Secp256k1PrivateKey::new(); + let sender_addr = tests::to_addr(&sender_sk); + let send_amt = 100; + let send_fee = 180; + let recipient = PrincipalData::from(StacksAddress::burn_address(false)); + + let mut signer_test: SignerTest = SignerTest::new_with_config_modifications( + num_signers, + vec![(sender_addr, send_amt + send_fee)], + |_| {}, + |_| {}, + None, + None, + ); + let db_path = signer_test.signer_configs[0].db_path.clone(); + let http_origin = format!("http://{}", &signer_test.running_nodes.conf.node.rpc_bind); + signer_test.boot_to_epoch_3(); + + info!("----- Starting test -----"; + "db_path" => db_path.clone().to_str(), + ); + signer_test.mine_and_verify_confirmed_naka_block(timeout, num_signers, true); + TEST_VALIDATE_DELAY_DURATION_SECS.set(30); + + let proposals_before = signer_test.get_miner_proposal_messages().len(); + + let peer_info_before_stall = signer_test.get_peer_info(); + let burn_height_before_stall = peer_info_before_stall.burn_block_height; + let stacks_height_before_stall = peer_info_before_stall.stacks_tip_height; + + // STEP 1: Miner A proposes a block in tenure A + + // submit a tx so that the miner will attempt to mine an extra block + let sender_nonce = 0; + let transfer_tx = make_stacks_transfer( + &sender_sk, + sender_nonce, + send_fee, + signer_test.running_nodes.conf.burnchain.chain_id, + &recipient, + send_amt, + ); + submit_tx(&http_origin, &transfer_tx); + + info!("----- Waiting for miner to propose a block -----"); + + // Wait for the miner to propose a block + wait_for(30, || { + Ok(signer_test.get_miner_proposal_messages().len() > proposals_before) + }) + .expect("Timed out waiting for miner to propose a block"); + + let proposals_before = signer_test.get_miner_proposal_messages().len(); + let info_before = signer_test.get_peer_info(); + + // STEP 2: Miner B proposes a block in tenure B, while A's block is pending validation + + info!("----- Mining a new BTC block -----"); + signer_test + .running_nodes + .btc_regtest_controller + .build_next_block(1); + + let mut last_log = Instant::now(); + last_log -= Duration::from_secs(5); + let mut new_block_hash = None; + wait_for(120, || { + let proposals = signer_test.get_miner_proposal_messages(); + let new_proposal = proposals.iter().find(|p| { + p.burn_height > burn_height_before_stall + && p.block.header.chain_length == info_before.stacks_tip_height + 1 + }); + + let has_new_proposal = new_proposal.is_some() && proposals.len() > proposals_before; + if last_log.elapsed() > Duration::from_secs(5) && !has_new_proposal { + info!( + "----- Waiting for a new proposal -----"; + "proposals_len" => proposals.len(), + "burn_height_before" => info_before.burn_block_height, + ); + last_log = Instant::now(); + } + if let Some(proposal) = new_proposal { + new_block_hash = Some(proposal.block.header.signer_signature_hash()); + } + Ok(has_new_proposal) + }) + .expect("Timed out waiting for pending block proposal"); + + info!("----- Waiting for pending block validation to be submitted -----"); + let new_block_hash = new_block_hash.unwrap(); + + // Set the delay to 0 so that the block validation finishes quickly + TEST_VALIDATE_DELAY_DURATION_SECS.set(0); + + wait_for(30, || { + let proposal_responses = test_observer::get_proposal_responses(); + let found_proposal = proposal_responses + .iter() + .any(|p| p.signer_signature_hash() == new_block_hash); + Ok(found_proposal) + }) + .expect("Timed out waiting for pending block validation to be submitted"); + + // STEP 3: Miner B is rejected, retries, and mines a block + + // Now, wait for miner B to propose a new block + let mut last_log = Instant::now(); + last_log -= Duration::from_secs(5); + wait_for(30, || { + let proposals = signer_test.get_miner_proposal_messages(); + let new_proposal = proposals.iter().find(|p| { + p.burn_height > burn_height_before_stall + && p.block.header.chain_length == stacks_height_before_stall + 2 + }); + if last_log.elapsed() > Duration::from_secs(5) && !new_proposal.is_some() { + let last_proposal = proposals.last().unwrap(); + info!( + "----- Waiting for a new proposal -----"; + "proposals_len" => proposals.len(), + "burn_height_before" => burn_height_before_stall, + "stacks_height_before" => stacks_height_before_stall, + "last_proposal_burn_height" => last_proposal.burn_height, + "last_proposal_stacks_height" => last_proposal.block.header.chain_length, + ); + last_log = Instant::now(); + } + Ok(new_proposal.is_some()) + }) + .expect("Timed out waiting for miner to try a new block proposal"); + + // Wait for the new block to be mined + wait_for(30, || { + let peer_info = signer_test.get_peer_info(); + Ok( + peer_info.stacks_tip_height == stacks_height_before_stall + 2 + && peer_info.burn_block_height == burn_height_before_stall + 1, + ) + }) + .expect("Timed out waiting for new block to be mined"); + + // Ensure that we didn't tenure extend + verify_last_block_contains_tenure_change_tx(TenureChangeCause::BlockFound); + + info!("------------------------- Shutdown -------------------------"); + signer_test.shutdown(); +} + #[test] #[ignore] /// Test that a miner will extend its tenure after the succeding miner fails to mine a block. @@ -9486,7 +9817,7 @@ fn global_acceptance_depends_on_block_announcement() { .expect("Failed to get peer info"); let mut sister_block = None; let start_time = Instant::now(); - while sister_block.is_none() && start_time.elapsed() < Duration::from_secs(30) { + while sister_block.is_none() && start_time.elapsed() < Duration::from_secs(45) { sister_block = test_observer::get_stackerdb_chunks() .into_iter() .flat_map(|chunk| chunk.modified_slots) @@ -9779,7 +10110,7 @@ fn no_reorg_due_to_successive_block_validation_ok() { debug!("Miner 1 mined block N: {block_n_signature_hash}"); info!("------------------------- Pause Block Validation Response of N+1 -------------------------"); - TEST_VALIDATE_STALL.lock().unwrap().replace(true); + TEST_VALIDATE_STALL.set(true); let proposals_before_2 = rl2_proposals.load(Ordering::SeqCst); let rejections_before_2 = rl2_rejections.load(Ordering::SeqCst); let blocks_before = test_observer::get_blocks().len(); @@ -9914,7 +10245,7 @@ fn no_reorg_due_to_successive_block_validation_ok() { info!("------------------------- Unpause Block Validation Response of N+1 -------------------------"); - TEST_VALIDATE_STALL.lock().unwrap().replace(false); + TEST_VALIDATE_STALL.set(false); // Verify that the node accepted the proposed N+1, sending back a validate ok response wait_for(30, || {