Skip to content

Commit

Permalink
Merge branch 'main' into fix/node-deref-error
Browse files Browse the repository at this point in the history
  • Loading branch information
alvin-reyes authored Jan 14, 2025
2 parents 53bfbc4 + 2e61c18 commit b7b42f6
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 7 deletions.
5 changes: 5 additions & 0 deletions node/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func (node *OracleNode) getNodeData() *pubsub.NodeData {
func (node *OracleNode) Start() (err error) {
logrus.Infof("[+] Starting node with ID: %s", node.Host.ID().String())

// simple check to ensure the node is initialized
if node.Host == nil {
return fmt.Errorf("node host not initialized")
}

node.Host.SetStreamHandler(node.Protocol, node.handleStream)
node.Host.SetStreamHandler(node.protocolWithVersion(node.Options.NodeDataSyncProtocol), node.ReceiveNodeData)

Expand Down
61 changes: 61 additions & 0 deletions node/oracle_node_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package node

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"encoding/hex"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strings"
"testing"

"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -39,3 +44,59 @@ func TestNodeSignature(t *testing.T) {
t.Errorf("Expected node to be a publisher, but it's not")
}
}

func TestOracleNode_GetNodeData(t *testing.T) {
ctx := context.Background()

tests := []struct {
name string
opts []Option
checkFn func(*testing.T, *pubsub.NodeData)
}{
{
name: "random identity node",
opts: []Option{
EnableRandomIdentity,
EnableUDP,
WithPort(0),
},
checkFn: func(t *testing.T, data *pubsub.NodeData) {
assert.NotEmpty(t, data.PeerId)
assert.True(t, strings.HasPrefix(data.EthAddress, "0x"))
assert.False(t, data.IsStaked)
},
},
{
name: "node with all capabilities",
opts: []Option{
EnableRandomIdentity,
EnableUDP,
WithPort(0),
EnableStaked,
IsTwitterScraper,
IsWebScraper,
IsValidator,
},
checkFn: func(t *testing.T, data *pubsub.NodeData) {
assert.NotEmpty(t, data.PeerId)
assert.True(t, data.IsStaked)
assert.True(t, data.IsTwitterScraper)
assert.True(t, data.IsWebScraper)
assert.True(t, data.IsValidator)
assert.True(t, data.IsActive)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
node, err := NewOracleNode(ctx, tt.opts...)
require.NoError(t, err)
defer node.Host.Close()

nodeData := node.getNodeData()
require.NotNil(t, nodeData)
tt.checkFn(t, nodeData)
})
}
}
146 changes: 146 additions & 0 deletions pkg/network/address_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package network

import (
"testing"

"github.com/libp2p/go-libp2p"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetMultiAddressesForHost(t *testing.T) {
tests := []struct {
name string
listenAddrs []string
expectEmpty bool
expectError bool
}{
{
name: "valid non-local addresses",
listenAddrs: []string{
"/ip4/0.0.0.0/tcp/0", // Use 0.0.0.0 and port 0 for testing
},
expectEmpty: false,
expectError: false,
},
{
name: "only localhost addresses",
listenAddrs: []string{
"/ip4/127.0.0.1/tcp/0",
},
expectEmpty: true,
expectError: false,
},
{
name: "mixed addresses",
listenAddrs: []string{
"/ip4/127.0.0.1/tcp/0",
"/ip4/0.0.0.0/tcp/0",
},
expectEmpty: false,
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a test host with specified listen addresses
opts := []libp2p.Option{
libp2p.ListenAddrStrings(tt.listenAddrs...),
}
h, err := libp2p.New(opts...)
require.NoError(t, err)
defer h.Close()

// Get multiaddresses
addrs, err := GetMultiAddressesForHost(h)

// Check error
if tt.expectError {
assert.Error(t, err)
return
}
assert.NoError(t, err)

// Check if empty when expected
if tt.expectEmpty {
assert.Empty(t, addrs)
return
}

// Verify no localhost addresses
for _, addr := range addrs {
assert.NotContains(t, addr.String(), "127.0.0.1")
}
})
}
}

