From d1ac11d9c2e58254e478a4ba7bf41c9e0fbc59b4 Mon Sep 17 00:00:00 2001 From: Yilun Date: Wed, 7 Aug 2019 16:26:59 -0700 Subject: [PATCH] Implement dropping nanopay txn in txpool Signed-off-by: Yilun --- chain/pool/txpool.go | 114 ++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/chain/pool/txpool.go b/chain/pool/txpool.go index c9f529f40..e0adedcfd 100644 --- a/chain/pool/txpool.go +++ b/chain/pool/txpool.go @@ -112,6 +112,11 @@ func (tp *TxnPool) DropTxns() { return true }) + tp.NanoPayTxs.Range(func(_, v interface{}) bool { + dropList = append(dropList, v.(*transaction.Transaction)) + return true + }) + heap.Init((*dropTxnsHeap)(&dropList)) for { @@ -121,50 +126,59 @@ func (tp *TxnPool) DropTxns() { txn := heap.Pop((*dropTxnsHeap)(&dropList)).(*transaction.Transaction) - account, err := txn.GetProgramHashes() - if err != nil { - continue - } - - v, ok := tp.TxLists.Load(account[0]) - if !ok { - continue - } - - list, ok := v.(*NonceSortedTxs) - if !ok { - continue - } - - _, ok, err = list.Drop(txn.Hash()) - if err != nil { - continue - } - - if ok { - tp.deleteTransactionFromMap(txn) - txnsDropped = append(txnsDropped, txn) + switch txn.UnsignedTx.Payload.Type { + case pb.NANO_PAY_TYPE: + if _, ok := tp.NanoPayTxs.Load(txn.Hash()); !ok { + continue + } + tp.NanoPayTxs.Delete(txn.Hash()) + default: + account, err := txn.GetProgramHashes() + if err != nil { + continue + } - atomic.AddInt32(&tp.txnCount, -1) - atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize())) - currentTxnCount-- - currentTxnSize -= int64(txn.GetSize()) + v, ok := tp.TxLists.Load(account[0]) + if !ok { + continue + } - if !isTxPoolFull(currentTxnCount, currentTxnSize) { - break + list, ok := v.(*NonceSortedTxs) + if !ok { + continue } - } - if list.Len() > 0 { - nonce, err := list.GetLatestNonce() + _, dropped, err := list.Drop(txn.Hash()) if err != nil { continue } - nextTxn, err := list.Get(nonce) - if err != nil { + + if list.Len() > 0 { + nonce, err := list.GetLatestNonce() + if err != nil { + continue + } + nextTxn, err := list.Get(nonce) + if err != nil { + continue + } + heap.Push((*dropTxnsHeap)(&dropList), nextTxn) + } + + if !dropped { continue } - heap.Push((*dropTxnsHeap)(&dropList), nextTxn) + } + + txnsDropped = append(txnsDropped, txn) + tp.deleteTransactionFromMap(txn) + atomic.AddInt32(&tp.txnCount, -1) + atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize())) + currentTxnCount-- + currentTxnSize -= int64(txn.GetSize()) + + if !isTxPoolFull(currentTxnCount, currentTxnSize) { + break } } @@ -287,6 +301,8 @@ func (tp *TxnPool) processTx(txn *transaction.Transaction) error { } tp.deleteTransactionFromMap(oldTxn) + atomic.AddInt32(&tp.txnCount, -1) + atomic.AddInt64(&tp.txnSize, -int64(oldTxn.GetSize())) } else if list.Full() { return errors.New("txpool per account list is full") } else { @@ -432,6 +448,7 @@ func (tp *TxnPool) GetAllTransactions() []*transaction.Transaction { } return true }) + tp.NanoPayTxs.Range(func(k, v interface{}) bool { txs = append(txs, v.(*transaction.Transaction)) return true @@ -455,6 +472,7 @@ func (tp *TxnPool) GetAllTransactionLists() map[common.Uint160][]*transaction.Tr return true }) + tp.NanoPayTxs.Range(func(k, v interface{}) bool { tx := v.(*transaction.Transaction) addr, _ := common.ToCodeHash(tx.Programs[0].Code) @@ -470,10 +488,12 @@ func (tp *TxnPool) GetAllTransactionLists() map[common.Uint160][]*transaction.Tr } func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) error { - txnsInPool := make([]*transaction.Transaction, 0) + txnsRemoved := make([]*transaction.Transaction, 0) // clean submitted txs for _, txn := range txns { + txnsToRemove := make([]*transaction.Transaction, 0) + switch txn.UnsignedTx.Payload.Type { case pb.COINBASE_TYPE: continue @@ -481,6 +501,7 @@ func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) e continue case pb.NANO_PAY_TYPE: tp.NanoPayTxs.Delete(txn.Hash()) + txnsToRemove = append(txnsToRemove, txn) default: sender, _ := common.ToCodeHash(txn.Programs[0].Code) txNonce := txn.UnsignedTx.Nonce @@ -490,15 +511,23 @@ func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) e if _, err := list.Get(txNonce); err == nil { nonce := list.getNonce(list.idx[0]) for i := 0; uint64(i) <= txNonce-nonce; i++ { - list.Pop() - atomic.AddInt32(&tp.txnCount, -1) - atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize())) + t, err := list.Pop() + if err == nil { + txnsToRemove = append(txnsToRemove, t) + } } } } } } + for _, t := range txnsToRemove { + txnsRemoved = append(txnsRemoved, t) + tp.deleteTransactionFromMap(t) + atomic.AddInt32(&tp.txnCount, -1) + atomic.AddInt64(&tp.txnSize, -int64(t.GetSize())) + } + tp.TxLists.Range(func(k, v interface{}) bool { listLen := v.(*NonceSortedTxs).Len() if listLen == 0 { @@ -506,16 +535,11 @@ func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) e } return true }) - - if _, ok := tp.TxMap.Load(txn.Hash()); ok { - txnsInPool = append(txnsInPool, txn) - } - tp.deleteTransactionFromMap(txn) } tp.blockValidationState.Lock() defer tp.blockValidationState.Unlock() - return tp.CleanBlockValidationState(txnsInPool) + return tp.CleanBlockValidationState(txnsRemoved) } func (tp *TxnPool) addTransactionToMap(txn *transaction.Transaction) {