Skip to content

Commit

Permalink
CCQ: Gossip split (#3320)
Browse files Browse the repository at this point in the history
* CCQ: Gossip Split

* Fix error handling
  • Loading branch information
bruce-riley committed Sep 5, 2023
1 parent 2a5f41d commit 2627973
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 112 deletions.
1 change: 1 addition & 0 deletions devnet/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ spec:
# - --chainGovernorEnabled=true
- --ccqEnabled=true
- --ccqAllowedRequesters=beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe
- --ccqAllowedPeers=12D3KooWSnju8zhywCYVi2JwTqky1sySPnmtYLsHHzc4WerMnDQH
# - --logLevel=debug
securityContext:
capabilities:
Expand Down
4 changes: 2 additions & 2 deletions devnet/query-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ spec:
- /guardiand
- query-server
- --nodeKey
- /tmp/node.key
- node/hack/query/querier.key
- --listenAddr
- "[::]:6069"
- --ethRPC
Expand All @@ -43,7 +43,7 @@ spec:
- "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550"
# Hardcoded devnet bootstrap (generated from deterministic key in guardiand)
- --bootstrap
- /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- /dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- --logLevel=info
ports:
- containerPort: 6069
Expand Down
46 changes: 32 additions & 14 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type SignedResponse struct {
}

type P2PSub struct {
sub *pubsub.Subscription
topic *pubsub.Topic
host host.Host
sub *pubsub.Subscription
topic_req *pubsub.Topic
topic_resp *pubsub.Topic
host host.Host
}

func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrap, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger) (*P2PSub, error) {
Expand Down Expand Up @@ -105,31 +106,47 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
return nil, err
}

topicName := fmt.Sprintf("%s/%s", networkID, "broadcast")
topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")

logger.Info("Subscribing pubsub topic", zap.String("topic", topicName))
ps, err := pubsub.NewGossipSub(ctx, h)
logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))

// Comment from security team in PR #2981: CCQServers should have a parameter of D = 36, Dlo = 19, Dhi = 40, Dout = 18 such that they can reach all Guardians directly.
gossipParams := pubsub.DefaultGossipSubParams()
gossipParams.D = 36
gossipParams.Dlo = 19
gossipParams.Dhi = 40
gossipParams.Dout = 18

ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithGossipSubParams(gossipParams))
if err != nil {
logger.Error("failed to create gossip subscription", zap.Error(err))
return nil, err
}

th_req, err := ps.Join(topic_req)
if err != nil {
logger.Error("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
return nil, err
}

topic, err := ps.Join(topicName)
th_resp, err := ps.Join(topic_resp)
if err != nil {
logger.Error("failed to join topic", zap.Error(err))
logger.Error("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
return nil, err
}

sub, err := topic.Subscribe()
sub, err := th_resp.Subscribe()
if err != nil {
logger.Error("failed to subscribe topic", zap.Error(err))
logger.Error("failed to subscribe to response topic", zap.Error(err))
return nil, err
}

logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

// Wait for peers
for len(topic.ListPeers()) < 1 {
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}

Expand Down Expand Up @@ -230,8 +247,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
}()

return &P2PSub{
sub: sub,
topic: topic,
host: h,
sub: sub,
topic_req: th_req,
topic_resp: th_resp,
host: h,
}, nil
}
53 changes: 41 additions & 12 deletions node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,38 @@ import (
"os"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
"github.com/certusone/wormhole/node/pkg/version"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
listenAddr *string
nodeKeyPath *string
ethRPC *string
ethContract *string
logLevel *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
listenAddr *string
nodeKeyPath *string
ethRPC *string
ethContract *string
logLevel *string
telemetryLokiURL *string
telemetryNodeName *string
)

