Skip to content

Commit

Permalink
handle node restart
Browse files Browse the repository at this point in the history
Signed-off-by: oscar <[email protected]>
  • Loading branch information
oscar committed Sep 7, 2018
1 parent b583987 commit 506016c
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 107 deletions.
38 changes: 4 additions & 34 deletions consensus/ising/proposercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (pc *ProposerCache) Add(height uint32, votingContent voting.VotingContent)
pc.cache[height] = proposerInfo
}

func (pc *ProposerCache) Get(height uint32) (*ProposerInfo, error) {
func (pc *ProposerCache) Get(height uint32) *ProposerInfo {
pc.RLock()
defer pc.RUnlock()

Expand All @@ -84,42 +84,12 @@ func (pc *ProposerCache) Get(height uint32) (*ProposerInfo, error) {
publicKey: proposer,
winningHash: EmptyUint256,
winningHashType: ledger.GenesisHash,
}, nil
}
}

if _, ok := pc.cache[height]; ok {
return pc.cache[height], nil
}

var proposer []byte
previousBlockHash := ledger.DefaultLedger.Store.GetCurrentBlockHash()
previousBlock, err := ledger.DefaultLedger.Store.GetBlock(previousBlockHash)
if err != nil {
return nil, err
}
switch previousBlock.Header.WinningHashType {
case ledger.WinningTxnHash:
txn, err := ledger.DefaultLedger.GetTransactionWithHash(previousBlock.Header.WinningHash)
if err != nil {
return nil, err
}
payload := txn.Payload.(*payload.Commit)
sigchain := &por.SigChain{}
proto.Unmarshal(payload.SigChain, sigchain)
proposer, err = sigchain.GetMiner()
if err != nil {
return nil, err
}
case ledger.WinningNilHash:
proposer, err = previousBlock.GetSigner()
if err != nil {
return nil, err
}
return pc.cache[height]
}

