Skip to content

Commit

Permalink
Merge branch 'master' into fix/handshake-log
Browse files Browse the repository at this point in the history
  • Loading branch information
Nadimpalli Susruth authored Nov 5, 2019
2 parents 0a9a193 + c32e13a commit cde5a8a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 33 deletions.
42 changes: 25 additions & 17 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type PeerOptions struct {
BroadcasterStore kv.Table // Defaults to using in memory store
SignVerifier protocol.SignVerifier // Defaults to nil
Runners []Runner // Defaults to nil

ServerOptions tcp.ServerOptions
ClientOptions tcp.ClientOptions
}

type Peer interface {
Expand Down Expand Up @@ -244,11 +247,6 @@ func newErrInvalidPeerOptions(err error) error {
}

func NewTCPPeer(options PeerOptions, events EventSender, cap, port int) Peer {
var handshaker handshake.Handshaker
if options.SignVerifier != nil {
handshaker = handshake.New(options.SignVerifier)
}

if options.DHTStore == nil {
options.DHTStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "dht")
}
Expand All @@ -257,6 +255,26 @@ func NewTCPPeer(options PeerOptions, events EventSender, cap, port int) Peer {
options.BroadcasterStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "broadcaster")
}

if options.ServerOptions.Logger == nil {
options.ServerOptions.Logger = options.Logger
}

if options.ServerOptions.Port == 0 {
options.ServerOptions.Port = port
}

if options.ServerOptions.Handshaker == nil && options.SignVerifier != nil {
options.ServerOptions.Handshaker = handshake.New(options.SignVerifier)
}

if options.ClientOptions.Logger == nil {
options.ClientOptions.Logger = options.Logger
}

if options.ClientOptions.Handshaker == nil && options.SignVerifier != nil {
options.ClientOptions.Handshaker = handshake.New(options.SignVerifier)
}

serverMessages := make(chan protocol.MessageOnTheWire, cap)
clientMessages := make(chan protocol.MessageOnTheWire, cap)

Expand All @@ -266,18 +284,8 @@ func NewTCPPeer(options PeerOptions, events EventSender, cap, port int) Peer {
}

options.Runners = append(options.Runners,
tcp.NewServer(tcp.ServerOptions{
Logger: options.Logger,
Timeout: time.Minute,
Handshaker: handshaker,
Port: port,
}, serverMessages),
tcp.NewClient(tcp.NewClientConns(tcp.ClientOptions{
Logger: options.Logger,
Timeout: 10 * time.Second,
Handshaker: handshaker,
MaxConnections: 200,
}), dht, clientMessages),
tcp.NewServer(options.ServerOptions, serverMessages),
tcp.NewClient(tcp.NewClientConns(options.ClientOptions), dht, clientMessages),
)

