Skip to content

Commit

Permalink
CCQ: Validation and marshalling changes (#3017)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley authored May 31, 2023
1 parent 6d589b4 commit 58a4432
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 137 deletions.
47 changes: 20 additions & 27 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/spf13/cobra"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

ipfslog "github.com/ipfs/go-log/v2"
googleapi_option "google.golang.org/api/option"
Expand Down Expand Up @@ -992,7 +991,7 @@ func runNode(cmd *cobra.Command, args []string) {
signedQueryReqReadC, signedQueryReqWriteC := makeChannelPair[*gossipv1.SignedQueryRequest](common.SignedQueryRequestChannelSize)

// Per-chain query requests
chainQueryReqC := make(map[vaa.ChainID]chan *gossipv1.SignedQueryRequest)
chainQueryReqC := make(map[vaa.ChainID]chan *common.QueryRequest)

// Query responses from watchers to query handler aggregated across all chains
queryResponseReadC, queryResponseWriteC := makeChannelPair[*common.QueryResponse](0)
Expand All @@ -1012,16 +1011,10 @@ func runNode(cmd *cobra.Command, args []string) {
case <-rootCtx.Done():
return
case response := <-c:
var queryRequest gossipv1.QueryRequest
err = proto.Unmarshal(response.Msg.Request.QueryRequest, &queryRequest)
if err != nil {
logger.Error("received invalid response from watcher", zap.Stringer("watcherChainId", chainId))
continue
}
if vaa.ChainID(queryRequest.ChainId) != chainId {
if response.ChainID != chainId {
// SECURITY: This should never happen. If it does, a watcher has been compromised.
logger.Fatal("SECURITY CRITICAL: Received query response from a chain that was not marked as originating from that chain",
zap.Uint32("responseChainId", queryRequest.ChainId),
zap.Uint16("responseChainId", uint16(response.ChainID)),
zap.Stringer("watcherChainId", chainId),
)
} else {
Expand Down Expand Up @@ -1234,7 +1227,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Ethereum watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDEthereum)
chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDEthereum] = make(chan *common.QueryRequest, queryRequestBufferSize)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], chainQueryReqC[vaa.ChainIDEthereum], chainQueryResponseC[vaa.ChainIDEthereum], *unsafeDevMode)
if err := supervisor.Run(ctx, "ethwatch",
common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil {
Expand All @@ -1246,7 +1239,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting BSC watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDBSC)
chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDBSC] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDBSC] = make(chan *common.QueryRequest, queryRequestBufferSize)
bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], chainQueryReqC[vaa.ChainIDBSC], chainQueryResponseC[vaa.ChainIDBSC], *unsafeDevMode)
bscWatcher.SetWaitForConfirmations(true)
if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil {
Expand All @@ -1263,7 +1256,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Polygon watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDPolygon)
chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDPolygon] = make(chan *common.QueryRequest, queryRequestBufferSize)
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], chainQueryReqC[vaa.ChainIDPolygon], chainQueryResponseC[vaa.ChainIDPolygon], *unsafeDevMode)
polygonWatcher.SetWaitForConfirmations(waitForConfirmations)
if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil {
Expand All @@ -1277,7 +1270,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Avalanche watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDAvalanche)
chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDAvalanche] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "avalanchewatch",
common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], chainQueryReqC[vaa.ChainIDAvalanche], chainQueryResponseC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil {
return err
Expand All @@ -1287,7 +1280,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Oasis watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDOasis)
chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDOasis] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDOasis] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "oasiswatch",
common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], chainQueryReqC[vaa.ChainIDOasis], chainQueryResponseC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil {
return err
Expand All @@ -1297,7 +1290,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Aurora watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDAurora)
chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDAurora] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDAurora] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "aurorawatch",
common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], chainQueryReqC[vaa.ChainIDAurora], chainQueryResponseC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil {
return err
Expand All @@ -1307,7 +1300,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Fantom watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDFantom)
chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDFantom] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDFantom] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "fantomwatch",
common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], chainQueryReqC[vaa.ChainIDFantom], chainQueryResponseC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil {
return err
Expand All @@ -1317,7 +1310,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Karura watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDKarura)
chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDKarura] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDKarura] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "karurawatch",
common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], chainQueryReqC[vaa.ChainIDKarura], chainQueryResponseC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil {
return err
Expand All @@ -1327,7 +1320,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Acala watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDAcala)
chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDAcala] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDAcala] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "acalawatch",
common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], chainQueryReqC[vaa.ChainIDAcala], chainQueryResponseC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil {
return err
Expand All @@ -1337,7 +1330,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Klaytn watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDKlaytn)
chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDKlaytn] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "klaytnwatch",
common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], chainQueryReqC[vaa.ChainIDKlaytn], chainQueryResponseC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil {
return err
Expand All @@ -1347,7 +1340,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Celo watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDCelo)
chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDCelo] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDCelo] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "celowatch",
common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], chainQueryReqC[vaa.ChainIDCelo], chainQueryResponseC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil {
return err
Expand All @@ -1357,7 +1350,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Moonbeam watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDMoonbeam)
chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDMoonbeam] = make(chan *common.QueryRequest, queryRequestBufferSize)
if err := supervisor.Run(ctx, "moonbeamwatch",
common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], chainQueryReqC[vaa.ChainIDMoonbeam], chainQueryResponseC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil {
return err
Expand All @@ -1370,7 +1363,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Arbitrum watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDArbitrum)
chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDArbitrum] = make(chan *common.QueryRequest, queryRequestBufferSize)
arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], chainQueryReqC[vaa.ChainIDArbitrum], chainQueryResponseC[vaa.ChainIDArbitrum], *unsafeDevMode)
arbitrumWatcher.SetL1Finalizer(ethWatcher)
if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil {
Expand All @@ -1381,7 +1374,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Optimism watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDOptimism)
chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDOptimism] = make(chan *common.QueryRequest, queryRequestBufferSize)
optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], chainQueryReqC[vaa.ChainIDOptimism], chainQueryResponseC[vaa.ChainIDOptimism], *unsafeDevMode)

// If rootChainParams are set, pass them in for pre-Bedrock mode
Expand Down Expand Up @@ -1521,7 +1514,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Neon watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDNeon)
chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDNeon] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDNeon] = make(chan *common.QueryRequest, queryRequestBufferSize)
neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], chainQueryReqC[vaa.ChainIDNeon], chainQueryResponseC[vaa.ChainIDNeon], *unsafeDevMode)
neonWatcher.SetL1Finalizer(solanaFinalizedWatcher)
if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil {
Expand All @@ -1532,7 +1525,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Base watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDBase)
chainObsvReqC[vaa.ChainIDBase] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDBase] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDBase] = make(chan *common.QueryRequest, queryRequestBufferSize)
baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], chainQueryReqC[vaa.ChainIDBase], chainQueryResponseC[vaa.ChainIDBase], *unsafeDevMode)
if err := supervisor.Run(ctx, "basewatch", common.WrapWithScissors(baseWatcher.Run, "basewatch")); err != nil {
return err
Expand All @@ -1545,7 +1538,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Sepolia watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDSepolia)
chainObsvReqC[vaa.ChainIDSepolia] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
chainQueryReqC[vaa.ChainIDSepolia] = make(chan *gossipv1.SignedQueryRequest, queryRequestBufferSize)
chainQueryReqC[vaa.ChainIDSepolia] = make(chan *common.QueryRequest, queryRequestBufferSize)
sepoliaWatcher := evm.NewEthWatcher(*sepoliaRPC, sepoliaContractAddr, "sepolia", vaa.ChainIDSepolia, chainMsgC[vaa.ChainIDSepolia], nil, chainObsvReqC[vaa.ChainIDSepolia], chainQueryReqC[vaa.ChainIDSepolia], chainQueryResponseC[vaa.ChainIDSepolia], *unsafeDevMode)
if err := supervisor.Run(ctx, "sepoliawatch", common.WrapWithScissors(sepoliaWatcher.Run, "sepoliawatch")); err != nil {
return err
Expand Down
Loading

0 comments on commit 58a4432

Please sign in to comment.