Skip to content

Commit

Permalink
put domain to the end of the reserve list
Browse files Browse the repository at this point in the history
fix comment

fix broadcast

fix comment
  • Loading branch information
rongyi authored and laizy committed Apr 28, 2020
1 parent f4bbc4a commit f710def
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 37 deletions.
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func startOntology(ctx *cli.Context) {
log.Errorf("initP2PNode error: %s", err)
return
}
_, err = initConsensus(ctx, p2pSvr.GetNetWork(), txpool, acc)
_, err = initConsensus(ctx, p2pSvr.GetNetwork(), txpool, acc)
if err != nil {
log.Errorf("initConsensus error: %s", err)
return
Expand Down Expand Up @@ -309,8 +309,8 @@ func initP2PNode(ctx *cli.Context, txpoolSvr *proc.TXPoolServer) (*p2pserver.P2P
return nil, fmt.Errorf("p2p service start error %s", err)
}
netreqactor.SetTxnPoolPid(txpoolSvr.GetPID(tc.TxActor))
txpoolSvr.Net = p2p.GetNetWork()
hserver.SetNetServer(p2p.GetNetWork())
txpoolSvr.Net = p2p.GetNetwork()
hserver.SetNetServer(p2p.GetNetwork())
p2p.WaitForPeersStart()
log.Infof("P2P init success")
return p2p, nil
Expand Down Expand Up @@ -407,7 +407,7 @@ func initNodeInfo(ctx *cli.Context, p2pSvr *p2pserver.P2PServer) {
if config.DefConfig.P2PNode.HttpInfoPort == 0 {
return
}
go nodeinfo.StartServer(p2pSvr.GetNetWork())
go nodeinfo.StartServer(p2pSvr.GetNetwork())

log.Infof("Nodeinfo init success")
}
Expand Down
40 changes: 32 additions & 8 deletions p2pserver/connect_controller/connect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package connect_controller
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -66,6 +66,10 @@ func NewConnectController(peerInfo *peer.PeerInfo, keyid *common.PeerKeyId,
connecting: strset.New(),
peers: make(map[common.PeerId]*connectedPeer),
}
// put domain to the end
sort.Slice(control.ReservedPeers, func(i, j int) bool {
return net.ParseIP(control.ReservedPeers[i]) != nil
})

return control
}
Expand Down Expand Up @@ -139,21 +143,27 @@ func (self *ConnectController) reserveEnabled() bool {
return len(self.ReservedPeers) > 0
}

