Skip to content

Commit

Permalink
Implement dropping nanopay txn in txpool
Browse files Browse the repository at this point in the history
Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Aug 8, 2019
1 parent 4c8f5c4 commit d1ac11d
Showing 1 changed file with 69 additions and 45 deletions.
114 changes: 69 additions & 45 deletions chain/pool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (tp *TxnPool) DropTxns() {
return true
})

tp.NanoPayTxs.Range(func(_, v interface{}) bool {
dropList = append(dropList, v.(*transaction.Transaction))
return true
})

heap.Init((*dropTxnsHeap)(&dropList))

for {
Expand All @@ -121,50 +126,59 @@ func (tp *TxnPool) DropTxns() {

txn := heap.Pop((*dropTxnsHeap)(&dropList)).(*transaction.Transaction)

account, err := txn.GetProgramHashes()
if err != nil {
continue
}

v, ok := tp.TxLists.Load(account[0])
if !ok {
continue
}

list, ok := v.(*NonceSortedTxs)
if !ok {
continue
}

_, ok, err = list.Drop(txn.Hash())
if err != nil {
continue
}

if ok {
tp.deleteTransactionFromMap(txn)
txnsDropped = append(txnsDropped, txn)
switch txn.UnsignedTx.Payload.Type {
case pb.NANO_PAY_TYPE:
if _, ok := tp.NanoPayTxs.Load(txn.Hash()); !ok {
continue
}
tp.NanoPayTxs.Delete(txn.Hash())
default:
account, err := txn.GetProgramHashes()
if err != nil {
continue
}

atomic.AddInt32(&tp.txnCount, -1)
atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize()))
currentTxnCount--
currentTxnSize -= int64(txn.GetSize())
v, ok := tp.TxLists.Load(account[0])
if !ok {
continue
}

if !isTxPoolFull(currentTxnCount, currentTxnSize) {
break
list, ok := v.(*NonceSortedTxs)
if !ok {
continue
}
}

if list.Len() > 0 {
nonce, err := list.GetLatestNonce()
_, dropped, err := list.Drop(txn.Hash())
if err != nil {
continue
}
nextTxn, err := list.Get(nonce)
if err != nil {

if list.Len() > 0 {
nonce, err := list.GetLatestNonce()
if err != nil {
continue
}
nextTxn, err := list.Get(nonce)
if err != nil {
continue
}
heap.Push((*dropTxnsHeap)(&dropList), nextTxn)
}

if !dropped {
continue
}
heap.Push((*dropTxnsHeap)(&dropList), nextTxn)
}

txnsDropped = append(txnsDropped, txn)
tp.deleteTransactionFromMap(txn)
atomic.AddInt32(&tp.txnCount, -1)
atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize()))
currentTxnCount--
currentTxnSize -= int64(txn.GetSize())

if !isTxPoolFull(currentTxnCount, currentTxnSize) {
break
}
}

Expand Down Expand Up @@ -287,6 +301,8 @@ func (tp *TxnPool) processTx(txn *transaction.Transaction) error {
}

tp.deleteTransactionFromMap(oldTxn)
atomic.AddInt32(&tp.txnCount, -1)
atomic.AddInt64(&tp.txnSize, -int64(oldTxn.GetSize()))
} else if list.Full() {
return errors.New("txpool per account list is full")
} else {
Expand Down Expand Up @@ -432,6 +448,7 @@ func (tp *TxnPool) GetAllTransactions() []*transaction.Transaction {
}
return true
})

tp.NanoPayTxs.Range(func(k, v interface{}) bool {
txs = append(txs, v.(*transaction.Transaction))
return true
Expand All @@ -455,6 +472,7 @@ func (tp *TxnPool) GetAllTransactionLists() map[common.Uint160][]*transaction.Tr

return true
})

tp.NanoPayTxs.Range(func(k, v interface{}) bool {
tx := v.(*transaction.Transaction)
addr, _ := common.ToCodeHash(tx.Programs[0].Code)
Expand All @@ -470,17 +488,20 @@ func (tp *TxnPool) GetAllTransactionLists() map[common.Uint160][]*transaction.Tr
}

func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) error {
txnsInPool := make([]*transaction.Transaction, 0)
txnsRemoved := make([]*transaction.Transaction, 0)

// clean submitted txs
for _, txn := range txns {
txnsToRemove := make([]*transaction.Transaction, 0)

switch txn.UnsignedTx.Payload.Type {
case pb.COINBASE_TYPE:
continue
case pb.SIG_CHAIN_TXN_TYPE:
continue
case pb.NANO_PAY_TYPE:
tp.NanoPayTxs.Delete(txn.Hash())
txnsToRemove = append(txnsToRemove, txn)
default:
sender, _ := common.ToCodeHash(txn.Programs[0].Code)
txNonce := txn.UnsignedTx.Nonce
Expand All @@ -490,32 +511,35 @@ func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) e
if _, err := list.Get(txNonce); err == nil {
nonce := list.getNonce(list.idx[0])
for i := 0; uint64(i) <= txNonce-nonce; i++ {
list.Pop()
atomic.AddInt32(&tp.txnCount, -1)
atomic.AddInt64(&tp.txnSize, -int64(txn.GetSize()))
t, err := list.Pop()
if err == nil {
txnsToRemove = append(txnsToRemove, t)
}
}
}
}
}
}

for _, t := range txnsToRemove {
txnsRemoved = append(txnsRemoved, t)
tp.deleteTransactionFromMap(t)
atomic.AddInt32(&tp.txnCount, -1)
atomic.AddInt64(&tp.txnSize, -int64(t.GetSize()))
}

tp.TxLists.Range(func(k, v interface{}) bool {
listLen := v.(*NonceSortedTxs).Len()
if listLen == 0 {
tp.TxLists.Delete(k)
}
return true
})

if _, ok := tp.TxMap.Load(txn.Hash()); ok {
txnsInPool = append(txnsInPool, txn)
}
tp.deleteTransactionFromMap(txn)
}

tp.blockValidationState.Lock()
defer tp.blockValidationState.Unlock()
return tp.CleanBlockValidationState(txnsInPool)
return tp.CleanBlockValidationState(txnsRemoved)
}

func (tp *TxnPool) addTransactionToMap(txn *transaction.Transaction) {
Expand Down

0 comments on commit d1ac11d

Please sign in to comment.