diff --git a/consensus/ising/proposercache.go b/consensus/ising/proposercache.go index 6dc9c765a..510fabb66 100644 --- a/consensus/ising/proposercache.go +++ b/consensus/ising/proposercache.go @@ -1,6 +1,8 @@ package ising import ( + "bytes" + "encoding/binary" "sync" "github.com/golang/protobuf/proto" @@ -20,6 +22,7 @@ const ( type ProposerInfo struct { publicKey []byte + chordID uint64 winningHash Uint256 winningHashType ledger.WinningHashType } @@ -55,13 +58,19 @@ func (pc *ProposerCache) Add(height uint32, votingContent voting.VotingContent) sigchain := &por.SigChain{} proto.Unmarshal(payload.SigChain, sigchain) // TODO: get a determinate public key on signature chain - pbk, err := sigchain.GetMiner() + pbk, chordID, err := sigchain.GetMiner() if err != nil { log.Warn("Get last public key error", err) return } + var id uint64 + err = binary.Read(bytes.NewBuffer(chordID[:8]), binary.LittleEndian, &id) + if err != nil { + log.Error(err) + } proposerInfo = &ProposerInfo{ publicKey: pbk, + chordID: id, winningHash: t.Hash(), winningHashType: ledger.WinningTxnHash, } diff --git a/consensus/ising/proposerservice.go b/consensus/ising/proposerservice.go index 5bdcf7a5a..3396fd2ce 100644 --- a/consensus/ising/proposerservice.go +++ b/consensus/ising/proposerservice.go @@ -22,7 +22,7 @@ import ( const ( TxnAmountToBePackaged = 1024 - WaitingForFloodingFinished = time.Second * 1 + WaitingForFloodingFinished = time.Second * 5 WaitingForVotingFinished = time.Second * 8 TimeoutTolerance = time.Second * 2 ) @@ -190,7 +190,7 @@ func (ps *ProposerService) SendNewProposal(votingHeight uint32, vType voting.Vot votingPool.SetMind(votingHeight, ownMindHash) } } - if !current.HasSelfState(ownMindHash, voting.ProposalSent) { + if !current.CheckAndSetOwnState(ownMindHash, voting.ProposalSent) { log.Infof("proposing hash: %s, type: %d", BytesToHexString(ownMindHash.ToArrayReverse()), vType) // create new proposal proposalMsg := NewProposal(&ownMindHash, votingHeight, vType) @@ -198,8 +198,6 @@ func (ps *ProposerService) SendNewProposal(votingHeight uint32, vType voting.Vot nodes := ps.GetReceiverNode(nil) // send proposal to neighbors ps.SendConsensusMsg(proposalMsg, nodes) - // state changed for current hash - current.SetSelfState(ownMindHash, voting.ProposalSent) // set confirming hash current.SetConfirmingHash(ownMindHash) } @@ -262,6 +260,9 @@ func (ps *ProposerService) IsBlockProposer() bool { if !IsEqualBytes(localPublicKey, proposerInfo.publicKey) { return false } + if proposerInfo.chordID != 0 && ps.localNode.GetID() != proposerInfo.chordID { + return false + } return true } @@ -569,12 +570,9 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u rtime := time.Now().Unix() // returns if receive duplicate block - if current.HasSelfState(blockHash, voting.FloodingFinished) { + if current.CheckAndSetOwnState(blockHash, voting.FloodingFinished) { return } - // set state for flooding block - current.SetSelfState(blockHash, voting.FloodingFinished) - // relay block to neighbors var nodes []protocol.Noder for _, node := range ps.localNode.GetNeighborNoder() { @@ -599,7 +597,7 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u return } // send vote when the block is verified by local node - if !current.HasSelfState(blockHash, voting.ProposalSent) { + if !current.CheckAndSetOwnState(blockHash, voting.ProposalSent) { err = ledger.HeaderCheck(block.Header, rtime) if err != nil { log.Error("header verification error when voting in sync mode", err) @@ -614,7 +612,6 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u proposalMsg := NewProposal(&blockHash, votingHeight, voting.BlockVote) nodes := ps.GetReceiverNode(nil) ps.SendConsensusMsg(proposalMsg, nodes) - current.SetSelfState(blockHash, voting.ProposalSent) } return } @@ -627,7 +624,7 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u } err = current.AddToCache(block, rtime) if err != nil { - log.Error("add received block to local cache error") + log.Error("add received block to local cache error: ", err) return } @@ -659,17 +656,16 @@ func (ps *ProposerService) HandleRequestMsg(req *Request, sender uint64) { log.Warn("requested block doesn't match with local block in process") return } - if !current.HasSelfState(hash, voting.ProposalSent) { + if !current.CheckOwnState(hash, voting.ProposalSent) { log.Warn("receive invalid request for hash: ", BytesToHexString(hash.ToArrayReverse())) return } // returns if receive duplicate request - if current.HasNeighborState(sender, hash, voting.RequestReceived) { + if current.CheckAndSetNeighborState(sender, hash, voting.RequestReceived) { log.Warn("duplicate request received for hash: ", BytesToHexString(hash.ToArrayReverse())) return } - // set state for request - current.SetNeighborState(sender, hash, voting.RequestReceived) + content, err := current.GetVotingContent(hash, height) if err != nil { return @@ -719,12 +715,12 @@ func (ps *ProposerService) HandleResponseMsg(resp *Response, sender uint64) { return } // returns if no request sent before - if !current.HasNeighborState(sender, *hash, voting.RequestSent) { + if !current.CheckNeighborState(sender, *hash, voting.RequestSent) { log.Warn("consensus state error in Response message handler") return } // returns if receive duplicate response - if current.HasNeighborState(sender, *hash, voting.ProposalReceived) { + if current.CheckAndSetNeighborState(sender, *hash, voting.ProposalReceived) { log.Warn("duplicate response received for hash: ", BytesToHexString(hash.ToArrayReverse())) return } @@ -733,7 +729,6 @@ func (ps *ProposerService) HandleResponseMsg(resp *Response, sender uint64) { if err != nil { return } - current.SetNeighborState(sender, *hash, voting.ProposalReceived) currentVotingPool := current.GetVotingPool() neighborWeight, _ := ledger.DefaultLedger.Store.GetVotingWeight(Uint160{}) // Get voting result from voting pool. If votes is not enough then return. @@ -828,18 +823,16 @@ func (ps *ProposerService) HandleProposalMsg(proposal *Proposal, sender uint64) nodes := ps.GetReceiverNode([]uint64{sender}) // send request message ps.SendConsensusMsg(requestMsg, nodes) - current.SetNeighborState(sender, hash, voting.RequestSent) + current.CheckAndSetNeighborState(sender, hash, voting.RequestSent) log.Warnf("doesn't contain hash in local cache, requesting it from neighbor %s", BytesToHexString(hash.ToArrayReverse())) return } // returns if receive duplicated proposal - if current.HasNeighborState(sender, hash, voting.ProposalReceived) { + if current.CheckAndSetNeighborState(sender, hash, voting.ProposalReceived) { log.Warn("duplicate proposal received for hash: ", BytesToHexString(hash.ToArrayReverse())) return } - // set state when receive a proposal from a neighbor - current.SetNeighborState(sender, hash, voting.ProposalReceived) currentVotingPool := current.GetVotingPool() neighborWeight, _ := ledger.DefaultLedger.Store.GetVotingWeight(Uint160{}) // Get voting result from voting pool. If votes is not enough then return. diff --git a/consensus/ising/voting/block.go b/consensus/ising/voting/block.go index 3f19cc0b0..696251ba4 100644 --- a/consensus/ising/voting/block.go +++ b/consensus/ising/voting/block.go @@ -38,17 +38,7 @@ func NewBlockVoting() *BlockVoting { return blockVoting } -func (bv *BlockVoting) SetSelfState(blockhash Uint256, s State) { - bv.Lock() - defer bv.Unlock() - - if _, ok := bv.pstate[blockhash]; !ok { - bv.pstate[blockhash] = new(State) - } - bv.pstate[blockhash].SetBit(s) -} - -func (bv *BlockVoting) HasSelfState(blockhash Uint256, state State) bool { +func (bv *BlockVoting) CheckOwnState(blockhash Uint256, state State) bool { bv.RLock() defer bv.RUnlock() @@ -62,20 +52,25 @@ func (bv *BlockVoting) HasSelfState(blockhash Uint256, state State) bool { } } -func (bv *BlockVoting) SetNeighborState(id uint64, blockhash Uint256, s State) { +func (bv *BlockVoting) CheckAndSetOwnState(blockhash Uint256, state State) bool { bv.Lock() defer bv.Unlock() - if _, ok := bv.vstate[id]; !ok { - bv.vstate[id] = make(map[Uint256]*State) + v, ok := bv.pstate[blockhash] + if !ok || v == nil { + bv.pstate[blockhash] = new(State) + bv.pstate[blockhash].SetBit(state) + return false } - if _, ok := bv.vstate[id][blockhash]; !ok { - bv.vstate[id][blockhash] = new(State) + if !v.HasBit(state) { + bv.pstate[blockhash].SetBit(state) + return false } - bv.vstate[id][blockhash].SetBit(s) + + return true } -func (bv *BlockVoting) HasNeighborState(id uint64, blockhash Uint256, state State) bool { +func (bv *BlockVoting) CheckNeighborState(id uint64, blockhash Uint256, state State) bool { bv.RLock() defer bv.RUnlock() @@ -93,6 +88,24 @@ func (bv *BlockVoting) HasNeighborState(id uint64, blockhash Uint256, state Stat } } +func (bv *BlockVoting) CheckAndSetNeighborState(id uint64, blockhash Uint256, s State) bool { + bv.Lock() + defer bv.Unlock() + + exist := true + if _, ok := bv.vstate[id]; !ok { + bv.vstate[id] = make(map[Uint256]*State) + exist = false + } + if _, ok := bv.vstate[id][blockhash]; !ok { + bv.vstate[id][blockhash] = new(State) + exist = false + } + bv.vstate[id][blockhash].SetBit(s) + + return exist +} + func (bv *BlockVoting) SetVotingHeight(height uint32) { bv.height = height } diff --git a/consensus/ising/voting/sigchain.go b/consensus/ising/voting/sigchain.go index 07a8b8fb0..3c7bb175b 100644 --- a/consensus/ising/voting/sigchain.go +++ b/consensus/ising/voting/sigchain.go @@ -41,16 +41,7 @@ func NewSigChainVoting(txnCollector *transaction.TxnCollector) *SigChainVoting { return sigChainVoting } -func (scv *SigChainVoting) SetSelfState(hash Uint256, s State) { - scv.Lock() - defer scv.Unlock() - if _, ok := scv.pstate[hash]; !ok { - scv.pstate[hash] = new(State) - } - scv.pstate[hash].SetBit(s) -} - -func (scv *SigChainVoting) HasSelfState(hash Uint256, state State) bool { +func (scv *SigChainVoting) CheckOwnState(hash Uint256, state State) bool { scv.RLock() defer scv.RUnlock() @@ -64,20 +55,25 @@ func (scv *SigChainVoting) HasSelfState(hash Uint256, state State) bool { } } -func (scv *SigChainVoting) SetNeighborState(id uint64, hash Uint256, s State) { +func (scv *SigChainVoting) CheckAndSetOwnState(hash Uint256, state State) bool { scv.Lock() defer scv.Unlock() - if _, ok := scv.vstate[id]; !ok { - scv.vstate[id] = make(map[Uint256]*State) + v, ok := scv.pstate[hash] + if !ok || v == nil { + scv.pstate[hash] = new(State) + scv.pstate[hash].SetBit(state) + return false } - if _, ok := scv.vstate[id][hash]; !ok { - scv.vstate[id][hash] = new(State) + if !v.HasBit(state) { + scv.pstate[hash].SetBit(state) + return false } - scv.vstate[id][hash].SetBit(s) + + return true } -func (scv *SigChainVoting) HasNeighborState(id uint64, hash Uint256, state State) bool { +func (scv *SigChainVoting) CheckNeighborState(id uint64, hash Uint256, state State) bool { scv.RLock() defer scv.RUnlock() @@ -95,6 +91,24 @@ func (scv *SigChainVoting) HasNeighborState(id uint64, hash Uint256, state State } } +func (scv *SigChainVoting) CheckAndSetNeighborState(id uint64, hash Uint256, s State) bool { + scv.Lock() + defer scv.Unlock() + + exist := true + if _, ok := scv.vstate[id]; !ok { + scv.vstate[id] = make(map[Uint256]*State) + exist = false + } + if _, ok := scv.vstate[id][hash]; !ok { + scv.vstate[id][hash] = new(State) + exist = false + } + scv.vstate[id][hash].SetBit(s) + + return exist +} + func (scv *SigChainVoting) SetVotingHeight(height uint32) { scv.height = height } diff --git a/consensus/ising/voting/voting.go b/consensus/ising/voting/voting.go index 06bbd7e50..82062caa7 100644 --- a/consensus/ising/voting/voting.go +++ b/consensus/ising/voting/voting.go @@ -24,14 +24,14 @@ type Voting interface { SetConfirmingHash(hash Uint256) // get hash in process GetConfirmingHash() Uint256 - // set self state - SetSelfState(hash Uint256, s State) - // check proposer state - HasSelfState(hash Uint256, s State) bool - // set voter state - SetNeighborState(nid uint64, hash Uint256, s State) - // check voter state - HasNeighborState(nid uint64, hash Uint256, s State) bool + // check own state + CheckOwnState(hash Uint256, s State) bool + // check and set own state + CheckAndSetOwnState(hash Uint256, s State) bool + // check neighbor state + CheckNeighborState(nid uint64, hash Uint256, s State) bool + // check and set neighbor state + CheckAndSetNeighborState(nid uint64, hash Uint256, s State) bool // get current voting height GetVotingHeight() uint32 // get best voting content diff --git a/core/ledger/validator.go b/core/ledger/validator.go index 1fe563dbd..274e16f77 100644 --- a/core/ledger/validator.go +++ b/core/ledger/validator.go @@ -136,7 +136,7 @@ func HeaderCheck(header *Header, receiveTime int64) error { } sigchain := &por.SigChain{} proto.Unmarshal(payload.SigChain, sigchain) - miner, err = sigchain.GetMiner() + miner, _, err = sigchain.GetMiner() if err != nil { return err } diff --git a/por/sigchain.go b/por/sigchain.go index 1890680db..19318a962 100644 --- a/por/sigchain.go +++ b/por/sigchain.go @@ -372,14 +372,14 @@ func (sc *SigChain) GetLastPubkey() ([]byte, error) { return e.NextPubkey, nil } -func (sc *SigChain) GetMiner() ([]byte, error) { +func (sc *SigChain) GetMiner() ([]byte, []byte, error) { if !sc.IsFinal() { - return nil, errors.New("not final") + return nil, nil, errors.New("not final") } n := sc.Length() if n < 3 { - return nil, errors.New("not enough elements") + return nil, nil, errors.New("not enough elements") } type SigChainElemInfo struct { @@ -400,7 +400,7 @@ func (sc *SigChain) GetMiner() ([]byte, error) { if elemLen == 0 { err := errors.New("invalid signature chain for block proposer selection") log.Error(err) - return nil, err + return nil, nil, err } newIndex := big.NewInt(0) x := big.NewInt(0) @@ -410,10 +410,10 @@ func (sc *SigChain) GetMiner() ([]byte, error) { originalIndex := minerElems[newIndex.Int64()].index if originalIndex == 0 { - return sc.GetSrcPubkey(), nil + return sc.GetSrcPubkey(), sc.Elems[0].Addr, nil } - return sc.Elems[originalIndex-1].NextPubkey, nil + return sc.Elems[originalIndex-1].NextPubkey, sc.Elems[originalIndex].Addr, nil } func (sc *SigChain) nextSigner() ([]byte, error) {