Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ol-ledger into fastsync
  • Loading branch information
jieyilong committed Apr 9, 2021
2 parents 2b3a0d7 + a6609a4 commit dd1f9f9
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 44 deletions.
48 changes: 48 additions & 0 deletions consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ConsensusEngine struct {

incoming chan interface{}
finalizedBlocks chan *core.Block
hasSynced bool

// Life cycle
wg *sync.WaitGroup
Expand Down Expand Up @@ -146,6 +147,8 @@ func (e *ConsensusEngine) Start(ctx context.Context) {
e.resetGuardianTimer()
e.guardian.Start(e.ctx)

e.checkSyncStatus()

e.wg.Add(1)
go e.mainLoop()
}
Expand Down Expand Up @@ -330,6 +333,31 @@ func (e *ConsensusEngine) processMessage(msg interface{}) (endEpoch bool) {
return false
}

func (e *ConsensusEngine) checkSyncStatus() error {
maxVoteHeight := uint64(0)
epochVotes, err := e.State().GetEpochVotes()
if err != nil {
return err
}
if epochVotes != nil {
for _, v := range epochVotes.Votes() {
if v.Height > maxVoteHeight {
maxVoteHeight = v.Height
}
}
}
// current finalized height is at most maxVoteHeight-1
currentHeight := uint64(maxVoteHeight - 1)

e.hasSynced = !isSyncing(e.GetLastFinalizedBlock(), currentHeight)

return nil
}

func (e *ConsensusEngine) HasSynced() bool {
return e.hasSynced
}

func (e *ConsensusEngine) validateBlock(block *core.Block, parent *core.ExtendedBlock) result.Result {
// Ignore old blocks.
if lfh := e.state.GetLastFinalizedBlock().Height; block.Height <= lfh {
Expand Down Expand Up @@ -831,6 +859,8 @@ func (e *ConsensusEngine) handleVote(vote core.Vote) (endEpoch bool) {
"expectedProposer": expectedProposer.ID().Hex(),
}).Debug("Majority votes for current epoch. Moving to new epoch")
e.state.SetEpoch(nextEpoch)

e.checkSyncStatus()
}
}
return
Expand Down Expand Up @@ -993,6 +1023,8 @@ func (e *ConsensusEngine) finalizeBlock(block *core.ExtendedBlock) error {
e.state.SetLastFinalizedBlock(block)
e.ledger.FinalizeState(block.Height, block.StateHash)

e.checkSyncStatus()

// Mark block and its ancestors as finalized.
if err := e.chain.FinalizePreviousBlocks(block.Hash()); err != nil {
return err
Expand Down Expand Up @@ -1193,3 +1225,19 @@ func (e *ConsensusEngine) resetGuardianTimer() {
}
e.guardianTimer = time.NewTicker(time.Duration(viper.GetInt(common.CfgGuardianRoundLength)) * time.Second)
}

func isSyncing(lastestFinalizedBlock *core.ExtendedBlock, currentHeight uint64) bool {
if lastestFinalizedBlock == nil {
return true
}
currentTime := big.NewInt(time.Now().Unix())
maxDiff := new(big.Int).SetUint64(30) // thirty seconds, about 5 blocks
threshold := new(big.Int).Sub(currentTime, maxDiff)
isSyncing := lastestFinalizedBlock.Timestamp.Cmp(threshold) < 0

if isSyncing { // sometimes the validator node clock is off, so here we also compare the block heights
isSyncing = (currentHeight - lastestFinalizedBlock.Height) > 5
}

return isSyncing
}
2 changes: 1 addition & 1 deletion ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (ledger *Ledger) ApplyBlockTxs(block *core.Block) result.Result {
txProcessTime = append(txProcessTime, time.Since(start))
}

logger.Debugf("ApplyBlockTxs: Finish applying block transactions, block.height = %v", block.Height, txProcessTime)
logger.Debugf("ApplyBlockTxs: Finish applying block transactions, block.height=%v, txProcessTime=%v", block.Height, txProcessTime)

start := time.Now()
ledger.handleDelayedStateUpdates(view)
Expand Down
4 changes: 2 additions & 2 deletions ledger/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newExecSim(chainID string, db database.Database, snapshot mockSnapshot, val
consensus := consensus.NewConsensusEngine(valPrivAcc.PrivKey, store, chain, dispatcher, valMgr)
valMgr.SetConsensusEngine(consensus)

mempool := mp.CreateMempool(dispatcher)
mempool := mp.CreateMempool(dispatcher, consensus)

ledgerState := st.NewLedgerState(chainID, db)
//ledgerState.ResetState(initHeight, snapshot.block.StateHash)
Expand Down Expand Up @@ -242,7 +242,7 @@ func newTesetValidatorManager(consensus core.ConsensusEngine) core.ValidatorMana

func newTestMempool(peerID string, messenger p2p.Network, messengerL p2pl.Network) *mp.Mempool {
dispatcher := dp.NewDispatcher(messenger, nil)
mempool := mp.CreateMempool(dispatcher)
mempool := mp.CreateMempool(dispatcher, nil)
txMsgHandler := mp.CreateMempoolMessageHandler(mempool)
messenger.RegisterMessageHandler(txMsgHandler)
return mempool
Expand Down
61 changes: 38 additions & 23 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/thetatoken/theta/common/clist"
"github.com/thetatoken/theta/common/math"
"github.com/thetatoken/theta/common/pqueue"
"github.com/thetatoken/theta/common/result"
"github.com/thetatoken/theta/consensus"
"github.com/thetatoken/theta/core"
dp "github.com/thetatoken/theta/dispatcher"
)
Expand Down Expand Up @@ -135,6 +137,7 @@ func createMempoolTransactionGroup(rawTx common.Bytes, txInfo *core.TxInfo) *mem
type Mempool struct {
mutex *sync.Mutex

consensus *consensus.ConsensusEngine
ledger core.Ledger
dispatcher *dp.Dispatcher

Expand All @@ -153,9 +156,10 @@ type Mempool struct {
}

// CreateMempool creates an instance of Mempool
func CreateMempool(dispatcher *dp.Dispatcher) *Mempool {
func CreateMempool(dispatcher *dp.Dispatcher, engine *consensus.ConsensusEngine) *Mempool {
return &Mempool{
mutex: &sync.Mutex{},
consensus: engine,
dispatcher: dispatcher,
newTxs: clist.New(),
candidateTxs: pqueue.CreatePriorityQueue(),
Expand All @@ -181,31 +185,42 @@ func (mp *Mempool) InsertTransaction(rawTx common.Bytes) error {
return DuplicateTxError
}

txInfo, checkTxRes := mp.ledger.ScreenTx(rawTx)
if !checkTxRes.IsOK() {
logger.Debugf("Transaction screening failed, tx: %v, error: %v", hex.EncodeToString(rawTx), checkTxRes.Message)
return errors.New(checkTxRes.Message)
}
var txInfo *core.TxInfo
var checkTxRes result.Result

logger.Infof("Insert tx, tx.hash: 0x%v", getTransactionHash(rawTx))
logger.Debugf("rawTx: %v, txInfo: %v", hex.EncodeToString(rawTx), txInfo)

// only record the transactions that passed the screening. This is because that
// an invalid transaction could becoume valid later on. For example, assume expected
// sequence for an account is 6. The account accidentally submits txA (seq = 7), got rejected.
// He then submit txB(seq = 6), and then txA(seq = 7) again. For the second submission, txA
// should not be rejected even though it has been submitted earlier.
mp.txBookeepper.record(rawTx)

txGroup, ok := mp.addressToTxGroup[txInfo.Address]
if ok {
txGroup.AddTx(rawTx, txInfo)
mp.candidateTxs.Remove(txGroup.index) // Need to re-insert txGroup into queue since its priority could change.
// Delay tx verification when in fast sync
if mp.consensus.HasSynced() {
txInfo, checkTxRes = mp.ledger.ScreenTx(rawTx)
if !checkTxRes.IsOK() {
logger.Debugf("Transaction screening failed, tx: %v, error: %v", hex.EncodeToString(rawTx), checkTxRes.Message)
return errors.New(checkTxRes.Message)
}

// only record the transactions that passed the screening. This is because that
// an invalid transaction could becoume valid later on. For example, assume expected
// sequence for an account is 6. The account accidentally submits txA (seq = 7), got rejected.
// He then submit txB(seq = 6), and then txA(seq = 7) again. For the second submission, txA
// should not be rejected even though it has been submitted earlier.
mp.txBookeepper.record(rawTx)

txGroup, ok := mp.addressToTxGroup[txInfo.Address]
if ok {
txGroup.AddTx(rawTx, txInfo)
mp.candidateTxs.Remove(txGroup.index) // Need to re-insert txGroup into queue since its priority could change.
} else {
txGroup = createMempoolTransactionGroup(rawTx, txInfo)
mp.addressToTxGroup[txInfo.Address] = txGroup
}
mp.candidateTxs.Push(txGroup)
logger.Debugf("rawTx: %v, txInfo: %v", hex.EncodeToString(rawTx), txInfo)
} else {
txGroup = createMempoolTransactionGroup(rawTx, txInfo)
mp.addressToTxGroup[txInfo.Address] = txGroup
// Record tx during sync for gossiping purpose
mp.txBookeepper.record(rawTx)

logger.Debug("Skipping tx vefification during sync")
}
mp.candidateTxs.Push(txGroup)

logger.Infof("Insert tx, tx.hash: 0x%v", getTransactionHash(rawTx))

mp.newTxs.PushBack(rawTx)
mp.size++
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewNode(params *Params) *Node {

// TODO: check if this is a guardian node
syncMgr := netsync.NewSyncManager(chain, consensus, params.NetworkOld, params.Network, dispatcher, consensus, reporter)
mempool := mp.CreateMempool(dispatcher)
mempool := mp.CreateMempool(dispatcher, consensus)
ledger := ld.NewLedger(params.ChainID, params.DB, chain, consensus, validatorManager, mempool)

validatorManager.SetConsensusEngine(consensus)
Expand Down
18 changes: 1 addition & 17 deletions rpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (t *ThetaRPCService) GetStatus(args *GetStatusArgs, result *GetStatusResult
result.CurrentHeight = common.JSONUint64(maxVoteHeight - 1) // current finalized height is at most maxVoteHeight-1
}

result.Syncing = isSyncing(latestFinalizedBlock, uint64(result.CurrentHeight))
result.Syncing = !t.consensus.HasSynced()

return
}
Expand Down Expand Up @@ -668,19 +668,3 @@ func getTxType(tx types.Tx) byte {

return t
}

func isSyncing(lastestFinalizedBlock *core.ExtendedBlock, currentHeight uint64) bool {
if lastestFinalizedBlock == nil {
return true
}
currentTime := big.NewInt(time.Now().Unix())
maxDiff := new(big.Int).SetUint64(30) // thirty seconds, about 5 blocks
threshold := new(big.Int).Sub(currentTime, maxDiff)
isSyncing := lastestFinalizedBlock.Timestamp.Cmp(threshold) < 0

if isSyncing { // sometimes the validator node clock is off, so here we also compare the block heights
isSyncing = (currentHeight - lastestFinalizedBlock.Height) > 5
}

return isSyncing
}

0 comments on commit dd1f9f9

Please sign in to comment.