Skip to content

Commit

Permalink
optimize the verify block method: reduce the lock count of the transa…
Browse files Browse the repository at this point in the history
…ction pool (DNAProject#6)

Signed-off-by: [email protected] <[email protected]>
  • Loading branch information
lightshine001 authored and laizy committed Apr 18, 2018
1 parent 4083876 commit 7caa195
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 124 deletions.
43 changes: 43 additions & 0 deletions txnpool/common/transaction_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,46 @@ func (tp *TXPool) GetTransactionCount() int {
defer tp.RUnlock()
return len(tp.txList)
}

// GetUnverifiedTxs checks the tx list in the block from consensus,
// and returns verified tx list, unverified tx list, and
// the tx list to be re-verified
func (tp *TXPool) GetUnverifiedTxs(txs []*types.Transaction,
height uint32) *CheckBlkResult {
tp.Lock()
defer tp.Unlock()
res := &CheckBlkResult{
VerifiedTxs: make([]*VerifyTxResult, 0, len(txs)),
UnverifiedTxs: make([]*types.Transaction, 0),
OldTxs: make([]*types.Transaction, 0),
}
for _, tx := range txs {
txEntry := tp.txList[tx.Hash()]
if txEntry == nil {
res.UnverifiedTxs = append(res.UnverifiedTxs,
tx)
continue
}

if !tp.compareTxHeight(txEntry, height) {
delete(tp.txList, tx.Hash())
res.OldTxs = append(res.OldTxs, txEntry.Tx)
continue
}

for _, v := range txEntry.Attrs {
if v.Type == vt.Statefull {
entry := &VerifyTxResult{
Tx: tx,
Height: v.Height,
ErrCode: v.ErrCode,
}
res.VerifiedTxs = append(res.VerifiedTxs,
entry)
break
}
}
}

return res
}
16 changes: 12 additions & 4 deletions txnpool/common/transaction_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ package common
import (
"bytes"
"encoding/hex"
"fmt"
"testing"
"time"

"github.com/ontio/ontology/common"
"github.com/ontio/ontology/common/log"
"github.com/ontio/ontology/core/payload"
"github.com/ontio/ontology/core/types"
"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -78,9 +78,13 @@ func TestTxPool(t *testing.T) {
return
}

txList := txPool.GetTxPool(true)
txList, oldTxList := txPool.GetTxPool(true, 0)
for _, v := range txList {
fmt.Println(v)
assert.NotNil(t, v)
}

for _, v := range oldTxList {
assert.NotNil(t, v)
}

entry := txPool.GetTransaction(txn.Hash())
Expand All @@ -89,14 +93,18 @@ func TestTxPool(t *testing.T) {
return
}

assert.Equal(t, txn.Hash(), entry.Hash())

status := txPool.GetTxStatus(txn.Hash())
if status == nil {
t.Error("failed to get the status")
return
}

assert.Equal(t, txn.Hash(), status.Hash)

count := txPool.GetTransactionCount()
fmt.Println(count)
assert.Equal(t, count, 1)

err := txPool.CleanTransactionList([]*types.Transaction{txn})
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions txnpool/common/txnpool_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ const (
MaxStats
)

// CheckBlkResult contains a verifed tx list,
// an unverified tx list and an old tx list
// to be re-verifed
type CheckBlkResult struct {
VerifiedTxs []*VerifyTxResult
UnverifiedTxs []*types.Transaction
OldTxs []*types.Transaction
}

// TxStatus contains the attributes of a transaction
type TxStatus struct {
Hash common.Uint256 // transaction hash
Expand Down
9 changes: 7 additions & 2 deletions txnpool/proc/txnpool_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,13 @@ func (ta *TxActor) Receive(context actor.Context) {

res := ta.server.getTxStatusReq(msg.Hash)
if sender != nil {
sender.Request(&tc.GetTxnStatusRsp{Hash: res.Hash,
TxStatus: res.Attrs}, context.Self())
if res == nil {
sender.Request(&tc.GetTxnStatusRsp{Hash: msg.Hash,
TxStatus: nil}, context.Self())
} else {
sender.Request(&tc.GetTxnStatusRsp{Hash: res.Hash,
TxStatus: res.Attrs}, context.Self())
}
}

default:
Expand Down
67 changes: 42 additions & 25 deletions txnpool/proc/txnpool_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
package proc

import (
"fmt"
"testing"
"time"

"github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/errors"
"github.com/ontio/ontology/events/message"
tc "github.com/ontio/ontology/txnpool/common"
vt "github.com/ontio/ontology/validator/types"
"github.com/stretchr/testify/assert"
)

func TestTxActor(t *testing.T) {
fmt.Println("Starting tx actor test")
t.Log("Starting tx actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM)
if s == nil {
t.Error("Test case: new tx pool server failed")
Expand All @@ -45,26 +46,30 @@ func TestTxActor(t *testing.T) {
return
}

txReq := &txReq{
txReq := &tc.TxReq{
Tx: txn,
Sender: tc.NilSender,
}
txPid.RequestFuture(txn)
txPid.Tell(txReq)

time.Sleep(1 * time.Second)

future := txPid.RequestFuture(&tc.GetTxnReq{Hash: txn.Hash()}, 1*time.Second)
result, err := future.Result()
fmt.Println(result, err)
assert.Nil(t, err)
rsp := (result).(*tc.GetTxnRsp)
assert.Nil(t, rsp.Txn)

future = txPid.RequestFuture(&tc.GetTxnStats{}, 2*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)
future = txPid.RequestFuture(&tc.CheckTxnReq{Hash: txn.Hash()}, 1*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

future = txPid.RequestFuture(&tc.GetTxnStatusReq{Hash: txn.Hash()}, 1*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

// Given the tx in the pool, test again
txEntry := &tc.TXEntry{
Expand All @@ -73,32 +78,29 @@ func TestTxActor(t *testing.T) {
Fee: txn.GetTotalFee(),
}
s.addTxList(txEntry)
future = txPid.RequestFuture(txn, 5*time.Second)
result, err = future.Result()
fmt.Println(result, err)

future = txPid.RequestFuture(&tc.GetTxnReq{Hash: txn.Hash()}, 1*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

future = txPid.RequestFuture(&tc.GetTxnStats{}, 2*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)
future = txPid.RequestFuture(&tc.CheckTxnReq{Hash: txn.Hash()}, 1*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

future = txPid.RequestFuture(&tc.GetTxnStatusReq{Hash: txn.Hash()}, 1*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

txPid.Tell("test")
s.Stop()
fmt.Println("Ending tx actor test")
t.Log("Ending tx actor test")
}

func TestTxPoolActor(t *testing.T) {
fmt.Println("Starting tx pool actor test")
t.Log("Starting tx pool actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM)
if s == nil {
t.Error("Test case: new tx pool server failed")
Expand All @@ -113,33 +115,47 @@ func TestTxPoolActor(t *testing.T) {
return
}

txPoolPid.Tell(txn)
txEntry := &tc.TXEntry{
Tx: txn,
Attrs: []*tc.TXAttr{},
Fee: txn.GetTotalFee(),
}

retAttr := &tc.TXAttr{
Height: 0,
Type: vt.Statefull,
ErrCode: errors.ErrNoError,
}
txEntry.Attrs = append(txEntry.Attrs, retAttr)
s.addTxList(txEntry)

future := txPoolPid.RequestFuture(&tc.GetTxnPoolReq{ByCount: false}, 2*time.Second)
result, err := future.Result()
fmt.Println(result, err)
assert.Nil(t, err)
rsp := (result).(*tc.GetTxnPoolRsp)
assert.NotNil(t, rsp.TxnPool)

future = txPoolPid.RequestFuture(&tc.GetPendingTxnReq{ByCount: false}, 2*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

bk := &tc.VerifyBlockReq{
Height: 1,
Height: 0,
Txs: []*types.Transaction{txn},
}
future = txPoolPid.RequestFuture(bk, 10*time.Second)
result, err = future.Result()
fmt.Println(result, err)
assert.Nil(t, err)

sbc := &message.SaveBlockCompleteMsg{}
txPoolPid.Tell(sbc)

s.Stop()
fmt.Println("Ending tx pool actor test")
t.Log("Ending tx pool actor test")
}

func TestVerifyRspActor(t *testing.T) {
fmt.Println("Starting validator response actor test")
t.Log("Starting validator response actor test")
s := NewTxPoolServer(tc.MAX_WORKER_NUM)
if s == nil {
t.Error("Test case: new tx pool server failed")
Expand All @@ -165,6 +181,7 @@ func TestVerifyRspActor(t *testing.T) {
rsp := &vt.CheckResponse{}
validatorPid.Tell(rsp)

time.Sleep(1 * time.Second)
s.Stop()
fmt.Println("Ending validator response actor test")
t.Log("Ending validator response actor test")
}
45 changes: 12 additions & 33 deletions txnpool/proc/txnpool_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,41 +569,20 @@ func (s *TXPoolServer) verifyBlock(req *tc.VerifyBlockReq, sender *actor.PID) {
s.pendingBlock.sender = sender
s.pendingBlock.height = req.Height

for _, t := range req.Txs {
/* Check if the tx is in the tx pool, if not, send it to
* valdiator to verify and add it to pending block list
*/
ret := s.txPool.GetTxStatus(t.Hash())
if ret == nil {
s.assignTxToWorker(t, tc.NilSender)
s.pendingBlock.unProcessedTxs[t.Hash()] = t
continue
}
checkBlkResult := s.txPool.GetUnverifiedTxs(req.Txs, req.Height)

/* Check the verified height >= the block height, if yes,
* add it to the response list.
*/
ok := false
for _, v := range ret.Attrs {
if v.Type == types.Statefull &&
v.Height >= req.Height {
entry := &tc.VerifyTxResult{
Tx: t,
Height: v.Height,
ErrCode: v.ErrCode,
}
s.pendingBlock.processedTxs[t.Hash()] = entry
ok = true
break
}
}
for _, t := range checkBlkResult.UnverifiedTxs {
s.assignTxToWorker(t, tc.NilSender)
s.pendingBlock.unProcessedTxs[t.Hash()] = t
}

// Re-verify it
if !ok {
s.delTransaction(t)
s.reVerifyStateful(t, tc.NilSender)
s.pendingBlock.unProcessedTxs[t.Hash()] = t
}
for _, t := range checkBlkResult.OldTxs {
s.reVerifyStateful(t, tc.NilSender)
s.pendingBlock.unProcessedTxs[t.Hash()] = t
}

for _, t := range checkBlkResult.VerifiedTxs {
s.pendingBlock.processedTxs[t.Tx.Hash()] = t
}

/* If all the txs in the blocks are verified, send response
Expand Down
Loading

0 comments on commit 7caa195

Please sign in to comment.