Skip to content

Commit

Permalink
Merge pull request #13 from PowerLoom/feat/trusted-relayers
Browse files Browse the repository at this point in the history
Trusted sequencer list
  • Loading branch information
anomit authored Aug 21, 2024
2 parents f47a2df + 5653c67 commit e7f587a
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 82 deletions.
8 changes: 4 additions & 4 deletions config/settings.example.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"SequencerId": "SEQUENCER_ID",
"RelayerRendezvousPoint": "RELAYER_RENDEZVOUS_POINT",
"ClientRendezvousPoint": "CLIENT_RENDEZVOUS_POINT",
"RelayerPrivateKey": "RELAYER_PRIVATE_KEY",
"PowerloomReportingUrl": "POWERLOOM_REPORTING_URL",
"SignerAccountAddress": "SIGNER_ACCOUNT_ADDRESS",
"LocalCollectorPort": "LOCAL_COLLECTOR_PORT"
"LocalCollectorPort": "LOCAL_COLLECTOR_PORT",
"TrustedRelayersListUrl" : "TRUSTED_RELAYERS_LIST_URL",
"SequencerNetworkPath" : "SEQUENCER_MULTIADDR",
"DataMarketAddress" : "DATA_MARKET_CONTRACT"
}
8 changes: 8 additions & 0 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Settings struct {
PowerloomReportingUrl string `json:"PowerloomReportingUrl"`
SignerAccountAddress string `json:"SignerAccountAddress"`
PortNumber string `json:"LocalCollectorPort"`
TrustedRelayersListUrl string `json:"TrustedRelayersListUrl"`
SequencerNetworkPath string `json:"Sequencers"`
DataMarketAddress string `json:"DataMarketAddress"`
}

