From b1ae3acf90ab990815e32986843e341f942f3385 Mon Sep 17 00:00:00 2001 From: susruth Date: Mon, 9 Sep 2019 17:41:20 +1000 Subject: [PATCH] updated to use Runners instead of RunFns --- peer.go | 72 +++++++++++++++++++------------------------- peer_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 114 insertions(+), 43 deletions(-) diff --git a/peer.go b/peer.go index f151118..5e88c88 100644 --- a/peer.go +++ b/peer.go @@ -19,7 +19,9 @@ import ( "github.com/sirupsen/logrus" ) -type RunFn func(context.Context) +type Runner interface { + Run(ctx context.Context) +} type PeerOptions struct { Logger logrus.FieldLogger @@ -37,7 +39,7 @@ type PeerOptions struct { DHTStore kv.Table // Defaults to using in memory store BroadcasterStore kv.Table // Defaults to using in memory store SignVerifier protocol.SignVerifier // Defaults to nil - RunFns []RunFn // Defaults to nil + Runners []Runner // Defaults to nil } type Peer interface { @@ -92,9 +94,9 @@ func New(options PeerOptions, receiver MessageReceiver, dht dht.DHT, pingponger } func (peer *peer) Run(ctx context.Context) { - if peer.options.RunFns != nil { - for _, fn := range peer.options.RunFns { - go fn(ctx) + if peer.options.Runners != nil { + for _, runner := range peer.options.Runners { + go runner.Run(ctx) } } @@ -241,41 +243,11 @@ func newErrInvalidPeerOptions(err error) error { return fmt.Errorf("invalid peer options: %v", err) } -func DefaultTCP(options PeerOptions, events EventSender, cap, port int) Peer { +func NewTCPPeer(options PeerOptions, events EventSender, cap, port int) Peer { var handshaker handshake.Handshaker if options.SignVerifier != nil { handshaker = handshake.New(options.SignVerifier) } - serverMessages := make(chan protocol.MessageOnTheWire, cap) - clientMessages := make(chan protocol.MessageOnTheWire, cap) - options.RunFns = []RunFn{ - func(ctx context.Context) { - err := tcp.NewServer(tcp.ServerOptions{ - Logger: options.Logger, - Timeout: time.Minute, - Handshaker: handshaker, - }, serverMessages).Listen(ctx, fmt.Sprintf("0.0.0.0:%v", port)) - if err != nil { - panic(fmt.Errorf("tcp server has crashed: %v", err)) - } - }, - func(ctx context.Context) { - tcp.NewClient(tcp.NewClientConns(tcp.ClientOptions{ - Logger: options.Logger, - Timeout: 10 * time.Second, - Handshaker: handshaker, - MaxConnections: 200, - }), clientMessages).Run(ctx) - }, - } - return Default(options, serverMessages, clientMessages, events) -} - -func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender, events EventSender) Peer { - // Pre-condition check - if err := validateOptions(options); err != nil { - panic(fmt.Errorf("pre-condition violation: %v", newErrInvalidPeerOptions(err))) - } if options.DHTStore == nil { options.DHTStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "dht") @@ -285,18 +257,36 @@ func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender options.BroadcasterStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "broadcaster") } + serverMessages := make(chan protocol.MessageOnTheWire, cap) + clientMessages := make(chan protocol.MessageOnTheWire, cap) + dht, err := dht.New(options.Me, options.Codec, options.DHTStore, options.BootstrapAddresses...) if err != nil { panic(fmt.Errorf("failed to initialize DHT: %v", err)) } + options.Runners = []Runner{ + tcp.NewServer(tcp.ServerOptions{ + Logger: options.Logger, + Timeout: time.Minute, + Handshaker: handshaker, + Port: port, + }, serverMessages), + tcp.NewClient(tcp.NewClientConns(tcp.ClientOptions{ + Logger: options.Logger, + Timeout: 10 * time.Second, + Handshaker: handshaker, + MaxConnections: 200, + }), dht, clientMessages), + } + return New( options, - receiver, + serverMessages, dht, - pingpong.NewPingPonger(dht, sender, events, options.Codec, options.Logger), - cast.NewCaster(dht, sender, events, options.Logger), - multicast.NewMulticaster(dht, sender, events, options.Logger), - broadcast.NewBroadcaster(broadcast.NewStorage(options.BroadcasterStore), dht, sender, events, options.Logger), + pingpong.NewPingPonger(dht, clientMessages, events, options.Codec, options.Logger), + cast.NewCaster(clientMessages, events, options.Logger), + multicast.NewMulticaster(clientMessages, events, options.Logger), + broadcast.NewBroadcaster(options.BroadcasterStore, clientMessages, events, options.Logger), ) } diff --git a/peer_test.go b/peer_test.go index 69abcfe..f70a86a 100644 --- a/peer_test.go +++ b/peer_test.go @@ -49,7 +49,7 @@ var _ = Describe("airwaves peer", func() { if signVerifiers != nil && len(signVerifiers) == len(peerAddresses) { options.SignVerifier = signVerifiers[i] } - go DefaultTCP(options, events, CAPACITY, 46532+i).Run(ctx) + go NewTCPPeer(options, events, CAPACITY, 46532+i).Run(ctx) }) return peerAddresses, nil } @@ -105,7 +105,7 @@ var _ = Describe("airwaves peer", func() { peers := make([]Peer, nodeCount.NewNodes) for i, peerAddr := range peerAddresses { events := make(chan protocol.Event, 10) - peer := DefaultTCP(PeerOptions{ + peer := NewTCPPeer(PeerOptions{ Logger: logger.WithField("test_node", i), Codec: codec, @@ -128,4 +128,85 @@ var _ = Describe("airwaves peer", func() { }) } }) + + Context("when updating peer address", func() { + FIt("should be able to send messages to the new address", func() { + logger := logrus.StandardLogger() + + peer1Events := make(chan protocol.Event, 65535) + peer2Events := make(chan protocol.Event, 65535) + updatedPeer2Events := make(chan protocol.Event, 65535) + + codec := testutil.NewSimpleTCPPeerAddressCodec() + peer1Address := testutil.NewSimpleTCPPeerAddress("peer_1", "127.0.0.1", fmt.Sprintf("%d", 8080)) + peer2Address := testutil.NewSimpleTCPPeerAddress("peer_2", "127.0.0.1", fmt.Sprintf("%d", 8081)) + updatedPeer2Address := testutil.NewSimpleTCPPeerAddress("peer_2", "127.0.0.1", fmt.Sprintf("%d", 8082)) + updatedPeer2Address.Nonce = 1 + + peer1 := NewTCPPeer(PeerOptions{ + Logger: logger, + Me: peer1Address, + BootstrapAddresses: PeerAddresses{peer2Address}, + Codec: codec, + + BootstrapDuration: 3 * time.Second, + }, peer1Events, 65535, 8080) + + peer2 := NewTCPPeer(PeerOptions{ + Logger: logger, + Me: peer2Address, + BootstrapAddresses: PeerAddresses{peer1Address}, + Codec: codec, + + BootstrapDuration: 3 * time.Second, + }, peer2Events, 65535, 8081) + + updatedPeer2 := NewTCPPeer(PeerOptions{ + Logger: logger, + Me: updatedPeer2Address, + BootstrapAddresses: PeerAddresses{peer1Address}, + Codec: codec, + + BootstrapDuration: 3 * time.Second, + }, updatedPeer2Events, 65535, 8082) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + go func() { + <-ctx.Done() + cancel() + }() + + co.ParBegin( + func() { + peer1.Run(context.Background()) + }, + func() { + peer2.Run(ctx) + fmt.Println("peer 2 restarted") + updatedPeer2.Run(context.Background()) + }, + func() { + <-ctx.Done() + ctx2, cancel := context.WithTimeout(context.Background(), 30*time.Second) + go func() { + <-ctx2.Done() + cancel() + }() + if err := peer1.Cast(ctx2, testutil.SimplePeerID("peer_2"), []byte("hello")); err != nil { + panic(err) + } + }, + func() { + for event := range peer1Events { + fmt.Println(event) + } + }, + func() { + for event := range updatedPeer2Events { + fmt.Println(event) + } + }, + ) + }) + }) })