Skip to content

Commit

Permalink
Peer id fix (#969)
Browse files Browse the repository at this point in the history
* Fix peer ID in 'p2p/peer' package.

Add uniqueness for 'peerImplID'.

* Fix 'incomingPeerID' for 'retransmitter' utility.

* Improved 'peerImplID' in 'p2p/peer' package. Tests added.

* Add TODOs.

* Add 'TestPeerImplId_InMap' test.

Co-authored-by: Alexey Kiselev <[email protected]>
  • Loading branch information
nickeskov and alexeykiselev authored Dec 14, 2022
1 parent f4ed645 commit d2fc4e3
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 22 deletions.
12 changes: 6 additions & 6 deletions cmd/retransmitter/retransmit/network/incoming_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ type IncomingPeer struct {
}

type incomingPeerID struct {
remoteAddr net.Addr
localAddr net.Addr
remoteAddr string
localAddr string
}

func newPeerID(remoteAddr net.Addr, localAddr net.Addr) incomingPeerID {
return incomingPeerID{remoteAddr: remoteAddr, localAddr: localAddr}
return incomingPeerID{remoteAddr: remoteAddr.String(), localAddr: localAddr.String()}
}

func (id incomingPeerID) String() string {
return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr.String(), id.localAddr.String())
return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr, id.localAddr)
}

type IncomingPeerParams struct {
Expand Down Expand Up @@ -60,7 +60,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
default:
}

id := fmt.Sprintf("incoming Connection %s -> %s", c.RemoteAddr().String(), c.LocalAddr().String())
id := newPeerID(c.RemoteAddr(), c.LocalAddr())
zap.S().Infof("read handshake from %s %+v", id, readHandshake)

writeHandshake := proto.Handshake{
Expand Down Expand Up @@ -95,7 +95,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
params: params,
conn: connection,
remote: remote,
uniqueID: newPeerID(c.RemoteAddr(), c.LocalAddr()),
uniqueID: id,
cancel: cancel,
handshake: readHandshake,
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/p2p/incoming/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Peer

remote := peer.NewRemote()
connection := conn.WrapConnection(c, remote.ToCh, remote.FromCh, remote.ErrCh, params.Skip)
peerImpl := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel)
peerImpl, err := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel)
if err != nil {
_ = c.Close() // TODO: handle error
zap.S().Warn("Failed to create new peer impl: ", err)
return errors.Wrap(err, "failed to run incoming peer")
}