func LoadConfig() {
Expand All @@ -37,5 +40,10 @@ func LoadConfig() {
if err != nil {
log.Fatalf("Failed to decode config file: %v", err)
}

if config.TrustedRelayersListUrl == "" {
config.TrustedRelayersListUrl = "https://raw.githubusercontent.com/PowerLoom/snapshotter-lite-local-collector/feat/trusted-relayers/relayers.json"
}

SettingsObj = &config
}
12 changes: 12 additions & 0 deletions config/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"SequencerId": "QmdJbNsbHpFseUPKC9vLt4vMsfdxA4dyHPzsAWuzYz3Yxx",
"RelayerRendezvousPoint": "RELAYER_RENDEZVOUS_POINT",
"ClientRendezvousPoint": "CLIENT_RENDEZVOUS_POINT",
"RelayerPrivateKey": "RELAYER_PRIVATE_KEY",
"PowerloomReportingUrl": "POWERLOOM_REPORTING_URL",
"SignerAccountAddress": "SIGNER_ACCOUNT_ADDRESS",
"LocalCollectorPort": "LOCAL_COLLECTOR_PORT",
"TrustedRelayersListUrl" : "https://raw.githubusercontent.com/PowerLoom/snapshotter-lite-local-collector/feat/trusted-relayers/relayers.json",
"SequencerNetworkPath": "/dns/proto-snapshot-listener.aws2.powerloom.io/tcp/9100/p2p/QmTK9e9QNEotPkjWAdZT5bbYKV7PEJVu7iXzdVn3VZDEk9",
"DataMarketAddress" : "0xa8D4C62BD8831bca08C9a16b3e76C824c9658eA1"
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/libp2p/go-libp2p v0.32.2
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/multiformats/go-multiaddr v0.12.2
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-retry v0.2.4
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
)
Expand All @@ -20,6 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down Expand Up @@ -84,7 +87,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
Expand Down Expand Up @@ -116,5 +119,6 @@ require (
golang.org/x/tools v0.18.0 // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down
149 changes: 98 additions & 51 deletions pkgs/service/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,142 @@ package service

import (
"context"
"fmt"
"encoding/json"
"github.com/pkg/errors"
"io"
"net/http"
"proto-snapshot-server/config"
"sync"

"github.com/cenkalti/backoff/v4"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
)

func isVisited(id peer.ID, visited []peer.ID) bool {
for _, v := range visited {
if v == id {
return true
}
}
return false
type Relayer struct {
ID string `json:"id"`
Name string `json:"name"`
RendezvousPoint string `json:"rendezvousPoint"`
Maddr string `json:"maddr"`
}

func connectToStableRelayer(ctx context.Context, host host.Host, relayerAddr string) peer.ID {
stableRelayerMA, err := ma.NewMultiaddr("/ip4/104.248.63.86/tcp/5001/p2p/QmQSEao6C3SuPZ8cWiYccPqsd7LtWBTzNgXQZiAjeGTQpm")
type Sequencer struct {
ID string `json:"id"`
Maddr string `json:"maddr"`
DataMarketAddress string `json:"dataMarketAddress"`
Environment string `json:"environment"`
}

func fetchSequencer(url string, dataMarketAddress string) (Sequencer, error) {
resp, err := http.Get(url)
if err != nil {
log.Debugln("Failed to parse stable peer multiaddress: ", err)
log.Fatalf("Failed to fetch JSON: %v", err)
}
defer resp.Body.Close()

peerInfo, err := peer.AddrInfoFromP2pAddr(stableRelayerMA)
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Debugln("Failed to extract peer info from multiaddress: %v", err)
log.Debugf("Failed to read response body: %v", err)
}

// Add the peer to the peerstore
host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
var sequencers []Sequencer
err = json.Unmarshal(body, &sequencers)
if err != nil {
log.Debugln("Failed to unmarshal JSON:", err)
}

if err := host.Connect(ctx, *peerInfo); err != nil {
log.Debugln("Failed to connect to stable relayer: %v", err)
for _, sequencer := range sequencers {
log.Debugf(
"ID: %s, Maddr: %s, Data Market Address: %s, Environment: %s\n",
sequencer.ID,
sequencer.Maddr,
sequencer.DataMarketAddress,
sequencer.Environment,
)

if sequencer.DataMarketAddress == dataMarketAddress {
return sequencer, nil
}
}

fmt.Println("Connected to stable relayer:", peerInfo.ID)
return peerInfo.ID
return Sequencer{}, errors.New("Sequencer not found")
}

func ConnectToPeer(ctx context.Context, routingDiscovery *routing.RoutingDiscovery, rendezvousString string, host host.Host, visited []peer.ID) peer.ID {
stableRelayer1 := "/ip4/104.248.63.86/tcp/5001/p2p/QmQSEao6C3SuPZ8cWiYccPqsd7LtWBTzNgXQZiAjeGTQpm"
stableRelayer2 := "/ip4/137.184.132.196/tcp/5001/p2p/QmU3xwsjRqQR4pjJQ7Cxhcb2tiPvaJ6Z5AHDULq7hHWvvj"
func fetchTrustedRelayers(url string) []Relayer {
resp, err := http.Get(url)
if err != nil {
log.Fatalf("Failed to fetch JSON: %v", err)
}
defer resp.Body.Close()

// Connect to stable relayers
peerID1 := connectToStableRelayer(ctx, host, stableRelayer1)
peerID2 := connectToStableRelayer(ctx, host, stableRelayer2)
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Debugf("Failed to read response body: %v", err)
}

if peerID1 != "" {
return peerID1
var relayers []Relayer
err = json.Unmarshal(body, &relayers)
if err != nil {
log.Debugln("Failed to unmarshal JSON:", err)
}
if peerID2 != "" {
return peerID2

for _, relayer := range relayers {
log.Debugf("ID: %s, Name: %s, Rendezvous Point: %s, Maddr: %s\n", relayer.ID, relayer.Name, relayer.RendezvousPoint, relayer.Maddr)
}

peerChan, err := routingDiscovery.FindPeers(ctx, rendezvousString)
return relayers
}

func AddPeerConnection(ctx context.Context, host host.Host, relayerAddr string) bool {
stableRelayerMA, err := ma.NewMultiaddr(relayerAddr)
if err != nil {
log.Debugln("Failed to parse stable peer multiaddress: ", err)
}

peerInfo, err := peer.AddrInfoFromP2pAddr(stableRelayerMA)
if err != nil {
log.Fatalf("Failed to find peers: %s", err)
log.Debugln("Failed to extract peer info from multiaddress:", err)
}
log.Debugln("Triggering peer discovery")

log.Debugln("Skipping visited peers: ", visited)
if host.Network().Connectedness(peerInfo.ID) == network.Connected {
log.Debugln("Skipping connected relayer: ", peerInfo.ID)
return true
}

for relayer := range peerChan {
if relayer.ID == host.ID() {
continue // Skip self or peers with no addresses
err = host.Connect(ctx, *peerInfo)
if err != nil {
log.Errorf("Failed to connect to relayer %s: %s", peerInfo.ID, err)
return false
if err := backoff.Retry(func() error { return host.Connect(ctx, *peerInfo) }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 1)); err != nil {
log.Errorf("Failed to connect to relayer %s: %s", peerInfo.ID, err)
} else {
log.Infof("Connected to new relayer: %s", peerInfo.ID)
return true
}
} else {
log.Infof("Connected to new relayer: %s", peerInfo.ID)
log.Infoln("Connected: ", host.Network().ConnsToPeer(peerInfo.ID))
return true
}

if host.Network().Connectedness(relayer.ID) != network.Connected {
// Connect to the relayer if not already connected
if err = backoff.Retry(func() error { return host.Connect(ctx, relayer) }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 1)); err != nil {
log.Errorf("Failed to connect to relayer %s: %s", relayer.ID, err)
} else {
log.Infof("Connected to new relayer: %s", relayer.ID)
return relayer.ID
}
} else {
log.Debugln("Already connected to: ", relayer.ID)
// return relayer.ID
return false
}

func ConnectToTrustedRelayers(ctx context.Context, host host.Host) []Relayer {
relayers := fetchTrustedRelayers(config.SettingsObj.TrustedRelayersListUrl)
var connectedRelayers []Relayer

for _, relayer := range relayers {
if AddPeerConnection(ctx, host, relayer.Maddr) {
connectedRelayers = append(connectedRelayers, relayer)
}
}
log.Debugln("Active connections: ", activeConnections)
return ""

return connectedRelayers
}

func ConfigureDHT(ctx context.Context, host host.Host) *dht.IpfsDHT {
Expand Down
Loading

0 comments on commit e7f587a

Please sign in to comment.