Skip to content

Commit

Permalink
Merge pull request #2570 from blockstack/fix/book-deadlock
Browse files Browse the repository at this point in the history
Boot looping
  • Loading branch information
lgalabru authored Apr 6, 2021
2 parents 7160d48 + dceca5f commit 11f9820
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
18 changes: 16 additions & 2 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::thread::sleep;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};

use async_h1::client;
Expand Down Expand Up @@ -36,6 +39,7 @@ use super::node::ChainTip;
#[derive(Debug, Clone)]
struct EventObserver {
endpoint: String,
should_keep_running: Arc<AtomicBool>,
}

const STATUS_RESP_TRUE: &str = "success";
Expand Down Expand Up @@ -73,6 +77,11 @@ impl EventObserver {
let backoff = Duration::from_millis((1.0 * 1_000.0) as u64);

loop {
if !self.should_keep_running.load(Ordering::SeqCst) {
info!("Terminating event observer");
return;
}

let body = body.clone();
let mut req = Request::new(Method::Post, url.clone());
req.append_header("Content-Type", "application/json")
Expand Down Expand Up @@ -627,11 +636,16 @@ impl EventDispatcher {
}
}

pub fn register_observer(&mut self, conf: &EventObserverConfig) {
pub fn register_observer(
&mut self,
conf: &EventObserverConfig,
should_keep_running: Arc<AtomicBool>,
) {
// let event_observer = EventObserver::new(&conf.address, conf.port);
info!("Registering event observer at: {}", conf.endpoint);
let event_observer = EventObserver {
endpoint: conf.endpoint.clone(),
should_keep_running,
};

let observer_index = self.registered_observers.len() as u16;
Expand Down
18 changes: 9 additions & 9 deletions testnet/stacks-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use stacks::chainstate::stacks::db::{
use stacks::chainstate::stacks::events::{
StacksTransactionEvent, StacksTransactionReceipt, TransactionOrigin,
};
use stacks::chainstate::stacks::index::TrieHash;
use stacks::chainstate::stacks::{
CoinbasePayload, StacksAddress, StacksBlock, StacksBlockHeader, StacksMicroblock,
StacksTransaction, StacksTransactionSigner, TransactionAnchorMode, TransactionPayload,
Expand All @@ -33,6 +34,11 @@ use stacks::net::{
rpc::RPCHandlerArgs,
Error as NetError, PeerAddress,
};
use stacks::util::get_epoch_time_secs;
use stacks::util::hash::Sha256Sum;
use stacks::util::secp256k1::Secp256k1PrivateKey;
use stacks::util::strings::UrlString;
use stacks::util::vrf::VRFPublicKey;
use stacks::{
burnchains::{Burnchain, BurnchainHeaderHash, Txid},
chainstate::stacks::db::{
Expand All @@ -41,13 +47,7 @@ use stacks::{
},
};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};

use stacks::chainstate::stacks::index::TrieHash;
use stacks::util::get_epoch_time_secs;
use stacks::util::hash::Sha256Sum;
use stacks::util::secp256k1::Secp256k1PrivateKey;
use stacks::util::strings::UrlString;
use stacks::util::vrf::VRFPublicKey;
use std::sync::{atomic::AtomicBool, Arc};

#[derive(Debug, Clone)]
pub struct ChainTip {
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Node {
let mut event_dispatcher = EventDispatcher::new();

for observer in &config.events_observers {
event_dispatcher.register_observer(observer);
event_dispatcher.register_observer(observer, Arc::new(AtomicBool::new(true)));
}

event_dispatcher.process_boot_receipts(receipts);
Expand Down Expand Up @@ -342,7 +342,7 @@ impl Node {
let mut event_dispatcher = EventDispatcher::new();

for observer in &config.events_observers {
event_dispatcher.register_observer(observer);
event_dispatcher.register_observer(observer, Arc::new(AtomicBool::new(true)));
}

let chainstate_path = config.get_chainstate_path_str();
Expand Down
7 changes: 6 additions & 1 deletion testnet/stacks-node/src/run_loop/neon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl RunLoop {
// setup dispatcher
let mut event_dispatcher = EventDispatcher::new();
for observer in self.config.events_observers.iter() {
event_dispatcher.register_observer(observer);
event_dispatcher.register_observer(observer, should_keep_running.clone());
}

let use_test_genesis_data = use_test_genesis_chainstate(&self.config);
Expand Down Expand Up @@ -270,6 +270,11 @@ impl RunLoop {
})
.unwrap();

// We announce a new burn block so that the chains coordinator
// can resume prior work and handle eventual unprocessed sortitions
// stored during a previous session.
coordinator_senders.announce_new_burn_block();

let mut burnchain_tip = burnchain
.wait_for_sortitions(None)
.expect("Unable to get burnchain tip");
Expand Down

0 comments on commit 11f9820

Please sign in to comment.