Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prevent multiple block proposal evals #5453

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions libsigner/src/v0/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,14 @@ impl BlockResponse {
) -> Self {
Self::Rejected(BlockRejection::new(hash, reject_code, private_key, mainnet))
}

/// The signer signature hash for the block response
pub fn signer_signature_hash(&self) -> Sha512Trunc256Sum {
match self {
BlockResponse::Accepted(accepted) => accepted.signer_signature_hash,
BlockResponse::Rejected(rejection) => rejection.signer_signature_hash,
}
}
}

impl StacksMessageCodec for BlockResponse {
Expand Down
78 changes: 75 additions & 3 deletions stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ static CREATE_INDEXES_3: &str = r#"
CREATE INDEX IF NOT EXISTS block_rejection_signer_addrs_on_block_signature_hash ON block_rejection_signer_addrs(signer_signature_hash);
"#;

static CREATE_INDEXES_4: &str = r#"
CREATE INDEX IF NOT EXISTS block_validations_pending_on_added_time ON block_validations_pending(added_time);
"#;

static CREATE_SIGNER_STATE_TABLE: &str = "
CREATE TABLE IF NOT EXISTS signer_states (
reward_cycle INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -369,6 +373,14 @@ CREATE TABLE IF NOT EXISTS block_rejection_signer_addrs (
PRIMARY KEY (signer_addr)
) STRICT;"#;

static CREATE_BLOCK_VALIDATION_PENDING_TABLE: &str = r#"
CREATE TABLE IF NOT EXISTS block_validations_pending (
signer_signature_hash TEXT NOT NULL,
-- the time at which the block was added to the pending table
added_time INTEGER NOT NULL,
PRIMARY KEY (signer_signature_hash)
) STRICT;"#;

static SCHEMA_1: &[&str] = &[
DROP_SCHEMA_0,
CREATE_DB_CONFIG,
Expand Down Expand Up @@ -405,9 +417,15 @@ static SCHEMA_3: &[&str] = &[
"INSERT INTO db_config (version) VALUES (3);",
];

static SCHEMA_4: &[&str] = &[
CREATE_BLOCK_VALIDATION_PENDING_TABLE,
CREATE_INDEXES_4,
"INSERT OR REPLACE INTO db_config (version) VALUES (4);",
];

impl SignerDb {
/// The current schema version used in this build of the signer binary.
pub const SCHEMA_VERSION: u32 = 3;
pub const SCHEMA_VERSION: u32 = 4;

/// Create a new `SignerState` instance.
/// This will create a new SQLite database at the given path
Expand All @@ -427,7 +445,7 @@ impl SignerDb {
return Ok(0);
}
let result = conn
.query_row("SELECT version FROM db_config LIMIT 1", [], |row| {
.query_row("SELECT MAX(version) FROM db_config LIMIT 1", [], |row| {
row.get(0)
})
.optional();
Expand Down Expand Up @@ -479,6 +497,20 @@ impl SignerDb {
Ok(())
}

/// Migrate from schema 3 to schema 4
fn schema_4_migration(tx: &Transaction) -> Result<(), DBError> {
if Self::get_schema_version(tx)? >= 4 {
// no migration necessary
return Ok(());
}

for statement in SCHEMA_4.iter() {
tx.execute_batch(statement)?;
}

Ok(())
}

/// Either instantiate a new database, or migrate an existing one
/// If the detected version of the existing database is 0 (i.e., a pre-migration
/// logic DB, the DB will be dropped).
Expand All @@ -490,7 +522,8 @@ impl SignerDb {
0 => Self::schema_1_migration(&sql_tx)?,
1 => Self::schema_2_migration(&sql_tx)?,
2 => Self::schema_3_migration(&sql_tx)?,
3 => break,
3 => Self::schema_4_migration(&sql_tx)?,
4 => break,
x => return Err(DBError::Other(format!(
"Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}",
Self::SCHEMA_VERSION,
Expand Down Expand Up @@ -809,6 +842,45 @@ impl SignerDb {
BlockState::try_from(state.as_str()).map_err(|_| DBError::Corruption)?,
))
}

/// Get a pending block validation, sorted by the time at which it was added to the pending table.
/// If found, remove it from the pending table.
pub fn get_pending_block_validation(&self) -> Result<Option<Sha512Trunc256Sum>, DBError> {
let qry =
"SELECT signer_signature_hash FROM block_validations_pending ORDER BY added_time ASC";
let sighash_opt: Option<String> = query_row(&self.db, qry, params![])?;
if let Some(sighash) = sighash_opt {
let sighash = Sha512Trunc256Sum::from_hex(&sighash).map_err(|_| DBError::Corruption)?;
self.remove_pending_block_validation(&sighash)?;
return Ok(Some(sighash));
}
Ok(None)
}

/// Remove a pending block validation
pub fn remove_pending_block_validation(
&self,
sighash: &Sha512Trunc256Sum,
) -> Result<(), DBError> {
self.db.execute(
"DELETE FROM block_validations_pending WHERE signer_signature_hash = ?1",
params![sighash.to_string()],
)?;
Ok(())
}

/// Insert a pending block validation
pub fn insert_pending_block_validation(
&self,
sighash: &Sha512Trunc256Sum,
ts: u64,
) -> Result<(), DBError> {
self.db.execute(
"INSERT INTO block_validations_pending (signer_signature_hash, added_time) VALUES (?1, ?2)",
params![sighash.to_string(), u64_to_sql(ts)?],
)?;
Ok(())
}
}

fn try_deserialize<T>(s: Option<String>) -> Result<Option<T>, DBError>
Expand Down
111 changes: 80 additions & 31 deletions stacks-signer/src/v0/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::{Duration, Instant};

use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader};
use blockstack_lib::net::api::postblock_proposal::{
BlockValidateOk, BlockValidateReject, BlockValidateResponse,
BlockValidateOk, BlockValidateReject, BlockValidateResponse, TOO_MANY_REQUESTS_STATUS,
};
use clarity::types::chainstate::StacksPrivateKey;
use clarity::types::{PrivateKey, StacksEpochId};
Expand All @@ -33,11 +33,12 @@ use libsigner::{BlockProposal, SignerEvent};
use slog::{slog_debug, slog_error, slog_info, slog_warn};
use stacks_common::types::chainstate::StacksAddress;
use stacks_common::util::get_epoch_time_secs;
use stacks_common::util::hash::Sha512Trunc256Sum;
use stacks_common::util::secp256k1::MessageSignature;
use stacks_common::{debug, error, info, warn};

use crate::chainstate::{ProposalEvalConfig, SortitionsView};
use crate::client::{SignerSlotID, StackerDB, StacksClient};
use crate::client::{ClientError, SignerSlotID, StackerDB, StacksClient};
use crate::config::SignerConfig;
use crate::runloop::SignerResult;
use crate::signerdb::{BlockInfo, BlockState, SignerDb};
Expand Down Expand Up @@ -90,7 +91,7 @@ pub struct Signer {
/// marking a submitted block as invalid
pub block_proposal_validation_timeout: Duration,
/// The current submitted block proposal and its submission time
pub submitted_block_proposal: Option<(BlockProposal, Instant)>,
pub submitted_block_proposal: Option<(Sha512Trunc256Sum, Instant)>,
hstove marked this conversation as resolved.
Show resolved Hide resolved
}

impl std::fmt::Display for Signer {
Expand Down Expand Up @@ -476,15 +477,8 @@ impl Signer {
"block_height" => block_proposal.block.header.chain_length,
"burn_height" => block_proposal.burn_height,
);
match stacks_client.submit_block_for_validation(block_info.block.clone()) {
Ok(_) => {
self.submitted_block_proposal =
Some((block_proposal.clone(), Instant::now()));
}
Err(e) => {
warn!("{self}: Failed to submit block for validation: {e:?}");
}
};

self.submit_block_for_validation(stacks_client, block_proposal.block.clone());
} else {
// Still store the block but log we can't submit it for validation. We may receive enough signatures/rejections
// from other signers to push the proposed block into a global rejection/acceptance regardless of our participation.
Expand All @@ -509,12 +503,44 @@ impl Signer {
match block_response {
BlockResponse::Accepted(accepted) => {
self.handle_block_signature(stacks_client, accepted);
accepted.signer_signature_hash
}
BlockResponse::Rejected(block_rejection) => {
self.handle_block_rejection(block_rejection);
block_rejection.signer_signature_hash
}
};

// Remove this block validation from the pending table
let signer_sig_hash = block_response.signer_signature_hash();
self.signer_db
.remove_pending_block_validation(&signer_sig_hash)
Copy link
Collaborator

@jferrant jferrant Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could maybe have mark_locally or mark_globally etc. functions to just auto update these pending lists because if we have come to a consensus or we are calling these functions, it doens't matter what the block response is/we should no longer consider these as pending block proposals. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, good call. That somewhat contrasts the ticket for always sending BlockResponses, but if the block already reached consensus, there's no point in adding load to our node with a new block response. Your suggestion would probably also help mitigate the issue that this PR tries to solve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think neither of us noticed this was in handle_block_response, not handle_block_validation_response 🤦. I've moved it to handle_block_validation_response.

Regarding your point, though, I think we should remove the pending validation when a block is marked globally accepted/rejected. If the validation response comes in after it's marked as globally handled (the "typical" scenario), we'll still respond with a block response, which we want. But if our signer is "catching up", and the pending block validation is 1 or more blocks behind the tip, we should just drop it.

.unwrap_or_else(|e| warn!("{self}: Failed to remove pending block validation: {e:?}"));

match self.signer_db.get_pending_block_validation() {
Ok(Some(signer_sig_hash)) => {
info!("{self}: Found a pending block validation: {signer_sig_hash:?}");
match self
.signer_db
.block_lookup(self.reward_cycle, &signer_sig_hash)
{
Ok(Some(block_info)) => {
self.submit_block_for_validation(stacks_client, block_info.block);
}
Ok(None) => {
// This should never happen
error!(
"{self}: Pending block validation not found in DB: {signer_sig_hash:?}"
);
}
Err(e) => error!("{self}: Failed to get block info: {e:?}"),
}
}
Ok(None) => {}
Err(e) => warn!("{self}: Failed to get pending block validation: {e:?}"),
}
}

/// Handle the block validate ok response. Returns our block response if we have one
fn handle_block_validate_ok(
&mut self,
Expand All @@ -525,10 +551,7 @@ impl Signer {
let signer_signature_hash = block_validate_ok.signer_signature_hash;
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| {
proposal.block.header.signer_signature_hash() == signer_signature_hash
})
.map(|(proposal_hash, _)| proposal_hash == signer_signature_hash)
.unwrap_or(false)
{
self.submitted_block_proposal = None;
Expand Down Expand Up @@ -584,10 +607,7 @@ impl Signer {
let signer_signature_hash = block_validate_reject.signer_signature_hash;
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| {
proposal.block.header.signer_signature_hash() == signer_signature_hash
})
.map(|(proposal_hash, _)| proposal_hash == signer_signature_hash)
.unwrap_or(false)
{
self.submitted_block_proposal = None;
Expand Down Expand Up @@ -670,20 +690,21 @@ impl Signer {
/// Check the current tracked submitted block proposal to see if it has timed out.
/// Broadcasts a rejection and marks the block locally rejected if it has.
fn check_submitted_block_proposal(&mut self) {
let Some((block_proposal, block_submission)) = self.submitted_block_proposal.take() else {
let Some((proposal_signer_sighash, block_submission)) =
self.submitted_block_proposal.take()
else {
// Nothing to check.
return;
};
if block_submission.elapsed() < self.block_proposal_validation_timeout {
// Not expired yet. Put it back!
self.submitted_block_proposal = Some((block_proposal, block_submission));
self.submitted_block_proposal = Some((proposal_signer_sighash, block_submission));
return;
}
let signature_sighash = block_proposal.block.header.signer_signature_hash();
// For mutability reasons, we need to take the block_info out of the map and add it back after processing
let mut block_info = match self
.signer_db
.block_lookup(self.reward_cycle, &signature_sighash)
.block_lookup(self.reward_cycle, &proposal_signer_sighash)
{
Ok(Some(block_info)) => {
if block_info.state == BlockState::GloballyRejected
Expand All @@ -698,8 +719,7 @@ impl Signer {
// This is weird. If this is reached, its probably an error in code logic or the db was flushed.
// Why are we tracking a block submission for a block we have never seen / stored before.
error!("{self}: tracking an unknown block validation submission.";
"signer_sighash" => %signature_sighash,
"block_id" => %block_proposal.block.block_id(),
"signer_sighash" => %proposal_signer_sighash,
);
return;
}
Expand All @@ -712,11 +732,10 @@ impl Signer {
// Reject it so we aren't holding up the network because of our inaction.
warn!(
"{self}: Failed to receive block validation response within {} ms. Rejecting block.", self.block_proposal_validation_timeout.as_millis();
"signer_sighash" => %signature_sighash,
"block_id" => %block_proposal.block.block_id(),
"signer_sighash" => %proposal_signer_sighash,
);
let rejection = BlockResponse::rejected(
block_proposal.block.header.signer_signature_hash(),
proposal_signer_sighash,
RejectCode::ConnectivityIssues,
&self.private_key,
self.mainnet,
Expand Down Expand Up @@ -851,7 +870,7 @@ impl Signer {
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash)
.map(|(proposal_signer_sighash, _)| proposal_signer_sighash == block_hash)
.unwrap_or(false)
{
// Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway.
Expand Down Expand Up @@ -1002,7 +1021,7 @@ impl Signer {
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash)
.map(|(proposal_hash, _)| proposal_hash == block_hash)
.unwrap_or(false)
{
// Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway.
Expand Down Expand Up @@ -1046,6 +1065,36 @@ impl Signer {
}
}

/// Submit a block for validation, and mark it as pending if the node
fn submit_block_for_validation(&mut self, stacks_client: &StacksClient, block: NakamotoBlock) {
let signer_signature_hash = block.header.signer_signature_hash();
match stacks_client.submit_block_for_validation(block.clone()) {
Ok(_) => {
self.submitted_block_proposal = Some((signer_signature_hash, Instant::now()));
}
Err(ClientError::RequestFailure(status)) => {
if status.as_u16() == TOO_MANY_REQUESTS_STATUS {
info!("{self}: Received 429 from stacks node. Inserting pending block validation...";
"signer_signature_hash" => %signer_signature_hash,
);
self.signer_db
.insert_pending_block_validation(
&signer_signature_hash,
get_epoch_time_secs(),
)
.unwrap_or_else(|e| {
warn!("{self}: Failed to insert pending block validation: {e:?}")
});
} else {
warn!("{self}: Received non-429 status from stacks node: {status}");
}
}
Err(e) => {
warn!("{self}: Failed to submit block for validation: {e:?}");
}
}
}

#[cfg(any(test, feature = "testing"))]
fn test_skip_block_broadcast(&self, block: &NakamotoBlock) -> bool {
if *TEST_SKIP_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
Expand Down
6 changes: 4 additions & 2 deletions stackslib/src/net/api/postblock_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ define_u8_enum![ValidateRejectCode {
NoSuchTenure = 6
}];

pub static TOO_MANY_REQUESTS_STATUS: u16 = 429;

impl TryFrom<u8> for ValidateRejectCode {
type Error = CodecError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -687,7 +689,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler {
let res = node.with_node_state(|network, sortdb, chainstate, _mempool, rpc_args| {
if network.is_proposal_thread_running() {
return Err((
429,
TOO_MANY_REQUESTS_STATUS,
NetError::SendError("Proposal currently being evaluated".into()),
));
}
Expand All @@ -708,7 +710,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler {
.spawn_validation_thread(sortdb, chainstate, receiver)
.map_err(|_e| {
(
429,
TOO_MANY_REQUESTS_STATUS,
NetError::SendError(
"IO error while spawning proposal callback thread".into(),
),
Expand Down
Loading