Skip to content

Commit

Permalink
Relay the TX, Consensus and block hash
Browse files Browse the repository at this point in the history
MSG should be relayed since not all node are connected with each other.

Signed-off-by: Xiang Fu <[email protected]>
  • Loading branch information
Xiang Fu committed Sep 22, 2017
1 parent 1cfb6cd commit 792eb0e
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 31 deletions.
5 changes: 3 additions & 2 deletions core/store/ChainStore/ChainStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,9 @@ func (self *ChainStore) SaveBlock(b *Block, ledger *Ledger) error {
}

if b.Blockdata.Height > headerHeight {
return errors.New(fmt.Sprintf("Info: [SaveBlock] block height - headerIndex.count >= 1, block height:%d, headerIndex.count:%d",
b.Blockdata.Height, headerHeight))
log.Infof("Info: [SaveBlock] block height - headerIndex.count >= 1, block height:%d, headerIndex.count:%d",
b.Blockdata.Height, headerHeight)
return nil
}

if b.Blockdata.Height == headerHeight {
Expand Down
13 changes: 12 additions & 1 deletion net/message/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type block struct {
func (msg block) Handle(node Noder) error {
log.Debug("RX block message")
hash := msg.blk.Hash()
isSync := false
if ledger.DefaultLedger.BlockInLedger(hash) {
ReceiveDuplicateBlockCnt++
log.Debug("Receive ", ReceiveDuplicateBlockCnt, " duplicated block.")
Expand All @@ -36,7 +37,17 @@ func (msg block) Handle(node Noder) error {
log.Warn("Block add failed: ", err, " ,block hash is ", hash)
return err
}
node.RemoveFlightHeight(msg.blk.Blockdata.Height)
for _, n := range node.LocalNode().GetNeighborNoder() {
if n.ExistFlightHeight(msg.blk.Blockdata.Height) {
//sync block
n.RemoveFlightHeight(msg.blk.Blockdata.Height)
isSync = true
}
}
if !isSync {
//haven`t require this block ,relay hash
node.LocalNode().Relay(node, hash)
}
node.LocalNode().GetEvent("block").Notify(events.EventNewInventory, &msg.blk)
return nil
}
Expand Down
12 changes: 10 additions & 2 deletions net/message/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type consensus struct {
}

func (cp *ConsensusPayload) Hash() common.Uint256 {
return common.Uint256{}
d := sig.GetHashData(cp)
temp := sha256.Sum256([]byte(d))
cp.hash = common.Uint256(sha256.Sum256(temp[:]))
return cp.hash
}

func (cp *ConsensusPayload) Verify() error {
Expand Down Expand Up @@ -106,7 +109,12 @@ func (cp *ConsensusPayload) GetMessage() []byte {

func (msg consensus) Handle(node Noder) error {
log.Debug()
node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, &msg.cons)
cp := &msg.cons
if !node.LocalNode().ExistedID(cp.Hash()) {
node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, cp)
node.LocalNode().Relay(node, cp)
log.Info("Relay consensus message")
}
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions net/message/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"io"
)

var LastInvHash Uint256

type blocksReq struct {
msgHdr
p struct {
Expand Down Expand Up @@ -136,12 +134,14 @@ func (msg Inv) Handle(node Noder) error {
id.Deserialize(bytes.NewReader(msg.P.Blk[HASHLEN*i:]))
// TODO check the ID queue
if !ledger.DefaultLedger.Store.BlockInCache(id) &&
!ledger.DefaultLedger.BlockInLedger(id) &&
LastInvHash != id {
LastInvHash = id
// send the block request
log.Infof("inv request block hash: %x", id)
ReqBlkData(node, id)
!ledger.DefaultLedger.BlockInLedger(id) {
node.CacheHash(id) //cached hash would not relayed
if !node.LocalNode().ExistedID(id) {
// send the block request
log.Infof("inv request block hash: %x", id)
ReqBlkData(node, id)
}

}

}
Expand Down
2 changes: 2 additions & 0 deletions net/message/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (msg trn) Handle(node Noder) error {
if errCode := node.LocalNode().AppendTxnPool(&(msg.txn)); errCode != ErrNoError {
return errors.New("[message] VerifyTransaction failed when AppendTxnPool.")
}
node.LocalNode().Relay(node, tx)
log.Info("Relay transaction")
node.LocalNode().IncRxTxnCnt()
log.Debug("RX Transaction message hash", msg.txn.Hash())
log.Debug("RX Transaction message type", msg.txn.TxType)
Expand Down
31 changes: 26 additions & 5 deletions net/node/idCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,45 @@ package node

import (
"DNA/common"
"DNA/net/protocol"
"sync"
)

type idCache struct {
sync.RWMutex
list map[common.Uint256]bool
lastid common.Uint256
index int
idarray []common.Uint256
idmaplsit map[common.Uint256]int
}

func (c *idCache) init() {
c.index = 0
c.idmaplsit = make(map[common.Uint256]int, protocol.MAXIDCACHED)
c.idarray = make([]common.Uint256, protocol.MAXIDCACHED)
}

func (c *idCache) add() {
}
func (c *idCache) add(id common.Uint256) {
oldid := c.idarray[c.index]
delete(c.idmaplsit, oldid)
c.idarray[c.index] = id
c.idmaplsit[id] = c.index
c.index++
c.lastid = id
c.index = c.index % protocol.MAXIDCACHED

func (c *idCache) del() {
}

func (c *idCache) ExistedID(id common.Uint256) bool {
// TODO
c.Lock()
defer c.Unlock()
if id == c.lastid {
return true
}
if _, ok := c.idmaplsit[id]; ok {
return true
} else {
c.add(id)
}
return false
}
123 changes: 110 additions & 13 deletions net/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (

type node struct {
//sync.RWMutex //The Lock not be used as expected to use function channel instead of lock
state uint32 // node state
id uint64 // The nodes's id
state uint32 // node state
id uint64 // The nodes's id
cap [32]byte // The node capability set
version uint32 // The network protocol the node used
services uint64 // The services the node supplied
relay bool // The relay capability of the node (merge into capbility flag)
height uint64 // The node latest block height
txnCnt uint64 // The transactions be transmit by this node
rxTxnCnt uint64 // The transaction received by this node
version uint32 // The network protocol the node used
services uint64 // The services the node supplied
relay bool // The relay capability of the node (merge into capbility flag)
height uint64 // The node latest block height
txnCnt uint64 // The transactions be transmit by this node
rxTxnCnt uint64 // The transaction received by this node
publicKey *crypto.PubKey
// TODO does this channel should be a buffer channel
chF chan func() error // Channel used to operate the node without lock
Expand All @@ -48,9 +48,12 @@ type node struct {
* |--|--|--|--|--|--|isSyncFailed|isSyncHeaders|
*/
flightHeights []uint32
cachelock sync.RWMutex
flightlock sync.RWMutex
lastContact time.Time
nodeDisconnectSubscriber events.Subscriber
tryTimes uint32
cachedHashes []Uint256
ConnectingNodes
RetryConnAddrs
}
Expand Down Expand Up @@ -173,6 +176,8 @@ func InitNode(pubKey *crypto.PubKey) Noder {
n.publicKey = pubKey
n.TXNPool.init()
n.eventQueue.init()
n.idCache.init()
n.cachedHashes = make([]Uint256, 0)
n.nodeDisconnectSubscriber = n.eventQueue.GetEvent("disconnect").Subscribe(events.EventNodeDisconnect, n.NodeDisconnect)
go n.initConnection()
go n.updateConnection()
Expand Down Expand Up @@ -216,24 +221,24 @@ func (node *node) GetPort() uint16 {
return node.port
}

func (node *node) GetHttpInfoPort() (int) {
func (node *node) GetHttpInfoPort() int {
return int(node.httpInfoPort)
}

func (node *node) SetHttpInfoPort(nodeInfoPort uint16) {
node.httpInfoPort = nodeInfoPort
}

func (node *node) GetHttpInfoState() bool{
func (node *node) GetHttpInfoState() bool {
if node.cap[HTTPINFOFLAG] == 0x01 {
return true
} else {
return false
}
}

func (node *node) SetHttpInfoState(nodeInfo bool){
if nodeInfo{
func (node *node) SetHttpInfoState(nodeInfo bool) {
if nodeInfo {
node.cap[HTTPINFOFLAG] = 0x01
} else {
node.cap[HTTPINFOFLAG] = 0x00
Expand Down Expand Up @@ -268,7 +273,7 @@ func (node *node) SetState(state uint32) {
atomic.StoreUint32(&(node.state), state)
}

func (node *node) GetPubKey() *crypto.PubKey{
func (node *node) GetPubKey() *crypto.PubKey {
return node.publicKey
}

Expand Down Expand Up @@ -423,6 +428,8 @@ func (node *node) WaitForFourPeersStart() {
}

func (node *node) StoreFlightHeight(height uint32) {
node.flightlock.Lock()
defer node.flightlock.Unlock()
node.flightHeights = append(node.flightHeights, height)
}

Expand All @@ -434,6 +441,8 @@ func (node *node) GetFlightHeights() []uint32 {
}

func (node *node) RemoveFlightHeightLessThan(h uint32) {
node.flightlock.Lock()
defer node.flightlock.Unlock()
heights := node.flightHeights
p := len(heights)
i := 0
Expand All @@ -450,6 +459,8 @@ func (node *node) RemoveFlightHeightLessThan(h uint32) {
}

func (node *node) RemoveFlightHeight(height uint32) {
node.flightlock.Lock()
defer node.flightlock.Unlock()
log.Debug("height is ", height)
for _, h := range node.flightHeights {
log.Debug("flight height ", h)
Expand Down Expand Up @@ -490,3 +501,89 @@ func (node *node) RemoveFromRetryList(addr string) {
}

}

func (node *node) Relay(frmnode Noder, message interface{}) error {
log.Debug()
var buffer []byte
var err error
isHash := false
switch message.(type) {
case *transaction.Transaction:
log.Debug("TX transaction message")
txn := message.(*transaction.Transaction)
buffer, err = NewTxn(txn)
if err != nil {
log.Error("Error New Tx message: ", err)
return err
}
node.txnCnt++
case *ConsensusPayload:
log.Debug("TX consensus message")
consensusPayload := message.(*ConsensusPayload)
buffer, err = NewConsensus(consensusPayload)
if err != nil {
log.Error("Error New consensus message: ", err)
return err
}
case Uint256:
log.Debug("TX block hash message")
hash := message.(Uint256)
isHash = true
buf := bytes.NewBuffer([]byte{})
hash.Serialize(buf)
// construct inv message
invPayload := NewInvPayload(BLOCK, 1, buf.Bytes())
buffer, err = NewInv(invPayload)
if err != nil {
log.Error("Error New inv message")
return err
}
default:
log.Warn("Unknown Relay message type")
return errors.New("Unknown Relay message type")
}

node.nbrNodes.RLock()
for _, n := range node.nbrNodes.List {
if n.state == ESTABLISH && n.relay == true &&
n.id != frmnode.GetID() {
if isHash && n.ExistHash(message.(Uint256)) {
continue
}
n.Tx(buffer)
}
}
node.nbrNodes.RUnlock()
return nil
}

func (node *node) CacheHash(hash Uint256) {
node.cachelock.Lock()
defer node.cachelock.Unlock()
node.cachedHashes = append(node.cachedHashes, hash)
if len(node.cachedHashes) > MAXCACHEHASH {
node.cachedHashes = append(node.cachedHashes[:0], node.cachedHashes[1:]...)
}
}

func (node *node) ExistHash(hash Uint256) bool {
node.cachelock.Lock()
defer node.cachelock.Unlock()
for _, v := range node.cachedHashes {
if v == hash {
return true
}
}
return false
}

func (node *node) ExistFlightHeight(height uint32) bool {
node.flightlock.Lock()
defer node.flightlock.Unlock()
for _, v := range node.flightHeights {
if v == height {
return true
}
}
return false
}
6 changes: 6 additions & 0 deletions net/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
DIVHASHLEN = 5
MINCONNCNT = 3
MAXREQBLKONCE = 16
MAXCACHEHASH = 16
)

const (
Expand All @@ -58,6 +59,7 @@ const (
CONNMONITOR = 6
CONNMAXBACK = 4000
MAXRETRYCOUNT = 3
MAXIDCACHED = 5000
)

// The node state
Expand Down Expand Up @@ -137,6 +139,10 @@ type Noder interface {
RemoveAddrInConnectingList(addr string)
AddInRetryList(addr string)
RemoveFromRetryList(addr string)
Relay(Noder, interface{}) error
ExistHash(hash common.Uint256) bool
CacheHash(hash common.Uint256)
ExistFlightHeight(height uint32) bool
}

func (msg *NodeAddr) Deserialization(p []byte) error {
Expand Down

0 comments on commit 792eb0e

Please sign in to comment.