From 2a32abd172095a3d388ad5af1440e7f6f8101c54 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Thu, 14 Oct 2021 22:22:36 +0800 Subject: [PATCH] feat: remove invalid header dep tx for reorg --- test/src/main.rs | 1 + test/src/specs/tx_pool/pool_resurrect.rs | 43 ++++++++++++++++ tx-pool/src/component/pending.rs | 57 ++++++++++++++++++---- tx-pool/src/component/proposed.rs | 62 +++++++++++++++++++----- tx-pool/src/component/tests/pending.rs | 55 +++++++++++++++------ tx-pool/src/component/tests/proposed.rs | 30 +++++++++++- tx-pool/src/component/tests/util.rs | 21 ++++++++ tx-pool/src/pool.rs | 45 ++++++++++------- tx-pool/src/process.rs | 8 ++- 9 files changed, 266 insertions(+), 56 deletions(-) diff --git a/test/src/main.rs b/test/src/main.rs index 9a1269ceb2..4e0e4feadd 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -396,6 +396,7 @@ fn all_specs() -> Vec> { Box::new(GetRawTxPool), Box::new(PoolReconcile), Box::new(PoolResurrect), + Box::new(InvalidHeaderDep), #[cfg(not(target_os = "windows"))] Box::new(PoolPersisted), Box::new(TransactionRelayBasic), diff --git a/test/src/specs/tx_pool/pool_resurrect.rs b/test/src/specs/tx_pool/pool_resurrect.rs index 4ebc1ee36b..effdb3d0dc 100644 --- a/test/src/specs/tx_pool/pool_resurrect.rs +++ b/test/src/specs/tx_pool/pool_resurrect.rs @@ -52,3 +52,46 @@ impl Spec for PoolResurrect { node0.assert_tx_pool_size(0, 0); } } + +pub struct InvalidHeaderDep; + +impl Spec for InvalidHeaderDep { + crate::setup!(num_nodes: 2); + + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let node1 = &nodes[1]; + + info!("Generate 1 block on node0"); + mine_until_out_bootstrap_period(node0); + mine(node0, 1); + + info!("Generate header dep tx on node0"); + let hash = node0.generate_transaction(); + + let tip = node0.get_tip_block(); + + let tx = node0.new_transaction(hash); + + node0.rpc_client().send_transaction( + tx.as_advanced_builder() + .set_header_deps(vec![tip.hash()]) + .build() + .data() + .into(), + ); + + let tx_pool_info = node0.get_tip_tx_pool_info(); + assert_eq!(tx_pool_info.pending.value(), 2); + + mine_until_out_bootstrap_period(node1); + mine(node1, 2); + + info!("Connect node0 to node1, waiting for sync"); + node0.connect(node1); + waiting_for_sync(nodes); + + info!("invalid header dep tx should be removed"); + node0.assert_tx_pool_size(1, 0); + } +} diff --git a/tx-pool/src/component/pending.rs b/tx-pool/src/component/pending.rs index eeb5345bac..5558922174 100644 --- a/tx-pool/src/component/pending.rs +++ b/tx-pool/src/component/pending.rs @@ -6,7 +6,7 @@ use ckb_types::{ tx_pool::Reject, TransactionView, }, - packed::{OutPoint, ProposalShortId}, + packed::{Byte32, OutPoint, ProposalShortId}, prelude::*, }; use ckb_util::{LinkedHashMap, LinkedHashMapEntries}; @@ -21,6 +21,8 @@ pub(crate) struct PendingQueue { pub(crate) deps: HashMap>, /// input-txid map represent in-pool tx's inputs pub(crate) inputs: HashMap, + /// dep-set map represent in-pool tx's header deps + pub(crate) header_deps: HashMap>, } impl PendingQueue { @@ -29,6 +31,7 @@ impl PendingQueue { inner: Default::default(), deps: Default::default(), inputs: Default::default(), + header_deps: Default::default(), } } @@ -56,23 +59,26 @@ impl PendingQueue { .insert(tx_short_id.clone()); } + // record header_deps + let header_deps = entry.transaction().header_deps(); + if !header_deps.is_empty() { + self.header_deps + .insert(tx_short_id.clone(), header_deps.into_iter().collect()); + } + self.inner.insert(tx_short_id, entry); true } - pub(crate) fn resolve_conflict( - &mut self, - tx: &TransactionView, - ) -> (Vec, Vec) { + pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec { let inputs = tx.input_pts_iter(); - let mut input_conflict = Vec::new(); - let mut deps_consumed = Vec::new(); + let mut conflicts = Vec::new(); for i in inputs { if let Some(id) = self.inputs.remove(&i) { if let Some(entry) = self.remove_entry(&id) { let reject = Reject::Resolve(OutPointError::Dead(i.clone())); - input_conflict.push((entry, reject)); + conflicts.push((entry, reject)); } } @@ -81,12 +87,39 @@ impl PendingQueue { for id in x { if let Some(entry) = self.remove_entry(&id) { let reject = Reject::Resolve(OutPointError::Dead(i.clone())); - deps_consumed.push((entry, reject)); + conflicts.push((entry, reject)); } } } } - (input_conflict, deps_consumed) + + conflicts + } + + pub(crate) fn resolve_conflict_header_dep( + &mut self, + headers: &HashSet, + ) -> Vec { + let mut conflicts = Vec::new(); + + // invalid header deps + let mut ids = Vec::new(); + for (tx_id, deps) in self.header_deps.iter() { + for hash in deps { + if headers.contains(hash) { + ids.push((hash.clone(), tx_id.clone())); + break; + } + } + } + + for (blk_hash, id) in ids { + if let Some(entry) = self.remove_entry(&id) { + let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash)); + conflicts.push((entry, reject)); + } + } + conflicts } pub(crate) fn contains_key(&self, id: &ProposalShortId) -> bool { @@ -130,6 +163,8 @@ impl PendingQueue { } } + self.header_deps.remove(&id); + return Some(entry); } None @@ -165,6 +200,7 @@ impl PendingQueue { self.deps.remove(d); } } + self.header_deps.remove(&tx_short_id); } pub(crate) fn remove_entries_by_filter bool>( @@ -215,6 +251,7 @@ impl PendingQueue { self.inner.clear(); self.deps.clear(); self.inputs.clear(); + self.header_deps.clear(); txs } } diff --git a/tx-pool/src/component/proposed.rs b/tx-pool/src/component/proposed.rs index 6741db61f8..59503789eb 100644 --- a/tx-pool/src/component/proposed.rs +++ b/tx-pool/src/component/proposed.rs @@ -8,7 +8,7 @@ use ckb_types::{ error::OutPointError, TransactionView, }, - packed::{CellOutput, OutPoint, ProposalShortId}, + packed::{Byte32, CellOutput, OutPoint, ProposalShortId}, prelude::*, }; use std::collections::{HashMap, HashSet}; @@ -24,6 +24,8 @@ pub(crate) struct Edges { pub(crate) inputs: HashMap, /// dep-set map represent in-pool tx's deps pub(crate) deps: HashMap>, + /// dep-set map represent in-pool tx's header deps + pub(crate) header_deps: HashMap>, } impl Edges { @@ -93,6 +95,7 @@ impl Edges { self.outputs.clear(); self.inputs.clear(); self.deps.clear(); + self.header_deps.clear(); } } @@ -199,6 +202,8 @@ impl ProposedPool { self.edges.remove_output(&o); // self.edges.remove_deps(&o); } + + self.edges.header_deps.remove(&entry.proposal_short_id()); } removed_entries } @@ -210,7 +215,6 @@ impl ProposedPool { ) -> Option { let outputs = tx.output_pts(); let inputs = tx.input_pts_iter(); - // TODO: handle header deps let id = tx.proposal_short_id(); if let Some(entry) = self.inner.remove_entry(&id) { @@ -232,6 +236,8 @@ impl ProposedPool { self.edges.delete_txid_by_dep(&d, &id); } + self.edges.header_deps.remove(&id); + return Some(entry); } None @@ -267,16 +273,20 @@ impl ProposedPool { self.edges.insert_output(o); } + // record header_deps + let header_deps = entry.transaction().header_deps(); + if !header_deps.is_empty() { + self.edges + .header_deps + .insert(tx_short_id, header_deps.into_iter().collect()); + } + self.inner.add_entry(entry) } - pub(crate) fn resolve_conflict( - &mut self, - tx: &TransactionView, - ) -> (Vec, Vec) { + pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec { let inputs = tx.input_pts_iter(); - let mut input_conflict = Vec::new(); - let mut deps_consumed = Vec::new(); + let mut conflicts = Vec::new(); for i in inputs { if let Some(id) = self.edges.remove_input(&i) { @@ -284,7 +294,7 @@ impl ProposedPool { if !entries.is_empty() { let reject = Reject::Resolve(OutPointError::Dead(i.clone())); let rejects = iter::repeat(reject).take(entries.len()); - input_conflict.extend(entries.into_iter().zip(rejects)); + conflicts.extend(entries.into_iter().zip(rejects)); } } @@ -295,12 +305,42 @@ impl ProposedPool { if !entries.is_empty() { let reject = Reject::Resolve(OutPointError::Dead(i.clone())); let rejects = iter::repeat(reject).take(entries.len()); - deps_consumed.extend(entries.into_iter().zip(rejects)); + conflicts.extend(entries.into_iter().zip(rejects)); } } } } - (input_conflict, deps_consumed) + + conflicts + } + + pub(crate) fn resolve_conflict_header_dep( + &mut self, + headers: &HashSet, + ) -> Vec { + let mut conflicts = Vec::new(); + + // invalid header deps + let mut invalid_header_ids = Vec::new(); + for (tx_id, deps) in self.edges.header_deps.iter() { + for hash in deps { + if headers.contains(hash) { + invalid_header_ids.push((hash.clone(), tx_id.clone())); + break; + } + } + } + + for (blk_hash, id) in invalid_header_ids { + let entries = self.remove_entry_and_descendants(&id); + if !entries.is_empty() { + let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash)); + let rejects = iter::repeat(reject).take(entries.len()); + conflicts.extend(entries.into_iter().zip(rejects)); + } + } + + conflicts } /// sorted by ancestor score from higher to lower diff --git a/tx-pool/src/component/tests/pending.rs b/tx-pool/src/component/tests/pending.rs index f6d32332b7..32ffaa5990 100644 --- a/tx-pool/src/component/tests/pending.rs +++ b/tx-pool/src/component/tests/pending.rs @@ -1,5 +1,5 @@ use crate::component::tests::util::{ - build_tx, build_tx_with_dep, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE, + build_tx, build_tx_with_dep, build_tx_with_header_dep, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE, }; use crate::component::{entry::TxEntry, pending::PendingQueue}; use ckb_types::{h256, packed::Byte32, prelude::*}; @@ -58,41 +58,66 @@ fn test_resolve_conflict() { assert!(queue.add_entry(entry2.clone())); assert!(queue.add_entry(entry3.clone())); - let (input_conflict, deps_consumed) = queue.resolve_conflict(&tx4); - assert!(deps_consumed.is_empty()); + let conflicts = queue.resolve_conflict(&tx4); assert_eq!( - input_conflict - .into_iter() - .map(|i| i.0) - .collect::>(), + conflicts.into_iter().map(|i| i.0).collect::>(), HashSet::from_iter(vec![entry1, entry2]) ); - let (input_conflict, deps_consumed) = queue.resolve_conflict(&tx5); - assert!(input_conflict.is_empty()); + let conflicts = queue.resolve_conflict(&tx5); assert_eq!( - deps_consumed - .into_iter() - .map(|i| i.0) - .collect::>(), + conflicts.into_iter().map(|i| i.0).collect::>(), HashSet::from_iter(vec![entry3]) ); } +#[test] +fn test_resolve_conflict_header_dep() { + let mut queue = PendingQueue::new(); + + let header: Byte32 = h256!("0x1").pack(); + let tx = build_tx_with_header_dep( + vec![(&Byte32::zero(), 1), (&h256!("0x1").pack(), 1)], + vec![header.clone()], + 1, + ); + + let entry = TxEntry::dummy_resolve(tx, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + assert!(queue.add_entry(entry.clone())); + + let mut headers = HashSet::new(); + headers.insert(header); + + let conflicts = queue.resolve_conflict_header_dep(&headers); + assert_eq!( + conflicts.into_iter().map(|i| i.0).collect::>(), + HashSet::from_iter(vec![entry]) + ); +} + #[test] fn test_remove_committed_tx() { let mut queue = PendingQueue::new(); let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&h256!("0x1").pack(), 1)], 1); + let header: Byte32 = h256!("0x1").pack(); + let tx2 = build_tx_with_header_dep(vec![(&h256!("0x2").pack(), 1)], vec![header], 1); + let entry1 = TxEntry::dummy_resolve(tx1.clone(), MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + let entry2 = TxEntry::dummy_resolve(tx2.clone(), MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); assert!(queue.add_entry(entry1.clone())); + assert!(queue.add_entry(entry2.clone())); - let related_dep: Vec<_> = entry1.related_dep_out_points().cloned().collect(); + let related_dep1: Vec<_> = entry1.related_dep_out_points().cloned().collect(); + let related_dep2: Vec<_> = entry2.related_dep_out_points().cloned().collect(); - let removed = queue.remove_committed_tx(&tx1, &related_dep); + let removed = queue.remove_committed_tx(&tx1, &related_dep1); assert_eq!(removed, Some(entry1)); + let removed = queue.remove_committed_tx(&tx2, &related_dep2); + assert_eq!(removed, Some(entry2)); assert!(queue.inner.is_empty()); assert!(queue.deps.is_empty()); assert!(queue.inputs.is_empty()); + assert!(queue.header_deps.is_empty()); } #[test] diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index 527b3c8637..9cee0449ba 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -1,5 +1,6 @@ use crate::component::tests::util::{ - build_tx, DEFAULT_MAX_ANCESTORS_SIZE, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE, + build_tx, build_tx_with_header_dep, DEFAULT_MAX_ANCESTORS_SIZE, MOCK_CYCLES, MOCK_FEE, + MOCK_SIZE, }; use crate::component::{entry::TxEntry, proposed::ProposedPool}; use ckb_types::{ @@ -12,6 +13,8 @@ use ckb_types::{ packed::{Byte32, CellDep, CellInput, CellOutput, OutPoint}, prelude::*, }; +use std::collections::HashSet; +use std::iter::FromIterator; fn dummy_resolve Option>( tx: TransactionView, @@ -471,3 +474,28 @@ fn test_dep_group() { assert_eq!(get_deps_len(&pool, &tx2_out_point), 0); assert_eq!(get_deps_len(&pool, &tx3_out_point), 0); } + +#[test] +fn test_resolve_conflict_header_dep() { + let mut pool = ProposedPool::new(DEFAULT_MAX_ANCESTORS_SIZE); + + let header: Byte32 = h256!("0x1").pack(); + let tx = build_tx_with_header_dep( + vec![(&Byte32::zero(), 1), (&h256!("0x1").pack(), 1)], + vec![header.clone()], + 1, + ); + + let entry = TxEntry::dummy_resolve(tx, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + + assert!(pool.add_entry(entry.clone()).is_ok()); + + let mut headers = HashSet::new(); + headers.insert(header); + + let conflicts = pool.resolve_conflict_header_dep(&headers); + assert_eq!( + conflicts.into_iter().map(|i| i.0).collect::>(), + HashSet::from_iter(vec![entry]) + ); +} diff --git a/tx-pool/src/component/tests/util.rs b/tx-pool/src/component/tests/util.rs index 3476d1a2e0..753d5ebec7 100644 --- a/tx-pool/src/component/tests/util.rs +++ b/tx-pool/src/component/tests/util.rs @@ -50,3 +50,24 @@ pub(crate) fn build_tx_with_dep( .outputs_data((0..outputs_len).map(|_| Bytes::new().pack())) .build() } + +pub(crate) fn build_tx_with_header_dep( + inputs: Vec<(&Byte32, u32)>, + header_deps: Vec, + outputs_len: usize, +) -> TransactionView { + TransactionBuilder::default() + .inputs( + inputs + .into_iter() + .map(|(txid, index)| CellInput::new(OutPoint::new(txid.to_owned(), index), 0)), + ) + .set_header_deps(header_deps) + .outputs((0..outputs_len).map(|i| { + CellOutput::new_builder() + .capacity(Capacity::bytes(i + 1).unwrap().pack()) + .build() + })) + .outputs_data((0..outputs_len).map(|_| Bytes::new().pack())) + .build() +} diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index b92dedeac8..b361322275 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -270,6 +270,7 @@ impl TxPool { &mut self, txs: impl Iterator)>, callbacks: &Callbacks, + detached_headers: &HashSet, ) { for (tx, related_out_points) in txs { self.remove_committed_tx(tx, &related_out_points, callbacks); @@ -277,6 +278,26 @@ impl TxPool { self.committed_txs_hash_cache .put(tx.proposal_short_id(), tx.hash()); } + + if !detached_headers.is_empty() { + self.resolve_conflict_header_dep(detached_headers, callbacks) + } + } + + pub(crate) fn resolve_conflict_header_dep( + &mut self, + detached_headers: &HashSet, + callbacks: &Callbacks, + ) { + for (entry, reject) in self.proposed.resolve_conflict_header_dep(detached_headers) { + callbacks.call_reject(self, &entry, reject); + } + for (entry, reject) in self.gap.resolve_conflict_header_dep(detached_headers) { + callbacks.call_reject(self, &entry, reject); + } + for (entry, reject) in self.pending.resolve_conflict_header_dep(detached_headers) { + callbacks.call_reject(self, &entry, reject); + } } pub(crate) fn remove_committed_tx( @@ -292,13 +313,9 @@ impl TxPool { if let Some(entry) = self.proposed.remove_committed_tx(tx, related_out_points) { callbacks.call_committed(self, &entry) } else { - let (input_conflict, deps_consumed) = self.proposed.resolve_conflict(tx); + let conflicts = self.proposed.resolve_conflict(tx); - for (entry, reject) in input_conflict { - callbacks.call_reject(self, &entry, reject); - } - - for (entry, reject) in deps_consumed { + for (entry, reject) in conflicts { callbacks.call_reject(self, &entry, reject); } } @@ -308,13 +325,9 @@ impl TxPool { callbacks.call_committed(self, &entry) } { - let (input_conflict, deps_consumed) = self.gap.resolve_conflict(tx); + let conflicts = self.gap.resolve_conflict(tx); - for (entry, reject) in input_conflict { - callbacks.call_reject(self, &entry, reject); - } - - for (entry, reject) in deps_consumed { + for (entry, reject) in conflicts { callbacks.call_reject(self, &entry, reject); } } @@ -323,13 +336,9 @@ impl TxPool { callbacks.call_committed(self, &entry) } { - let (input_conflict, deps_consumed) = self.pending.resolve_conflict(tx); - - for (entry, reject) in input_conflict { - callbacks.call_reject(self, &entry, reject); - } + let conflicts = self.pending.resolve_conflict(tx); - for (entry, reject) in deps_consumed { + for (entry, reject) in conflicts { callbacks.call_reject(self, &entry, reject); } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 6e1a850b1b..d5349581e2 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -1016,6 +1016,10 @@ impl TxPoolService { .tip_header() .epoch() .minimum_epoch_number_after_n_blocks(1); + let detached_headers: HashSet = detached_blocks + .iter() + .map(|blk| blk.header().hash()) + .collect(); for blk in detached_blocks { detached.extend(blk.transactions().into_iter().skip(1)) @@ -1059,6 +1063,7 @@ impl TxPoolService { _update_tx_pool_for_reorg( &mut tx_pool, &attached, + &detached_headers, detached_proposal_id, snapshot, &self.callbacks, @@ -1269,6 +1274,7 @@ fn _submit_entry( fn _update_tx_pool_for_reorg( tx_pool: &mut TxPool, attached: &LinkedHashSet, + detached_headers: &HashSet, detached_proposal_id: HashSet, snapshot: Arc, callbacks: &Callbacks, @@ -1290,7 +1296,7 @@ fn _update_tx_pool_for_reorg( // which is both expired and committed at the one time(commit at its end of commit-window), // we should treat it as a committed and not re-put into pending-pool. So we should ensure // that involves `remove_committed_txs` before `remove_expired`. - tx_pool.remove_committed_txs(txs_iter, callbacks); + tx_pool.remove_committed_txs(txs_iter, callbacks, detached_headers); tx_pool.remove_expired(detached_proposal_id.iter()); let mut entries = Vec::new();