diff --git a/core/store/ChainStore/ChainStore.go b/core/store/ChainStore/ChainStore.go index 7fc00587..ee0ad672 100644 --- a/core/store/ChainStore/ChainStore.go +++ b/core/store/ChainStore/ChainStore.go @@ -1099,8 +1099,9 @@ func (self *ChainStore) SaveBlock(b *Block, ledger *Ledger) error { } if b.Blockdata.Height > headerHeight { - return errors.New(fmt.Sprintf("Info: [SaveBlock] block height - headerIndex.count >= 1, block height:%d, headerIndex.count:%d", - b.Blockdata.Height, headerHeight)) + log.Infof("Info: [SaveBlock] block height - headerIndex.count >= 1, block height:%d, headerIndex.count:%d", + b.Blockdata.Height, headerHeight) + return nil } if b.Blockdata.Height == headerHeight { diff --git a/net/message/block.go b/net/message/block.go index 2c35ab8e..9ab1a037 100644 --- a/net/message/block.go +++ b/net/message/block.go @@ -27,6 +27,7 @@ type block struct { func (msg block) Handle(node Noder) error { log.Debug("RX block message") hash := msg.blk.Hash() + isSync := false if ledger.DefaultLedger.BlockInLedger(hash) { ReceiveDuplicateBlockCnt++ log.Debug("Receive ", ReceiveDuplicateBlockCnt, " duplicated block.") @@ -36,7 +37,17 @@ func (msg block) Handle(node Noder) error { log.Warn("Block add failed: ", err, " ,block hash is ", hash) return err } - node.RemoveFlightHeight(msg.blk.Blockdata.Height) + for _, n := range node.LocalNode().GetNeighborNoder() { + if n.ExistFlightHeight(msg.blk.Blockdata.Height) { + //sync block + n.RemoveFlightHeight(msg.blk.Blockdata.Height) + isSync = true + } + } + if !isSync { + //haven`t require this block ,relay hash + node.LocalNode().Relay(node, hash) + } node.LocalNode().GetEvent("block").Notify(events.EventNewInventory, &msg.blk) return nil } diff --git a/net/message/consensus.go b/net/message/consensus.go index e1f24bfa..5e67b776 100644 --- a/net/message/consensus.go +++ b/net/message/consensus.go @@ -40,7 +40,10 @@ type consensus struct { } func (cp *ConsensusPayload) Hash() common.Uint256 { - return common.Uint256{} + d := sig.GetHashData(cp) + temp := sha256.Sum256([]byte(d)) + cp.hash = common.Uint256(sha256.Sum256(temp[:])) + return cp.hash } func (cp *ConsensusPayload) Verify() error { @@ -106,7 +109,12 @@ func (cp *ConsensusPayload) GetMessage() []byte { func (msg consensus) Handle(node Noder) error { log.Debug() - node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, &msg.cons) + cp := &msg.cons + if !node.LocalNode().ExistedID(cp.Hash()) { + node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, cp) + node.LocalNode().Relay(node, cp) + log.Info("Relay consensus message") + } return nil } diff --git a/net/message/inventory.go b/net/message/inventory.go index 1b5c1f31..2e9d0672 100644 --- a/net/message/inventory.go +++ b/net/message/inventory.go @@ -14,8 +14,6 @@ import ( "io" ) -var LastInvHash Uint256 - type blocksReq struct { msgHdr p struct { @@ -136,12 +134,14 @@ func (msg Inv) Handle(node Noder) error { id.Deserialize(bytes.NewReader(msg.P.Blk[HASHLEN*i:])) // TODO check the ID queue if !ledger.DefaultLedger.Store.BlockInCache(id) && - !ledger.DefaultLedger.BlockInLedger(id) && - LastInvHash != id { - LastInvHash = id - // send the block request - log.Infof("inv request block hash: %x", id) - ReqBlkData(node, id) + !ledger.DefaultLedger.BlockInLedger(id) { + node.CacheHash(id) //cached hash would not relayed + if !node.LocalNode().ExistedID(id) { + // send the block request + log.Infof("inv request block hash: %x", id) + ReqBlkData(node, id) + } + } } diff --git a/net/message/transaction.go b/net/message/transaction.go index 5190222b..3f50b77b 100644 --- a/net/message/transaction.go +++ b/net/message/transaction.go @@ -36,6 +36,8 @@ func (msg trn) Handle(node Noder) error { if errCode := node.LocalNode().AppendTxnPool(&(msg.txn)); errCode != ErrNoError { return errors.New("[message] VerifyTransaction failed when AppendTxnPool.") } + node.LocalNode().Relay(node, tx) + log.Info("Relay transaction") node.LocalNode().IncRxTxnCnt() log.Debug("RX Transaction message hash", msg.txn.Hash()) log.Debug("RX Transaction message type", msg.txn.TxType) diff --git a/net/node/idCache.go b/net/node/idCache.go index b4d321fc..8ba1d8ff 100644 --- a/net/node/idCache.go +++ b/net/node/idCache.go @@ -2,24 +2,45 @@ package node import ( "DNA/common" + "DNA/net/protocol" "sync" ) type idCache struct { sync.RWMutex - list map[common.Uint256]bool + lastid common.Uint256 + index int + idarray []common.Uint256 + idmaplsit map[common.Uint256]int } func (c *idCache) init() { + c.index = 0 + c.idmaplsit = make(map[common.Uint256]int, protocol.MAXIDCACHED) + c.idarray = make([]common.Uint256, protocol.MAXIDCACHED) } -func (c *idCache) add() { -} +func (c *idCache) add(id common.Uint256) { + oldid := c.idarray[c.index] + delete(c.idmaplsit, oldid) + c.idarray[c.index] = id + c.idmaplsit[id] = c.index + c.index++ + c.lastid = id + c.index = c.index % protocol.MAXIDCACHED -func (c *idCache) del() { } func (c *idCache) ExistedID(id common.Uint256) bool { - // TODO + c.Lock() + defer c.Unlock() + if id == c.lastid { + return true + } + if _, ok := c.idmaplsit[id]; ok { + return true + } else { + c.add(id) + } return false } diff --git a/net/node/node.go b/net/node/node.go index afc18daf..843a5354 100644 --- a/net/node/node.go +++ b/net/node/node.go @@ -26,15 +26,15 @@ import ( type node struct { //sync.RWMutex //The Lock not be used as expected to use function channel instead of lock - state uint32 // node state - id uint64 // The nodes's id + state uint32 // node state + id uint64 // The nodes's id cap [32]byte // The node capability set - version uint32 // The network protocol the node used - services uint64 // The services the node supplied - relay bool // The relay capability of the node (merge into capbility flag) - height uint64 // The node latest block height - txnCnt uint64 // The transactions be transmit by this node - rxTxnCnt uint64 // The transaction received by this node + version uint32 // The network protocol the node used + services uint64 // The services the node supplied + relay bool // The relay capability of the node (merge into capbility flag) + height uint64 // The node latest block height + txnCnt uint64 // The transactions be transmit by this node + rxTxnCnt uint64 // The transaction received by this node publicKey *crypto.PubKey // TODO does this channel should be a buffer channel chF chan func() error // Channel used to operate the node without lock @@ -48,9 +48,12 @@ type node struct { * |--|--|--|--|--|--|isSyncFailed|isSyncHeaders| */ flightHeights []uint32 + cachelock sync.RWMutex + flightlock sync.RWMutex lastContact time.Time nodeDisconnectSubscriber events.Subscriber tryTimes uint32 + cachedHashes []Uint256 ConnectingNodes RetryConnAddrs } @@ -173,6 +176,8 @@ func InitNode(pubKey *crypto.PubKey) Noder { n.publicKey = pubKey n.TXNPool.init() n.eventQueue.init() + n.idCache.init() + n.cachedHashes = make([]Uint256, 0) n.nodeDisconnectSubscriber = n.eventQueue.GetEvent("disconnect").Subscribe(events.EventNodeDisconnect, n.NodeDisconnect) go n.initConnection() go n.updateConnection() @@ -216,7 +221,7 @@ func (node *node) GetPort() uint16 { return node.port } -func (node *node) GetHttpInfoPort() (int) { +func (node *node) GetHttpInfoPort() int { return int(node.httpInfoPort) } @@ -224,7 +229,7 @@ func (node *node) SetHttpInfoPort(nodeInfoPort uint16) { node.httpInfoPort = nodeInfoPort } -func (node *node) GetHttpInfoState() bool{ +func (node *node) GetHttpInfoState() bool { if node.cap[HTTPINFOFLAG] == 0x01 { return true } else { @@ -232,8 +237,8 @@ func (node *node) GetHttpInfoState() bool{ } } -func (node *node) SetHttpInfoState(nodeInfo bool){ - if nodeInfo{ +func (node *node) SetHttpInfoState(nodeInfo bool) { + if nodeInfo { node.cap[HTTPINFOFLAG] = 0x01 } else { node.cap[HTTPINFOFLAG] = 0x00 @@ -268,7 +273,7 @@ func (node *node) SetState(state uint32) { atomic.StoreUint32(&(node.state), state) } -func (node *node) GetPubKey() *crypto.PubKey{ +func (node *node) GetPubKey() *crypto.PubKey { return node.publicKey } @@ -423,6 +428,8 @@ func (node *node) WaitForFourPeersStart() { } func (node *node) StoreFlightHeight(height uint32) { + node.flightlock.Lock() + defer node.flightlock.Unlock() node.flightHeights = append(node.flightHeights, height) } @@ -434,6 +441,8 @@ func (node *node) GetFlightHeights() []uint32 { } func (node *node) RemoveFlightHeightLessThan(h uint32) { + node.flightlock.Lock() + defer node.flightlock.Unlock() heights := node.flightHeights p := len(heights) i := 0 @@ -450,6 +459,8 @@ func (node *node) RemoveFlightHeightLessThan(h uint32) { } func (node *node) RemoveFlightHeight(height uint32) { + node.flightlock.Lock() + defer node.flightlock.Unlock() log.Debug("height is ", height) for _, h := range node.flightHeights { log.Debug("flight height ", h) @@ -490,3 +501,89 @@ func (node *node) RemoveFromRetryList(addr string) { } } + +func (node *node) Relay(frmnode Noder, message interface{}) error { + log.Debug() + var buffer []byte + var err error + isHash := false + switch message.(type) { + case *transaction.Transaction: + log.Debug("TX transaction message") + txn := message.(*transaction.Transaction) + buffer, err = NewTxn(txn) + if err != nil { + log.Error("Error New Tx message: ", err) + return err + } + node.txnCnt++ + case *ConsensusPayload: + log.Debug("TX consensus message") + consensusPayload := message.(*ConsensusPayload) + buffer, err = NewConsensus(consensusPayload) + if err != nil { + log.Error("Error New consensus message: ", err) + return err + } + case Uint256: + log.Debug("TX block hash message") + hash := message.(Uint256) + isHash = true + buf := bytes.NewBuffer([]byte{}) + hash.Serialize(buf) + // construct inv message + invPayload := NewInvPayload(BLOCK, 1, buf.Bytes()) + buffer, err = NewInv(invPayload) + if err != nil { + log.Error("Error New inv message") + return err + } + default: + log.Warn("Unknown Relay message type") + return errors.New("Unknown Relay message type") + } + + node.nbrNodes.RLock() + for _, n := range node.nbrNodes.List { + if n.state == ESTABLISH && n.relay == true && + n.id != frmnode.GetID() { + if isHash && n.ExistHash(message.(Uint256)) { + continue + } + n.Tx(buffer) + } + } + node.nbrNodes.RUnlock() + return nil +} + +func (node *node) CacheHash(hash Uint256) { + node.cachelock.Lock() + defer node.cachelock.Unlock() + node.cachedHashes = append(node.cachedHashes, hash) + if len(node.cachedHashes) > MAXCACHEHASH { + node.cachedHashes = append(node.cachedHashes[:0], node.cachedHashes[1:]...) + } +} + +func (node *node) ExistHash(hash Uint256) bool { + node.cachelock.Lock() + defer node.cachelock.Unlock() + for _, v := range node.cachedHashes { + if v == hash { + return true + } + } + return false +} + +func (node *node) ExistFlightHeight(height uint32) bool { + node.flightlock.Lock() + defer node.flightlock.Unlock() + for _, v := range node.flightHeights { + if v == height { + return true + } + } + return false +} diff --git a/net/protocol/protocol.go b/net/protocol/protocol.go index 9aacb483..b244e527 100644 --- a/net/protocol/protocol.go +++ b/net/protocol/protocol.go @@ -43,6 +43,7 @@ const ( DIVHASHLEN = 5 MINCONNCNT = 3 MAXREQBLKONCE = 16 + MAXCACHEHASH = 16 ) const ( @@ -58,6 +59,7 @@ const ( CONNMONITOR = 6 CONNMAXBACK = 4000 MAXRETRYCOUNT = 3 + MAXIDCACHED = 5000 ) // The node state @@ -137,6 +139,10 @@ type Noder interface { RemoveAddrInConnectingList(addr string) AddInRetryList(addr string) RemoveFromRetryList(addr string) + Relay(Noder, interface{}) error + ExistHash(hash common.Uint256) bool + CacheHash(hash common.Uint256) + ExistFlightHeight(height uint32) bool } func (msg *NodeAddr) Deserialization(p []byte) error {