Skip to content

Commit

Permalink
feat: remove invalid header dep tx for reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Oct 19, 2021
1 parent cab368f commit 2a32abd
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 56 deletions.
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(GetRawTxPool),
Box::new(PoolReconcile),
Box::new(PoolResurrect),
Box::new(InvalidHeaderDep),
#[cfg(not(target_os = "windows"))]
Box::new(PoolPersisted),
Box::new(TransactionRelayBasic),
Expand Down
43 changes: 43 additions & 0 deletions test/src/specs/tx_pool/pool_resurrect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,46 @@ impl Spec for PoolResurrect {
node0.assert_tx_pool_size(0, 0);
}
}

pub struct InvalidHeaderDep;

impl Spec for InvalidHeaderDep {
crate::setup!(num_nodes: 2);

fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let node1 = &nodes[1];

info!("Generate 1 block on node0");
mine_until_out_bootstrap_period(node0);
mine(node0, 1);

info!("Generate header dep tx on node0");
let hash = node0.generate_transaction();

let tip = node0.get_tip_block();

let tx = node0.new_transaction(hash);

node0.rpc_client().send_transaction(
tx.as_advanced_builder()
.set_header_deps(vec![tip.hash()])
.build()
.data()
.into(),
);

let tx_pool_info = node0.get_tip_tx_pool_info();
assert_eq!(tx_pool_info.pending.value(), 2);

mine_until_out_bootstrap_period(node1);
mine(node1, 2);

info!("Connect node0 to node1, waiting for sync");
node0.connect(node1);
waiting_for_sync(nodes);

info!("invalid header dep tx should be removed");
node0.assert_tx_pool_size(1, 0);
}
}
57 changes: 47 additions & 10 deletions tx-pool/src/component/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_types::{
tx_pool::Reject,
TransactionView,
},
packed::{OutPoint, ProposalShortId},
packed::{Byte32, OutPoint, ProposalShortId},
prelude::*,
};
use ckb_util::{LinkedHashMap, LinkedHashMapEntries};
Expand All @@ -21,6 +21,8 @@ pub(crate) struct PendingQueue {
pub(crate) deps: HashMap<OutPoint, HashSet<ProposalShortId>>,
/// input-txid map represent in-pool tx's inputs
pub(crate) inputs: HashMap<OutPoint, ProposalShortId>,
/// dep-set<txid-headers> map represent in-pool tx's header deps
pub(crate) header_deps: HashMap<ProposalShortId, Vec<Byte32>>,
}

impl PendingQueue {
Expand All @@ -29,6 +31,7 @@ impl PendingQueue {
inner: Default::default(),
deps: Default::default(),
inputs: Default::default(),
header_deps: Default::default(),
}
}

Expand Down Expand Up @@ -56,23 +59,26 @@ impl PendingQueue {
.insert(tx_short_id.clone());
}

// record header_deps
let header_deps = entry.transaction().header_deps();
if !header_deps.is_empty() {
self.header_deps
.insert(tx_short_id.clone(), header_deps.into_iter().collect());
}

self.inner.insert(tx_short_id, entry);
true
}

pub(crate) fn resolve_conflict(
&mut self,
tx: &TransactionView,
) -> (Vec<ConflictEntry>, Vec<ConflictEntry>) {
pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let inputs = tx.input_pts_iter();
let mut input_conflict = Vec::new();
let mut deps_consumed = Vec::new();
let mut conflicts = Vec::new();

for i in inputs {
if let Some(id) = self.inputs.remove(&i) {
if let Some(entry) = self.remove_entry(&id) {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
input_conflict.push((entry, reject));
conflicts.push((entry, reject));
}
}

Expand All @@ -81,12 +87,39 @@ impl PendingQueue {
for id in x {
if let Some(entry) = self.remove_entry(&id) {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
deps_consumed.push((entry, reject));
conflicts.push((entry, reject));
}
}
}
}
(input_conflict, deps_consumed)

conflicts
}

pub(crate) fn resolve_conflict_header_dep(
&mut self,
headers: &HashSet<Byte32>,
) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();

// invalid header deps
let mut ids = Vec::new();
for (tx_id, deps) in self.header_deps.iter() {
for hash in deps {
if headers.contains(hash) {
ids.push((hash.clone(), tx_id.clone()));
break;
}
}
}

for (blk_hash, id) in ids {
if let Some(entry) = self.remove_entry(&id) {
let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash));
conflicts.push((entry, reject));
}
}
conflicts
}

pub(crate) fn contains_key(&self, id: &ProposalShortId) -> bool {
Expand Down Expand Up @@ -130,6 +163,8 @@ impl PendingQueue {
}
}