func TestGetBootNodesMultiAddress(t *testing.T) {
tests := []struct {
name string
bootstrapNodes []string
expectedLen int
expectError bool
}{
{
name: "valid addresses",
bootstrapNodes: []string{
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
"/ip4/104.131.131.83/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuK",
},
expectedLen: 2,
expectError: false,
},
{
name: "empty list",
bootstrapNodes: []string{},
expectedLen: 0,
expectError: false,
},
{
name: "list with empty string",
bootstrapNodes: []string{""},
expectedLen: 0,
expectError: false,
},
{
name: "invalid address",
bootstrapNodes: []string{
"invalid-address",
},
expectedLen: 0,
expectError: true,
},
{
name: "mixed valid and empty",
bootstrapNodes: []string{
"",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
},
expectedLen: 1,
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addrs, err := GetBootNodesMultiAddress(tt.bootstrapNodes)

if tt.expectError {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Len(t, addrs, tt.expectedLen)

// Verify each address is valid
for _, addr := range addrs {
assert.NotNil(t, addr)
assert.Implements(t, (*multiaddr.Multiaddr)(nil), addr)
}
})
}
}
31 changes: 24 additions & 7 deletions pkg/network/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,32 @@ func isConnectedToBootnode(host host.Host, bootnodes []string) bool {
}

// Function to attempt reconnection to bootnodes
// We can improve error handling and retry logic
func reconnectToBootnodes(ctx context.Context, host host.Host, bootnodes []string) {
for _, bn := range bootnodes {
ma, _ := multiaddr.NewMultiaddr(bn)
peerInfo, _ := peer.AddrInfoFromP2pAddr(ma)
if err := host.Connect(ctx, *peerInfo); err != nil {
logrus.Errorf("[-] Failed to reconnect to bootnode %s: %v", bn, err)
} else {
logrus.Infof("[+] Reconnected to bootnode %s", bn)
break // Exit after successful reconnection to one bootnode
ma, err := multiaddr.NewMultiaddr(bn)
if err != nil {
logrus.Errorf("[-] Failed to parse bootnode address %s: %v", bn, err)
continue
}

// Add timeout and backoff retry
expBackOff := backoff.NewExponentialBackOff()
expBackOff.MaxElapsedTime = time.Second * 30

err = backoff.Retry(func() error {
connectCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
return err
}
return host.Connect(connectCtx, *peerInfo)
}, expBackOff)

if err == nil {
logrus.Infof("[+] Connected to bootnode %s", bn)
break // Successfully connected to a bootnode
}
}
}
108 changes: 108 additions & 0 deletions pkg/network/discover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package network

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReconnectToBootnodes(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bootnode, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
)
require.NoError(t, err)
defer bootnode.Close()

regularNode, err := libp2p.New()
require.NoError(t, err)
defer regularNode.Close()

bootNodeAddr := bootnode.Addrs()[0].String() + "/p2p/" + bootnode.ID().String()

tests := []struct {
name string
bootnodes []string
expectConnected bool // New field to explicitly state connection expectation
}{
{
name: "successful connection to valid bootnode",
bootnodes: []string{bootNodeAddr},
expectConnected: true,
},
{
name: "invalid bootnode address",
bootnodes: []string{"/ip4/256.256.256.256/tcp/1234/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"},
expectConnected: false,
},
{
name: "empty bootnode list",
bootnodes: []string{},
expectConnected: false, // Changed to false since no connection is expected
},
{
name: "unreachable bootnode",
bootnodes: []string{"/ip4/127.0.0.1/tcp/1234/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"},
expectConnected: false,
},
{
name: "multiple bootnodes with one valid",
bootnodes: []string{
"/ip4/256.256.256.256/tcp/1234/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
bootNodeAddr,
},
expectConnected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Disconnect from any existing connections
for _, conn := range regularNode.Network().Conns() {
_ = conn.Close()
}

// Test reconnection
reconnectToBootnodes(ctx, regularNode, tt.bootnodes)

// Verify connection status matches expectation
connected := isConnectedToAnyBootnode(regularNode, tt.bootnodes)
assert.Equal(t, tt.expectConnected, connected,
"Connection status mismatch: expected connected=%v, got connected=%v",
tt.expectConnected, connected)

// Add small delay to allow for connection cleanup
time.Sleep(100 * time.Millisecond)
})
}
}

func isConnectedToAnyBootnode(h host.Host, bootnodes []string) bool {
for _, bn := range bootnodes {
if bn == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(bn)
if err != nil {
continue
}
pinfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
continue
}
if h.Network().Connectedness(pinfo.ID) == network.Connected {
return true
}
}
return false
}

0 comments on commit b7b42f6

Please sign in to comment.