Skip to content

Commit

Permalink
Merge pull request #36 from renproject/fix/wire-fmt
Browse files Browse the repository at this point in the history
Fix Wire format and client dial issues
  • Loading branch information
jazg authored Jul 20, 2020
2 parents bffe4ee + 591a58d commit b6345a8
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 98 deletions.
10 changes: 10 additions & 0 deletions aw.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"go.uber.org/zap"
)

type (
ContentResolver = dht.ContentResolver
)

type Builder struct {
opts Options

Expand Down Expand Up @@ -75,10 +79,12 @@ func (builder *Builder) WithAddr(addr wire.Address) *Builder {
}
return builder
}

func (builder *Builder) WithHost(host string) *Builder {
builder.trans.TCPServerOpts = builder.trans.TCPServerOpts.WithHost(host)
return builder
}

func (builder *Builder) WithPort(port uint16) *Builder {
builder.trans.TCPServerOpts = builder.trans.TCPServerOpts.WithPort(port)
return builder
Expand Down Expand Up @@ -155,6 +161,10 @@ func (node *Node) Gossiper() *gossip.Gossiper {
return node.gossiper
}

func (node *Node) Sync(ctx context.Context, subnet, hash id.Hash, dataType uint8) ([]byte, error) {
return node.gossiper.Sync(ctx, subnet, hash, dataType)
}

func (node *Node) Identity() id.Signatory {
return node.peer.Identity()
}
Expand Down
85 changes: 83 additions & 2 deletions aw_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package aw_test

import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"math/rand"
"sync/atomic"
"time"

"github.com/renproject/aw"
"github.com/renproject/aw/dht"
"github.com/renproject/aw/gossip"
"github.com/renproject/aw/wire"
"github.com/renproject/id"

Expand All @@ -32,15 +35,23 @@ var _ = Describe("Airwave", func() {
defer cancel()

port1 := uint16(3000 + r.Int()%3000)
addr1 := wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port1), uint64(time.Now().UnixNano()))
privKey1 := id.NewPrivKey()
Expect(addr1.Sign(privKey1)).To(Succeed())
node1 := aw.New().
WithAddr(wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port1), uint64(time.Now().UnixNano()))).
WithPrivKey(privKey1).
WithAddr(addr1).
WithHost("0.0.0.0").
WithPort(port1).
Build()

port2 := uint16(3000 + r.Int()%3000)
addr2 := wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port2), uint64(time.Now().UnixNano()))
privKey2 := id.NewPrivKey()
Expect(addr2.Sign(privKey2)).To(Succeed())
node2 := aw.New().
WithAddr(wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port2), uint64(time.Now().UnixNano()))).
WithPrivKey(privKey2).
WithAddr(addr2).
WithHost("0.0.0.0").
WithPort(port2).
WithContentResolver(
Expand Down Expand Up @@ -93,4 +104,74 @@ var _ = Describe("Airwave", func() {
})
})
})

Context("when gossiping", func() {
Context("when fully connected", func() {
It("should return content from all nodes", func() {
defer time.Sleep(time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initialise nodes.
n := 3
nodes := make([]*aw.Node, n)
addrs := make([]wire.Address, n)
for i := range nodes {
port := uint16(3000 + i)
addrs[i] = wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port), uint64(time.Now().UnixNano()))
privKey := id.NewPrivKey()
Expect(addrs[i].Sign(privKey)).To(Succeed())
node := aw.New().
WithPrivKey(privKey).
WithAddr(addrs[i]).
WithHost("0.0.0.0").
WithPort(port).
Build()
nodes[i] = node
}

// Connect nodes in a fully connected cyclic graph.
for i := range nodes {
for j := range nodes {
if i == j {
continue
}
nodes[i].DHT().InsertAddr(addrs[j])
}
}

// Run the nodes.
for i := range nodes {
go nodes[i].Run(ctx)
}

// Sleep for enough time for nodes to find each other by pinging
// each other.
time.Sleep(100 * time.Millisecond)

contentHash := sha256.Sum256([]byte("hello!"))
contentType := uint8(1)
content := []byte("hello!")
nodes[0].Broadcast(ctx, gossip.DefaultSubnet, contentType, content)

found := map[id.Signatory]struct{}{}
for {
time.Sleep(time.Millisecond)
for i := range nodes {
data, ok := nodes[i].DHT().Content(contentHash, contentType)
if !ok {
continue
}
if bytes.Equal(content, data) {
found[nodes[i].Identity()] = struct{}{}
}
}
if len(found) == n {
return
}
}
})
})
})
})
3 changes: 1 addition & 2 deletions handshake/ecdsa.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ func (handshaker *ecdsaHandshaker) AcceptHandshake(ctx context.Context, conn net
//
// 2
//
err = writePubKeyWithSignature(conn, &handshaker.opts.PrivKey.PublicKey, handshaker.opts.PrivKey)
if err != nil {
if err := writePubKeyWithSignature(conn, &handshaker.opts.PrivKey.PublicKey, handshaker.opts.PrivKey); err != nil {
return nil, fmt.Errorf("writing server pubkey with signature: %v", err)
}

Expand Down
Loading

0 comments on commit b6345a8

Please sign in to comment.