Skip to content

Commit

Permalink
Check chord ID for block proposer and fix lock
Browse files Browse the repository at this point in the history
Signed-off-by: oscar <[email protected]>
  • Loading branch information
oscar authored and yilunzhang committed Sep 14, 2018
1 parent 33436c1 commit 634edc0
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 73 deletions.
11 changes: 10 additions & 1 deletion consensus/ising/proposercache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ising

import (
"bytes"
"encoding/binary"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -20,6 +22,7 @@ const (

type ProposerInfo struct {
publicKey []byte
chordID uint64
winningHash Uint256
winningHashType ledger.WinningHashType
}
Expand Down Expand Up @@ -55,13 +58,19 @@ func (pc *ProposerCache) Add(height uint32, votingContent voting.VotingContent)
sigchain := &por.SigChain{}
proto.Unmarshal(payload.SigChain, sigchain)
// TODO: get a determinate public key on signature chain
pbk, err := sigchain.GetMiner()
pbk, chordID, err := sigchain.GetMiner()
if err != nil {
log.Warn("Get last public key error", err)
return
}
var id uint64
err = binary.Read(bytes.NewBuffer(chordID[:8]), binary.LittleEndian, &id)
if err != nil {
log.Error(err)
}
proposerInfo = &ProposerInfo{
publicKey: pbk,
chordID: id,
winningHash: t.Hash(),
winningHashType: ledger.WinningTxnHash,
}
Expand Down
37 changes: 15 additions & 22 deletions consensus/ising/proposerservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const (
TxnAmountToBePackaged = 1024
WaitingForFloodingFinished = time.Second * 1
WaitingForFloodingFinished = time.Second * 5
WaitingForVotingFinished = time.Second * 8
TimeoutTolerance = time.Second * 2
)
Expand Down Expand Up @@ -190,16 +190,14 @@ func (ps *ProposerService) SendNewProposal(votingHeight uint32, vType voting.Vot
votingPool.SetMind(votingHeight, ownMindHash)
}
}
if !current.HasSelfState(ownMindHash, voting.ProposalSent) {
if !current.CheckAndSetOwnState(ownMindHash, voting.ProposalSent) {
log.Infof("proposing hash: %s, type: %d", BytesToHexString(ownMindHash.ToArrayReverse()), vType)
// create new proposal
proposalMsg := NewProposal(&ownMindHash, votingHeight, vType)
// get nodes which should receive proposal message
nodes := ps.GetReceiverNode(nil)
// send proposal to neighbors
ps.SendConsensusMsg(proposalMsg, nodes)
// state changed for current hash
current.SetSelfState(ownMindHash, voting.ProposalSent)
// set confirming hash
current.SetConfirmingHash(ownMindHash)
}
Expand Down Expand Up @@ -262,6 +260,9 @@ func (ps *ProposerService) IsBlockProposer() bool {
if !IsEqualBytes(localPublicKey, proposerInfo.publicKey) {
return false
}
if proposerInfo.chordID != 0 && ps.localNode.GetID() != proposerInfo.chordID {
return false
}

return true
}
Expand Down Expand Up @@ -569,12 +570,9 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u
rtime := time.Now().Unix()

// returns if receive duplicate block
if current.HasSelfState(blockHash, voting.FloodingFinished) {
if current.CheckAndSetOwnState(blockHash, voting.FloodingFinished) {
return
}
// set state for flooding block
current.SetSelfState(blockHash, voting.FloodingFinished)

// relay block to neighbors
var nodes []protocol.Noder
for _, node := range ps.localNode.GetNeighborNoder() {
Expand All @@ -599,7 +597,7 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u
return
}
// send vote when the block is verified by local node
if !current.HasSelfState(blockHash, voting.ProposalSent) {
if !current.CheckAndSetOwnState(blockHash, voting.ProposalSent) {
err = ledger.HeaderCheck(block.Header, rtime)
if err != nil {
log.Error("header verification error when voting in sync mode", err)
Expand All @@ -614,7 +612,6 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u
proposalMsg := NewProposal(&blockHash, votingHeight, voting.BlockVote)
nodes := ps.GetReceiverNode(nil)
ps.SendConsensusMsg(proposalMsg, nodes)
current.SetSelfState(blockHash, voting.ProposalSent)
}
return
}
Expand All @@ -627,7 +624,7 @@ func (ps *ProposerService) HandleBlockFloodingMsg(bfMsg *BlockFlooding, sender u
}
err = current.AddToCache(block, rtime)
if err != nil {
log.Error("add received block to local cache error")
log.Error("add received block to local cache error: ", err)
return
}

Expand Down Expand Up @@ -659,17 +656,16 @@ func (ps *ProposerService) HandleRequestMsg(req *Request, sender uint64) {
log.Warn("requested block doesn't match with local block in process")
return
}
if !current.HasSelfState(hash, voting.ProposalSent) {
if !current.CheckOwnState(hash, voting.ProposalSent) {
log.Warn("receive invalid request for hash: ", BytesToHexString(hash.ToArrayReverse()))
return
}
// returns if receive duplicate request
if current.HasNeighborState(sender, hash, voting.RequestReceived) {
if current.CheckAndSetNeighborState(sender, hash, voting.RequestReceived) {
log.Warn("duplicate request received for hash: ", BytesToHexString(hash.ToArrayReverse()))
return
}
// set state for request
current.SetNeighborState(sender, hash, voting.RequestReceived)

content, err := current.GetVotingContent(hash, height)
if err != nil {
return
Expand Down Expand Up @@ -719,12 +715,12 @@ func (ps *ProposerService) HandleResponseMsg(resp *Response, sender uint64) {
return
}
// returns if no request sent before
if !current.HasNeighborState(sender, *hash, voting.RequestSent) {
if !current.CheckNeighborState(sender, *hash, voting.RequestSent) {
log.Warn("consensus state error in Response message handler")
return
}
// returns if receive duplicate response
if current.HasNeighborState(sender, *hash, voting.ProposalReceived) {
if current.CheckAndSetNeighborState(sender, *hash, voting.ProposalReceived) {
log.Warn("duplicate response received for hash: ", BytesToHexString(hash.ToArrayReverse()))
return
}
Expand All @@ -733,7 +729,6 @@ func (ps *ProposerService) HandleResponseMsg(resp *Response, sender uint64) {
if err != nil {
return
}
current.SetNeighborState(sender, *hash, voting.ProposalReceived)
currentVotingPool := current.GetVotingPool()
neighborWeight, _ := ledger.DefaultLedger.Store.GetVotingWeight(Uint160{})
// Get voting result from voting pool. If votes is not enough then return.
Expand Down Expand Up @@ -828,18 +823,16 @@ func (ps *ProposerService) HandleProposalMsg(proposal *Proposal, sender uint64)
nodes := ps.GetReceiverNode([]uint64{sender})
// send request message
ps.SendConsensusMsg(requestMsg, nodes)
current.SetNeighborState(sender, hash, voting.RequestSent)
current.CheckAndSetNeighborState(sender, hash, voting.RequestSent)
log.Warnf("doesn't contain hash in local cache, requesting it from neighbor %s",
BytesToHexString(hash.ToArrayReverse()))
return
}
// returns if receive duplicated proposal
if current.HasNeighborState(sender, hash, voting.ProposalReceived) {
if current.CheckAndSetNeighborState(sender, hash, voting.ProposalReceived) {
log.Warn("duplicate proposal received for hash: ", BytesToHexString(hash.ToArrayReverse()))
return
}
// set state when receive a proposal from a neighbor
current.SetNeighborState(sender, hash, voting.ProposalReceived)
currentVotingPool := current.GetVotingPool()
neighborWeight, _ := ledger.DefaultLedger.Store.GetVotingWeight(Uint160{})
// Get voting result from voting pool. If votes is not enough then return.
Expand Down
49 changes: 31 additions & 18 deletions consensus/ising/voting/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,7 @@ func NewBlockVoting() *BlockVoting {
return blockVoting
}

func (bv *BlockVoting) SetSelfState(blockhash Uint256, s State) {
bv.Lock()
defer bv.Unlock()

if _, ok := bv.pstate[blockhash]; !ok {
bv.pstate[blockhash] = new(State)
}
bv.pstate[blockhash].SetBit(s)
}

func (bv *BlockVoting) HasSelfState(blockhash Uint256, state State) bool {
func (bv *BlockVoting) CheckOwnState(blockhash Uint256, state State) bool {
bv.RLock()
defer bv.RUnlock()

Expand All @@ -62,20 +52,25 @@ func (bv *BlockVoting) HasSelfState(blockhash Uint256, state State) bool {
}
}

func (bv *BlockVoting) SetNeighborState(id uint64, blockhash Uint256, s State) {
func (bv *BlockVoting) CheckAndSetOwnState(blockhash Uint256, state State) bool {
bv.Lock()
defer bv.Unlock()

if _, ok := bv.vstate[id]; !ok {
bv.vstate[id] = make(map[Uint256]*State)
v, ok := bv.pstate[blockhash]
if !ok || v == nil {
bv.pstate[blockhash] = new(State)
bv.pstate[blockhash].SetBit(state)
return false
}
if _, ok := bv.vstate[id][blockhash]; !ok {
bv.vstate[id][blockhash] = new(State)
if !v.HasBit(state) {
bv.pstate[blockhash].SetBit(state)
return false
}
bv.vstate[id][blockhash].SetBit(s)

return true
}

func (bv *BlockVoting) HasNeighborState(id uint64, blockhash Uint256, state State) bool {
func (bv *BlockVoting) CheckNeighborState(id uint64, blockhash Uint256, state State) bool {
bv.RLock()
defer bv.RUnlock()

Expand All @@ -93,6 +88,24 @@ func (bv *BlockVoting) HasNeighborState(id uint64, blockhash Uint256, state Stat
}
}

func (bv *BlockVoting) CheckAndSetNeighborState(id uint64, blockhash Uint256, s State) bool {
bv.Lock()
defer bv.Unlock()

exist := true
if _, ok := bv.vstate[id]; !ok {
bv.vstate[id] = make(map[Uint256]*State)
exist = false
}
if _, ok := bv.vstate[id][blockhash]; !ok {
bv.vstate[id][blockhash] = new(State)
exist = false
}
bv.vstate[id][blockhash].SetBit(s)

return exist
}

func (bv *BlockVoting) SetVotingHeight(height uint32) {
bv.height = height
}
Expand Down
48 changes: 31 additions & 17 deletions consensus/ising/voting/sigchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,7 @@ func NewSigChainVoting(txnCollector *transaction.TxnCollector) *SigChainVoting {
return sigChainVoting
}

func (scv *SigChainVoting) SetSelfState(hash Uint256, s State) {
scv.Lock()
defer scv.Unlock()
if _, ok := scv.pstate[hash]; !ok {
scv.pstate[hash] = new(State)
}
scv.pstate[hash].SetBit(s)
}

func (scv *SigChainVoting) HasSelfState(hash Uint256, state State) bool {
func (scv *SigChainVoting) CheckOwnState(hash Uint256, state State) bool {
scv.RLock()
defer scv.RUnlock()

Expand All @@ -64,20 +55,25 @@ func (scv *SigChainVoting) HasSelfState(hash Uint256, state State) bool {
}
}

func (scv *SigChainVoting) SetNeighborState(id uint64, hash Uint256, s State) {
func (scv *SigChainVoting) CheckAndSetOwnState(hash Uint256, state State) bool {
scv.Lock()
defer scv.Unlock()

if _, ok := scv.vstate[id]; !ok {
scv.vstate[id] = make(map[Uint256]*State)
v, ok := scv.pstate[hash]
if !ok || v == nil {
scv.pstate[hash] = new(State)
scv.pstate[hash].SetBit(state)
return false
}
if _, ok := scv.vstate[id][hash]; !ok {
scv.vstate[id][hash] = new(State)
if !v.HasBit(state) {
scv.pstate[hash].SetBit(state)
return false
}
scv.vstate[id][hash].SetBit(s)

return true
}

func (scv *SigChainVoting) HasNeighborState(id uint64, hash Uint256, state State) bool {
func (scv *SigChainVoting) CheckNeighborState(id uint64, hash Uint256, state State) bool {
scv.RLock()
defer scv.RUnlock()

Expand All @@ -95,6 +91,24 @@ func (scv *SigChainVoting) HasNeighborState(id uint64, hash Uint256, state State
}
}

func (scv *SigChainVoting) CheckAndSetNeighborState(id uint64, hash Uint256, s State) bool {
scv.Lock()
defer scv.Unlock()

exist := true
if _, ok := scv.vstate[id]; !ok {
scv.vstate[id] = make(map[Uint256]*State)
exist = false
}
if _, ok := scv.vstate[id][hash]; !ok {
scv.vstate[id][hash] = new(State)
exist = false
}
scv.vstate[id][hash].SetBit(s)

return exist
}

func (scv *SigChainVoting) SetVotingHeight(height uint32) {
scv.height = height
}
Expand Down
16 changes: 8 additions & 8 deletions consensus/ising/voting/voting.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ type Voting interface {
SetConfirmingHash(hash Uint256)
// get hash in process
GetConfirmingHash() Uint256
// set self state
SetSelfState(hash Uint256, s State)
// check proposer state
HasSelfState(hash Uint256, s State) bool
// set voter state
SetNeighborState(nid uint64, hash Uint256, s State)
// check voter state
HasNeighborState(nid uint64, hash Uint256, s State) bool
// check own state
CheckOwnState(hash Uint256, s State) bool
// check and set own state
CheckAndSetOwnState(hash Uint256, s State) bool
// check neighbor state
CheckNeighborState(nid uint64, hash Uint256, s State) bool
// check and set neighbor state
CheckAndSetNeighborState(nid uint64, hash Uint256, s State) bool
// get current voting height
GetVotingHeight() uint32
// get best voting content
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func HeaderCheck(header *Header, receiveTime int64) error {
}
sigchain := &por.SigChain{}
proto.Unmarshal(payload.SigChain, sigchain)
miner, err = sigchain.GetMiner()
miner, _, err = sigchain.GetMiner()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 634edc0

Please sign in to comment.