From 7caa1953058103b268343fbcadeb4ab14bfe6769 Mon Sep 17 00:00:00 2001 From: xigua Date: Wed, 18 Apr 2018 18:13:16 +0800 Subject: [PATCH] optimize the verify block method: reduce the lock count of the transaction pool (#6) Signed-off-by: liupan@onchain.com --- txnpool/common/transaction_pool.go | 43 ++++++++++++++++ txnpool/common/transaction_pool_test.go | 16 ++++-- txnpool/common/txnpool_common.go | 9 ++++ txnpool/proc/txnpool_actor.go | 9 +++- txnpool/proc/txnpool_actor_test.go | 67 ++++++++++++++++--------- txnpool/proc/txnpool_server.go | 45 +++++------------ txnpool/proc/txnpool_server_test.go | 47 ++++++++--------- txnpool/proc/txnpool_worker_test.go | 16 +++--- txnpool/test/txnpool_test.go | 53 +++++++++---------- 9 files changed, 181 insertions(+), 124 deletions(-) diff --git a/txnpool/common/transaction_pool.go b/txnpool/common/transaction_pool.go index 2a7fa057..ea8587b0 100644 --- a/txnpool/common/transaction_pool.go +++ b/txnpool/common/transaction_pool.go @@ -193,3 +193,46 @@ func (tp *TXPool) GetTransactionCount() int { defer tp.RUnlock() return len(tp.txList) } + +// GetUnverifiedTxs checks the tx list in the block from consensus, +// and returns verified tx list, unverified tx list, and +// the tx list to be re-verified +func (tp *TXPool) GetUnverifiedTxs(txs []*types.Transaction, + height uint32) *CheckBlkResult { + tp.Lock() + defer tp.Unlock() + res := &CheckBlkResult{ + VerifiedTxs: make([]*VerifyTxResult, 0, len(txs)), + UnverifiedTxs: make([]*types.Transaction, 0), + OldTxs: make([]*types.Transaction, 0), + } + for _, tx := range txs { + txEntry := tp.txList[tx.Hash()] + if txEntry == nil { + res.UnverifiedTxs = append(res.UnverifiedTxs, + tx) + continue + } + + if !tp.compareTxHeight(txEntry, height) { + delete(tp.txList, tx.Hash()) + res.OldTxs = append(res.OldTxs, txEntry.Tx) + continue + } + + for _, v := range txEntry.Attrs { + if v.Type == vt.Statefull { + entry := &VerifyTxResult{ + Tx: tx, + Height: v.Height, + ErrCode: v.ErrCode, + } + res.VerifiedTxs = append(res.VerifiedTxs, + entry) + break + } + } + } + + return res +} diff --git a/txnpool/common/transaction_pool_test.go b/txnpool/common/transaction_pool_test.go index cc18817e..3e6a2df9 100644 --- a/txnpool/common/transaction_pool_test.go +++ b/txnpool/common/transaction_pool_test.go @@ -21,7 +21,6 @@ package common import ( "bytes" "encoding/hex" - "fmt" "testing" "time" @@ -29,6 +28,7 @@ import ( "github.com/ontio/ontology/common/log" "github.com/ontio/ontology/core/payload" "github.com/ontio/ontology/core/types" + "github.com/stretchr/testify/assert" ) var ( @@ -78,9 +78,13 @@ func TestTxPool(t *testing.T) { return } - txList := txPool.GetTxPool(true) + txList, oldTxList := txPool.GetTxPool(true, 0) for _, v := range txList { - fmt.Println(v) + assert.NotNil(t, v) + } + + for _, v := range oldTxList { + assert.NotNil(t, v) } entry := txPool.GetTransaction(txn.Hash()) @@ -89,14 +93,18 @@ func TestTxPool(t *testing.T) { return } + assert.Equal(t, txn.Hash(), entry.Hash()) + status := txPool.GetTxStatus(txn.Hash()) if status == nil { t.Error("failed to get the status") return } + assert.Equal(t, txn.Hash(), status.Hash) + count := txPool.GetTransactionCount() - fmt.Println(count) + assert.Equal(t, count, 1) err := txPool.CleanTransactionList([]*types.Transaction{txn}) if err != nil { diff --git a/txnpool/common/txnpool_common.go b/txnpool/common/txnpool_common.go index a9110e89..012d56c4 100644 --- a/txnpool/common/txnpool_common.go +++ b/txnpool/common/txnpool_common.go @@ -86,6 +86,15 @@ const ( MaxStats ) +// CheckBlkResult contains a verifed tx list, +// an unverified tx list and an old tx list +// to be re-verifed +type CheckBlkResult struct { + VerifiedTxs []*VerifyTxResult + UnverifiedTxs []*types.Transaction + OldTxs []*types.Transaction +} + // TxStatus contains the attributes of a transaction type TxStatus struct { Hash common.Uint256 // transaction hash diff --git a/txnpool/proc/txnpool_actor.go b/txnpool/proc/txnpool_actor.go index 5d90eb24..b0901bcd 100644 --- a/txnpool/proc/txnpool_actor.go +++ b/txnpool/proc/txnpool_actor.go @@ -137,8 +137,13 @@ func (ta *TxActor) Receive(context actor.Context) { res := ta.server.getTxStatusReq(msg.Hash) if sender != nil { - sender.Request(&tc.GetTxnStatusRsp{Hash: res.Hash, - TxStatus: res.Attrs}, context.Self()) + if res == nil { + sender.Request(&tc.GetTxnStatusRsp{Hash: msg.Hash, + TxStatus: nil}, context.Self()) + } else { + sender.Request(&tc.GetTxnStatusRsp{Hash: res.Hash, + TxStatus: res.Attrs}, context.Self()) + } } default: diff --git a/txnpool/proc/txnpool_actor_test.go b/txnpool/proc/txnpool_actor_test.go index 57b4ddd6..8befe7f7 100644 --- a/txnpool/proc/txnpool_actor_test.go +++ b/txnpool/proc/txnpool_actor_test.go @@ -19,18 +19,19 @@ package proc import ( - "fmt" "testing" "time" "github.com/ontio/ontology/core/types" + "github.com/ontio/ontology/errors" "github.com/ontio/ontology/events/message" tc "github.com/ontio/ontology/txnpool/common" vt "github.com/ontio/ontology/validator/types" + "github.com/stretchr/testify/assert" ) func TestTxActor(t *testing.T) { - fmt.Println("Starting tx actor test") + t.Log("Starting tx actor test") s := NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { t.Error("Test case: new tx pool server failed") @@ -45,26 +46,30 @@ func TestTxActor(t *testing.T) { return } - txReq := &txReq{ + txReq := &tc.TxReq{ Tx: txn, Sender: tc.NilSender, } - txPid.RequestFuture(txn) + txPid.Tell(txReq) + + time.Sleep(1 * time.Second) future := txPid.RequestFuture(&tc.GetTxnReq{Hash: txn.Hash()}, 1*time.Second) result, err := future.Result() - fmt.Println(result, err) + assert.Nil(t, err) + rsp := (result).(*tc.GetTxnRsp) + assert.Nil(t, rsp.Txn) future = txPid.RequestFuture(&tc.GetTxnStats{}, 2*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) future = txPid.RequestFuture(&tc.CheckTxnReq{Hash: txn.Hash()}, 1*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) future = txPid.RequestFuture(&tc.GetTxnStatusReq{Hash: txn.Hash()}, 1*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) // Given the tx in the pool, test again txEntry := &tc.TXEntry{ @@ -73,32 +78,29 @@ func TestTxActor(t *testing.T) { Fee: txn.GetTotalFee(), } s.addTxList(txEntry) - future = txPid.RequestFuture(txn, 5*time.Second) - result, err = future.Result() - fmt.Println(result, err) future = txPid.RequestFuture(&tc.GetTxnReq{Hash: txn.Hash()}, 1*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) future = txPid.RequestFuture(&tc.GetTxnStats{}, 2*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) future = txPid.RequestFuture(&tc.CheckTxnReq{Hash: txn.Hash()}, 1*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) future = txPid.RequestFuture(&tc.GetTxnStatusReq{Hash: txn.Hash()}, 1*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) txPid.Tell("test") s.Stop() - fmt.Println("Ending tx actor test") + t.Log("Ending tx actor test") } func TestTxPoolActor(t *testing.T) { - fmt.Println("Starting tx pool actor test") + t.Log("Starting tx pool actor test") s := NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { t.Error("Test case: new tx pool server failed") @@ -113,33 +115,47 @@ func TestTxPoolActor(t *testing.T) { return } - txPoolPid.Tell(txn) + txEntry := &tc.TXEntry{ + Tx: txn, + Attrs: []*tc.TXAttr{}, + Fee: txn.GetTotalFee(), + } + + retAttr := &tc.TXAttr{ + Height: 0, + Type: vt.Statefull, + ErrCode: errors.ErrNoError, + } + txEntry.Attrs = append(txEntry.Attrs, retAttr) + s.addTxList(txEntry) future := txPoolPid.RequestFuture(&tc.GetTxnPoolReq{ByCount: false}, 2*time.Second) result, err := future.Result() - fmt.Println(result, err) + assert.Nil(t, err) + rsp := (result).(*tc.GetTxnPoolRsp) + assert.NotNil(t, rsp.TxnPool) future = txPoolPid.RequestFuture(&tc.GetPendingTxnReq{ByCount: false}, 2*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) bk := &tc.VerifyBlockReq{ - Height: 1, + Height: 0, Txs: []*types.Transaction{txn}, } future = txPoolPid.RequestFuture(bk, 10*time.Second) result, err = future.Result() - fmt.Println(result, err) + assert.Nil(t, err) sbc := &message.SaveBlockCompleteMsg{} txPoolPid.Tell(sbc) s.Stop() - fmt.Println("Ending tx pool actor test") + t.Log("Ending tx pool actor test") } func TestVerifyRspActor(t *testing.T) { - fmt.Println("Starting validator response actor test") + t.Log("Starting validator response actor test") s := NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { t.Error("Test case: new tx pool server failed") @@ -165,6 +181,7 @@ func TestVerifyRspActor(t *testing.T) { rsp := &vt.CheckResponse{} validatorPid.Tell(rsp) + time.Sleep(1 * time.Second) s.Stop() - fmt.Println("Ending validator response actor test") + t.Log("Ending validator response actor test") } diff --git a/txnpool/proc/txnpool_server.go b/txnpool/proc/txnpool_server.go index 993bf9d9..17310e8f 100644 --- a/txnpool/proc/txnpool_server.go +++ b/txnpool/proc/txnpool_server.go @@ -569,41 +569,20 @@ func (s *TXPoolServer) verifyBlock(req *tc.VerifyBlockReq, sender *actor.PID) { s.pendingBlock.sender = sender s.pendingBlock.height = req.Height - for _, t := range req.Txs { - /* Check if the tx is in the tx pool, if not, send it to - * valdiator to verify and add it to pending block list - */ - ret := s.txPool.GetTxStatus(t.Hash()) - if ret == nil { - s.assignTxToWorker(t, tc.NilSender) - s.pendingBlock.unProcessedTxs[t.Hash()] = t - continue - } + checkBlkResult := s.txPool.GetUnverifiedTxs(req.Txs, req.Height) - /* Check the verified height >= the block height, if yes, - * add it to the response list. - */ - ok := false - for _, v := range ret.Attrs { - if v.Type == types.Statefull && - v.Height >= req.Height { - entry := &tc.VerifyTxResult{ - Tx: t, - Height: v.Height, - ErrCode: v.ErrCode, - } - s.pendingBlock.processedTxs[t.Hash()] = entry - ok = true - break - } - } + for _, t := range checkBlkResult.UnverifiedTxs { + s.assignTxToWorker(t, tc.NilSender) + s.pendingBlock.unProcessedTxs[t.Hash()] = t + } - // Re-verify it - if !ok { - s.delTransaction(t) - s.reVerifyStateful(t, tc.NilSender) - s.pendingBlock.unProcessedTxs[t.Hash()] = t - } + for _, t := range checkBlkResult.OldTxs { + s.reVerifyStateful(t, tc.NilSender) + s.pendingBlock.unProcessedTxs[t.Hash()] = t + } + + for _, t := range checkBlkResult.VerifiedTxs { + s.pendingBlock.processedTxs[t.Tx.Hash()] = t } /* If all the txs in the blocks are verified, send response diff --git a/txnpool/proc/txnpool_server_test.go b/txnpool/proc/txnpool_server_test.go index 768a5eb0..f9088382 100644 --- a/txnpool/proc/txnpool_server_test.go +++ b/txnpool/proc/txnpool_server_test.go @@ -21,10 +21,10 @@ package proc import ( "bytes" "encoding/hex" - "fmt" "testing" "time" + "github.com/ontio/ontology-eventbus/actor" "github.com/ontio/ontology/common" "github.com/ontio/ontology/common/log" "github.com/ontio/ontology/core/payload" @@ -33,7 +33,7 @@ import ( tc "github.com/ontio/ontology/txnpool/common" "github.com/ontio/ontology/validator/stateless" vt "github.com/ontio/ontology/validator/types" - "github.com/ontio/ontology-eventbus/actor" + "github.com/stretchr/testify/assert" ) var ( @@ -72,15 +72,12 @@ func startActor(obj interface{}) *actor.PID { }) pid := actor.Spawn(props) - if pid == nil { - fmt.Println("Fail to start actor") - return nil - } + return pid } func TestTxn(t *testing.T) { - fmt.Println("Starting test tx") + t.Log("Starting test tx") var s *TXPoolServer s = NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { @@ -90,18 +87,18 @@ func TestTxn(t *testing.T) { defer s.Stop() // Case 1: Send nil txn to the server, server should reject it - s.assginTXN2Worker(nil, sender) + s.assignTxToWorker(nil, sender) /* Case 2: send non-nil txn to the server, server should assign * it to the worker */ - s.assginTXN2Worker(txn, sender) + s.assignTxToWorker(txn, sender) /* Case 3: Duplicate input the tx, server should reject the second * one */ time.Sleep(10 * time.Second) - s.assginTXN2Worker(txn, sender) - s.assginTXN2Worker(txn, sender) + s.assignTxToWorker(txn, sender) + s.assignTxToWorker(txn, sender) /* Case 4: Given the tx is in the tx pool, server can get the tx * with the invalid hash @@ -126,11 +123,11 @@ func TestTxn(t *testing.T) { return } - fmt.Println("Ending test tx") + t.Log("Ending test tx") } func TestAssignRsp2Worker(t *testing.T) { - fmt.Println("Starting assign response to the worker testing") + t.Log("Starting assign response to the worker testing") var s *TXPoolServer s = NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { @@ -140,7 +137,7 @@ func TestAssignRsp2Worker(t *testing.T) { defer s.Stop() - s.assignRsp2Worker(nil) + s.assignRspToWorker(nil) statelessRsp := &vt.CheckResponse{ WorkerId: 0, @@ -157,8 +154,8 @@ func TestAssignRsp2Worker(t *testing.T) { Type: vt.Statefull, Height: 0, } - s.assignRsp2Worker(statelessRsp) - s.assignRsp2Worker(statefulRsp) + s.assignRspToWorker(statelessRsp) + s.assignRspToWorker(statefulRsp) statelessRsp = &vt.CheckResponse{ WorkerId: 0, @@ -167,13 +164,13 @@ func TestAssignRsp2Worker(t *testing.T) { Type: vt.Stateless, Height: 0, } - s.assignRsp2Worker(statelessRsp) + s.assignRspToWorker(statelessRsp) - fmt.Println("Ending assign response to the worker testing") + t.Log("Ending assign response to the worker testing") } func TestActor(t *testing.T) { - fmt.Println("Starting actor testing") + t.Log("Starting actor testing") var s *TXPoolServer s = NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { @@ -237,11 +234,11 @@ func TestActor(t *testing.T) { return } - fmt.Println("Ending actor testing") + t.Log("Ending actor testing") } func TestValidator(t *testing.T) { - fmt.Println("Starting validator testing") + t.Log("Starting validator testing") var s *TXPoolServer s = NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { @@ -277,12 +274,12 @@ func TestValidator(t *testing.T) { ret := s.getNextValidatorPIDs() for _, v := range ret { - fmt.Println(v) + assert.NotNil(t, v) } ret = s.getNextValidatorPIDs() for _, v := range ret { - fmt.Println(v) + assert.NotNil(t, v) } statelessV1.UnRegister(rspPid) @@ -292,8 +289,8 @@ func TestValidator(t *testing.T) { ret = s.getNextValidatorPIDs() for _, v := range ret { - fmt.Println(v) + assert.NotNil(t, v) } - fmt.Println("Ending validator testing") + t.Log("Ending validator testing") } diff --git a/txnpool/proc/txnpool_worker_test.go b/txnpool/proc/txnpool_worker_test.go index fff53899..39c86f25 100644 --- a/txnpool/proc/txnpool_worker_test.go +++ b/txnpool/proc/txnpool_worker_test.go @@ -21,7 +21,6 @@ package proc import ( "bytes" "encoding/hex" - "fmt" "testing" "time" @@ -30,10 +29,11 @@ import ( "github.com/ontio/ontology/errors" tc "github.com/ontio/ontology/txnpool/common" vt "github.com/ontio/ontology/validator/types" + "github.com/stretchr/testify/assert" ) func TestWorker(t *testing.T) { - fmt.Println("Starting worker test") + t.Log("Starting worker test") s := NewTxPoolServer(tc.MAX_WORKER_NUM) if s == nil { t.Error("Test case: new tx pool server failed") @@ -74,8 +74,8 @@ func TestWorker(t *testing.T) { time.Sleep(1 * time.Second) ret := worker.server.getTransaction(txn.Hash()) - fmt.Println(ret) - + assert.NotNil(t, ret) + assert.Equal(t, ret.Hash(), txn.Hash()) /* Case 2: Duplicate input tx, worker should reject * it with the log */ @@ -152,7 +152,9 @@ func TestWorker(t *testing.T) { */ time.Sleep(1 * time.Second) txStatus := worker.GetTxStatus(txn.Hash()) - fmt.Println(txStatus) + t.Log(txStatus) + assert.NotNil(t, txStatus) + assert.Equal(t, txStatus.Hash, txn.Hash()) /* Case 8: Given the invalid hash, worker should return nil */ tempStr := "3369930accc1ddd067245e8edadcd9bea207ba5e1753ac18a51df77a343bfe83" @@ -160,9 +162,9 @@ func TestWorker(t *testing.T) { var hash common.Uint256 hash.Deserialize(bytes.NewReader(hex)) txStatus = worker.GetTxStatus(hash) - fmt.Println(txStatus) + assert.Nil(t, txStatus) worker.stop() s.Stop() - fmt.Println("Ending worker test") + t.Log("Ending worker test") } diff --git a/txnpool/test/txnpool_test.go b/txnpool/test/txnpool_test.go index d3249ce3..77b8651a 100644 --- a/txnpool/test/txnpool_test.go +++ b/txnpool/test/txnpool_test.go @@ -21,19 +21,20 @@ package txnpool import ( "bytes" "encoding/hex" - "fmt" "sync" "testing" "time" + "github.com/ontio/ontology-eventbus/actor" "github.com/ontio/ontology/common" "github.com/ontio/ontology/common/log" + "github.com/ontio/ontology/core/ledger" "github.com/ontio/ontology/core/payload" "github.com/ontio/ontology/core/types" tc "github.com/ontio/ontology/txnpool/common" tp "github.com/ontio/ontology/txnpool/proc" + "github.com/ontio/ontology/validator/statefull" "github.com/ontio/ontology/validator/stateless" - "github.com/ontio/ontology-eventbus/actor" ) var ( @@ -69,10 +70,6 @@ func startActor(obj interface{}) *actor.PID { }) pid := actor.Spawn(props) - if pid == nil { - fmt.Println("Fail to start actor") - return nil - } return pid } @@ -80,6 +77,8 @@ func Test_RCV(t *testing.T) { var s *tp.TXPoolServer var wg sync.WaitGroup + ledger.DefLedger, _ = ledger.NewLedger() + // Start txnpool server to receive msgs from p2p, consensus and valdiators s = tp.NewTxPoolServer(tc.MAX_WORKER_NUM) @@ -87,7 +86,7 @@ func Test_RCV(t *testing.T) { rspActor := tp.NewVerifyRspActor(s) rspPid := startActor(rspActor) if rspPid == nil { - fmt.Println("Fail to start verify rsp actor") + t.Error("Fail to start verify rsp actor") return } s.RegisterActor(tc.VerifyRspActor, rspPid) @@ -96,7 +95,7 @@ func Test_RCV(t *testing.T) { tpa := tp.NewTxPoolActor(s) txPoolPid := startActor(tpa) if txPoolPid == nil { - fmt.Println("Fail to start txnpool actor") + t.Error("Fail to start txnpool actor") return } s.RegisterActor(tc.TxPoolActor, txPoolPid) @@ -105,7 +104,7 @@ func Test_RCV(t *testing.T) { ta := tp.NewTxActor(s) txPid := startActor(ta) if txPid == nil { - fmt.Println("Fail to start txn actor") + t.Error("Fail to start txn actor") return } s.RegisterActor(tc.TxActor, txPid) @@ -113,38 +112,32 @@ func Test_RCV(t *testing.T) { // Start stateless validator statelessV, err := stateless.NewValidator("stateless") if err != nil { - fmt.Println("failed to new stateless valdiator", err) + t.Errorf("failed to new stateless valdiator", err) return } statelessV.Register(rspPid) statelessV2, err := stateless.NewValidator("stateless2") if err != nil { - fmt.Println("failed to new stateless valdiator", err) + t.Errorf("failed to new stateless valdiator", err) return } statelessV2.Register(rspPid) statelessV3, err := stateless.NewValidator("stateless3") if err != nil { - fmt.Println("failed to new stateless valdiator", err) + t.Errorf("failed to new stateless valdiator", err) return } statelessV3.Register(rspPid) - // Todo: depending on ledger db sync, when ledger db ready, enable it - // Start stateful validator - /*store, err := db.NewStore("temp.db") - if err != nil { - fmt.Println("failed to new store",err) - return - } - - statefulV, err := statefull.NewValidator("stateful", store) - if err != nil { - fmt.Println("failed to new stateful validator", err) - return - } - statefulV.Register(rspPid)*/ + + statefulV, err := statefull.NewValidator("stateful") + if err != nil { + t.Errorf("failed to new stateful valdiator", err) + return + } + statefulV.Register(rspPid) + for i := 0; i < 2; i++ { wg.Add(1) go func() { @@ -152,7 +145,11 @@ func Test_RCV(t *testing.T) { defer wg.Done() for { j++ - txPid.Tell(tx) + txReq := &tc.TxReq{ + Tx: tx, + Sender: tc.NilSender, + } + txPid.Tell(txReq) if j >= 4 { return @@ -170,6 +167,6 @@ func Test_RCV(t *testing.T) { statelessV.UnRegister(rspPid) statelessV2.UnRegister(rspPid) statelessV3.UnRegister(rspPid) - //statefulV.UnRegister(rspPid) + statefulV.UnRegister(rspPid) s.Stop() }