From f710def71a488e81b71c5364327145bb65e29044 Mon Sep 17 00:00:00 2001 From: rongyi Date: Mon, 20 Apr 2020 20:56:54 +0800 Subject: [PATCH] put domain to the end of the reserve list fix comment fix broadcast fix comment --- main.go | 8 ++-- .../connect_controller/connect_controller.go | 40 +++++++++++++++---- .../connect_controller_test.go | 23 ++++++++++- p2pserver/connect_controller/connection.go | 12 +----- p2pserver/dht/dht.go | 5 +-- p2pserver/net/netserver/nbr_peers.go | 2 +- p2pserver/net/netserver/netserver.go | 12 +++--- p2pserver/p2pserver.go | 4 +- 8 files changed, 69 insertions(+), 37 deletions(-) diff --git a/main.go b/main.go index 0ff43aacee..ff90501f2e 100644 --- a/main.go +++ b/main.go @@ -173,7 +173,7 @@ func startOntology(ctx *cli.Context) { log.Errorf("initP2PNode error: %s", err) return } - _, err = initConsensus(ctx, p2pSvr.GetNetWork(), txpool, acc) + _, err = initConsensus(ctx, p2pSvr.GetNetwork(), txpool, acc) if err != nil { log.Errorf("initConsensus error: %s", err) return @@ -309,8 +309,8 @@ func initP2PNode(ctx *cli.Context, txpoolSvr *proc.TXPoolServer) (*p2pserver.P2P return nil, fmt.Errorf("p2p service start error %s", err) } netreqactor.SetTxnPoolPid(txpoolSvr.GetPID(tc.TxActor)) - txpoolSvr.Net = p2p.GetNetWork() - hserver.SetNetServer(p2p.GetNetWork()) + txpoolSvr.Net = p2p.GetNetwork() + hserver.SetNetServer(p2p.GetNetwork()) p2p.WaitForPeersStart() log.Infof("P2P init success") return p2p, nil @@ -407,7 +407,7 @@ func initNodeInfo(ctx *cli.Context, p2pSvr *p2pserver.P2PServer) { if config.DefConfig.P2PNode.HttpInfoPort == 0 { return } - go nodeinfo.StartServer(p2pSvr.GetNetWork()) + go nodeinfo.StartServer(p2pSvr.GetNetwork()) log.Infof("Nodeinfo init success") } diff --git a/p2pserver/connect_controller/connect_controller.go b/p2pserver/connect_controller/connect_controller.go index 8c64de760c..bd8e78b8b5 100644 --- a/p2pserver/connect_controller/connect_controller.go +++ b/p2pserver/connect_controller/connect_controller.go @@ -20,8 +20,8 @@ package connect_controller import ( "fmt" "net" + "sort" "strconv" - "strings" "sync" "sync/atomic" @@ -66,6 +66,10 @@ func NewConnectController(peerInfo *peer.PeerInfo, keyid *common.PeerKeyId, connecting: strset.New(), peers: make(map[common.PeerId]*connectedPeer), } + // put domain to the end + sort.Slice(control.ReservedPeers, func(i, j int) bool { + return net.ParseIP(control.ReservedPeers[i]) != nil + }) return control } @@ -139,21 +143,27 @@ func (self *ConnectController) reserveEnabled() bool { return len(self.ReservedPeers) > 0 } -func (self *ConnectController) inReserveList(remoteAddr string) bool { - var rsvIPs []string +// remoteAddr format 192.168.1.1:61234 +func (self *ConnectController) inReserveList(remoteIPPort string) bool { + // 192.168.1.1 in reserve list, 192.168.1.111:61234 and 192.168.1.11:61234 can connect in if we are using prefix matching + // so get this IP to do fully match + remoteAddr, _, err := net.SplitHostPort(remoteIPPort) + if err != nil { + return false + } // we don't load domain in start because we consider domain's A/AAAA record may change sometimes for _, curIPOrName := range self.ReservedPeers { curIPs, err := net.LookupHost(curIPOrName) if err != nil { continue } - rsvIPs = append(rsvIPs, curIPs...) - } - for _, addr := range rsvIPs { - if strings.HasPrefix(remoteAddr, addr) { - return true + for _, digIP := range curIPs { + if digIP == remoteAddr { + return true + } } } + return false } @@ -325,6 +335,20 @@ func (self *ConnectController) savePeer(conn net.Conn, p *peer.PeerInfo, index i } } +func (self *ConnectController) removePeer(conn *Conn) { + self.mutex.Lock() + defer self.mutex.Unlock() + + self.inoutbounds[conn.boundIndex].Remove(conn.addr) + + p := self.peers[conn.kid] + if p == nil || p.peer == nil { + log.Fatalf("connection %s not in controller", conn.kid.ToHexString()) + } else if p.connectId == conn.connectId { // connection not replaced + delete(self.peers, conn.kid) + } +} + // if connection with peer.Kid exist, but has different IP, return error func (self *ConnectController) checkPeerIdAndIP(peer *peer.PeerInfo, addr string) error { oldPeer := self.getPeer(peer.Id) diff --git a/p2pserver/connect_controller/connect_controller_test.go b/p2pserver/connect_controller/connect_controller_test.go index c1ce86bee8..fa881bd61e 100644 --- a/p2pserver/connect_controller/connect_controller_test.go +++ b/p2pserver/connect_controller/connect_controller_test.go @@ -18,7 +18,9 @@ package connect_controller import ( + "fmt" "net" + "sort" "sync" "testing" "time" @@ -229,14 +231,31 @@ func checkServer(t *testing.T, client, server *Node, clientConns chan<- net.Conn func TestCheckReserveWithDomain(t *testing.T) { a := assert.New(t) - dname := "www.baidu.com" + // this domain only have one A record, so we can assure two lookup below return the same IP + // other domain may fail the test sometimes + dname := "www.onchain.com" + gips, err := net.LookupHost(dname) a.Nil(err, "fail to get domain record") cc := &ConnectController{} cc.ReservedPeers = []string{dname} for _, ip := range gips { - err := cc.checkReservedPeers(ip) + err := cc.checkReservedPeers(fmt.Sprintf("%s:1234", ip)) a.Nil(err, "fail") } + + cc.ReservedPeers = []string{"192.168.1.111"} + cret := cc.inReserveList("192.168.1.1:1234") + a.False(cret, "fail") + cret = cc.inReserveList("192.168.1.11:1234") + a.False(cret, "fail") + cret = cc.inReserveList("192.168.1.111:1234") + a.True(cret, "fail") + + cc.ReservedPeers = []string{"192.168.1.2", "www.baidu.com", "192.168.1.1"} + sort.Slice(cc.ReservedPeers, func(i, j int) bool { + return net.ParseIP(cc.ReservedPeers[i]) != nil + }) + a.Equal(cc.ReservedPeers[len(cc.ReservedPeers)-1], "www.baidu.com", "fail") } diff --git a/p2pserver/connect_controller/connection.go b/p2pserver/connect_controller/connection.go index 1a047363b1..98aec2ffa2 100644 --- a/p2pserver/connect_controller/connection.go +++ b/p2pserver/connect_controller/connection.go @@ -39,17 +39,7 @@ type Conn struct { func (self *Conn) Close() error { log.Infof("closing connection: peer %s, address: %s", self.kid.ToHexString(), self.addr) - self.controller.mutex.Lock() - defer self.controller.mutex.Unlock() - - self.controller.inoutbounds[self.boundIndex].Remove(self.addr) - - p := self.controller.peers[self.kid] - if p == nil || p.peer == nil { - log.Fatalf("connection %s not in controller", self.kid.ToHexString()) - } else if p.connectId == self.connectId { // connection not replaced - delete(self.controller.peers, self.kid) - } + self.controller.removePeer(self) return self.Conn.Close() } diff --git a/p2pserver/dht/dht.go b/p2pserver/dht/dht.go index 8b15df0d51..248f36e22e 100644 --- a/p2pserver/dht/dht.go +++ b/p2pserver/dht/dht.go @@ -36,9 +36,7 @@ var KValue = 20 var AlphaValue = 3 type DHT struct { - localId common.PeerId - birth time.Time // When this peer started up - + localId common.PeerId bucketSize int routeTable *kb.RouteTable // Array of routing tables for differently distanced nodes @@ -66,7 +64,6 @@ func NewDHT(id common.PeerId) *DHT { return &DHT{ localId: id, - birth: time.Now(), routeTable: rt, bucketSize: bucketSize, AutoRefresh: true, diff --git a/p2pserver/net/netserver/nbr_peers.go b/p2pserver/net/netserver/nbr_peers.go index 79137c37ce..5e266c32df 100644 --- a/p2pserver/net/netserver/nbr_peers.go +++ b/p2pserver/net/netserver/nbr_peers.go @@ -94,7 +94,7 @@ func (this *NbrPeers) Broadcast(msg types.Message) { defer this.RUnlock() for _, node := range this.List { if node.Peer.GetRelay() { - node.Peer.SendRaw(msg.CmdType(), sink.Bytes()) + go node.Peer.SendRaw(msg.CmdType(), sink.Bytes()) } } } diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index f6d49ec850..bccb8e5ac9 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -254,7 +254,7 @@ func (this *NetServer) Stop() { if this.listener != nil { _ = this.listener.Close() } - this.stopRecvCh <- true + close(this.stopRecvCh) this.protocol.HandleSystemMessage(this, p2p.NetworkStop{}) } @@ -282,10 +282,12 @@ func (this *NetServer) startNetAccept(listener net.Listener) { return } - if err := this.handleClientConnection(conn); err != nil { - log.Warnf("[p2p] client connect error: %s", err) - _ = conn.Close() - } + go func() { + if err := this.handleClientConnection(conn); err != nil { + log.Warnf("[p2p] client connect error: %s", err) + _ = conn.Close() + } + }() } } diff --git a/p2pserver/p2pserver.go b/p2pserver/p2pserver.go index 0a2c9efec5..20a5b566c9 100644 --- a/p2pserver/p2pserver.go +++ b/p2pserver/p2pserver.go @@ -63,8 +63,8 @@ func (this *P2PServer) Stop() { this.network.Stop() } -// GetNetWork returns the low level netserver -func (this *P2PServer) GetNetWork() p2pnet.P2P { +// GetNetwork returns the low level netserver +func (this *P2PServer) GetNetwork() p2pnet.P2P { return this.network }