self.header_deps.remove(&id);

return Some(entry);
}
None
Expand Down Expand Up @@ -165,6 +200,7 @@ impl PendingQueue {
self.deps.remove(d);
}
}
self.header_deps.remove(&tx_short_id);
}

pub(crate) fn remove_entries_by_filter<P: FnMut(&ProposalShortId, &TxEntry) -> bool>(
Expand Down Expand Up @@ -215,6 +251,7 @@ impl PendingQueue {
self.inner.clear();
self.deps.clear();
self.inputs.clear();
self.header_deps.clear();
txs
}
}
Expand Down
62 changes: 51 additions & 11 deletions tx-pool/src/component/proposed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ckb_types::{
error::OutPointError,
TransactionView,
},
packed::{CellOutput, OutPoint, ProposalShortId},
packed::{Byte32, CellOutput, OutPoint, ProposalShortId},
prelude::*,
};
use std::collections::{HashMap, HashSet};
Expand All @@ -24,6 +24,8 @@ pub(crate) struct Edges {
pub(crate) inputs: HashMap<OutPoint, ProposalShortId>,
/// dep-set<txid> map represent in-pool tx's deps
pub(crate) deps: HashMap<OutPoint, HashSet<ProposalShortId>>,
/// dep-set<txid-headers> map represent in-pool tx's header deps
pub(crate) header_deps: HashMap<ProposalShortId, Vec<Byte32>>,
}

impl Edges {
Expand Down Expand Up @@ -93,6 +95,7 @@ impl Edges {
self.outputs.clear();
self.inputs.clear();
self.deps.clear();
self.header_deps.clear();
}
}

Expand Down Expand Up @@ -199,6 +202,8 @@ impl ProposedPool {
self.edges.remove_output(&o);
// self.edges.remove_deps(&o);
}

self.edges.header_deps.remove(&entry.proposal_short_id());
}
removed_entries
}
Expand All @@ -210,7 +215,6 @@ impl ProposedPool {
) -> Option<TxEntry> {
let outputs = tx.output_pts();
let inputs = tx.input_pts_iter();
// TODO: handle header deps
let id = tx.proposal_short_id();

if let Some(entry) = self.inner.remove_entry(&id) {
Expand All @@ -232,6 +236,8 @@ impl ProposedPool {
self.edges.delete_txid_by_dep(&d, &id);
}

self.edges.header_deps.remove(&id);

return Some(entry);
}
None
Expand Down Expand Up @@ -267,24 +273,28 @@ impl ProposedPool {
self.edges.insert_output(o);
}

// record header_deps
let header_deps = entry.transaction().header_deps();
if !header_deps.is_empty() {
self.edges
.header_deps
.insert(tx_short_id, header_deps.into_iter().collect());
}

self.inner.add_entry(entry)
}

pub(crate) fn resolve_conflict(
&mut self,
tx: &TransactionView,
) -> (Vec<ConflictEntry>, Vec<ConflictEntry>) {
pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let inputs = tx.input_pts_iter();
let mut input_conflict = Vec::new();
let mut deps_consumed = Vec::new();
let mut conflicts = Vec::new();

for i in inputs {
if let Some(id) = self.edges.remove_input(&i) {
let entries = self.remove_entry_and_descendants(&id);
if !entries.is_empty() {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
let rejects = iter::repeat(reject).take(entries.len());
input_conflict.extend(entries.into_iter().zip(rejects));
conflicts.extend(entries.into_iter().zip(rejects));
}
}

Expand All @@ -295,12 +305,42 @@ impl ProposedPool {
if !entries.is_empty() {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
let rejects = iter::repeat(reject).take(entries.len());
deps_consumed.extend(entries.into_iter().zip(rejects));
conflicts.extend(entries.into_iter().zip(rejects));
}
}
}
}
(input_conflict, deps_consumed)

conflicts
}

pub(crate) fn resolve_conflict_header_dep(
&mut self,
headers: &HashSet<Byte32>,
) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();

// invalid header deps
let mut invalid_header_ids = Vec::new();
for (tx_id, deps) in self.edges.header_deps.iter() {
for hash in deps {
if headers.contains(hash) {
invalid_header_ids.push((hash.clone(), tx_id.clone()));
break;
}
}
}

for (blk_hash, id) in invalid_header_ids {
let entries = self.remove_entry_and_descendants(&id);
if !entries.is_empty() {
let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash));
let rejects = iter::repeat(reject).take(entries.len());
conflicts.extend(entries.into_iter().zip(rejects));
}
}

conflicts
}

/// sorted by ancestor score from higher to lower
Expand Down
Loading

0 comments on commit 2a32abd

Please sign in to comment.