-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[KS-70] Minimal RageP2P wrapper (#12296)
A minimal counterpart of libOCR's peer_v2+endpoint_v2+bootstrapper_v2 with following differences: 1. Ability to change a set of peers dynamically. 2. Identify peers by their PeerIDs instead of Oracle IDs.
- Loading branch information
Showing
12 changed files
with
851 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": patch | ||
--- | ||
|
||
Added a RageP2P wrapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package common | ||
|
||
import ( | ||
"crypto/ed25519" | ||
"encoding/hex" | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/smartcontractkit/libocr/commontypes" | ||
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
) | ||
|
||
type Config struct { | ||
Nodes []string `json:"nodes"` | ||
Bootstrappers []string `json:"bootstrappers"` | ||
|
||
// parsed values below | ||
NodeKeys []ed25519.PrivateKey | ||
NodePeerIDs []ragep2ptypes.PeerID | ||
NodePeerIDsStr []string | ||
|
||
BootstrapperKeys []ed25519.PrivateKey | ||
BootstrapperPeerInfos []ragep2ptypes.PeerInfo | ||
BootstrapperLocators []commontypes.BootstrapperLocator | ||
} | ||
|
||
const ( | ||
// bootstrappers will listen on 127.0.0.1 ports 9000, 9001, 9002, etc. | ||
BootstrapStartPort = 9000 | ||
|
||
// nodes will listen on 127.0.0.1 ports 8000, 8001, 8002, etc. | ||
NodeStartPort = 8000 | ||
) | ||
|
||
func ParseConfigFromFile(fileName string) (*Config, error) { | ||
rawConfig, err := os.ReadFile(fileName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var config Config | ||
err = json.Unmarshal(rawConfig, &config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for _, hexKey := range config.Nodes { | ||
key, peerID, err := parseKey(hexKey) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config.NodeKeys = append(config.NodeKeys, key) | ||
config.NodePeerIDs = append(config.NodePeerIDs, peerID) | ||
config.NodePeerIDsStr = append(config.NodePeerIDsStr, peerID.String()) | ||
} | ||
|
||
for _, hexKey := range config.Bootstrappers { | ||
key, peerID, err := parseKey(hexKey) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config.BootstrapperKeys = append(config.BootstrapperKeys, key) | ||
config.BootstrapperPeerInfos = append(config.BootstrapperPeerInfos, ragep2ptypes.PeerInfo{ID: peerID}) | ||
} | ||
|
||
locators := []commontypes.BootstrapperLocator{} | ||
for id, peerID := range config.BootstrapperPeerInfos { | ||
addr := fmt.Sprintf("127.0.0.1:%d", BootstrapStartPort+id) | ||
locators = append(locators, commontypes.BootstrapperLocator{ | ||
PeerID: peerID.ID.String(), | ||
Addrs: []string{addr}, | ||
}) | ||
config.BootstrapperPeerInfos[id].Addrs = []ragep2ptypes.Address{ragep2ptypes.Address(addr)} | ||
} | ||
config.BootstrapperLocators = locators | ||
|
||
return &config, nil | ||
} | ||
|
||
func parseKey(hexKey string) (ed25519.PrivateKey, ragep2ptypes.PeerID, error) { | ||
b, err := hex.DecodeString(hexKey) | ||
if err != nil { | ||
return nil, ragep2ptypes.PeerID{}, err | ||
} | ||
key := ed25519.PrivateKey(b) | ||
peerID, err := ragep2ptypes.PeerIDFromPrivateKey(key) | ||
if err != nil { | ||
return nil, ragep2ptypes.PeerID{}, err | ||
} | ||
return key, peerID, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package main | ||
|
||
import ( | ||
"crypto/ed25519" | ||
"crypto/rand" | ||
"encoding/hex" | ||
"flag" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
|
||
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
) | ||
|
||
func main() { | ||
lggr, _ := logger.NewLogger() | ||
|
||
n := flag.Int("n", 1, "how many key pairs to generate") | ||
flag.Parse() | ||
|
||
for i := 0; i < *n; i++ { | ||
pubKey, privKey, err := ed25519.GenerateKey(rand.Reader) | ||
if err != nil { | ||
lggr.Error("error generating key pair ", err) | ||
return | ||
} | ||
lggr.Info("key pair ", i, ":") | ||
lggr.Info("public key ", hex.EncodeToString(pubKey)) | ||
lggr.Info("private key ", hex.EncodeToString(privKey)) | ||
|
||
peerID, err := ragep2ptypes.PeerIDFromPrivateKey(privKey) | ||
if err != nil { | ||
lggr.Error("error generating peer ID ", err) | ||
return | ||
} | ||
lggr.Info("peer ID ", peerID.String()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/ed25519" | ||
"flag" | ||
"fmt" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/smartcontractkit/chainlink/core/scripts/p2ptoys/common" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/p2p" | ||
|
||
"github.com/smartcontractkit/libocr/ragep2p" | ||
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
|
||
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" | ||
) | ||
|
||
/* | ||
Usage: | ||
go run run.go --bootstrap | ||
go run run.go --index 0 | ||
go run run.go --index 1 | ||
Observe nodes 0 and 1 discovering each other via the bootstrapper and exchanging messages. | ||
*/ | ||
func main() { | ||
lggr, _ := logger.NewLogger() | ||
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) | ||
var shutdownWaitGroup sync.WaitGroup | ||
|
||
configFile := flag.String("config", "test_keys.json", "Path to JSON config file") | ||
nodeIndex := flag.Int("index", 0, "Index of the key in the config file to use") | ||
isBootstrap := flag.Bool("bootstrap", false, "Whether to run as a bootstrapper or not") | ||
flag.Parse() | ||
config, err := common.ParseConfigFromFile(*configFile) | ||
if err != nil { | ||
lggr.Error("error parsing config ", err) | ||
return | ||
} | ||
|
||
// Select this node's private key and listen address from config. | ||
var privateKey ed25519.PrivateKey | ||
var peerIDs []ragetypes.PeerID | ||
var listenAddr string | ||
if *isBootstrap { | ||
privateKey = config.BootstrapperKeys[*nodeIndex] | ||
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.BootstrapStartPort+*nodeIndex) | ||
} else { | ||
privateKey = config.NodeKeys[*nodeIndex] | ||
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.NodeStartPort+*nodeIndex) | ||
} | ||
for _, key := range config.NodeKeys { | ||
peerID, _ := ragetypes.PeerIDFromPrivateKey(key) | ||
peerIDs = append(peerIDs, peerID) | ||
} | ||
|
||
reg := prometheus.NewRegistry() | ||
peerConfig := p2p.PeerConfig{ | ||
PrivateKey: privateKey, | ||
ListenAddresses: []string{listenAddr}, | ||
Bootstrappers: config.BootstrapperPeerInfos, | ||
|
||
DeltaReconcile: time.Second * 5, | ||
DeltaDial: time.Second * 5, | ||
DiscovererDatabase: p2p.NewInMemoryDiscovererDatabase(), | ||
MetricsRegisterer: reg, | ||
} | ||
|
||
peer, err := p2p.NewPeer(peerConfig, lggr) | ||
if err != nil { | ||
lggr.Error("error creating peer:", err) | ||
return | ||
} | ||
err = peer.Start(ctx) | ||
if err != nil { | ||
lggr.Error("error starting peer:", err) | ||
return | ||
} | ||
|
||
peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) | ||
for _, peerID := range peerIDs { | ||
peers[peerID] = p2ptypes.StreamConfig{ | ||
IncomingMessageBufferSize: 1000000, | ||
OutgoingMessageBufferSize: 1000000, | ||
MaxMessageLenBytes: 100000, | ||
MessageRateLimiter: ragep2p.TokenBucketParams{ | ||
Rate: 2.0, | ||
Capacity: 10, | ||
}, | ||
BytesRateLimiter: ragep2p.TokenBucketParams{ | ||
Rate: 100000.0, | ||
Capacity: 100000, | ||
}, | ||
} | ||
} | ||
|
||
err = peer.UpdateConnections(peers) | ||
if err != nil { | ||
lggr.Errorw("error updating peer addresses", "error", err) | ||
} | ||
|
||
if !*isBootstrap { | ||
shutdownWaitGroup.Add(2) | ||
go sendLoop(ctx, &shutdownWaitGroup, peer, peerIDs, *nodeIndex, lggr) | ||
go recvLoop(ctx, &shutdownWaitGroup, peer.Receive(), lggr) | ||
} | ||
|
||
<-ctx.Done() | ||
err = peer.Close() | ||
if err != nil { | ||
lggr.Error("error closing peer:", err) | ||
} | ||
shutdownWaitGroup.Wait() | ||
} | ||
|
||
func sendLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, peer p2ptypes.Peer, peerIDs []ragetypes.PeerID, myId int, lggr logger.Logger) { | ||
defer shutdownWaitGroup.Done() | ||
ticker := time.NewTicker(2 * time.Second) | ||
defer ticker.Stop() | ||
lastId := 0 | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
if lastId != myId { | ||
lggr.Infow("sending message", "receiver", peerIDs[lastId]) | ||
err := peer.Send(peerIDs[lastId], []byte("hello!")) | ||
if err != nil { | ||
lggr.Errorw("error sending message", "receiver", peerIDs[lastId], "error", err) | ||
} | ||
} | ||
lastId++ | ||
if lastId >= len(peerIDs) { | ||
lastId = 0 | ||
} | ||
} | ||
} | ||
} | ||
|
||
func recvLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, chRecv <-chan p2ptypes.Message, lggr logger.Logger) { | ||
defer shutdownWaitGroup.Done() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case msg := <-chRecv: | ||
lggr.Info("received message from ", msg.Sender, " : ", string(msg.Payload)) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
{ | ||
"bootstrappers": [ | ||
"105fe90c7b474b9be25e87cce8cd25956bb888215cd8028d6da28a933c8b4124d838facdbdb2e3a2e3e79848b04c3c15815593c2c13ad229eb26a91cd565ab55" | ||
], | ||
"nodes": [ | ||
"5f45bf580bb7697f1dd23062444d5692fa0f5e60306e6782b172206aed292abf29c9f9b13bf79e30677cb701fab3140ab5a87d0d0d2dc75de500551de60e5e57", | ||
"bc3d7c6479b44e7ce0377e07d5d456ec0089f77e89d2bdd8eaf98fdc602144dd28ff68e88c565ea4034eb09748ac042ec631c78b4d5db7509c8c54d63bb5550c", | ||
"010fbc8ab7c425f80b7dcfab1b41f6e79ddf27168edb88f423eec624119bca7c99605c48f83f799ecb9abaab8630866867aca20cb8fb9e5518d12a7980ec8e0b", | ||
"12b059c77f7d9367b380d778d45ac8a5dfc8a1737769a868e9e2bc1962eb690e3be9fe624419eb71183707abae4dd30ae56f269b010d98a360850673b51e9488" | ||
] | ||
} |
Oops, something went wrong.