From 506016c9b147be468f10f8b27115bcebe6c90be3 Mon Sep 17 00:00:00 2001 From: oscar Date: Wed, 5 Sep 2018 12:05:48 +0800 Subject: [PATCH] handle node restart Signed-off-by: oscar --- consensus/ising/proposercache.go | 38 +-------- consensus/ising/proposerservice.go | 119 +++++++++++++++-------------- core/ledger/header.go | 4 +- core/ledger/ledgerStore.go | 1 + core/ledger/validator.go | 6 +- db/store.go | 23 ++++++ net/message/block.go | 1 - net/message/blockHdr.go | 1 - net/node/node.go | 15 ++-- net/protocol/protocol.go | 1 + 10 files changed, 102 insertions(+), 107 deletions(-) diff --git a/consensus/ising/proposercache.go b/consensus/ising/proposercache.go index fd6eee4aa..6dc9c765a 100644 --- a/consensus/ising/proposercache.go +++ b/consensus/ising/proposercache.go @@ -73,7 +73,7 @@ func (pc *ProposerCache) Add(height uint32, votingContent voting.VotingContent) pc.cache[height] = proposerInfo } -func (pc *ProposerCache) Get(height uint32) (*ProposerInfo, error) { +func (pc *ProposerCache) Get(height uint32) *ProposerInfo { pc.RLock() defer pc.RUnlock() @@ -84,42 +84,12 @@ func (pc *ProposerCache) Get(height uint32) (*ProposerInfo, error) { publicKey: proposer, winningHash: EmptyUint256, winningHashType: ledger.GenesisHash, - }, nil + } } if _, ok := pc.cache[height]; ok { - return pc.cache[height], nil - } - - var proposer []byte - previousBlockHash := ledger.DefaultLedger.Store.GetCurrentBlockHash() - previousBlock, err := ledger.DefaultLedger.Store.GetBlock(previousBlockHash) - if err != nil { - return nil, err - } - switch previousBlock.Header.WinningHashType { - case ledger.WinningTxnHash: - txn, err := ledger.DefaultLedger.GetTransactionWithHash(previousBlock.Header.WinningHash) - if err != nil { - return nil, err - } - payload := txn.Payload.(*payload.Commit) - sigchain := &por.SigChain{} - proto.Unmarshal(payload.SigChain, sigchain) - proposer, err = sigchain.GetMiner() - if err != nil { - return nil, err - } - case ledger.WinningNilHash: - proposer, err = previousBlock.GetSigner() - if err != nil { - return nil, err - } + return pc.cache[height] } - return &ProposerInfo{ - publicKey: proposer, - winningHash: EmptyUint256, - winningHashType: ledger.WinningNilHash, - }, nil + return nil } diff --git a/consensus/ising/proposerservice.go b/consensus/ising/proposerservice.go index 456af57f8..62b6db20b 100644 --- a/consensus/ising/proposerservice.go +++ b/consensus/ising/proposerservice.go @@ -33,7 +33,6 @@ type ProposerService struct { timer *time.Timer // timer for proposer node timeout *time.Timer // timeout for next round consensus proposerChangeTimer *time.Timer // timer for proposer change - proposerChangeIndex uint32 // block index for proposer change localNode protocol.Noder // local node txnCollector *transaction.TxnCollector // collect transaction from where mining Mining // built-in mining @@ -53,7 +52,6 @@ func NewProposerService(account *vault.Account, node protocol.Noder) *ProposerSe timer: time.NewTimer(config.ConsensusTime), timeout: time.NewTimer(config.ConsensusTime + TimeoutTolerance), proposerChangeTimer: time.NewTimer(config.ProposerChangeTime), - proposerChangeIndex: 0, account: account, localNode: node, txnCollector: txnCollector, @@ -72,9 +70,6 @@ func NewProposerService(account *vault.Account, node protocol.Noder) *ProposerSe if !service.timeout.Stop() { <-service.timeout.C } - if !service.proposerChangeTimer.Stop() { - <-service.proposerChangeTimer.C - } return service } @@ -216,10 +211,12 @@ func (ps *ProposerService) ProduceNewBlock() { current := ps.CurrentVoting(voting.BlockVote) votingPool := current.GetVotingPool() votingHeight := current.GetVotingHeight() - proposerInfo, err := ps.proposerCache.Get(votingHeight + 1) - if err != nil { - log.Error("get proposer info for producing new block error: ", err) - return + proposerInfo := ps.proposerCache.Get(votingHeight + 1) + if proposerInfo == nil { + proposerInfo = &ProposerInfo{ + winningHash: EmptyUint256, + winningHashType: ledger.WinningBlockHash, + } } // build new block to be proposed block, err := ps.mining.BuildBlock(votingHeight, proposerInfo.winningHash, proposerInfo.winningHashType) @@ -258,8 +255,8 @@ func (ps *ProposerService) IsBlockProposer() bool { } current := ps.CurrentVoting(voting.BlockVote) votingHeight := current.GetVotingHeight() - proposerInfo, err := ps.proposerCache.Get(votingHeight) - if err != nil { + proposerInfo := ps.proposerCache.Get(votingHeight) + if proposerInfo == nil { return false } if !IsEqualBytes(localPublicKey, proposerInfo.publicKey) { @@ -275,8 +272,10 @@ func (ps *ProposerService) ProposerRoutine() { case <-ps.timer.C: if ps.IsBlockProposer() { log.Info("-> I am Block Proposer") + if ps.localNode.GetSyncState() != protocol.PersistFinished { + ps.localNode.StopSyncBlock() + } ps.ProduceNewBlock() - time.Sleep(time.Second) for _, v := range ps.voting { go ps.ConsensusRoutine(v.VotingType(), true) } @@ -318,42 +317,52 @@ func (ps *ProposerService) ProbeRoutine() { } func (ps *ProposerService) BlockPersistCompleted(v interface{}) { - if ps.localNode.GetSyncState() != protocol.PersistFinished { - return - } - if block, ok := v.(*ledger.Block); ok { - // record time when persist block - ledger.DefaultLedger.Blockchain.AddBlockTime(block.Hash(), time.Now().Unix()) - ps.txnCollector.Cleanup(block.Transactions) - // reset index when block persisted - ps.proposerChangeIndex = 0 - // reset timer when block persisted - ps.proposerChangeTimer.Stop() - ps.proposerChangeTimer.Reset(config.ProposerChangeTime) + // reset timer when block persisted + ps.proposerChangeTimer.Stop() + ps.proposerChangeTimer.Reset(config.ProposerChangeTime) + + if ps.localNode.GetSyncState() == protocol.PersistFinished { + if block, ok := v.(*ledger.Block); ok { + // record time when persist block + ledger.DefaultLedger.Blockchain.AddBlockTime(block.Hash(), time.Now().Unix()) + ps.txnCollector.Cleanup(block.Transactions) + } } } -func (ps *ProposerService) ChangeProposer() { +func (ps *ProposerService) ChangeProposerRoutine() { for { select { case <-ps.proposerChangeTimer.C: - height := ledger.DefaultLedger.Store.GetHeight() - ps.proposerChangeIndex - hash, err := ledger.DefaultLedger.Store.GetBlockHash(height) - if err != nil { - log.Error("get block hash error when change proposer: ", err) - } - block, err := ledger.DefaultLedger.Store.GetBlock(hash) - if err != nil { - log.Error("get block error when change proposer: ", err) + now := time.Now().Unix() + currentHeight := ledger.DefaultLedger.Store.GetHeight() + var block *ledger.Block + var err error + if currentHeight < InitialBlockHeight { + block, err = ledger.DefaultLedger.Store.GetBlockByHeight(0) + if err != nil { + log.Error("get genesis block error when change proposer") + } + } else { + currentBlock, err := ledger.DefaultLedger.Store.GetBlockByHeight(currentHeight) + if err != nil { + log.Errorf("get latest block %d error when change proposer", currentHeight) + } + timestamp := currentBlock.Header.Timestamp + index := (now - timestamp) / 60 + var height uint32 = 0 + if int64(currentHeight) > index { + height = uint32(int64(currentHeight) - index) + } + block, err = ledger.DefaultLedger.Store.GetBlockByHeight(height) + if err != nil { + log.Errorf("get block %d error when change proposer", currentHeight) + } } - nextBlockHeight := ledger.DefaultLedger.Store.GetHeight() + 1 - ps.proposerCache.Add(nextBlockHeight, block) + ps.proposerCache.Add(currentHeight+1, block) ps.timer.Stop() ps.timer.Reset(0) ps.proposerChangeTimer.Reset(config.ProposerChangeTime) - if height > 1 { - ps.proposerChangeIndex++ - } } } } @@ -396,6 +405,10 @@ func (ps *ProposerService) BlockSyncingFinished(v interface{}) { ps.localNode.SetSyncState(protocol.PersistFinished) } func (ps *ProposerService) SyncBlock(isProposer bool) { + if ps.localNode.GetSyncState() == protocol.PersistFinished || + ps.localNode.GetSyncState() == protocol.SyncFinished { + return + } var wg sync.WaitGroup wg.Add(1) // start block syncing @@ -412,21 +425,6 @@ func (ps *ProposerService) SyncBlock(isProposer bool) { wg.Wait() } -func (ps *ProposerService) StartConsensus(isProposer bool) { - // start block proposer routine - go ps.ProposerRoutine() - // start timeout routine - go ps.TimeoutRoutine() - // change proposer - go ps.ChangeProposer() - // trigger block proposer routine - if isProposer { - ps.timer.Reset(0) - } - // start probe routine - go ps.ProbeRoutine() -} - func (ps *ProposerService) Start() error { // register consensus message ps.consensusMsgReceived = ps.localNode.GetEvent("consensus").Subscribe(events.EventConsensusMsgReceived, @@ -438,11 +436,16 @@ func (ps *ProposerService) Start() error { ps.syncFinished = ps.localNode.GetEvent("sync").Subscribe(events.EventBlockSyncingFinished, ps.BlockSyncingFinished) - isProposer := ps.IsBlockProposer() - // start block syncing - ps.SyncBlock(isProposer) - // start consensus - ps.StartConsensus(isProposer) + // start block proposer routine + go ps.ProposerRoutine() + // start change proposer routine + go ps.ChangeProposerRoutine() + // start timeout routine + go ps.TimeoutRoutine() + + ps.SyncBlock(false) + // start probe routine + go ps.ProbeRoutine() return nil } diff --git a/core/ledger/header.go b/core/ledger/header.go index 41957c668..b011e9304 100644 --- a/core/ledger/header.go +++ b/core/ledger/header.go @@ -22,10 +22,8 @@ const ( GenesisHash WinningHashType = 0 // WinningTxnHash means next Block proposer is a node on signature chain. WinningTxnHash WinningHashType = 1 - // WinningNilHash means next Block proposer is the signer of last Block. - WinningNilHash WinningHashType = 2 // WinningBlockHash means next Block proposer is signer of historical Block. - WinningBlockHash WinningHashType = 3 + WinningBlockHash WinningHashType = 2 ) type Header struct { diff --git a/core/ledger/ledgerStore.go b/core/ledger/ledgerStore.go index 107fd8c19..fb41ee60d 100644 --- a/core/ledger/ledgerStore.go +++ b/core/ledger/ledgerStore.go @@ -12,6 +12,7 @@ import ( type ILedgerStore interface { SaveBlock(b *Block, ledger *Ledger) error GetBlock(hash Uint256) (*Block, error) + GetBlockByHeight(height uint32) (*Block, error) BlockInCache(hash Uint256) bool GetBlockHash(height uint32) (Uint256, error) GetBlockHistory(startHeight, blockNum uint32) map[uint32]Uint256 diff --git a/core/ledger/validator.go b/core/ledger/validator.go index f2734798a..1fe563dbd 100644 --- a/core/ledger/validator.go +++ b/core/ledger/validator.go @@ -94,7 +94,7 @@ func HeaderCheck(header *Header, receiveTime int64) error { // get miner who will sign next block var miner []byte timeSlot := int64(config.ProposerChangeTime / time.Second) - if timeDiff > timeSlot { + if timeDiff >= timeSlot { index := timeDiff / timeSlot proposerBlockHeight := int64(DefaultLedger.Store.GetHeight()) - index if proposerBlockHeight < 0 { @@ -143,10 +143,6 @@ func HeaderCheck(header *Header, receiveTime int64) error { txnHash := txn.Hash() log.Infof("verification: block signer should be %s which is got in sigchain transaction %s", common.BytesToHexString(miner), common.BytesToHexString(txnHash.ToArrayReverse())) - case WinningNilHash: - miner = prevHeader.Signer - log.Infof("verification: block signer should be: %s which is the signer of previous block %d", - common.BytesToHexString(miner), prevHeader.Height) } } diff --git a/db/store.go b/db/store.go index 092a0d99b..892db88ac 100644 --- a/db/store.go +++ b/db/store.go @@ -333,6 +333,29 @@ func (cs *ChainStore) GetBlockHash(height uint32) (Uint256, error) { return blockHash256, nil } +func (cs *ChainStore) GetBlockByHeight(height uint32) (*Block, error) { + key := bytes.NewBuffer(nil) + key.WriteByte(byte(DATA_BlockHash)) + err := serialization.WriteUint32(key, height) + if err != nil { + return nil, err + } + value, err := cs.st.Get(key.Bytes()) + if err != nil { + return nil, err + } + hash, err := Uint256ParseFromBytes(value) + if err != nil { + return nil, err + } + block, err := cs.GetBlock(hash) + if err != nil { + return nil, err + } + + return block, nil +} + func (cs *ChainStore) GetCurrentBlockHash() Uint256 { cs.mu.RLock() defer cs.mu.RUnlock() diff --git a/net/message/block.go b/net/message/block.go index 1248c984d..1586a1254 100644 --- a/net/message/block.go +++ b/net/message/block.go @@ -67,7 +67,6 @@ func (msg dataReq) Handle(node Noder) error { func NewBlockFromHash(hash common.Uint256) (*ledger.Block, error) { bk, err := ledger.DefaultLedger.Store.GetBlock(hash) if err != nil { - log.Errorf("Get Block error: %s, block hash: %x", err.Error(), hash) return nil, err } return bk, nil diff --git a/net/message/blockHdr.go b/net/message/blockHdr.go index 21c92e5dc..83f9777b5 100644 --- a/net/message/blockHdr.go +++ b/net/message/blockHdr.go @@ -226,7 +226,6 @@ func GetHeadersFromHash(startHash Uint256, stopHash Uint256) ([]ledger.Header, u hash, err := ledger.DefaultLedger.Store.GetBlockHash(startHeight + i) hd, err := ledger.DefaultLedger.Store.GetHeader(hash) if err != nil { - log.Error("GetBlockWithHeight failed ", err.Error()) return nil, 0, err } headers = append(headers, *hd) diff --git a/net/node/node.go b/net/node/node.go index 960f2e752..c4d18fc09 100644 --- a/net/node/node.go +++ b/net/node/node.go @@ -571,7 +571,7 @@ func (node *node) GetChordRing() *chord.Ring { } func (node *node) blockHeaderSyncing(stopHash Uint256) { - noders := node.local.GetSyncFinishedNeighbors() + noders := node.local.GetNeighborNoder() if len(noders) == 0 { return } @@ -599,7 +599,7 @@ func (node *node) blockSyncing() { var dValue int32 var reqCnt uint32 var i uint32 - noders := node.local.GetSyncFinishedNeighbors() + noders := node.local.GetNeighborNoder() for _, n := range noders { if uint32(n.GetHeight()) <= currentBlkHeight { @@ -807,15 +807,20 @@ func (node *node) SyncBlock(isProposer bool) { } } +func (node *node) StopSyncBlock() { + // switch syncing state + node.SetSyncState(SyncFinished) + // stop block syncing + node.quit <- struct{}{} +} + func (node *node) SyncBlockMonitor(isProposer bool) { // wait for header syncing finished node.WaitForSyncHeaderFinish(isProposer) // wait for block syncing finished node.WaitForSyncBlkFinish() - // switch syncing state - node.SetSyncState(SyncFinished) // stop block syncing - node.quit <- struct{}{} + node.StopSyncBlock() } func (node *node) SendRelayPacketsInBuffer(clientId []byte) error { diff --git a/net/protocol/protocol.go b/net/protocol/protocol.go index 74f09dcf9..bea5b6a88 100644 --- a/net/protocol/protocol.go +++ b/net/protocol/protocol.go @@ -60,6 +60,7 @@ type Noder interface { SetSyncStopHash(hash Uint256, height uint32) SyncBlock(bool) SyncBlockMonitor(bool) + StopSyncBlock() GetRelay() bool GetPubKey() *crypto.PubKey CompareAndSetState(old, new uint32) bool