From 290dc3fc70fb6a1a667e10ca5147f2c839d0309a Mon Sep 17 00:00:00 2001 From: tanyuan <1067598718@qq.com> Date: Sun, 1 Jul 2018 03:19:18 +0800 Subject: [PATCH] optimize network (#496) --- common/config/config.go | 6 +- common/constants/constants.go | 2 +- p2pserver/common/p2p_common.go | 12 +- p2pserver/message/utils/msg_router.go | 22 +--- p2pserver/net/netserver/netserver.go | 158 +++++++++++--------------- p2pserver/net/protocol/server.go | 1 - p2pserver/p2pserver.go | 1 - 7 files changed, 77 insertions(+), 125 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index 4570bf9a8f..95f0c865bb 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -63,9 +63,9 @@ const ( DEFAULT_CLI_RPC_PORT = uint(20000) DEFAULT_GAS_LIMIT = 20000 DEFAULT_GAS_PRICE = 0 - DEFAULT_NO_HANDSHAKE_TIMEOUT = 6 - DEFAULT_DATA_DIR = "./Chain" - DEFAULT_RESERVED_FILE = "./peers.rsv" + + DEFAULT_DATA_DIR = "./Chain" + DEFAULT_RESERVED_FILE = "./peers.rsv" ) const ( diff --git a/common/constants/constants.go b/common/constants/constants.go index c550d88183..7287290608 100644 --- a/common/constants/constants.go +++ b/common/constants/constants.go @@ -25,7 +25,7 @@ import ( // genesis constants var ( //TODO: modify this when on mainnet - GENESIS_BLOCK_TIMESTAMP = uint32(time.Date(2018, time.June, 12, 0, 0, 0, 0, time.UTC).Unix()) + GENESIS_BLOCK_TIMESTAMP = uint32(time.Date(2018, time.June, 30, 0, 0, 0, 0, time.UTC).Unix()) ) // ont constants diff --git a/p2pserver/common/p2p_common.go b/p2pserver/common/p2p_common.go index 3b003f7b1f..cb2440e987 100644 --- a/p2pserver/common/p2p_common.go +++ b/p2pserver/common/p2p_common.go @@ -20,7 +20,6 @@ package common import ( "errors" - "strconv" "strings" "github.com/ontio/ontology/core/types" @@ -149,7 +148,7 @@ type AppendBlock struct { func ParseIPAddr(s string) (string, error) { i := strings.Index(s, ":") if i < 0 { - return "", errors.New("split ip address error") + return s, errors.New("split ip address error") } return s[:i], nil } @@ -158,14 +157,7 @@ func ParseIPAddr(s string) (string, error) { func ParseIPPort(s string) (string, error) { i := strings.Index(s, ":") if i < 0 { - return "", errors.New("split ip port error") - } - port, err := strconv.Atoi(s[i+1:]) - if err != nil { - return "", errors.New("parse port error") - } - if port <= 0 || port >= 65535 { - return "", errors.New("port out of bound") + return s, errors.New("split ip port error") } return s[i:], nil } diff --git a/p2pserver/message/utils/msg_router.go b/p2pserver/message/utils/msg_router.go index 1d6149a484..4455328281 100644 --- a/p2pserver/message/utils/msg_router.go +++ b/p2pserver/message/utils/msg_router.go @@ -107,28 +107,10 @@ func (this *MessageRouter) hookChan(channel chan *types.MsgPayload, case data, ok := <-channel: if ok { msgType := data.Payload.CmdType() + handler, ok := this.msgHandlers[msgType] if ok { - if msgType == msgCommon.VERSION_TYPE || msgType == - msgCommon.VERACK_TYPE || msgType == msgCommon.DISCONNECT_TYPE { - handler(data, this.p2p, this.pid) - } else { - remotePeer := this.p2p.GetPeerFromAddr(data.Addr) - if remotePeer != nil && remotePeer.GetSyncState() == msgCommon.ESTABLISH { - go handler(data, this.p2p, this.pid) - } else { - this.p2p.RemoveFromInConnRecord(data.Addr) - this.p2p.RemoveFromOutConnRecord(data.Addr) - this.p2p.RemoveFromConnectingList(data.Addr) - this.p2p.RemovePeerSyncAddress(data.Addr) - this.p2p.RemovePeerConsAddress(data.Addr) - if remotePeer != nil { - remotePeer.CloseSync() - remotePeer.CloseCons() - log.Warnf("receive unrecognize (%s) peer`s msg (%s), close it", data.Addr, msgType) - } - } - } + go handler(data, this.p2p, this.pid) } else { log.Info("unknown message handler for the msg: ", msgType) diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index 15a5152af4..1d7446f70a 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -46,8 +46,7 @@ func NewNetServer() p2p.P2P { n.PeerAddrMap.PeerSyncAddress = make(map[string]*peer.Peer) n.PeerAddrMap.PeerConsAddress = make(map[string]*peer.Peer) - n.InConnRecord.InConnectingAddrs = make(map[string]int64) - n.OutConnRecord.OutConnectingAddrs = make(map[string]int64) + n.init() return n } @@ -63,21 +62,21 @@ type NetServer struct { PeerAddrMap Np *peer.NbrPeers connectLock sync.Mutex - InConnRecord InConnectionRecord - OutConnRecord OutConnectionRecord + inConnRecord InConnectionRecord + outConnRecord OutConnectionRecord OwnAddress string //network`s own address(ip : sync port),which get from version check } //InConnectionRecord include all addr connected type InConnectionRecord struct { sync.RWMutex - InConnectingAddrs map[string]int64 + InConnectingAddrs []string } //OutConnectionRecord include all addr accepted type OutConnectionRecord struct { sync.RWMutex - OutConnectingAddrs map[string]int64 + OutConnectingAddrs []string } //ConnectingNodes include all addr in connecting state @@ -268,7 +267,7 @@ func (this *NetServer) IsPeerEstablished(p *peer.Peer) bool { //Connect used to connect net address under sync or cons mode func (this *NetServer) Connect(addr string, isConsensus bool) error { if this.IsAddrInOutConnRecord(addr) { - log.Debugf("Address: %s Consensus: %v is in OutConnectionRecord,", addr, isConsensus) + log.Error("Addr is in OutConnectionRecord") return nil } if this.IsOwnAddress(addr) { @@ -331,7 +330,7 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error { conn.RemoteAddr().Network())) if !isConsensus { - this.AddOutConnRecord(addr, time.Now().Unix()) + this.AddOutConnRecord(addr) remotePeer = peer.NewPeer() this.AddPeerSyncAddress(addr, remotePeer) remotePeer.SyncLink.SetAddr(addr) @@ -484,7 +483,8 @@ func (this *NetServer) startSyncAccept(listener net.Listener) { remotePeer := peer.NewPeer() addr := conn.RemoteAddr().String() - this.AddInConnRecord(addr, time.Now().Unix()) + this.AddInConnRecord(addr) + this.AddPeerSyncAddress(addr, remotePeer) remotePeer.SyncLink.SetAddr(addr) @@ -654,68 +654,36 @@ func (this *NetServer) GetPeerSyncAddressCount() (count uint) { return uint(len(this.PeerSyncAddress)) } -//CleanNoHandShakeConn close connection connected over DEFAULT_NO_HANDSHAKE_TIMEOUT but not handshake -func (this *NetServer) CleanNoHandShakeConn() { - this.InConnRecord.RLock() - for addr, connTime := range this.InConnRecord.InConnectingAddrs { - if !this.IsNbrPeerAddr(addr, false) && time.Now().Unix()-connTime > config.DEFAULT_NO_HANDSHAKE_TIMEOUT { - peer := this.GetPeerFromAddr(addr) - if peer != nil { - peer.CloseSync() - peer.CloseCons() - } - this.RemoveFromInConnRecord(addr) - this.RemoveFromConnectingList(addr) - this.RemovePeerSyncAddress(addr) - this.RemovePeerConsAddress(addr) - log.Warnf("Connection with (%s) no handshake over (%d)s, disconnect", addr, config.DEFAULT_NO_HANDSHAKE_TIMEOUT) - } - } - this.InConnRecord.RUnlock() - - this.OutConnRecord.RLock() - for addr, connTime := range this.OutConnRecord.OutConnectingAddrs { - if !this.IsNbrPeerAddr(addr, false) && time.Now().Unix()-connTime > config.DEFAULT_NO_HANDSHAKE_TIMEOUT { - peer := this.GetPeerFromAddr(addr) - if peer != nil { - peer.CloseSync() - peer.CloseCons() - } - this.RemoveFromOutConnRecord(addr) - this.RemoveFromConnectingList(addr) - this.RemovePeerSyncAddress(addr) - this.RemovePeerConsAddress(addr) - log.Warnf("Connection with (%s) no handshake over (%d)s, disconnect", addr, config.DEFAULT_NO_HANDSHAKE_TIMEOUT) +//AddInConnRecord add in connection to inConnRecord +func (this *NetServer) AddInConnRecord(addr string) { + this.inConnRecord.Lock() + defer this.inConnRecord.Unlock() + for _, a := range this.inConnRecord.InConnectingAddrs { + if strings.Compare(a, addr) == 0 { + return } } - this.OutConnRecord.RUnlock() -} - -//AddInConnRecord add in connection to InConnRecord -func (this *NetServer) AddInConnRecord(addr string, connTime int64) { - this.InConnRecord.Lock() - defer this.InConnRecord.Unlock() - if _, ok := this.InConnRecord.InConnectingAddrs[addr]; !ok { - this.InConnRecord.InConnectingAddrs[addr] = connTime - } + this.inConnRecord.InConnectingAddrs = append(this.inConnRecord.InConnectingAddrs, addr) } //IsAddrInInConnRecord return result whether addr is in inConnRecordList func (this *NetServer) IsAddrInInConnRecord(addr string) bool { - this.InConnRecord.RLock() - defer this.InConnRecord.RUnlock() - if _, ok := this.InConnRecord.InConnectingAddrs[addr]; ok { - return true + this.inConnRecord.RLock() + defer this.inConnRecord.RUnlock() + for _, a := range this.inConnRecord.InConnectingAddrs { + if strings.Compare(a, addr) == 0 { + return true + } } return false } //IsIPInInConnRecord return result whether the IP is in inConnRecordList func (this *NetServer) IsIPInInConnRecord(ip string) bool { - this.InConnRecord.RLock() - defer this.InConnRecord.RUnlock() + this.inConnRecord.RLock() + defer this.inConnRecord.RUnlock() var ipRecord string - for addr, _ := range this.InConnRecord.InConnectingAddrs { + for _, addr := range this.inConnRecord.InConnectingAddrs { ipRecord, _ = common.ParseIPAddr(addr) if 0 == strings.Compare(ipRecord, ip) { return true @@ -726,27 +694,31 @@ func (this *NetServer) IsIPInInConnRecord(ip string) bool { //RemoveInConnRecord remove in connection from inConnRecordList func (this *NetServer) RemoveFromInConnRecord(addr string) { - this.InConnRecord.Lock() - defer this.InConnRecord.Unlock() - if _, ok := this.InConnRecord.InConnectingAddrs[addr]; ok { - delete(this.InConnRecord.InConnectingAddrs, addr) + this.inConnRecord.Lock() + defer this.inConnRecord.Unlock() + addrs := []string{} + for _, a := range this.inConnRecord.InConnectingAddrs { + if strings.Compare(a, addr) != 0 { + addrs = append(addrs, a) + } } + this.inConnRecord.InConnectingAddrs = addrs } //GetInConnRecordLen return length of inConnRecordList func (this *NetServer) GetInConnRecordLen() int { - this.InConnRecord.RLock() - defer this.InConnRecord.RUnlock() - return len(this.InConnRecord.InConnectingAddrs) + this.inConnRecord.RLock() + defer this.inConnRecord.RUnlock() + return len(this.inConnRecord.InConnectingAddrs) } //GetIpCountInInConnRecord return count of in connections with single ip func (this *NetServer) GetIpCountInInConnRecord(ip string) uint { - this.InConnRecord.RLock() - defer this.InConnRecord.RUnlock() + this.inConnRecord.RLock() + defer this.inConnRecord.RUnlock() var count uint var ipRecord string - for addr, _ := range this.InConnRecord.InConnectingAddrs { + for _, addr := range this.inConnRecord.InConnectingAddrs { ipRecord, _ = common.ParseIPAddr(addr) if 0 == strings.Compare(ipRecord, ip) { count++ @@ -755,40 +727,48 @@ func (this *NetServer) GetIpCountInInConnRecord(ip string) uint { return count } -//AddOutConnRecord add out connection to OutConnRecord -func (this *NetServer) AddOutConnRecord(addr string, connTime int64) { - this.OutConnRecord.Lock() - defer this.OutConnRecord.Unlock() - if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; !ok { - this.OutConnRecord.OutConnectingAddrs[addr] = connTime +//AddOutConnRecord add out connection to outConnRecord +func (this *NetServer) AddOutConnRecord(addr string) { + this.outConnRecord.Lock() + defer this.outConnRecord.Unlock() + for _, a := range this.outConnRecord.OutConnectingAddrs { + if strings.Compare(a, addr) == 0 { + return + } } + this.outConnRecord.OutConnectingAddrs = append(this.outConnRecord.OutConnectingAddrs, addr) } -//IsAddrInOutConnRecord return result whether addr is in OutConnRecord +//IsAddrInOutConnRecord return result whether addr is in outConnRecord func (this *NetServer) IsAddrInOutConnRecord(addr string) bool { - this.OutConnRecord.RLock() - defer this.OutConnRecord.RUnlock() - if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; ok { - return true + this.outConnRecord.RLock() + defer this.outConnRecord.RUnlock() + for _, a := range this.outConnRecord.OutConnectingAddrs { + if strings.Compare(a, addr) == 0 { + return true + } } return false } -//RemoveOutConnRecord remove out connection from OutConnRecord +//RemoveOutConnRecord remove out connection from outConnRecord func (this *NetServer) RemoveFromOutConnRecord(addr string) { - this.OutConnRecord.Lock() - defer this.OutConnRecord.Unlock() - if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; ok { - delete(this.OutConnRecord.OutConnectingAddrs, addr) + this.outConnRecord.Lock() + defer this.outConnRecord.Unlock() + addrs := []string{} + for _, a := range this.outConnRecord.OutConnectingAddrs { + if strings.Compare(a, addr) != 0 { + addrs = append(addrs, a) + } } - + this.outConnRecord.OutConnectingAddrs = addrs } -//GetOutConnRecordLen return length of OutConnRecord +//GetOutConnRecordLen return length of outConnRecord func (this *NetServer) GetOutConnRecordLen() int { - this.OutConnRecord.RLock() - defer this.OutConnRecord.RUnlock() - return len(this.OutConnRecord.OutConnectingAddrs) + this.outConnRecord.RLock() + defer this.outConnRecord.RUnlock() + return len(this.outConnRecord.OutConnectingAddrs) } //AddrValid whether the addr could be connect or accept diff --git a/p2pserver/net/protocol/server.go b/p2pserver/net/protocol/server.go index bcdedd16a1..3130fa4218 100644 --- a/p2pserver/net/protocol/server.go +++ b/p2pserver/net/protocol/server.go @@ -51,7 +51,6 @@ type P2P interface { GetPeerFromAddr(addr string) *peer.Peer AddOutConnectingList(addr string) (added bool) GetOutConnRecordLen() int - CleanNoHandShakeConn() RemoveFromConnectingList(addr string) RemoveFromOutConnRecord(addr string) RemoveFromInConnRecord(addr string) diff --git a/p2pserver/p2pserver.go b/p2pserver/p2pserver.go index 4908795d85..426a3fb49e 100644 --- a/p2pserver/p2pserver.go +++ b/p2pserver/p2pserver.go @@ -456,7 +456,6 @@ func (this *P2PServer) keepOnlineService() { select { case <-t.C: this.retryInactivePeer() - this.network.CleanNoHandShakeConn() t.Stop() t.Reset(time.Second * common.CONN_MONITOR) case <-this.quitOnline: