Skip to content

Commit

Permalink
Merge branch 'develop' into fix/5502
Browse files Browse the repository at this point in the history
  • Loading branch information
jcnelson authored Dec 18, 2024
2 parents 5812c4a + ca77e95 commit 7d4499e
Show file tree
Hide file tree
Showing 15 changed files with 709 additions and 355 deletions.
1 change: 1 addition & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ jobs:
- tests::signer::v0::block_validation_response_timeout
- tests::signer::v0::tenure_extend_after_bad_commit
- tests::signer::v0::block_proposal_max_age_rejections
- tests::signer::v0::global_acceptance_depends_on_block_announcement
- tests::nakamoto_integrations::burn_ops_integration_test
- tests::nakamoto_integrations::check_block_heights
- tests::nakamoto_integrations::clarity_burn_state
Expand Down
184 changes: 82 additions & 102 deletions libsigner/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ pub enum SignerEvent<T: SignerEventTrait> {
/// the time at which this event was received by the signer's event processor
received_time: SystemTime,
},
/// A new processed Stacks block was received from the node with the given block hash
NewBlock {
/// The block header hash for the newly processed stacks block
block_hash: Sha512Trunc256Sum,
/// The block height for the newly processed stacks block
block_height: u64,
},
}

/// Trait to implement a stop-signaler for the event receiver thread.
Expand Down Expand Up @@ -298,29 +305,25 @@ impl<T: SignerEventTrait> EventReceiver<T> for SignerEventReceiver<T> {
&request.method(),
)));
}
debug!("Processing {} event", request.url());
if request.url() == "/stackerdb_chunks" {
process_stackerdb_event(event_receiver.local_addr, request)
.map_err(|e| {
error!("Error processing stackerdb_chunks message"; "err" => ?e);
e
})
process_event::<T, StackerDBChunksEvent>(request)
} else if request.url() == "/proposal_response" {
process_proposal_response(request)
process_event::<T, BlockValidateResponse>(request)
} else if request.url() == "/new_burn_block" {
process_new_burn_block_event(request)
process_event::<T, BurnBlockEvent>(request)
} else if request.url() == "/shutdown" {
event_receiver.stop_signal.store(true, Ordering::SeqCst);
return Err(EventError::Terminated);
Err(EventError::Terminated)
} else if request.url() == "/new_block" {
process_event::<T, BlockEvent>(request)
} else {
let url = request.url().to_string();
// `/new_block` is expected, but not specifically handled. do not log.
if &url != "/new_block" {
debug!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
url
);
}
debug!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
url
);
ack_dispatcher(request);
Err(EventError::UnrecognizedEvent(url))
}
Expand Down Expand Up @@ -385,12 +388,13 @@ fn ack_dispatcher(request: HttpRequest) {

// TODO: add tests from mutation testing results #4835
#[cfg_attr(test, mutants::skip)]
/// Process a stackerdb event from the node
fn process_stackerdb_event<T: SignerEventTrait>(
local_addr: Option<SocketAddr>,
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
fn process_event<T, E>(mut request: HttpRequest) -> Result<SignerEvent<T>, EventError>
where
T: SignerEventTrait,
E: serde::de::DeserializeOwned + TryInto<SignerEvent<T>, Error = EventError>,
{
let mut body = String::new();

if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
ack_dispatcher(request);
Expand All @@ -399,27 +403,12 @@ fn process_stackerdb_event<T: SignerEventTrait>(
&e
)));
}

debug!("Got stackerdb_chunks event"; "chunks_event_body" => %body);
let event: StackerDBChunksEvent = serde_json::from_slice(body.as_bytes())
// Regardless of whether we successfully deserialize, we should ack the dispatcher so they don't keep resending it
ack_dispatcher(request);
let json_event: E = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;

let event_contract_id = event.contract_id.clone();

let signer_event = match SignerEvent::try_from(event) {
Err(e) => {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
local_addr,
event_contract_id
);
ack_dispatcher(request);
return Err(e);
}
Ok(x) => x,
};

ack_dispatcher(request);
let signer_event: SignerEvent<T> = json_event.try_into()?;

Ok(signer_event)
}
Expand Down Expand Up @@ -466,78 +455,69 @@ impl<T: SignerEventTrait> TryFrom<StackerDBChunksEvent> for SignerEvent<T> {
}
}

/// Process a proposal response from the node
fn process_proposal_response<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got proposal_response event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BlockValidateResponse> for SignerEvent<T> {
type Error = EventError;

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
fn try_from(block_validate_response: BlockValidateResponse) -> Result<Self, Self::Error> {
Ok(SignerEvent::BlockValidationResponse(
block_validate_response,
))
}
}

