Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event emitter will now emit new_txid in-case a tx replaces another #5381

Closed
wants to merge 10 commits into from
20 changes: 16 additions & 4 deletions stackslib/src/chainstate/stacks/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1427,8 +1427,16 @@ impl<'a> StacksMicroblockBuilder<'a> {
self.runtime.num_mined = num_txs;

mem_pool.drop_txs(&invalidated_txs)?;
event_dispatcher.mempool_txs_dropped(invalidated_txs, MemPoolDropReason::TOO_EXPENSIVE);
event_dispatcher.mempool_txs_dropped(to_drop_and_blacklist, MemPoolDropReason::PROBLEMATIC);
event_dispatcher.mempool_txs_dropped(
invalidated_txs,
None,
MemPoolDropReason::TOO_EXPENSIVE,
);
event_dispatcher.mempool_txs_dropped(
to_drop_and_blacklist,
None,
MemPoolDropReason::PROBLEMATIC,
);

if blocked {
debug!(
Expand Down Expand Up @@ -2543,8 +2551,12 @@ impl StacksBlockBuilder {
mempool.drop_txs(&invalidated_txs)?;

if let Some(observer) = event_observer {
observer.mempool_txs_dropped(invalidated_txs, MemPoolDropReason::TOO_EXPENSIVE);
observer.mempool_txs_dropped(to_drop_and_blacklist, MemPoolDropReason::PROBLEMATIC);
observer.mempool_txs_dropped(invalidated_txs, None, MemPoolDropReason::TOO_EXPENSIVE);
observer.mempool_txs_dropped(
to_drop_and_blacklist,
None,
MemPoolDropReason::PROBLEMATIC,
);
}

if let Err(e) = result {
Expand Down
13 changes: 9 additions & 4 deletions stackslib/src/core/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ pub trait ProposalCallbackReceiver: Send {

pub trait MemPoolEventDispatcher {
fn get_proposal_callback_receiver(&self) -> Option<Box<dyn ProposalCallbackReceiver>>;
fn mempool_txs_dropped(&self, txids: Vec<Txid>, reason: MemPoolDropReason);
fn mempool_txs_dropped(
&self,
txids: Vec<Txid>,
new_txid: Option<Txid>,
reason: MemPoolDropReason,
);
fn mined_block_event(
&self,
target_burn_height: u64,
Expand Down Expand Up @@ -2229,7 +2234,7 @@ impl MemPoolDB {

// broadcast drop event if a tx is being replaced
if let (Some(prior_tx), Some(event_observer)) = (prior_tx, event_observer) {
event_observer.mempool_txs_dropped(vec![prior_tx.txid], replace_reason);
event_observer.mempool_txs_dropped(vec![prior_tx.txid], Some(txid), replace_reason);
};

Ok(())
Expand Down Expand Up @@ -2275,7 +2280,7 @@ impl MemPoolDB {
if let Some(event_observer) = event_observer {
let sql = "SELECT txid FROM mempool WHERE accept_time < ?1";
let txids = query_rows(tx, sql, args)?;
event_observer.mempool_txs_dropped(txids, MemPoolDropReason::STALE_COLLECT);
event_observer.mempool_txs_dropped(txids, None, MemPoolDropReason::STALE_COLLECT);
}

let sql = "DELETE FROM mempool WHERE accept_time < ?1";
Expand All @@ -2297,7 +2302,7 @@ impl MemPoolDB {
if let Some(event_observer) = event_observer {
let sql = "SELECT txid FROM mempool WHERE height < ?1";
let txids = query_rows(tx, sql, args)?;
event_observer.mempool_txs_dropped(txids, MemPoolDropReason::STALE_COLLECT);
event_observer.mempool_txs_dropped(txids, None, MemPoolDropReason::STALE_COLLECT);
}

let sql = "DELETE FROM mempool WHERE height < ?1";
Expand Down
8 changes: 7 additions & 1 deletion stackslib/src/net/api/tests/postblock_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ impl MemPoolEventDispatcher for ProposalTestObserver {
Some(Box::new(Arc::clone(&self.proposal_observer)))
}

fn mempool_txs_dropped(&self, txids: Vec<Txid>, reason: mempool::MemPoolDropReason) {}
fn mempool_txs_dropped(
&self,
txids: Vec<Txid>,
new_txid: Option<Txid>,
reason: mempool::MemPoolDropReason,
) {
}

fn mined_block_event(
&self,
Expand Down
36 changes: 29 additions & 7 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,9 +943,14 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler {
}

impl MemPoolEventDispatcher for EventDispatcher {
fn mempool_txs_dropped(&self, txids: Vec<Txid>, reason: MemPoolDropReason) {
fn mempool_txs_dropped(
&self,
txids: Vec<Txid>,
new_txid: Option<Txid>,
reason: MemPoolDropReason,
) {
if !txids.is_empty() {
self.process_dropped_mempool_txs(txids, reason)
self.process_dropped_mempool_txs(txids, new_txid, reason)
}
}

Expand Down Expand Up @@ -1568,7 +1573,12 @@ impl EventDispatcher {
}
}

pub fn process_dropped_mempool_txs(&self, txs: Vec<Txid>, reason: MemPoolDropReason) {
pub fn process_dropped_mempool_txs(
&self,
txs: Vec<Txid>,
new_txid: Option<Txid>,
reason: MemPoolDropReason,
) {
// lazily assemble payload only if we have observers
let interested_observers = self.filter_observers(&self.mempool_observers_lookup, true);

Expand All @@ -1581,10 +1591,22 @@ impl EventDispatcher {
.map(|tx| serde_json::Value::String(format!("0x{tx}")))
.collect();

let payload = json!({
"dropped_txids": serde_json::Value::Array(dropped_txids),
"reason": reason.to_string(),
});
let payload = match new_txid {
Some(id) => {
json!({
"dropped_txids": serde_json::Value::Array(dropped_txids),
"reason": reason.to_string(),
"new_txid": format!("0x{}", &id),
})
}
None => {
json!({
"dropped_txids": serde_json::Value::Array(dropped_txids),
"reason": reason.to_string(),
"new_txid": null,
})
}
};

for observer in interested_observers.iter() {
observer.send_dropped_mempool_txs(&payload);
Expand Down