Skip to content

Commit

Permalink
Clean up the networking server & add bootnode peer discovery (0xPolyg…
Browse files Browse the repository at this point in the history
…on#343)

* added new flags

* updated logic to check inbound and outbound limits

* updated tests

* removed peerdial event

* fixed merge conflicts

* removed redundant check

* updated inbound test case

* cleanup

* resolved formatting issue

* resolved merge conflicts

* resolved merge conflicts

* renamed the variables

* resolved lint issues

* replaced redundant logic with func

* code cleanup

* updated control flow for handle discovery

* modified checkSlots function

* updated the proto file

* added new routine for bootnode discovery

* added new checks to handshake routine

* added test cases

* cleanup

* fixed merge conflicts

* formating issue

* fixed typo

* fixed test cases

* cleanup

* fixed issue in server close function

* updated log message

* modified start function flow of server

* renamed function

* modified discovery run method

* added new check before disconnecting

* cleanup

* updated getBootNode func

* updated discovery func to work with one bootnode

* updated network default limit for inbound/outbound

* modified function names

* modified flag description

* Cosmetic Changes

* modified function name

* revamped the test case

* updated proto file

* cleanup

* added addPeersToTable method

* updated flag info

* build fix

* made variable access atomic

* removed comments

* added defer statement

* bootnodecount issue

* updated conns limit

* test cleanup

* added logs

* cosmetics

* removed redundant check

* updated method name

* added missing disconnect call

* Minor code cleanup in the discovery service

* Refactor the majority of the discovery protocol, and related modules

* Cleanup the discovery mechanism further

* Simplify the connection management logic

* Add support to Prometheus for newly added connection info

* Polish up the peer dial queue

* Clean up the identity service

* Remove leftover field

* Fix the peers add command usage

* Remove the useless joinWatchers service

* Clean up the server's run dial

* Fix debug print

* Fix the hanging connection counters

* Resolve linting errors

* Move out the connection management logic to the networking server

* Add support for Identity service mocking

* Make the mock networking server available to other modules

* Rename the testing helper file

* Add support for mocking the discovery service

* Add unit test for bootnode discovery

* Resolve linting errors

* Remove leftover testing code

* Rename method name to be more accurate and fix loop exit condition

* Move out the discovery closure to discovery

* Add a rollback method for routing table failiures

* Rename testing code to not be bundled in binary

* Separate out the networking server code for identity and discovery

* Remove the unused reference to the identity service

* Move out some init code to identity status construction

* Extract protocol saving to a separate method

* Remove unused error

* Remove unused networking server method

* Remove useless check:

* Move out the service setup logic to a better place

* Remove the connections package and move the logic to the base package

* Rename the method to be more precise

* Remove useless connection closure from removePeer

* Simplify the peer addition / deletion, add support for discovery race

* Resolve linting errors

* Have more precise attribute naming with dial tasks

* Have the temporary dials have special status during node discovery

* Update the initial bootnode count metrics

* Add sanity check for saving streams

Authored by: Milos Zivkovic <[email protected]>
  • Loading branch information
munna0908 authored Mar 31, 2022
1 parent f59d3ca commit 876489a
Show file tree
Hide file tree
Showing 39 changed files with 3,515 additions and 1,774 deletions.
3 changes: 2 additions & 1 deletion command/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"fmt"
"github.com/0xPolygon/polygon-edge/network/common"
"math"
"net"

Expand Down Expand Up @@ -237,7 +238,7 @@ func (p *serverParams) initDNSAddress() error {

var parseErr error

if p.dnsAddress, parseErr = network.MultiAddrFromDNS(
if p.dnsAddress, parseErr = common.MultiAddrFromDNS(
p.rawConfig.Network.DNSAddr, p.libp2pAddress.Port,
); parseErr != nil {
return parseErr
Expand Down
50 changes: 50 additions & 0 deletions network/bootnodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package network

import (
"github.com/libp2p/go-libp2p-core/peer"
"sync/atomic"
)

type bootnodesWrapper struct {
// bootnodeArr is the array that contains all the bootnode addresses
bootnodeArr []*peer.AddrInfo

// bootnodesMap is a map used for quick bootnode lookup
bootnodesMap map[peer.ID]*peer.AddrInfo

// bootnodeConnCount is an atomic value that keeps track
// of the number of bootnode connections
bootnodeConnCount int64
}

// isBootnode checks if the node ID belongs to a set bootnode
func (bw *bootnodesWrapper) isBootnode(nodeID peer.ID) bool {
_, ok := bw.bootnodesMap[nodeID]

return ok
}

// getBootnodeConnCount loads the bootnode connection count [Thread safe]
func (bw *bootnodesWrapper) getBootnodeConnCount() int64 {
return atomic.LoadInt64(&bw.bootnodeConnCount)
}

// increaseBootnodeConnCount increases the bootnode connection count by delta [Thread safe]
func (bw *bootnodesWrapper) increaseBootnodeConnCount(delta int64) {
atomic.AddInt64(&bw.bootnodeConnCount, delta)
}

// getBootnodes gets all the bootnodes
func (bw *bootnodesWrapper) getBootnodes() []*peer.AddrInfo {
return bw.bootnodeArr
}

// getBootnodeCount returns the number of set bootnodes
func (bw *bootnodesWrapper) getBootnodeCount() int {
return len(bw.bootnodeArr)
}

// hasBootnodes checks if any bootnodes are set [Thread safe]
func (bw *bootnodesWrapper) hasBootnodes() bool {
return bw.getBootnodeCount() > 0
}
133 changes: 133 additions & 0 deletions network/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package common

import (
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"regexp"
"strings"
)

type DialPriority uint64

const (
PriorityRequestedDial DialPriority = 1
PriorityRandomDial DialPriority = 10
)

const (
DiscProto = "/disc/0.1"
IdentityProto = "/id/0.1"
)

// DNSRegex is a regex string to match against a valid dns/dns4/dns6 addr
const DNSRegex = `^/?(dns)(4|6)?/[^-|^/][A-Za-z0-9-]([^-|^/]?)+([\\-\\.]{1}[a-z0-9]+)*\\.[A-Za-z]{2,}(/?)$`

func StringToAddrInfo(addr string) (*peer.AddrInfo, error) {
addr0, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}

addr1, err := peer.AddrInfoFromP2pAddr(addr0)

if err != nil {
return nil, err
}

return addr1, nil
}

var (
// Regex used for matching loopback addresses (IPv4, IPv6, DNS)
// This regex will match:
// /ip4/localhost/tcp/<port>
// /ip4/127.0.0.1/tcp/<port>
// /ip4/<any other loopback>/tcp/<port>
// /ip6/<any loopback>/tcp/<port>
// /dns/foobar.com/tcp/<port>
loopbackRegex = regexp.MustCompile(
//nolint:lll
fmt.Sprintf(`^\/ip4\/127(?:\.[0-9]+){0,2}\.[0-9]+\/tcp\/\d+$|^\/ip4\/localhost\/tcp\/\d+$|^\/ip6\/(?:0*\:)*?:?0*1\/tcp\/\d+$|%s`, DNSRegex),
)

dnsRegex = "^/?(dns)(4|6)?/[^-|^/][A-Za-z0-9-]([^-|^/]?)+([\\-\\.]{1}[a-z0-9]+)*\\.[A-Za-z]{2,}(/?)$"
)

// AddrInfoToString converts an AddrInfo into a string representation that can be dialed from another node
func AddrInfoToString(addr *peer.AddrInfo) string {
// Safety check
if len(addr.Addrs) == 0 {
panic("No dial addresses found")
}

dialAddress := addr.Addrs[0].String()

// Try to see if a non loopback address is present in the list
if len(addr.Addrs) > 1 && loopbackRegex.MatchString(dialAddress) {
// Find an address that's not a loopback address
for _, address := range addr.Addrs {
if !loopbackRegex.MatchString(address.String()) {
// Not a loopback address, dial address found
dialAddress = address.String()

break
}
}
}

// Format output and return
return dialAddress + "/p2p/" + addr.ID.String()
}

// MultiAddrFromDNS constructs a multiAddr from the passed in DNS address and port combination
func MultiAddrFromDNS(addr string, port int) (multiaddr.Multiaddr, error) {
var (
version string
domain string
)

match, err := regexp.MatchString(
dnsRegex,
addr,
)
if err != nil || !match {
return nil, errors.New("invalid DNS address")
}

s := strings.Trim(addr, "/")
split := strings.Split(s, "/")

if len(split) != 2 {
return nil, errors.New("invalid DNS address")
}

switch split[0] {
case "dns":
version = "dns"
case "dns4":
version = "dns4"
case "dns6":
version = "dns6"
default:
return nil, errors.New("invalid DNS version")
}

domain = split[1]

multiAddr, err := multiaddr.NewMultiaddr(
fmt.Sprintf(
"/%s/%s/tcp/%d",
version,
domain,
port,
),
)

if err != nil {
return nil, errors.New("could not create a multi address")
}

return multiAddr, nil
}
40 changes: 40 additions & 0 deletions network/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package network

import (
"github.com/0xPolygon/polygon-edge/chain"
"github.com/0xPolygon/polygon-edge/secrets"
"github.com/multiformats/go-multiaddr"
"net"
)

// Config details the params for the base networking server
type Config struct {
NoDiscover bool // flag indicating if the discovery mechanism should be turned on
Addr *net.TCPAddr // the base address
NatAddr net.IP // the NAT address
DNS multiaddr.Multiaddr // the DNS address
DataDir string // the base data directory for the client
MaxPeers int64 // the maximum number of peer connections
MaxInboundPeers int64 // the maximum number of inbound peer connections
MaxOutboundPeers int64 // the maximum number of outbound peer connections
Chain *chain.Chain // the reference to the chain configuration
SecretsManager secrets.SecretsManager // the secrets manager used for key storage
Metrics *Metrics // the metrics reporting reference
}

func DefaultConfig() *Config {
return &Config{
// The discovery service is turned on by default
NoDiscover: false,
// Addresses are bound to localhost by default
Addr: &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: DefaultLibp2pPort,
},
// The default ratio for outbound / max peer connections is 0.20
MaxPeers: 40,
// The default ratio for outbound / inbound connections is 0.25
MaxInboundPeers: 32,
MaxOutboundPeers: 8,
}
}
150 changes: 150 additions & 0 deletions network/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package network

import (
"github.com/libp2p/go-libp2p-core/network"
"sync/atomic"
)

// ConnectionInfo keeps track of current connection information
// for the networking server
type ConnectionInfo struct {
// ACTIVE CONNECTIONS (VERIFIED) //
inboundConnectionCount int64
outboundConnectionCount int64

// PENDING CONNECTIONS (NOT YET VERIFIED) //
// The reason for keeping pending connections is to, in a way, "reserve" open
// slots for potential connections. These connections may or may not be valid in the end.
// The additional complication of having the concept of pending (reserved) slots
// is with calculating the number of actually available connection slots, in any direction.
// This concept should be revisited in the future, as pending connections are not something
// that is vital to the networking package or connection management as a whole
pendingInboundConnectionCount int64
pendingOutboundConnectionCount int64

// CONNECTION LIMITS //
maxInboundConnectionCount int64
maxOutboundConnectionCount int64
}

// NewBlankConnectionInfo returns a cleared ConnectionInfo instance
func NewBlankConnectionInfo(
maxInboundConnCount int64,
maxOutboundConnCount int64,
) *ConnectionInfo {
return &ConnectionInfo{
inboundConnectionCount: 0,
outboundConnectionCount: 0,
pendingInboundConnectionCount: 0,
pendingOutboundConnectionCount: 0,
maxInboundConnectionCount: maxInboundConnCount,
maxOutboundConnectionCount: maxOutboundConnCount,
}
}

// GetInboundConnCount returns the number of active inbound connections [Thread safe]
func (ci *ConnectionInfo) GetInboundConnCount() int64 {
return atomic.LoadInt64(&ci.inboundConnectionCount)
}

// GetOutboundConnCount returns the number of active outbound connections [Thread safe]
func (ci *ConnectionInfo) GetOutboundConnCount() int64 {
return atomic.LoadInt64(&ci.outboundConnectionCount)
}

// GetPendingInboundConnCount returns the number of pending inbound connections [Thread safe]
func (ci *ConnectionInfo) GetPendingInboundConnCount() int64 {
return atomic.LoadInt64(&ci.pendingInboundConnectionCount)
}

// GetPendingOutboundConnCount returns the number of pending outbound connections [Thread safe]
func (ci *ConnectionInfo) GetPendingOutboundConnCount() int64 {
return atomic.LoadInt64(&ci.pendingOutboundConnectionCount)
}

// incInboundConnCount increases the number of inbound connections by delta [Thread safe]
func (ci *ConnectionInfo) incInboundConnCount(delta int64) {
atomic.AddInt64(&ci.inboundConnectionCount, delta)
}

// incPendingInboundConnCount increases the number of pending inbound connections by delta [Thread safe]
func (ci *ConnectionInfo) incPendingInboundConnCount(delta int64) {
atomic.AddInt64(&ci.pendingInboundConnectionCount, delta)
}

// incPendingOutboundConnCount increases the number of pending outbound connections by delta [Thread safe]
func (ci *ConnectionInfo) incPendingOutboundConnCount(delta int64) {
atomic.AddInt64(&ci.pendingOutboundConnectionCount, delta)
}

// incOutboundConnCount increases the number of outbound connections by delta [Thread safe]
func (ci *ConnectionInfo) incOutboundConnCount(delta int64) {
atomic.AddInt64(&ci.outboundConnectionCount, delta)
}

// HasFreeOutboundConn checks if there are any open outbound connection slots.
// It takes into account the number of current (active) outbound connections and
// the number of pending outbound connections [Thread safe]
func (ci *ConnectionInfo) HasFreeOutboundConn() bool {
return ci.GetOutboundConnCount()+ci.GetPendingOutboundConnCount() < ci.maxOutboundConnCount()
}

// HasFreeInboundConn checks if there are any open inbound connection slots.
// It takes into account the number of current (active) inbound connections and
// the number of pending inbound connections [Thread safe]
func (ci *ConnectionInfo) HasFreeInboundConn() bool {
return ci.GetInboundConnCount()+ci.GetPendingInboundConnCount() < ci.maxInboundConnCount()
}

// maxOutboundConnCount returns the maximum number of outbound connections.
// [Thread safe] since this value is unchanged during runtime
func (ci *ConnectionInfo) maxOutboundConnCount() int64 {
return ci.maxOutboundConnectionCount
}

// maxInboundConnCount returns the minimum number of outbound connections
// [Thread safe] since this value is unchanged during runtime
func (ci *ConnectionInfo) maxInboundConnCount() int64 {
return ci.maxInboundConnectionCount
}

// UpdateConnCountByDirection updates the connection count by delta
// in the specified direction [Thread safe]
func (ci *ConnectionInfo) UpdateConnCountByDirection(
delta int64,
direction network.Direction,
) {
switch direction {
case network.DirInbound:
ci.incInboundConnCount(delta)
case network.DirOutbound:
ci.incOutboundConnCount(delta)
}
}

// UpdatePendingConnCountByDirection updates the pending connection count by delta
// in the specified direction [Thread safe]
func (ci *ConnectionInfo) UpdatePendingConnCountByDirection(
delta int64,
direction network.Direction,
) {
switch direction {
case network.DirInbound:
ci.incPendingInboundConnCount(delta)
case network.DirOutbound:
ci.incPendingOutboundConnCount(delta)
}
}

// HasFreeConnectionSlot checks if there is a free connection slot in the
// specified direction [Thread safe]
func (ci *ConnectionInfo) HasFreeConnectionSlot(direction network.Direction) bool {
switch direction {
case network.DirInbound:
return ci.HasFreeInboundConn()
case network.DirOutbound:
return ci.HasFreeOutboundConn()
}

return false
}
Loading

0 comments on commit 876489a

Please sign in to comment.