out := peer.InfoMessage{
Peer: peerImpl,
Expand Down
11 changes: 10 additions & 1 deletion pkg/p2p/outgoing/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,35 @@ type EstablishParams struct {

func EstablishConnection(ctx context.Context, params EstablishParams, v proto.Version) error {
ctx, cancel := context.WithCancel(ctx)
// FIXME: cancel should be defered
remote := peer.NewRemote()
p := connector{
params: params,
cancel: cancel,
remote: remote,
}

// TODO: use net.DialTimeout
c, err := net.Dial("tcp", params.Address.String())
if err != nil {
return err
}
// FIXME: connection.close should be called in case of any error, or it should be deferred in any case

connection, handshake, err := p.connect(ctx, c, v)
if err != nil {
// FIXME: close connection
zap.S().Debugf("Outgoing connection to address %s failed with error: %v", params.Address.String(), err)
return errors.Wrapf(err, "%q", params.Address)
}
p.connection = connection

peerImpl := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel)
peerImpl, err := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel)
if err != nil {
_ = c.Close() // TODO: handle error
zap.S().Debugf("Failed to create new peer impl for outgoing conn to %s: %v", params.Address, err)
return errors.Wrapf(err, "failed to establish connection to %s", params.Address.String())
}

connected := peer.InfoMessage{
Peer: peerImpl,
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/peer/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type HandlerParams struct {
}

// Handle sends and receives messages no matter outgoing or incoming connection.
// TODO: caller should be responsible for closing network connection
func Handle(params HandlerParams) error {
for {
select {
Expand Down
34 changes: 24 additions & 10 deletions pkg/p2p/peer/peer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"net"
"strings"
"net/netip"

"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/p2p/conn"
Expand All @@ -13,17 +13,27 @@ import (
)

type peerImplID struct {
addr net.Addr
nonce uint64
addr16 [16]byte
nonce uint64
}

func newPeerImplID(addr net.Addr, nonce uint64) peerImplID {
return peerImplID{addr: addr, nonce: nonce}
func newPeerImplID(addr net.Addr, nonce uint64) (peerImplID, error) {
var (
netStr = addr.Network()
addrStr = addr.String()
)
tcpAddr, err := net.ResolveTCPAddr(netStr, addrStr)
if err != nil {
return peerImplID{}, errors.Wrapf(err, "failed to resolve '%s' addr from '%s'", netStr, addrStr)
}
var addr16 [16]byte
copy(addr16[:], tcpAddr.IP.To16())
return peerImplID{addr16: addr16, nonce: nonce}, nil
}

func (id peerImplID) String() string {
a := strings.Split(id.addr.String(), ":")[0]
return fmt.Sprintf("%s-%d", a, id.nonce)
addr := netip.AddrFrom16(id.addr16).Unmap()
return fmt.Sprintf("%s-%d", addr.String(), id.nonce)
}

type PeerImpl struct {
Expand All @@ -35,15 +45,19 @@ type PeerImpl struct {
cancel context.CancelFunc
}

func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) *PeerImpl {
func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) (*PeerImpl, error) {
id, err := newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce)
if err != nil {
return nil, errors.Wrap(err, "failed to create new peer")
}
return &PeerImpl{
handshake: handshake,
conn: conn,
direction: direction,
remote: remote,
id: newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce),
id: id,
cancel: cancel,
}
}, nil
}

func (a *PeerImpl) Direction() Direction {
Expand Down
82 changes: 78 additions & 4 deletions pkg/p2p/peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,87 @@
package peer

import (
"net"
"fmt"
"net/netip"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestID(t *testing.T) {
addr, _ := net.ResolveTCPAddr("", "127.0.0.1:6868")
assert.Equal(t, "127.0.0.1-100500", peerImplID{addr: addr, nonce: 100500}.String())
type netAddr struct{ net, addr string }

func (n netAddr) Network() string { return n.net }

func (n netAddr) String() string { return n.addr }

func TestPeerImplID(t *testing.T) {
tests := []struct {
net, addr string
nonce uint64
errorStr string
}{
{net: "tcp", addr: "127.0.0.1:100", nonce: 100501},
{net: "tcp4", addr: "127.0.0.1:100", nonce: 100502},
{net: "", addr: "127.0.0.1:100", nonce: 100504},
{net: "tcp", addr: "[2001:db8::1]:8080", nonce: 80},
{net: "tcp6", addr: "[2001:db8::1]:8080", nonce: 82},
{
net: "tcp6", addr: "127.0.0.1:100", nonce: 100503,
errorStr: "failed to resolve 'tcp6' addr from '127.0.0.1:100': address 127.0.0.1: no suitable address found",
},
{
net: "tcp4", addr: "[2001:db8::1]:8080", nonce: 81,
errorStr: "failed to resolve 'tcp4' addr from '[2001:db8::1]:8080': address 2001:db8::1: no suitable address found",
},
{
net: "udp", addr: "[2001:db8::1]:8080", nonce: 80,
errorStr: "failed to resolve 'udp' addr from '[2001:db8::1]:8080': unknown network udp",
},
{
net: "tcp", addr: "127.0.0.01", nonce: 90,
errorStr: "failed to resolve 'tcp' addr from '127.0.0.01': address 127.0.0.01: missing port in address",
},
}
for i, test := range tests {
t.Run(strconv.Itoa(i+1), func(t *testing.T) {
id, err := newPeerImplID(netAddr{net: test.net, addr: test.addr}, test.nonce)
if test.errorStr != "" {
assert.EqualError(t, err, test.errorStr)
} else {
addrP, err := netip.ParseAddrPort(test.addr)
require.NoError(t, err)
expectedAddr := addrP.Addr()
assert.Equal(t, expectedAddr.As16(), id.addr16)
assert.Equal(t, test.nonce, id.nonce)
expectedString := fmt.Sprintf("%s-%d", expectedAddr, test.nonce)
assert.Equal(t, expectedString, id.String())
}
})
}
}

func TestPeerImplId_InMap(t *testing.T) {
const (
net = "tcp"
addr = "127.0.0.1:8080"
)
type noncePair struct{ first, second uint64 }
for i, np := range []noncePair{{100, 500}, {100, 100}} {
t.Run(strconv.Itoa(i+1), func(t *testing.T) {
first, err := newPeerImplID(netAddr{net: net, addr: addr}, np.first)
require.NoError(t, err)
second, err := newPeerImplID(netAddr{net: net, addr: addr}, np.second)
require.NoError(t, err)

m := map[ID]struct{}{first: {}}
_, ok := m[second]
if unique := np.first != np.second; unique {
assert.False(t, ok)
} else {
assert.True(t, ok)
}
})
}
}

0 comments on commit d2fc4e3

Please sign in to comment.