return &ProposerInfo{
publicKey: proposer,
winningHash: EmptyUint256,
winningHashType: ledger.WinningNilHash,
}, nil
return nil
}
119 changes: 61 additions & 58 deletions consensus/ising/proposerservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type ProposerService struct {
timer *time.Timer // timer for proposer node
timeout *time.Timer // timeout for next round consensus
proposerChangeTimer *time.Timer // timer for proposer change
proposerChangeIndex uint32 // block index for proposer change
localNode protocol.Noder // local node
txnCollector *transaction.TxnCollector // collect transaction from where
mining Mining // built-in mining
Expand All @@ -53,7 +52,6 @@ func NewProposerService(account *vault.Account, node protocol.Noder) *ProposerSe
timer: time.NewTimer(config.ConsensusTime),
timeout: time.NewTimer(config.ConsensusTime + TimeoutTolerance),
proposerChangeTimer: time.NewTimer(config.ProposerChangeTime),
proposerChangeIndex: 0,
account: account,
localNode: node,
txnCollector: txnCollector,
Expand All @@ -72,9 +70,6 @@ func NewProposerService(account *vault.Account, node protocol.Noder) *ProposerSe
if !service.timeout.Stop() {
<-service.timeout.C
}
if !service.proposerChangeTimer.Stop() {
<-service.proposerChangeTimer.C
}

return service
}
Expand Down Expand Up @@ -216,10 +211,12 @@ func (ps *ProposerService) ProduceNewBlock() {
current := ps.CurrentVoting(voting.BlockVote)
votingPool := current.GetVotingPool()
votingHeight := current.GetVotingHeight()
proposerInfo, err := ps.proposerCache.Get(votingHeight + 1)
if err != nil {
log.Error("get proposer info for producing new block error: ", err)
return
proposerInfo := ps.proposerCache.Get(votingHeight + 1)
if proposerInfo == nil {
proposerInfo = &ProposerInfo{
winningHash: EmptyUint256,
winningHashType: ledger.WinningBlockHash,
}
}
// build new block to be proposed
block, err := ps.mining.BuildBlock(votingHeight, proposerInfo.winningHash, proposerInfo.winningHashType)
Expand Down Expand Up @@ -258,8 +255,8 @@ func (ps *ProposerService) IsBlockProposer() bool {
}
current := ps.CurrentVoting(voting.BlockVote)
votingHeight := current.GetVotingHeight()
proposerInfo, err := ps.proposerCache.Get(votingHeight)
if err != nil {
proposerInfo := ps.proposerCache.Get(votingHeight)
if proposerInfo == nil {
return false
}
if !IsEqualBytes(localPublicKey, proposerInfo.publicKey) {
Expand All @@ -275,8 +272,10 @@ func (ps *ProposerService) ProposerRoutine() {
case <-ps.timer.C:
if ps.IsBlockProposer() {
log.Info("-> I am Block Proposer")
if ps.localNode.GetSyncState() != protocol.PersistFinished {
ps.localNode.StopSyncBlock()
}
ps.ProduceNewBlock()
time.Sleep(time.Second)
for _, v := range ps.voting {
go ps.ConsensusRoutine(v.VotingType(), true)
}
Expand Down Expand Up @@ -318,42 +317,52 @@ func (ps *ProposerService) ProbeRoutine() {
}

func (ps *ProposerService) BlockPersistCompleted(v interface{}) {
if ps.localNode.GetSyncState() != protocol.PersistFinished {
return
}
if block, ok := v.(*ledger.Block); ok {
// record time when persist block
ledger.DefaultLedger.Blockchain.AddBlockTime(block.Hash(), time.Now().Unix())
ps.txnCollector.Cleanup(block.Transactions)
// reset index when block persisted
ps.proposerChangeIndex = 0
// reset timer when block persisted
ps.proposerChangeTimer.Stop()
ps.proposerChangeTimer.Reset(config.ProposerChangeTime)
// reset timer when block persisted
ps.proposerChangeTimer.Stop()
ps.proposerChangeTimer.Reset(config.ProposerChangeTime)

if ps.localNode.GetSyncState() == protocol.PersistFinished {
if block, ok := v.(*ledger.Block); ok {
// record time when persist block
ledger.DefaultLedger.Blockchain.AddBlockTime(block.Hash(), time.Now().Unix())
ps.txnCollector.Cleanup(block.Transactions)
}
}
}

func (ps *ProposerService) ChangeProposer() {
func (ps *ProposerService) ChangeProposerRoutine() {
for {
select {
case <-ps.proposerChangeTimer.C:
height := ledger.DefaultLedger.Store.GetHeight() - ps.proposerChangeIndex
hash, err := ledger.DefaultLedger.Store.GetBlockHash(height)
if err != nil {
log.Error("get block hash error when change proposer: ", err)
}
block, err := ledger.DefaultLedger.Store.GetBlock(hash)
if err != nil {
log.Error("get block error when change proposer: ", err)
now := time.Now().Unix()
currentHeight := ledger.DefaultLedger.Store.GetHeight()
var block *ledger.Block
var err error
if currentHeight < InitialBlockHeight {
block, err = ledger.DefaultLedger.Store.GetBlockByHeight(0)
if err != nil {
log.Error("get genesis block error when change proposer")
}
} else {
currentBlock, err := ledger.DefaultLedger.Store.GetBlockByHeight(currentHeight)
if err != nil {
log.Errorf("get latest block %d error when change proposer", currentHeight)
}
timestamp := currentBlock.Header.Timestamp
index := (now - timestamp) / 60
var height uint32 = 0
if int64(currentHeight) > index {
height = uint32(int64(currentHeight) - index)
}
block, err = ledger.DefaultLedger.Store.GetBlockByHeight(height)
if err != nil {
log.Errorf("get block %d error when change proposer", currentHeight)
}
}
nextBlockHeight := ledger.DefaultLedger.Store.GetHeight() + 1
ps.proposerCache.Add(nextBlockHeight, block)
ps.proposerCache.Add(currentHeight+1, block)
ps.timer.Stop()
ps.timer.Reset(0)
ps.proposerChangeTimer.Reset(config.ProposerChangeTime)
if height > 1 {
ps.proposerChangeIndex++
}
}
}
}
Expand Down Expand Up @@ -396,6 +405,10 @@ func (ps *ProposerService) BlockSyncingFinished(v interface{}) {
ps.localNode.SetSyncState(protocol.PersistFinished)
}
func (ps *ProposerService) SyncBlock(isProposer bool) {
if ps.localNode.GetSyncState() == protocol.PersistFinished ||
ps.localNode.GetSyncState() == protocol.SyncFinished {
return
}
var wg sync.WaitGroup
wg.Add(1)
// start block syncing
Expand All @@ -412,21 +425,6 @@ func (ps *ProposerService) SyncBlock(isProposer bool) {
wg.Wait()
}

func (ps *ProposerService) StartConsensus(isProposer bool) {
// start block proposer routine
go ps.ProposerRoutine()
// start timeout routine
go ps.TimeoutRoutine()
// change proposer
go ps.ChangeProposer()
// trigger block proposer routine
if isProposer {
ps.timer.Reset(0)
}
// start probe routine
go ps.ProbeRoutine()
}

func (ps *ProposerService) Start() error {
// register consensus message
ps.consensusMsgReceived = ps.localNode.GetEvent("consensus").Subscribe(events.EventConsensusMsgReceived,
Expand All @@ -438,11 +436,16 @@ func (ps *ProposerService) Start() error {
ps.syncFinished = ps.localNode.GetEvent("sync").Subscribe(events.EventBlockSyncingFinished,
ps.BlockSyncingFinished)

isProposer := ps.IsBlockProposer()
// start block syncing
ps.SyncBlock(isProposer)
// start consensus
ps.StartConsensus(isProposer)
// start block proposer routine
go ps.ProposerRoutine()
// start change proposer routine
go ps.ChangeProposerRoutine()
// start timeout routine
go ps.TimeoutRoutine()

ps.SyncBlock(false)
// start probe routine
go ps.ProbeRoutine()

return nil
}
Expand Down
4 changes: 1 addition & 3 deletions core/ledger/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ const (
GenesisHash WinningHashType = 0
// WinningTxnHash means next Block proposer is a node on signature chain.
WinningTxnHash WinningHashType = 1
// WinningNilHash means next Block proposer is the signer of last Block.
WinningNilHash WinningHashType = 2
// WinningBlockHash means next Block proposer is signer of historical Block.
WinningBlockHash WinningHashType = 3
WinningBlockHash WinningHashType = 2
)

type Header struct {
Expand Down
1 change: 1 addition & 0 deletions core/ledger/ledgerStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type ILedgerStore interface {
SaveBlock(b *Block, ledger *Ledger) error
GetBlock(hash Uint256) (*Block, error)
GetBlockByHeight(height uint32) (*Block, error)
BlockInCache(hash Uint256) bool
GetBlockHash(height uint32) (Uint256, error)
GetBlockHistory(startHeight, blockNum uint32) map[uint32]Uint256
Expand Down
6 changes: 1 addition & 5 deletions core/ledger/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func HeaderCheck(header *Header, receiveTime int64) error {
// get miner who will sign next block
var miner []byte
timeSlot := int64(config.ProposerChangeTime / time.Second)
if timeDiff > timeSlot {
if timeDiff >= timeSlot {
index := timeDiff / timeSlot
proposerBlockHeight := int64(DefaultLedger.Store.GetHeight()) - index
if proposerBlockHeight < 0 {
Expand Down Expand Up @@ -143,10 +143,6 @@ func HeaderCheck(header *Header, receiveTime int64) error {
txnHash := txn.Hash()
log.Infof("verification: block signer should be %s which is got in sigchain transaction %s",
common.BytesToHexString(miner), common.BytesToHexString(txnHash.ToArrayReverse()))
case WinningNilHash:
miner = prevHeader.Signer
log.Infof("verification: block signer should be: %s which is the signer of previous block %d",
common.BytesToHexString(miner), prevHeader.Height)
}
}

Expand Down
23 changes: 23 additions & 0 deletions db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,29 @@ func (cs *ChainStore) GetBlockHash(height uint32) (Uint256, error) {
return blockHash256, nil
}

func (cs *ChainStore) GetBlockByHeight(height uint32) (*Block, error) {
key := bytes.NewBuffer(nil)
key.WriteByte(byte(DATA_BlockHash))
err := serialization.WriteUint32(key, height)
if err != nil {
return nil, err
}
value, err := cs.st.Get(key.Bytes())
if err != nil {
return nil, err
}
hash, err := Uint256ParseFromBytes(value)
if err != nil {
return nil, err
}
block, err := cs.GetBlock(hash)
if err != nil {
return nil, err
}

return block, nil
}

func (cs *ChainStore) GetCurrentBlockHash() Uint256 {
cs.mu.RLock()
defer cs.mu.RUnlock()
Expand Down
1 change: 0 additions & 1 deletion net/message/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (msg dataReq) Handle(node Noder) error {
func NewBlockFromHash(hash common.Uint256) (*ledger.Block, error) {
bk, err := ledger.DefaultLedger.Store.GetBlock(hash)
if err != nil {
log.Errorf("Get Block error: %s, block hash: %x", err.Error(), hash)
return nil, err
}
return bk, nil
Expand Down
1 change: 0 additions & 1 deletion net/message/blockHdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ func GetHeadersFromHash(startHash Uint256, stopHash Uint256) ([]ledger.Header, u
hash, err := ledger.DefaultLedger.Store.GetBlockHash(startHeight + i)
hd, err := ledger.DefaultLedger.Store.GetHeader(hash)
if err != nil {
log.Error("GetBlockWithHeight failed ", err.Error())
return nil, 0, err
}
headers = append(headers, *hd)
Expand Down
15 changes: 10 additions & 5 deletions net/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func (node *node) GetChordRing() *chord.Ring {
}

func (node *node) blockHeaderSyncing(stopHash Uint256) {
noders := node.local.GetSyncFinishedNeighbors()
noders := node.local.GetNeighborNoder()
if len(noders) == 0 {
return
}
Expand Down Expand Up @@ -599,7 +599,7 @@ func (node *node) blockSyncing() {
var dValue int32
var reqCnt uint32
var i uint32
noders := node.local.GetSyncFinishedNeighbors()
noders := node.local.GetNeighborNoder()

for _, n := range noders {
if uint32(n.GetHeight()) <= currentBlkHeight {
Expand Down Expand Up @@ -807,15 +807,20 @@ func (node *node) SyncBlock(isProposer bool) {
}
}

func (node *node) StopSyncBlock() {
// switch syncing state
node.SetSyncState(SyncFinished)
// stop block syncing
node.quit <- struct{}{}
}

func (node *node) SyncBlockMonitor(isProposer bool) {
// wait for header syncing finished
node.WaitForSyncHeaderFinish(isProposer)
// wait for block syncing finished
node.WaitForSyncBlkFinish()
// switch syncing state
node.SetSyncState(SyncFinished)
// stop block syncing
node.quit <- struct{}{}
node.StopSyncBlock()
}

func (node *node) SendRelayPacketsInBuffer(clientId []byte) error {
Expand Down
1 change: 1 addition & 0 deletions net/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Noder interface {
SetSyncStopHash(hash Uint256, height uint32)
SyncBlock(bool)
SyncBlockMonitor(bool)
StopSyncBlock()
GetRelay() bool
GetPubKey() *crypto.PubKey
CompareAndSetState(old, new uint32) bool
Expand Down

0 comments on commit 506016c

Please sign in to comment.