func (self *ConnectController) inReserveList(remoteAddr string) bool {
var rsvIPs []string
// remoteAddr format 192.168.1.1:61234
func (self *ConnectController) inReserveList(remoteIPPort string) bool {
// 192.168.1.1 in reserve list, 192.168.1.111:61234 and 192.168.1.11:61234 can connect in if we are using prefix matching
// so get this IP to do fully match
remoteAddr, _, err := net.SplitHostPort(remoteIPPort)
if err != nil {
return false
}
// we don't load domain in start because we consider domain's A/AAAA record may change sometimes
for _, curIPOrName := range self.ReservedPeers {
curIPs, err := net.LookupHost(curIPOrName)
if err != nil {
continue
}
rsvIPs = append(rsvIPs, curIPs...)
}
for _, addr := range rsvIPs {
if strings.HasPrefix(remoteAddr, addr) {
return true
for _, digIP := range curIPs {
if digIP == remoteAddr {
return true
}
}
}

return false
}

Expand Down Expand Up @@ -325,6 +335,20 @@ func (self *ConnectController) savePeer(conn net.Conn, p *peer.PeerInfo, index i
}
}

func (self *ConnectController) removePeer(conn *Conn) {
self.mutex.Lock()
defer self.mutex.Unlock()

self.inoutbounds[conn.boundIndex].Remove(conn.addr)

p := self.peers[conn.kid]
if p == nil || p.peer == nil {
log.Fatalf("connection %s not in controller", conn.kid.ToHexString())
} else if p.connectId == conn.connectId { // connection not replaced
delete(self.peers, conn.kid)
}
}

// if connection with peer.Kid exist, but has different IP, return error
func (self *ConnectController) checkPeerIdAndIP(peer *peer.PeerInfo, addr string) error {
oldPeer := self.getPeer(peer.Id)
Expand Down
23 changes: 21 additions & 2 deletions p2pserver/connect_controller/connect_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package connect_controller

import (
"fmt"
"net"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -229,14 +231,31 @@ func checkServer(t *testing.T, client, server *Node, clientConns chan<- net.Conn

func TestCheckReserveWithDomain(t *testing.T) {
a := assert.New(t)
dname := "www.baidu.com"
// this domain only have one A record, so we can assure two lookup below return the same IP
// other domain may fail the test sometimes
dname := "www.onchain.com"

gips, err := net.LookupHost(dname)
a.Nil(err, "fail to get domain record")

cc := &ConnectController{}
cc.ReservedPeers = []string{dname}
for _, ip := range gips {
err := cc.checkReservedPeers(ip)
err := cc.checkReservedPeers(fmt.Sprintf("%s:1234", ip))
a.Nil(err, "fail")
}

cc.ReservedPeers = []string{"192.168.1.111"}
cret := cc.inReserveList("192.168.1.1:1234")
a.False(cret, "fail")
cret = cc.inReserveList("192.168.1.11:1234")
a.False(cret, "fail")
cret = cc.inReserveList("192.168.1.111:1234")
a.True(cret, "fail")

cc.ReservedPeers = []string{"192.168.1.2", "www.baidu.com", "192.168.1.1"}
sort.Slice(cc.ReservedPeers, func(i, j int) bool {
return net.ParseIP(cc.ReservedPeers[i]) != nil
})
a.Equal(cc.ReservedPeers[len(cc.ReservedPeers)-1], "www.baidu.com", "fail")
}
12 changes: 1 addition & 11 deletions p2pserver/connect_controller/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,7 @@ type Conn struct {
func (self *Conn) Close() error {
log.Infof("closing connection: peer %s, address: %s", self.kid.ToHexString(), self.addr)

self.controller.mutex.Lock()
defer self.controller.mutex.Unlock()

self.controller.inoutbounds[self.boundIndex].Remove(self.addr)

p := self.controller.peers[self.kid]
if p == nil || p.peer == nil {
log.Fatalf("connection %s not in controller", self.kid.ToHexString())
} else if p.connectId == self.connectId { // connection not replaced
delete(self.controller.peers, self.kid)
}
self.controller.removePeer(self)

return self.Conn.Close()
}
5 changes: 1 addition & 4 deletions p2pserver/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ var KValue = 20
var AlphaValue = 3

type DHT struct {
localId common.PeerId
birth time.Time // When this peer started up

localId common.PeerId
bucketSize int
routeTable *kb.RouteTable // Array of routing tables for differently distanced nodes

Expand Down Expand Up @@ -66,7 +64,6 @@ func NewDHT(id common.PeerId) *DHT {

return &DHT{
localId: id,
birth: time.Now(),
routeTable: rt,
bucketSize: bucketSize,
AutoRefresh: true,
Expand Down
2 changes: 1 addition & 1 deletion p2pserver/net/netserver/nbr_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (this *NbrPeers) Broadcast(msg types.Message) {
defer this.RUnlock()
for _, node := range this.List {
if node.Peer.GetRelay() {
node.Peer.SendRaw(msg.CmdType(), sink.Bytes())
go node.Peer.SendRaw(msg.CmdType(), sink.Bytes())
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions p2pserver/net/netserver/netserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (this *NetServer) Stop() {
if this.listener != nil {
_ = this.listener.Close()
}
this.stopRecvCh <- true
close(this.stopRecvCh)
this.protocol.HandleSystemMessage(this, p2p.NetworkStop{})
}

Expand Down Expand Up @@ -282,10 +282,12 @@ func (this *NetServer) startNetAccept(listener net.Listener) {
return
}

if err := this.handleClientConnection(conn); err != nil {
log.Warnf("[p2p] client connect error: %s", err)
_ = conn.Close()
}
go func() {
if err := this.handleClientConnection(conn); err != nil {
log.Warnf("[p2p] client connect error: %s", err)
_ = conn.Close()
}
}()
}
}

Expand Down
4 changes: 2 additions & 2 deletions p2pserver/p2pserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (this *P2PServer) Stop() {
this.network.Stop()
}

// GetNetWork returns the low level netserver
func (this *P2PServer) GetNetWork() p2pnet.P2P {
// GetNetwork returns the low level netserver
func (this *P2PServer) GetNetwork() p2pnet.P2P {
return this.network
}

Expand Down

0 comments on commit f710def

Please sign in to comment.