let event: BlockValidateResponse = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
#[derive(Debug, Deserialize)]
struct BurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
}

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BurnBlockEvent> for SignerEvent<T> {
type Error = EventError;

fn try_from(burn_block_event: BurnBlockEvent) -> Result<Self, Self::Error> {
let burn_header_hash = burn_block_event
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;

Ok(SignerEvent::NewBurnBlock {
burn_height: burn_block_event.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
})
}
}

Ok(SignerEvent::BlockValidationResponse(event))
#[derive(Debug, Deserialize)]
struct BlockEvent {
block_hash: String,
block_height: u64,
}

/// Process a new burn block event from the node
fn process_new_burn_block_event<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got burn_block event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BlockEvent> for SignerEvent<T> {
type Error = EventError;

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}
#[derive(Debug, Deserialize)]
struct TempBurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
}
let temp: TempBurnBlockEvent = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
let burn_header_hash = temp
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
let event = SignerEvent::NewBurnBlock {
burn_height: temp.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
};
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
fn try_from(block_event: BlockEvent) -> Result<Self, Self::Error> {
let block_hash: Sha512Trunc256Sum = block_event
.block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
Sha512Trunc256Sum::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
Ok(SignerEvent::NewBlock {
block_hash,
block_height: block_event.block_height,
})
}
Ok(event)
}

pub fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
Expand Down
3 changes: 3 additions & 0 deletions stacks-common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{error, fmt, thread, time};

#[cfg(any(test, feature = "testing"))]
pub mod tests;

pub fn get_epoch_time_secs() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start
Expand Down
99 changes: 99 additions & 0 deletions stacks-common/src/util/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::{Arc, Mutex};
/// `TestFlag` is a thread-safe utility designed for managing shared state in testing scenarios. It wraps
/// a value of type `T` inside an `Arc<Mutex<Option<T>>>`, allowing you to set and retrieve a value
/// across different parts of your codebase while ensuring thread safety.
///
/// This structure is particularly useful when:
/// - You need a global or static variable in tests.
/// - You want to control the execution of custom test code paths by setting and checking a shared value.
///
/// # Type Parameter
/// - `T`: The type of the value managed by the `TestFlag`. It must implement the `Default` and `Clone` traits.
///
/// # Examples
///
/// ```rust
/// use stacks_common::util::tests::TestFlag;
/// use std::sync::{Arc, Mutex};
///
/// // Create a TestFlag instance
/// let test_flag = TestFlag::default();
///
/// // Set a value in the test flag
/// test_flag.set("test_value".to_string());
///
/// // Retrieve the value
/// assert_eq!(test_flag.get(), "test_value".to_string());
///
/// // Reset the value to default
/// test_flag.set("".to_string());
/// assert_eq!(test_flag.get(), "".to_string());
/// ```
#[derive(Clone)]
pub struct TestFlag<T>(pub Arc<Mutex<Option<T>>>);

impl<T: Default + Clone> Default for TestFlag<T> {
fn default() -> Self {
Self(Arc::new(Mutex::new(None)))
}
}

impl<T: Default + Clone> TestFlag<T> {
/// Sets the value of the test flag.
///
/// This method updates the value stored inside the `TestFlag`, replacing any existing value.
///
/// # Arguments
/// - `value`: The new value to set for the `TestFlag`.
///
/// # Examples
///
/// ```rust
/// let test_flag = TestFlag::default();
/// test_flag.set(42);
/// assert_eq!(test_flag.get(), 42);
/// ```
pub fn set(&self, value: T) {
*self.0.lock().unwrap() = Some(value);
}

/// Retrieves the current value of the test flag.
///
/// If no value has been set, this method returns the default value for the type `T`.
///
/// # Returns
/// - The current value of the test flag, or the default value of `T` if none has been set.
///
/// # Examples
///
/// ```rust
/// let test_flag = TestFlag::default();
///
/// // Get the default value
/// assert_eq!(test_flag.get(), 0); // For T = i32, default is 0
///
/// // Set a value
/// test_flag.set(123);
///
/// // Get the updated value
/// assert_eq!(test_flag.get(), 123);
/// ```
pub fn get(&self) -> T {
self.0.lock().unwrap().clone().unwrap_or_default().clone()
}
}
1 change: 1 addition & 0 deletions stacks-signer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE

## Changed
- Improvements to the stale signer cleanup logic: deletes the prior signer if it has no remaining unprocessed blocks in its database
- Signers now listen to new block events from the stacks node to determine whether a block has been successfully appended to the chain tip

## [3.1.0.0.1.0]

Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ serde_stacker = "0.1"
slog = { version = "2.5.2", features = [ "max_level_trace" ] }
slog-json = { version = "2.3.0", optional = true }
slog-term = "2.6.0"
stacks-common = { path = "../stacks-common" }
stacks-common = { path = "../stacks-common", features = ["testing"] }
stackslib = { path = "../stackslib" }
thiserror = { workspace = true }
tiny_http = { version = "0.12", optional = true }
Expand Down
Loading

0 comments on commit 7d4499e

Please sign in to comment.