Skip to content

Commit

Permalink
optimize network (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanZiWen authored and Arbio5zt committed Jun 30, 2018
1 parent de96efd commit 290dc3f
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 125 deletions.
6 changes: 3 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ const (
DEFAULT_CLI_RPC_PORT = uint(20000)
DEFAULT_GAS_LIMIT = 20000
DEFAULT_GAS_PRICE = 0
DEFAULT_NO_HANDSHAKE_TIMEOUT = 6
DEFAULT_DATA_DIR = "./Chain"
DEFAULT_RESERVED_FILE = "./peers.rsv"

DEFAULT_DATA_DIR = "./Chain"
DEFAULT_RESERVED_FILE = "./peers.rsv"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// genesis constants
var (
//TODO: modify this when on mainnet
GENESIS_BLOCK_TIMESTAMP = uint32(time.Date(2018, time.June, 12, 0, 0, 0, 0, time.UTC).Unix())
GENESIS_BLOCK_TIMESTAMP = uint32(time.Date(2018, time.June, 30, 0, 0, 0, 0, time.UTC).Unix())
)

// ont constants
Expand Down
12 changes: 2 additions & 10 deletions p2pserver/common/p2p_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package common

import (
"errors"
"strconv"
"strings"

"github.com/ontio/ontology/core/types"
Expand Down Expand Up @@ -149,7 +148,7 @@ type AppendBlock struct {
func ParseIPAddr(s string) (string, error) {
i := strings.Index(s, ":")
if i < 0 {
return "", errors.New("split ip address error")
return s, errors.New("split ip address error")
}
return s[:i], nil
}
Expand All @@ -158,14 +157,7 @@ func ParseIPAddr(s string) (string, error) {
func ParseIPPort(s string) (string, error) {
i := strings.Index(s, ":")
if i < 0 {
return "", errors.New("split ip port error")
}
port, err := strconv.Atoi(s[i+1:])
if err != nil {
return "", errors.New("parse port error")
}
if port <= 0 || port >= 65535 {
return "", errors.New("port out of bound")
return s, errors.New("split ip port error")
}
return s[i:], nil
}
22 changes: 2 additions & 20 deletions p2pserver/message/utils/msg_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,10 @@ func (this *MessageRouter) hookChan(channel chan *types.MsgPayload,
case data, ok := <-channel:
if ok {
msgType := data.Payload.CmdType()

handler, ok := this.msgHandlers[msgType]
if ok {
if msgType == msgCommon.VERSION_TYPE || msgType ==
msgCommon.VERACK_TYPE || msgType == msgCommon.DISCONNECT_TYPE {
handler(data, this.p2p, this.pid)
} else {
remotePeer := this.p2p.GetPeerFromAddr(data.Addr)
if remotePeer != nil && remotePeer.GetSyncState() == msgCommon.ESTABLISH {
go handler(data, this.p2p, this.pid)
} else {
this.p2p.RemoveFromInConnRecord(data.Addr)
this.p2p.RemoveFromOutConnRecord(data.Addr)
this.p2p.RemoveFromConnectingList(data.Addr)
this.p2p.RemovePeerSyncAddress(data.Addr)
this.p2p.RemovePeerConsAddress(data.Addr)
if remotePeer != nil {
remotePeer.CloseSync()
remotePeer.CloseCons()
log.Warnf("receive unrecognize (%s) peer`s msg (%s), close it", data.Addr, msgType)
}
}
}
go handler(data, this.p2p, this.pid)
} else {
log.Info("unknown message handler for the msg: ",
msgType)
Expand Down
158 changes: 69 additions & 89 deletions p2pserver/net/netserver/netserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func NewNetServer() p2p.P2P {

n.PeerAddrMap.PeerSyncAddress = make(map[string]*peer.Peer)
n.PeerAddrMap.PeerConsAddress = make(map[string]*peer.Peer)
n.InConnRecord.InConnectingAddrs = make(map[string]int64)
n.OutConnRecord.OutConnectingAddrs = make(map[string]int64)

n.init()
return n
}
Expand All @@ -63,21 +62,21 @@ type NetServer struct {
PeerAddrMap
Np *peer.NbrPeers
connectLock sync.Mutex
InConnRecord InConnectionRecord
OutConnRecord OutConnectionRecord
inConnRecord InConnectionRecord
outConnRecord OutConnectionRecord
OwnAddress string //network`s own address(ip : sync port),which get from version check
}

//InConnectionRecord include all addr connected
type InConnectionRecord struct {
sync.RWMutex
InConnectingAddrs map[string]int64
InConnectingAddrs []string
}

//OutConnectionRecord include all addr accepted
type OutConnectionRecord struct {
sync.RWMutex
OutConnectingAddrs map[string]int64
OutConnectingAddrs []string
}

//ConnectingNodes include all addr in connecting state
Expand Down Expand Up @@ -268,7 +267,7 @@ func (this *NetServer) IsPeerEstablished(p *peer.Peer) bool {
//Connect used to connect net address under sync or cons mode
func (this *NetServer) Connect(addr string, isConsensus bool) error {
if this.IsAddrInOutConnRecord(addr) {
log.Debugf("Address: %s Consensus: %v is in OutConnectionRecord,", addr, isConsensus)
log.Error("Addr is in OutConnectionRecord")
return nil
}
if this.IsOwnAddress(addr) {
Expand Down Expand Up @@ -331,7 +330,7 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error {
conn.RemoteAddr().Network()))

if !isConsensus {
this.AddOutConnRecord(addr, time.Now().Unix())
this.AddOutConnRecord(addr)
remotePeer = peer.NewPeer()
this.AddPeerSyncAddress(addr, remotePeer)
remotePeer.SyncLink.SetAddr(addr)
Expand Down Expand Up @@ -484,7 +483,8 @@ func (this *NetServer) startSyncAccept(listener net.Listener) {

remotePeer := peer.NewPeer()
addr := conn.RemoteAddr().String()
this.AddInConnRecord(addr, time.Now().Unix())
this.AddInConnRecord(addr)

this.AddPeerSyncAddress(addr, remotePeer)

remotePeer.SyncLink.SetAddr(addr)
Expand Down Expand Up @@ -654,68 +654,36 @@ func (this *NetServer) GetPeerSyncAddressCount() (count uint) {
return uint(len(this.PeerSyncAddress))
}

//CleanNoHandShakeConn close connection connected over DEFAULT_NO_HANDSHAKE_TIMEOUT but not handshake
func (this *NetServer) CleanNoHandShakeConn() {
this.InConnRecord.RLock()
for addr, connTime := range this.InConnRecord.InConnectingAddrs {
if !this.IsNbrPeerAddr(addr, false) && time.Now().Unix()-connTime > config.DEFAULT_NO_HANDSHAKE_TIMEOUT {
peer := this.GetPeerFromAddr(addr)
if peer != nil {
peer.CloseSync()
peer.CloseCons()
}
this.RemoveFromInConnRecord(addr)
this.RemoveFromConnectingList(addr)
this.RemovePeerSyncAddress(addr)
this.RemovePeerConsAddress(addr)
log.Warnf("Connection with (%s) no handshake over (%d)s, disconnect", addr, config.DEFAULT_NO_HANDSHAKE_TIMEOUT)
}
}
this.InConnRecord.RUnlock()

this.OutConnRecord.RLock()
for addr, connTime := range this.OutConnRecord.OutConnectingAddrs {
if !this.IsNbrPeerAddr(addr, false) && time.Now().Unix()-connTime > config.DEFAULT_NO_HANDSHAKE_TIMEOUT {
peer := this.GetPeerFromAddr(addr)
if peer != nil {
peer.CloseSync()
peer.CloseCons()
}
this.RemoveFromOutConnRecord(addr)
this.RemoveFromConnectingList(addr)
this.RemovePeerSyncAddress(addr)
this.RemovePeerConsAddress(addr)
log.Warnf("Connection with (%s) no handshake over (%d)s, disconnect", addr, config.DEFAULT_NO_HANDSHAKE_TIMEOUT)
//AddInConnRecord add in connection to inConnRecord
func (this *NetServer) AddInConnRecord(addr string) {
this.inConnRecord.Lock()
defer this.inConnRecord.Unlock()
for _, a := range this.inConnRecord.InConnectingAddrs {
if strings.Compare(a, addr) == 0 {
return
}
}
this.OutConnRecord.RUnlock()
}

//AddInConnRecord add in connection to InConnRecord
func (this *NetServer) AddInConnRecord(addr string, connTime int64) {
this.InConnRecord.Lock()
defer this.InConnRecord.Unlock()
if _, ok := this.InConnRecord.InConnectingAddrs[addr]; !ok {
this.InConnRecord.InConnectingAddrs[addr] = connTime
}
this.inConnRecord.InConnectingAddrs = append(this.inConnRecord.InConnectingAddrs, addr)
}

//IsAddrInInConnRecord return result whether addr is in inConnRecordList
func (this *NetServer) IsAddrInInConnRecord(addr string) bool {
this.InConnRecord.RLock()
defer this.InConnRecord.RUnlock()
if _, ok := this.InConnRecord.InConnectingAddrs[addr]; ok {
return true
this.inConnRecord.RLock()
defer this.inConnRecord.RUnlock()
for _, a := range this.inConnRecord.InConnectingAddrs {
if strings.Compare(a, addr) == 0 {
return true
}
}
return false
}

//IsIPInInConnRecord return result whether the IP is in inConnRecordList
func (this *NetServer) IsIPInInConnRecord(ip string) bool {
this.InConnRecord.RLock()
defer this.InConnRecord.RUnlock()
this.inConnRecord.RLock()
defer this.inConnRecord.RUnlock()
var ipRecord string
for addr, _ := range this.InConnRecord.InConnectingAddrs {
for _, addr := range this.inConnRecord.InConnectingAddrs {
ipRecord, _ = common.ParseIPAddr(addr)
if 0 == strings.Compare(ipRecord, ip) {
return true
Expand All @@ -726,27 +694,31 @@ func (this *NetServer) IsIPInInConnRecord(ip string) bool {

//RemoveInConnRecord remove in connection from inConnRecordList
func (this *NetServer) RemoveFromInConnRecord(addr string) {
this.InConnRecord.Lock()
defer this.InConnRecord.Unlock()
if _, ok := this.InConnRecord.InConnectingAddrs[addr]; ok {
delete(this.InConnRecord.InConnectingAddrs, addr)
this.inConnRecord.Lock()
defer this.inConnRecord.Unlock()
addrs := []string{}
for _, a := range this.inConnRecord.InConnectingAddrs {
if strings.Compare(a, addr) != 0 {
addrs = append(addrs, a)
}
}
this.inConnRecord.InConnectingAddrs = addrs
}

//GetInConnRecordLen return length of inConnRecordList
func (this *NetServer) GetInConnRecordLen() int {
this.InConnRecord.RLock()
defer this.InConnRecord.RUnlock()
return len(this.InConnRecord.InConnectingAddrs)
this.inConnRecord.RLock()
defer this.inConnRecord.RUnlock()
return len(this.inConnRecord.InConnectingAddrs)
}

//GetIpCountInInConnRecord return count of in connections with single ip
func (this *NetServer) GetIpCountInInConnRecord(ip string) uint {
this.InConnRecord.RLock()
defer this.InConnRecord.RUnlock()
this.inConnRecord.RLock()
defer this.inConnRecord.RUnlock()
var count uint
var ipRecord string
for addr, _ := range this.InConnRecord.InConnectingAddrs {
for _, addr := range this.inConnRecord.InConnectingAddrs {
ipRecord, _ = common.ParseIPAddr(addr)
if 0 == strings.Compare(ipRecord, ip) {
count++
Expand All @@ -755,40 +727,48 @@ func (this *NetServer) GetIpCountInInConnRecord(ip string) uint {
return count
}

//AddOutConnRecord add out connection to OutConnRecord
func (this *NetServer) AddOutConnRecord(addr string, connTime int64) {
this.OutConnRecord.Lock()
defer this.OutConnRecord.Unlock()
if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; !ok {
this.OutConnRecord.OutConnectingAddrs[addr] = connTime
//AddOutConnRecord add out connection to outConnRecord
func (this *NetServer) AddOutConnRecord(addr string) {
this.outConnRecord.Lock()
defer this.outConnRecord.Unlock()
for _, a := range this.outConnRecord.OutConnectingAddrs {
if strings.Compare(a, addr) == 0 {
return
}
}
this.outConnRecord.OutConnectingAddrs = append(this.outConnRecord.OutConnectingAddrs, addr)
}

//IsAddrInOutConnRecord return result whether addr is in OutConnRecord
//IsAddrInOutConnRecord return result whether addr is in outConnRecord
func (this *NetServer) IsAddrInOutConnRecord(addr string) bool {
this.OutConnRecord.RLock()
defer this.OutConnRecord.RUnlock()
if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; ok {
return true
this.outConnRecord.RLock()
defer this.outConnRecord.RUnlock()
for _, a := range this.outConnRecord.OutConnectingAddrs {
if strings.Compare(a, addr) == 0 {
return true
}
}
return false
}

//RemoveOutConnRecord remove out connection from OutConnRecord
//RemoveOutConnRecord remove out connection from outConnRecord
func (this *NetServer) RemoveFromOutConnRecord(addr string) {
this.OutConnRecord.Lock()
defer this.OutConnRecord.Unlock()
if _, ok := this.OutConnRecord.OutConnectingAddrs[addr]; ok {
delete(this.OutConnRecord.OutConnectingAddrs, addr)
this.outConnRecord.Lock()
defer this.outConnRecord.Unlock()
addrs := []string{}
for _, a := range this.outConnRecord.OutConnectingAddrs {
if strings.Compare(a, addr) != 0 {
addrs = append(addrs, a)
}
}

this.outConnRecord.OutConnectingAddrs = addrs
}

//GetOutConnRecordLen return length of OutConnRecord
//GetOutConnRecordLen return length of outConnRecord
func (this *NetServer) GetOutConnRecordLen() int {
this.OutConnRecord.RLock()
defer this.OutConnRecord.RUnlock()
return len(this.OutConnRecord.OutConnectingAddrs)
this.outConnRecord.RLock()
defer this.outConnRecord.RUnlock()
return len(this.outConnRecord.OutConnectingAddrs)
}

//AddrValid whether the addr could be connect or accept
Expand Down
1 change: 0 additions & 1 deletion p2pserver/net/protocol/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type P2P interface {
GetPeerFromAddr(addr string) *peer.Peer
AddOutConnectingList(addr string) (added bool)
GetOutConnRecordLen() int
CleanNoHandShakeConn()
RemoveFromConnectingList(addr string)
RemoveFromOutConnRecord(addr string)
RemoveFromInConnRecord(addr string)
Expand Down
1 change: 0 additions & 1 deletion p2pserver/p2pserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ func (this *P2PServer) keepOnlineService() {
select {
case <-t.C:
this.retryInactivePeer()
this.network.CleanNoHandShakeConn()
t.Stop()
t.Reset(time.Second * common.CONN_MONITOR)
case <-this.quitOnline:
Expand Down

0 comments on commit 290dc3f

Please sign in to comment.