Skip to content

Commit

Permalink
updated to use Runners instead of RunFns
Browse files Browse the repository at this point in the history
  • Loading branch information
susruth committed Sep 9, 2019
1 parent 6776b0e commit b1ae3ac
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 43 deletions.
72 changes: 31 additions & 41 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/sirupsen/logrus"
)

type RunFn func(context.Context)
type Runner interface {
Run(ctx context.Context)
}

type PeerOptions struct {
Logger logrus.FieldLogger
Expand All @@ -37,7 +39,7 @@ type PeerOptions struct {
DHTStore kv.Table // Defaults to using in memory store
BroadcasterStore kv.Table // Defaults to using in memory store
SignVerifier protocol.SignVerifier // Defaults to nil
RunFns []RunFn // Defaults to nil
Runners []Runner // Defaults to nil
}

type Peer interface {
Expand Down Expand Up @@ -92,9 +94,9 @@ func New(options PeerOptions, receiver MessageReceiver, dht dht.DHT, pingponger
}

func (peer *peer) Run(ctx context.Context) {
if peer.options.RunFns != nil {
for _, fn := range peer.options.RunFns {
go fn(ctx)
if peer.options.Runners != nil {
for _, runner := range peer.options.Runners {
go runner.Run(ctx)
}
}

Expand Down Expand Up @@ -241,41 +243,11 @@ func newErrInvalidPeerOptions(err error) error {
return fmt.Errorf("invalid peer options: %v", err)
}

func DefaultTCP(options PeerOptions, events EventSender, cap, port int) Peer {
func NewTCPPeer(options PeerOptions, events EventSender, cap, port int) Peer {
var handshaker handshake.Handshaker
if options.SignVerifier != nil {
handshaker = handshake.New(options.SignVerifier)
}
serverMessages := make(chan protocol.MessageOnTheWire, cap)
clientMessages := make(chan protocol.MessageOnTheWire, cap)
options.RunFns = []RunFn{
func(ctx context.Context) {
err := tcp.NewServer(tcp.ServerOptions{
Logger: options.Logger,
Timeout: time.Minute,
Handshaker: handshaker,
}, serverMessages).Listen(ctx, fmt.Sprintf("0.0.0.0:%v", port))
if err != nil {
panic(fmt.Errorf("tcp server has crashed: %v", err))
}
},
func(ctx context.Context) {
tcp.NewClient(tcp.NewClientConns(tcp.ClientOptions{
Logger: options.Logger,
Timeout: 10 * time.Second,
Handshaker: handshaker,
MaxConnections: 200,
}), clientMessages).Run(ctx)
},
}
return Default(options, serverMessages, clientMessages, events)
}

func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender, events EventSender) Peer {
// Pre-condition check
if err := validateOptions(options); err != nil {
panic(fmt.Errorf("pre-condition violation: %v", newErrInvalidPeerOptions(err)))
}

if options.DHTStore == nil {
options.DHTStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "dht")
Expand All @@ -285,18 +257,36 @@ func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender
options.BroadcasterStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "broadcaster")
}

serverMessages := make(chan protocol.MessageOnTheWire, cap)
clientMessages := make(chan protocol.MessageOnTheWire, cap)

dht, err := dht.New(options.Me, options.Codec, options.DHTStore, options.BootstrapAddresses...)
if err != nil {
panic(fmt.Errorf("failed to initialize DHT: %v", err))
}

options.Runners = []Runner{
tcp.NewServer(tcp.ServerOptions{
Logger: options.Logger,
Timeout: time.Minute,
Handshaker: handshaker,
Port: port,
}, serverMessages),
tcp.NewClient(tcp.NewClientConns(tcp.ClientOptions{
Logger: options.Logger,
Timeout: 10 * time.Second,
Handshaker: handshaker,
MaxConnections: 200,
}), dht, clientMessages),
}

return New(
options,
receiver,
serverMessages,
dht,
pingpong.NewPingPonger(dht, sender, events, options.Codec, options.Logger),
cast.NewCaster(dht, sender, events, options.Logger),
multicast.NewMulticaster(dht, sender, events, options.Logger),
broadcast.NewBroadcaster(broadcast.NewStorage(options.BroadcasterStore), dht, sender, events, options.Logger),
pingpong.NewPingPonger(dht, clientMessages, events, options.Codec, options.Logger),
cast.NewCaster(clientMessages, events, options.Logger),
multicast.NewMulticaster(clientMessages, events, options.Logger),
broadcast.NewBroadcaster(options.BroadcasterStore, clientMessages, events, options.Logger),
)
}
85 changes: 83 additions & 2 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ = Describe("airwaves peer", func() {
if signVerifiers != nil && len(signVerifiers) == len(peerAddresses) {
options.SignVerifier = signVerifiers[i]
}
go DefaultTCP(options, events, CAPACITY, 46532+i).Run(ctx)
go NewTCPPeer(options, events, CAPACITY, 46532+i).Run(ctx)
})
return peerAddresses, nil
}
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("airwaves peer", func() {
peers := make([]Peer, nodeCount.NewNodes)
for i, peerAddr := range peerAddresses {
events := make(chan protocol.Event, 10)
peer := DefaultTCP(PeerOptions{
peer := NewTCPPeer(PeerOptions{
Logger: logger.WithField("test_node", i),
Codec: codec,

Expand All @@ -128,4 +128,85 @@ var _ = Describe("airwaves peer", func() {
})
}
})

Context("when updating peer address", func() {
FIt("should be able to send messages to the new address", func() {
logger := logrus.StandardLogger()

peer1Events := make(chan protocol.Event, 65535)
peer2Events := make(chan protocol.Event, 65535)
updatedPeer2Events := make(chan protocol.Event, 65535)

codec := testutil.NewSimpleTCPPeerAddressCodec()
peer1Address := testutil.NewSimpleTCPPeerAddress("peer_1", "127.0.0.1", fmt.Sprintf("%d", 8080))
peer2Address := testutil.NewSimpleTCPPeerAddress("peer_2", "127.0.0.1", fmt.Sprintf("%d", 8081))
updatedPeer2Address := testutil.NewSimpleTCPPeerAddress("peer_2", "127.0.0.1", fmt.Sprintf("%d", 8082))
updatedPeer2Address.Nonce = 1

peer1 := NewTCPPeer(PeerOptions{
Logger: logger,
Me: peer1Address,
BootstrapAddresses: PeerAddresses{peer2Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
}, peer1Events, 65535, 8080)

peer2 := NewTCPPeer(PeerOptions{
Logger: logger,
Me: peer2Address,
BootstrapAddresses: PeerAddresses{peer1Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
}, peer2Events, 65535, 8081)

updatedPeer2 := NewTCPPeer(PeerOptions{
Logger: logger,
Me: updatedPeer2Address,
BootstrapAddresses: PeerAddresses{peer1Address},
Codec: codec,

BootstrapDuration: 3 * time.Second,
}, updatedPeer2Events, 65535, 8082)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
go func() {
<-ctx.Done()
cancel()
}()

co.ParBegin(
func() {
peer1.Run(context.Background())
},
func() {
peer2.Run(ctx)
fmt.Println("peer 2 restarted")
updatedPeer2.Run(context.Background())
},
func() {
<-ctx.Done()
ctx2, cancel := context.WithTimeout(context.Background(), 30*time.Second)
go func() {
<-ctx2.Done()
cancel()
}()
if err := peer1.Cast(ctx2, testutil.SimplePeerID("peer_2"), []byte("hello")); err != nil {
panic(err)
}
},
func() {
for event := range peer1Events {
fmt.Println(event)
}
},
func() {
for event := range updatedPeer2Events {
fmt.Println(event)
}
},
)
})
})
})

0 comments on commit b1ae3ac

Please sign in to comment.