func init() {
p2pNetworkID = QueryServerCmd.Flags().String("network", "/wormhole/dev", "P2P network identifier")
p2pPort = QueryServerCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
p2pPort = QueryServerCmd.Flags().Uint("port", 8996, "P2P UDP listener port")
p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
ethRPC = QueryServerCmd.Flags().String("ethRPC", "", "Ethereum RPC for fetching current guardian set")
ethContract = QueryServerCmd.Flags().String("ethContract", "", "Ethereum core bridge address for fetching current guardian set")
logLevel = QueryServerCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
telemetryLokiURL = QueryServerCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
}

var QueryServerCmd = &cobra.Command{
Expand All @@ -54,6 +60,26 @@ func runQueryServer(cmd *cobra.Command, args []string) {
logger := ipfslog.Logger("query-server").Desugar()
ipfslog.SetAllLoggers(lvl)

if *telemetryLokiURL != "" {
logger.Info("Using Loki telemetry logger")
if *telemetryNodeName == "" {
logger.Fatal("if --telemetryLokiURL is specified --telemetryNodeName must be specified")
}
labels := map[string]string{
"network": *p2pNetworkID,
"node_name": *telemetryNodeName,
"version": version.Version(),
}

tm, err := telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "ccq_server", true, labels)
if err != nil {
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
}

defer tm.Close()
logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
}

// Verify flags
if *nodeKeyPath == "" {
logger.Fatal("Please specify --nodeKey")
Expand Down Expand Up @@ -87,7 +113,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {

// Start the HTTP server
go func() {
s := NewHTTPServer(*listenAddr, p2p.topic, pendingResponses)
s := NewHTTPServer(*listenAddr, p2p.topic_req, pendingResponses)
logger.Sugar().Infof("Server listening on %s", *listenAddr)
err := s.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
Expand All @@ -101,8 +127,11 @@ func runQueryServer(cmd *cobra.Command, args []string) {
// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
p2p.sub.Cancel()
if err := p2p.topic.Close(); err != nil {
logger.Error("Error closing the topic", zap.Error(err))
if err := p2p.topic_req.Close(); err != nil {
logger.Error("Error closing the request topic", zap.Error(err))
}
if err := p2p.topic_resp.Close(); err != nil {
logger.Error("Error closing the response topic", zap.Error(err))
}
if err := p2p.host.Close(); err != nil {
logger.Error("Error closing the host", zap.Error(err))
Expand Down
9 changes: 8 additions & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ var (

ccqEnabled *bool
ccqAllowedRequesters *string
ccqP2pPort *uint
ccqP2pBootstrap *string
ccqAllowedPeers *string

gatewayRelayerContract *string
gatewayRelayerKeyPath *string
Expand Down Expand Up @@ -390,6 +393,9 @@ func init() {

ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support")
ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port")
ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (comma-separated)")
ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)")

gatewayRelayerContract = NodeCmd.Flags().String("gatewayRelayerContract", "", "Address of the smart contract on wormchain to receive relayed VAAs")
gatewayRelayerKeyPath = NodeCmd.Flags().String("gatewayRelayerKeyPath", "", "Path to gateway relayer private key for signing transactions")
Expand Down Expand Up @@ -490,6 +496,7 @@ func runNode(cmd *cobra.Command, args []string) {

// Use the first guardian node as bootstrap
*p2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *p2pPort, g0key.String())
*ccqP2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *ccqP2pPort, g0key.String())

// Deterministic ganache ETH devnet address.
*ethContract = unsafeDevModeEvmContractAddress(*ethContract)
Expand Down Expand Up @@ -1468,7 +1475,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(),
}
Expand Down
4 changes: 3 additions & 1 deletion node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ func runSpy(cmd *cobra.Command, args []string) {
false, // ccqEnabled
nil, // query requests
nil, // query responses

"", // query bootstrap peers
0, // query port
"", // query allow list
)); err != nil {
return err
}
Expand Down
Binary file added node/hack/query/querier.key
Binary file not shown.
47 changes: 28 additions & 19 deletions node/hack/query/send_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func main() {

p2pNetworkID := "/wormhole/dev"
var p2pPort uint = 8998 // don't collide with spy so we can run from the same container in tilt
p2pBootstrap := "/dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
nodeKeyPath := "/tmp/querier.key" // don't use node key so we get a new address
p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
nodeKeyPath := "./querier.key"

ctx := context.Background()
logger, _ := zap.NewDevelopment()
Expand Down Expand Up @@ -146,29 +146,34 @@ func main() {
panic(err)
}

topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")

logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
panic(err)
}

th, err := ps.Join(topic)
th_req, err := ps.Join(topic_req)
if err != nil {
logger.Panic("failed to join topic", zap.Error(err))
logger.Panic("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
}

sub, err := th.Subscribe()
th_resp, err := ps.Join(topic_resp)
if err != nil {
logger.Panic("failed to subscribe topic", zap.Error(err))
logger.Panic("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
}

sub, err := th_resp.Subscribe()
if err != nil {
logger.Panic("failed to subscribe to response topic", zap.Error(err))
}

logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

// Wait for peers
for len(th.ListPeers()) < 1 {
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}

Expand All @@ -183,7 +188,7 @@ func main() {

methods := []string{"name", "totalSupply"}
callData := []*query.EthCallData{}
to, _ := hex.DecodeString("0d500b1d8e8ef31e21c99d1db9a6444d3adf1270")
to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E")

for _, method := range methods {
data, err := wethAbi.Pack(method)
Expand All @@ -198,7 +203,8 @@ func main() {
}

// Fetch the latest block number
url := "https://rpc.ankr.com/polygon"
//url := "https://localhost:8545"
url := "http://eth-devnet:8545"
logger.Info("Querying for latest block height", zap.String("url", url))
blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
if err != nil {
Expand All @@ -221,7 +227,7 @@ func main() {
// First request...
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
queryRequest := createQueryRequest(callRequest)
sendQueryAndGetRsp(queryRequest, sk, th, ctx, logger, sub, wethAbi, methods)
sendQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)

// This is just so that when I look at the output, it is easier for me. (Paul)
logger.Info("sleeping for 5 seconds")
Expand All @@ -235,20 +241,23 @@ func main() {
}
queryRequest2 := createQueryRequest(callRequest2)
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
sendQueryAndGetRsp(queryRequest2, sk, th, ctx, logger, sub, wethAbi, methods)
sendQueryAndGetRsp(queryRequest2, sk, th_req, ctx, logger, sub, wethAbi, methods)

// Now, want to send a single query with multiple requests...
logger.Info("Starting multiquery test in 5...")
time.Sleep(time.Second * 5)
multiCallRequest := []*query.EthCallQueryRequest{callRequest, callRequest2}
multQueryRequest := createQueryRequestWithMultipleRequests(multiCallRequest)
sendQueryAndGetRsp(multQueryRequest, sk, th, ctx, logger, sub, wethAbi, methods)
sendQueryAndGetRsp(multQueryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
sub.Cancel()
if err := th.Close(); err != nil {
logger.Fatal("Error closing the topic", zap.Error(err))
if err := th_req.Close(); err != nil {
logger.Fatal("Error closing the request topic", zap.Error(err))
}
if err := th_resp.Close(); err != nil {
logger.Fatal("Error closing the response topic", zap.Error(err))
}
if err := h.Close(); err != nil {
logger.Fatal("Error closing the host", zap.Error(err))
Expand All @@ -270,7 +279,7 @@ func createQueryRequest(callRequest *query.EthCallQueryRequest) *query.QueryRequ
Nonce: rand.Uint32(),
PerChainQueries: []*query.PerChainQueryRequest{
{
ChainId: 5,
ChainId: 2,
Query: callRequest,
},
},
Expand All @@ -282,7 +291,7 @@ func createQueryRequestWithMultipleRequests(callRequests []*query.EthCallQueryRe
perChainQueries := []*query.PerChainQueryRequest{}
for _, req := range callRequests {
perChainQueries = append(perChainQueries, &query.PerChainQueryRequest{
ChainId: 5,
ChainId: 2,
Query: req,
})
}
Expand Down
Loading

0 comments on commit 2627973

Please sign in to comment.