return New(
Expand Down
49 changes: 36 additions & 13 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aw_test

import (
"context"
"encoding/base64"
"fmt"
"time"

Expand All @@ -10,6 +11,7 @@ import (
. "github.com/renproject/aw"

"github.com/renproject/aw/protocol"
"github.com/renproject/aw/tcp"
"github.com/renproject/aw/testutil"
"github.com/renproject/phi/co"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -130,7 +132,7 @@ var _ = Describe("airwaves peer", func() {
})

Context("when updating peer address", func() {
FIt("should be able to send messages to the new address", func() {
It("should be able to send messages to the new address", func() {
logger := logrus.StandardLogger()

peer1Events := make(chan protocol.Event, 65535)
Expand All @@ -144,30 +146,39 @@ var _ = Describe("airwaves peer", func() {
updatedPeer2Address.Nonce = 1

peer1 := NewTCPPeer(PeerOptions{
Logger: logger,
Logger: logger.WithField("peer", 1),
Me: peer1Address,
BootstrapAddresses: PeerAddresses{peer2Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
ClientOptions: tcp.ClientOptions{
MaxRetries: 60,
},
}, peer1Events, 65535, 8080)

peer2 := NewTCPPeer(PeerOptions{
Logger: logger,
Logger: logger.WithField("peer", 2),
Me: peer2Address,
BootstrapAddresses: PeerAddresses{peer1Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
ClientOptions: tcp.ClientOptions{
MaxRetries: 60,
},
}, peer2Events, 65535, 8081)

updatedPeer2 := NewTCPPeer(PeerOptions{
Logger: logger,
Logger: logger.WithField("updated peer", 2),
Me: updatedPeer2Address,
BootstrapAddresses: PeerAddresses{peer1Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
ClientOptions: tcp.ClientOptions{
MaxRetries: 60,
},
}, updatedPeer2Events, 65535, 8082)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -176,14 +187,16 @@ var _ = Describe("airwaves peer", func() {
cancel()
}()

newCtx, newCancel := context.WithCancel(context.Background())

co.ParBegin(
func() {
peer1.Run(context.Background())
peer1.Run(newCtx)
},
func() {
peer2.Run(ctx)
fmt.Println("peer 2 restarted")
updatedPeer2.Run(context.Background())
updatedPeer2.Run(newCtx)
},
func() {
<-ctx.Done()
Expand All @@ -192,19 +205,29 @@ var _ = Describe("airwaves peer", func() {
<-ctx2.Done()
cancel()
}()
if err := peer1.Cast(ctx2, testutil.SimplePeerID("peer_2"), []byte("hello")); err != nil {
// After peer 2 receives this message, its server should
// shut down as the context has expired.
if err := peer1.Cast(ctx2, peer2Address.PeerID(), []byte("hello")); err != nil {
panic(err)
}
time.Sleep(time.Second)
if err := peer1.Cast(ctx2, peer2Address.PeerID(), []byte("hello")); err != nil {
panic(err)
}
},
func() {
for event := range peer1Events {
fmt.Println(event)
}
event := <-peer1Events
_, ok := event.(protocol.EventPeerChanged)
Expect(ok).To(BeTrue())
},
func() {
for event := range updatedPeer2Events {
fmt.Println(event)
}
event := <-updatedPeer2Events
msg, ok := event.(protocol.EventMessageReceived)
Expect(ok).To(BeTrue())
msgBytes, err := base64.StdEncoding.DecodeString(msg.Message.String())
Expect(err).ToNot(HaveOccurred())
Expect(msgBytes).To(Equal([]byte("hello")))
newCancel()
},
)
})
Expand Down
22 changes: 20 additions & 2 deletions tcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ClientOptions struct {
Timeout time.Duration
// MaxConnections to remote servers that the Client will maintain.
MaxConnections int
// MaxRetries if the message cannot be sent.
MaxRetries int
// Handshaker handles the handshake process between peers. Default: no handshake
Handshaker handshake.Handshaker
}
Expand Down Expand Up @@ -60,7 +62,7 @@ func NewClientConns(options ClientOptions) *ClientConns {
if options.Timeout == 0 {
options.Timeout = 10 * time.Second
}
if options.MaxConnections == 256 {
if options.MaxConnections == 0 {
options.MaxConnections = 256
}

Expand All @@ -87,7 +89,11 @@ func (clientConns *ClientConns) Write(ctx context.Context, addr net.Addr, messag
clientConns.connsMu.RLock()
conn := clientConns.conns[addr.String()]
clientConns.connsMu.RUnlock()

clientConns.connsMu.RLock()
if conn != nil && conn.conn != nil {
clientConns.connsMu.RUnlock()

// Mutex on the conn
conn.mu.Lock()
defer conn.mu.Unlock()
Expand All @@ -101,6 +107,7 @@ func (clientConns *ClientConns) Write(ctx context.Context, addr net.Addr, messag
}
return nil
}
clientConns.connsMu.RUnlock()

// Protect the cache from concurrent writes and establish a connection that
// can be dialed
Expand Down Expand Up @@ -128,7 +135,11 @@ func (clientConns *ClientConns) Write(ctx context.Context, addr net.Addr, messag
if err != nil {
return err
}

clientConns.connsMu.RLock()
if conn.conn != nil {
clientConns.connsMu.RUnlock()

// Mutex on the conn
conn.mu.Lock()
defer conn.mu.Unlock()
Expand All @@ -142,10 +153,14 @@ func (clientConns *ClientConns) Write(ctx context.Context, addr net.Addr, messag
}
return nil
}
clientConns.connsMu.RUnlock()

// Double-check the connection, because while waiting to acquire the write lock
// another goroutine may have already dialed the remote server
clientConns.connsMu.RLock()
if conn.conn != nil {
clientConns.connsMu.RUnlock()

// Mutex on the conn
conn.mu.Lock()
defer conn.mu.Unlock()
Expand All @@ -159,14 +174,17 @@ func (clientConns *ClientConns) Write(ctx context.Context, addr net.Addr, messag
}
return nil
}
clientConns.connsMu.RUnlock()

// A new connection needs to be dialed, so we lock the connection to prevent
// multiple dials against the same remote server
conn.mu.Lock()
defer conn.mu.Unlock()

// Dial
clientConns.connsMu.Lock()
conn.conn, err = net.DialTimeout("tcp", addr.String(), clientConns.options.Timeout)
clientConns.connsMu.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +315,7 @@ func (client *Client) sendMessageOnTheWire(ctx context.Context, to net.Addr, mes
go func() {
begin := time.Now()
delay := time.Duration(1000)
for i := 0; i < 60; i++ {
for i := 0; i < client.conns.options.MaxRetries; i++ {
// Dial
client.conns.options.Logger.Warnf("retrying write to tcp connection to %v with delay of %.4f second(s)", to.String(), time.Now().Sub(begin).Seconds())
err := client.conns.Write(ctx, to, message)
Expand Down
7 changes: 7 additions & 0 deletions tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ type Server struct {
}

func NewServer(options ServerOptions, messages protocol.MessageSender) *Server {
if options.Logger == nil {
panic("pre-condition violation: logger is nil")
}
if options.Timeout == 0 {
options.Timeout = time.Minute
}

return &Server{
options: options,
messages: messages,
Expand Down
2 changes: 1 addition & 1 deletion testutil/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (address SimpleTCPPeerAddress) IsNewer(peerAddress protocol.PeerAddress) bo
if !ok {
return false
}
return peerAddr.Nonce > address.Nonce
return address.Nonce > peerAddr.Nonce
}

func Remove(addrs protocol.PeerAddresses, i int) protocol.PeerAddresses {
Expand Down

0 comments on commit cde5a8a

Please sign in to comment.