From 7c5b3f9175ca182e6ea3fdb8e0c5daf891bb6cab Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Thu, 22 Jun 2023 09:23:11 -0500 Subject: [PATCH] CCQ: Reorg code into a query package (#3112) * CCQ: Reorg code into a query package * Make requestTimeout public --- node/cmd/guardiand/node.go | 53 +++---- node/hack/query/send_req.go | 39 +++--- node/hack/query/test/query_test.go | 19 +-- node/pkg/p2p/p2p.go | 7 +- node/pkg/query/helpers_test.go | 58 ++++++++ .../query_test.go => query/msg_test.go} | 2 +- node/{cmd/guardiand => pkg/query}/query.go | 62 ++++----- .../guardiand => pkg/query}/query_test.go | 130 +++++++++--------- .../queryRequest.go => query/request.go} | 11 +- .../queryResponse.go => query/response.go} | 2 +- node/pkg/watchers/evm/watcher.go | 33 ++--- 11 files changed, 237 insertions(+), 179 deletions(-) create mode 100644 node/pkg/query/helpers_test.go rename node/pkg/{common/query_test.go => query/msg_test.go} (99%) rename node/{cmd/guardiand => pkg/query}/query.go (87%) rename node/{cmd/guardiand => pkg/query}/query_test.go (83%) rename node/pkg/{common/queryRequest.go => query/request.go} (98%) rename node/pkg/{common/queryResponse.go => query/response.go} (99%) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index a550ecddd6..1ab055f33e 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -27,6 +27,7 @@ import ( "github.com/benbjohnson/clock" "github.com/certusone/wormhole/node/pkg/db" + "github.com/certusone/wormhole/node/pkg/query" "github.com/certusone/wormhole/node/pkg/telemetry" "github.com/certusone/wormhole/node/pkg/version" "github.com/gagliardetto/solana-go/rpc" @@ -971,24 +972,24 @@ func runNode(cmd *cobra.Command, args []string) { chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest) // Inbound observation requests from the p2p service (for all chains) - signedQueryReqReadC, signedQueryReqWriteC := makeChannelPair[*gossipv1.SignedQueryRequest](common.SignedQueryRequestChannelSize) + signedQueryReqReadC, signedQueryReqWriteC := makeChannelPair[*gossipv1.SignedQueryRequest](query.SignedQueryRequestChannelSize) // Per-chain query requests - chainQueryReqC := make(map[vaa.ChainID]chan *common.PerChainQueryInternal) + chainQueryReqC := make(map[vaa.ChainID]chan *query.PerChainQueryInternal) // Query responses from watchers to query handler aggregated across all chains - queryResponseReadC, queryResponseWriteC := makeChannelPair[*common.PerChainQueryResponseInternal](0) + queryResponseReadC, queryResponseWriteC := makeChannelPair[*query.PerChainQueryResponseInternal](0) // Query responses from query handler to p2p - queryResponsePublicationReadC, queryResponsePublicationWriteC := makeChannelPair[*common.QueryResponsePublication](0) + queryResponsePublicationReadC, queryResponsePublicationWriteC := makeChannelPair[*query.QueryResponsePublication](0) // Per-chain query response channel - chainQueryResponseC := make(map[vaa.ChainID]chan *common.PerChainQueryResponseInternal) + chainQueryResponseC := make(map[vaa.ChainID]chan *query.PerChainQueryResponseInternal) // aggregate per-chain msgC into msgC. // SECURITY defense-in-depth: This way we enforce that a watcher must set the msg.EmitterChain to its chainId, which makes the code easier to audit for _, chainId := range vaa.GetAllNetworkIDs() { - chainQueryResponseC[chainId] = make(chan *common.PerChainQueryResponseInternal) - go func(c <-chan *common.PerChainQueryResponseInternal, chainId vaa.ChainID) { + chainQueryResponseC[chainId] = make(chan *query.PerChainQueryResponseInternal) + go func(c <-chan *query.PerChainQueryResponseInternal, chainId vaa.ChainID) { for { select { case <-rootCtx.Done(): @@ -1210,7 +1211,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Ethereum watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDEthereum) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDEthereum] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDEthereum] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], chainQueryReqC[vaa.ChainIDEthereum], chainQueryResponseC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil { @@ -1222,7 +1223,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting BSC watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDBSC) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDBSC] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDBSC] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], chainQueryReqC[vaa.ChainIDBSC], chainQueryResponseC[vaa.ChainIDBSC], *unsafeDevMode) bscWatcher.SetWaitForConfirmations(true) if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil { @@ -1239,7 +1240,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Polygon watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDPolygon) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDPolygon] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDPolygon] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], chainQueryReqC[vaa.ChainIDPolygon], chainQueryResponseC[vaa.ChainIDPolygon], *unsafeDevMode) polygonWatcher.SetWaitForConfirmations(waitForConfirmations) if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { @@ -1253,7 +1254,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Avalanche watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDAvalanche) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDAvalanche] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDAvalanche] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], chainQueryReqC[vaa.ChainIDAvalanche], chainQueryResponseC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { return err @@ -1263,7 +1264,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDOasis) chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDOasis] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDOasis] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], chainQueryReqC[vaa.ChainIDOasis], chainQueryResponseC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { return err @@ -1273,7 +1274,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Aurora watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDAurora) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDAurora] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDAurora] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], chainQueryReqC[vaa.ChainIDAurora], chainQueryResponseC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { return err @@ -1283,7 +1284,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Fantom watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDFantom) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDFantom] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDFantom] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], chainQueryReqC[vaa.ChainIDFantom], chainQueryResponseC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { return err @@ -1293,7 +1294,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Karura watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDKarura) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDKarura] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDKarura] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], chainQueryReqC[vaa.ChainIDKarura], chainQueryResponseC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { return err @@ -1303,7 +1304,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Acala watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDAcala) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDAcala] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDAcala] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], chainQueryReqC[vaa.ChainIDAcala], chainQueryResponseC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { return err @@ -1313,7 +1314,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Klaytn watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDKlaytn) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDKlaytn] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDKlaytn] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], chainQueryReqC[vaa.ChainIDKlaytn], chainQueryResponseC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { return err @@ -1323,7 +1324,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Celo watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDCelo) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDCelo] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDCelo] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], chainQueryReqC[vaa.ChainIDCelo], chainQueryResponseC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { return err @@ -1333,7 +1334,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Moonbeam watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDMoonbeam) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDMoonbeam] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDMoonbeam] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], chainQueryReqC[vaa.ChainIDMoonbeam], chainQueryResponseC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { return err @@ -1346,7 +1347,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Arbitrum watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDArbitrum) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDArbitrum] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDArbitrum] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], chainQueryReqC[vaa.ChainIDArbitrum], chainQueryResponseC[vaa.ChainIDArbitrum], *unsafeDevMode) arbitrumWatcher.SetL1Finalizer(ethWatcher) if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil { @@ -1357,7 +1358,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Optimism watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDOptimism) chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDOptimism] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDOptimism] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], chainQueryReqC[vaa.ChainIDOptimism], chainQueryResponseC[vaa.ChainIDOptimism], *unsafeDevMode) if err := supervisor.Run(ctx, "optimismwatch", common.WrapWithScissors(optimismWatcher.Run, "optimismwatch")); err != nil { @@ -1477,7 +1478,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Neon watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDNeon) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDNeon] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDNeon] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], chainQueryReqC[vaa.ChainIDNeon], chainQueryResponseC[vaa.ChainIDNeon], *unsafeDevMode) neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil { @@ -1488,7 +1489,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Base watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDBase) chainObsvReqC[vaa.ChainIDBase] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDBase] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDBase] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], chainQueryReqC[vaa.ChainIDBase], chainQueryResponseC[vaa.ChainIDBase], *unsafeDevMode) if err := supervisor.Run(ctx, "basewatch", common.WrapWithScissors(baseWatcher.Run, "basewatch")); err != nil { return err @@ -1501,7 +1502,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Sepolia watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDSepolia) chainObsvReqC[vaa.ChainIDSepolia] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainQueryReqC[vaa.ChainIDSepolia] = make(chan *common.PerChainQueryInternal, queryRequestBufferSize) + chainQueryReqC[vaa.ChainIDSepolia] = make(chan *query.PerChainQueryInternal, queryRequestBufferSize) sepoliaWatcher := evm.NewEthWatcher(*sepoliaRPC, sepoliaContractAddr, "sepolia", vaa.ChainIDSepolia, chainMsgC[vaa.ChainIDSepolia], nil, chainObsvReqC[vaa.ChainIDSepolia], chainQueryReqC[vaa.ChainIDSepolia], chainQueryResponseC[vaa.ChainIDSepolia], *unsafeDevMode) if err := supervisor.Run(ctx, "sepoliawatch", common.WrapWithScissors(sepoliaWatcher.Run, "sepoliawatch")); err != nil { return err @@ -1555,11 +1556,11 @@ func runNode(cmd *cobra.Command, args []string) { go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqReadC, chainObsvReqC) if *ccqEnabled { - ccqAllowedRequestersList, err := ccqParseAllowedRequesters(*ccqAllowedRequesters) + ccqAllowedRequestersList, err := query.ParseAllowedRequesters(*ccqAllowedRequesters) if err != nil { logger.Fatal("failed to parse allowed requesters list", zap.String("ccqAllowedRequesters", *ccqAllowedRequesters), zap.Error(err), zap.String("component", "ccqconfig")) } - go handleQueryRequests(rootCtx, logger, signedQueryReqReadC, chainQueryReqC, ccqAllowedRequestersList, queryResponseReadC, queryResponsePublicationWriteC, env) + go query.HandleQueryRequests(rootCtx, logger, signedQueryReqReadC, chainQueryReqC, ccqAllowedRequestersList, queryResponseReadC, queryResponsePublicationWriteC, env) } if acct != nil { diff --git a/node/hack/query/send_req.go b/node/hack/query/send_req.go index fe4bb254b3..6dd95de0d2 100644 --- a/node/hack/query/send_req.go +++ b/node/hack/query/send_req.go @@ -20,6 +20,7 @@ import ( "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1" + "github.com/certusone/wormhole/node/pkg/query" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common/hexutil" ethCrypto "github.com/ethereum/go-ethereum/crypto" @@ -181,7 +182,7 @@ func main() { } methods := []string{"name", "totalSupply"} - callData := []*common.EthCallData{} + callData := []*query.EthCallData{} to, _ := hex.DecodeString("0d500b1d8e8ef31e21c99d1db9a6444d3adf1270") for _, method := range methods { @@ -190,7 +191,7 @@ func main() { panic(err) } - callData = append(callData, &common.EthCallData{ + callData = append(callData, &query.EthCallData{ To: to, Data: data, }) @@ -211,7 +212,7 @@ func main() { // block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2" // Start of query creation... - callRequest := &common.EthCallQueryRequest{ + callRequest := &query.EthCallQueryRequest{ BlockId: hexutil.EncodeBig(blockNum), CallData: callData, } @@ -228,7 +229,7 @@ func main() { // Second request... blockNum = blockNum.Sub(blockNum, big.NewInt(5)) - callRequest2 := &common.EthCallQueryRequest{ + callRequest2 := &query.EthCallQueryRequest{ BlockId: hexutil.EncodeBig(blockNum), CallData: callData, } @@ -239,7 +240,7 @@ func main() { // Now, want to send a single query with multiple requests... logger.Info("Starting multiquery test in 5...") time.Sleep(time.Second * 5) - multiCallRequest := []*common.EthCallQueryRequest{callRequest, callRequest2} + multiCallRequest := []*query.EthCallQueryRequest{callRequest, callRequest2} multQueryRequest := createQueryRequestWithMultipleRequests(multiCallRequest) sendQueryAndGetRsp(multQueryRequest, sk, th, ctx, logger, sub, wethAbi, methods) @@ -264,10 +265,10 @@ const ( GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY" ) -func createQueryRequest(callRequest *common.EthCallQueryRequest) *common.QueryRequest { - queryRequest := &common.QueryRequest{ +func createQueryRequest(callRequest *query.EthCallQueryRequest) *query.QueryRequest { + queryRequest := &query.QueryRequest{ Nonce: rand.Uint32(), - PerChainQueries: []*common.PerChainQueryRequest{ + PerChainQueries: []*query.PerChainQueryRequest{ { ChainId: 5, Query: callRequest, @@ -277,23 +278,23 @@ func createQueryRequest(callRequest *common.EthCallQueryRequest) *common.QueryRe return queryRequest } -func createQueryRequestWithMultipleRequests(callRequests []*common.EthCallQueryRequest) *common.QueryRequest { - perChainQueries := []*common.PerChainQueryRequest{} +func createQueryRequestWithMultipleRequests(callRequests []*query.EthCallQueryRequest) *query.QueryRequest { + perChainQueries := []*query.PerChainQueryRequest{} for _, req := range callRequests { - perChainQueries = append(perChainQueries, &common.PerChainQueryRequest{ + perChainQueries = append(perChainQueries, &query.PerChainQueryRequest{ ChainId: 5, Query: req, }) } - queryRequest := &common.QueryRequest{ + queryRequest := &query.QueryRequest{ Nonce: rand.Uint32(), PerChainQueries: perChainQueries, } return queryRequest } -func sendQueryAndGetRsp(queryRequest *common.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription, wethAbi abi.ABI, methods []string) { +func sendQueryAndGetRsp(queryRequest *query.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription, wethAbi abi.ABI, methods []string) { queryRequestBytes, err := queryRequest.Marshal() if err != nil { panic(err) @@ -301,7 +302,7 @@ func sendQueryAndGetRsp(queryRequest *common.QueryRequest, sk *ecdsa.PrivateKey, numQueries := len(queryRequest.PerChainQueries) // Sign the query request using our private key. - digest := common.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) + digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) sig, err := ethCrypto.Sign(digest.Bytes(), sk) if err != nil { panic(err) @@ -348,7 +349,7 @@ func sendQueryAndGetRsp(queryRequest *common.QueryRequest, sk *ecdsa.PrivateKey, switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedQueryResponse: logger.Info("query response received", zap.Any("response", m.SignedQueryResponse)) - var response common.QueryResponsePublication + var response query.QueryResponsePublication err := response.Unmarshal(m.SignedQueryResponse.QueryResponse) if err != nil { logger.Warn("failed to unmarshal response", zap.Error(err)) @@ -366,17 +367,17 @@ func sendQueryAndGetRsp(queryRequest *common.QueryRequest, sk *ecdsa.PrivateKey, for index := range response.PerChainResponses { logger.Info("per chain query response index", zap.Int("index", index)) - var localCallData []*common.EthCallData + var localCallData []*query.EthCallData switch ecq := queryRequest.PerChainQueries[index].Query.(type) { - case *common.EthCallQueryRequest: + case *query.EthCallQueryRequest: localCallData = ecq.CallData default: panic("unsupported query type") } - var localResp *common.EthCallQueryResponse + var localResp *query.EthCallQueryResponse switch ecq := response.PerChainResponses[index].Response.(type) { - case *common.EthCallQueryResponse: + case *query.EthCallQueryResponse: localResp = ecq default: panic("unsupported query type") diff --git a/node/hack/query/test/query_test.go b/node/hack/query/test/query_test.go index 4ed7707e55..e115743ce6 100644 --- a/node/hack/query/test/query_test.go +++ b/node/hack/query/test/query_test.go @@ -20,6 +20,7 @@ import ( "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1" + "github.com/certusone/wormhole/node/pkg/query" "github.com/ethereum/go-ethereum/accounts/abi" ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -185,21 +186,21 @@ func TestCrossChainQuery(t *testing.T) { } to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E") // WETH - callData := []*common.EthCallData{ + callData := []*query.EthCallData{ { To: to, Data: data, }, } - callRequest := &common.EthCallQueryRequest{ + callRequest := &query.EthCallQueryRequest{ BlockId: hexutil.EncodeBig(blockNum), CallData: callData, } - queryRequest := &common.QueryRequest{ + queryRequest := &query.QueryRequest{ Nonce: 1, - PerChainQueries: []*common.PerChainQueryRequest{ + PerChainQueries: []*query.PerChainQueryRequest{ { ChainId: 2, Query: callRequest, @@ -213,7 +214,7 @@ func TestCrossChainQuery(t *testing.T) { } // Sign the query request using our private key. - digest := common.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) + digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) sig, err := ethCrypto.Sign(digest.Bytes(), sk) if err != nil { panic(err) @@ -260,13 +261,13 @@ func TestCrossChainQuery(t *testing.T) { switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedQueryResponse: logger.Info("query response received", zap.Any("response", m.SignedQueryResponse)) - var response common.QueryResponsePublication + var response query.QueryResponsePublication err := response.Unmarshal(m.SignedQueryResponse.QueryResponse) if err != nil { logger.Fatal("failed to unmarshal response", zap.Error(err)) } if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) { - digest := common.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse) + digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse) signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature) if err != nil { logger.Fatal("failed to verify signature on response", @@ -296,9 +297,9 @@ func TestCrossChainQuery(t *testing.T) { break } - var pcq *common.EthCallQueryResponse + var pcq *query.EthCallQueryResponse switch ecq := response.PerChainResponses[0].Response.(type) { - case *common.EthCallQueryResponse: + case *query.EthCallQueryResponse: pcq = ecq default: panic("unsupported query type") diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index f3179684c8..177f1405bf 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -12,6 +12,7 @@ import ( "github.com/certusone/wormhole/node/pkg/accountant" node_common "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/governor" + "github.com/certusone/wormhole/node/pkg/query" "github.com/certusone/wormhole/node/pkg/version" "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" @@ -157,7 +158,7 @@ func Run( ibcFeaturesFunc func() string, ccqEnabled bool, signedQueryReqC chan<- *gossipv1.SignedQueryRequest, - queryResponseReadC <-chan *node_common.QueryResponsePublication, + queryResponseReadC <-chan *query.QueryResponsePublication, ) func(ctx context.Context) error { if components == nil { components = DefaultComponents() @@ -427,7 +428,7 @@ func Run( logger.Error("failed to marshal query response", zap.Error(err), zap.String("component", "ccqp2p")) continue } - digest := node_common.GetQueryResponseDigestFromBytes(msgBytes) + digest := query.GetQueryResponseDigestFromBytes(msgBytes) sig, err := ethcrypto.Sign(digest.Bytes(), gk) if err != nil { panic(err) @@ -605,7 +606,7 @@ func Run( case *gossipv1.GossipMessage_SignedQueryRequest: if signedQueryReqC != nil { if ccqEnabled { - if err := node_common.PostSignedQueryRequest(signedQueryReqC, m.SignedQueryRequest); err != nil { + if err := query.PostSignedQueryRequest(signedQueryReqC, m.SignedQueryRequest); err != nil { logger.Warn("failed to handle query request", zap.Error(err), zap.String("component", "ccqp2p")) } } else { diff --git a/node/pkg/query/helpers_test.go b/node/pkg/query/helpers_test.go new file mode 100644 index 0000000000..16ebc9152c --- /dev/null +++ b/node/pkg/query/helpers_test.go @@ -0,0 +1,58 @@ +package query + +import ( + "crypto/ecdsa" + "fmt" + "io" + "os" + + nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1" + + ethCrypto "github.com/ethereum/go-ethereum/crypto" + "golang.org/x/crypto/openpgp/armor" //nolint + "google.golang.org/protobuf/proto" +) + +const ( + GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY" +) + +// loadGuardianKey loads a serialized guardian key from disk. +func loadGuardianKey(filename string) (*ecdsa.PrivateKey, error) { + f, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + + p, err := armor.Decode(f) + if err != nil { + return nil, fmt.Errorf("failed to read armored file: %w", err) + } + + if p.Type != GuardianKeyArmoredBlock { + return nil, fmt.Errorf("invalid block type: %s", p.Type) + } + + b, err := io.ReadAll(p.Body) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var m nodev1.GuardianKey + err = proto.Unmarshal(b, &m) + if err != nil { + return nil, fmt.Errorf("failed to deserialize protobuf: %w", err) + } + + gk, err := ethCrypto.ToECDSA(m.Data) + if err != nil { + return nil, fmt.Errorf("failed to deserialize raw key data: %w", err) + } + + return gk, nil +} + +func makeChannelPair[T any](cap int) (<-chan T, chan<- T) { + out := make(chan T, cap) + return out, out +} diff --git a/node/pkg/common/query_test.go b/node/pkg/query/msg_test.go similarity index 99% rename from node/pkg/common/query_test.go rename to node/pkg/query/msg_test.go index 618f60b290..ce461f40ae 100644 --- a/node/pkg/common/query_test.go +++ b/node/pkg/query/msg_test.go @@ -1,4 +1,4 @@ -package common +package query import ( "encoding/hex" diff --git a/node/cmd/guardiand/query.go b/node/pkg/query/query.go similarity index 87% rename from node/cmd/guardiand/query.go rename to node/pkg/query/query.go index b5c61d51f1..49f222ab14 100644 --- a/node/cmd/guardiand/query.go +++ b/node/pkg/query/query.go @@ -1,4 +1,4 @@ -package guardiand +package query import ( "context" @@ -18,47 +18,47 @@ import ( ) const ( - // requestTimeout indicates how long before a request is considered to have timed out. - requestTimeout = 1 * time.Minute + // RequestTimeout indicates how long before a request is considered to have timed out. + RequestTimeout = 1 * time.Minute - // retryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker. - retryInterval = 10 * time.Second + // RetryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker. + RetryInterval = 10 * time.Second ) type ( // pendingQuery is the cache entry for a given query. pendingQuery struct { signedRequest *gossipv1.SignedQueryRequest - request *common.QueryRequest + request *QueryRequest requestID string receiveTime time.Time queries []*perChainQuery - responses []*common.PerChainQueryResponseInternal + responses []*PerChainQueryResponseInternal // respPub is only populated when we need to retry sending the response to p2p. - respPub *common.QueryResponsePublication + respPub *QueryResponsePublication } // perChainQuery is the data associated with a single per chain query in a query request. perChainQuery struct { - req *common.PerChainQueryInternal - channel chan *common.PerChainQueryInternal + req *PerChainQueryInternal + channel chan *PerChainQueryInternal lastUpdateTime time.Time } ) -// handleQueryRequests multiplexes observation requests to the appropriate chain -func handleQueryRequests( +// HandleQueryRequests multiplexes observation requests to the appropriate chain +func HandleQueryRequests( ctx context.Context, logger *zap.Logger, signedQueryReqC <-chan *gossipv1.SignedQueryRequest, - chainQueryReqC map[vaa.ChainID]chan *common.PerChainQueryInternal, + chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal, allowedRequestors map[ethCommon.Address]struct{}, - queryResponseReadC <-chan *common.PerChainQueryResponseInternal, - queryResponseWriteC chan<- *common.QueryResponsePublication, + queryResponseReadC <-chan *PerChainQueryResponseInternal, + queryResponseWriteC chan<- *QueryResponsePublication, env common.Environment, ) { - handleQueryRequestsImpl(ctx, logger, signedQueryReqC, chainQueryReqC, allowedRequestors, queryResponseReadC, queryResponseWriteC, env, requestTimeout, retryInterval) + handleQueryRequestsImpl(ctx, logger, signedQueryReqC, chainQueryReqC, allowedRequestors, queryResponseReadC, queryResponseWriteC, env, RequestTimeout, RetryInterval) } // handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters. @@ -66,10 +66,10 @@ func handleQueryRequestsImpl( ctx context.Context, logger *zap.Logger, signedQueryReqC <-chan *gossipv1.SignedQueryRequest, - chainQueryReqC map[vaa.ChainID]chan *common.PerChainQueryInternal, + chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal, allowedRequestors map[ethCommon.Address]struct{}, - queryResponseReadC <-chan *common.PerChainQueryResponseInternal, - queryResponseWriteC chan<- *common.QueryResponsePublication, + queryResponseReadC <-chan *PerChainQueryResponseInternal, + queryResponseWriteC chan<- *QueryResponsePublication, env common.Environment, requestTimeoutImpl time.Duration, retryIntervalImpl time.Duration, @@ -119,7 +119,7 @@ func handleQueryRequestsImpl( // - valid "block" strings requestID := hex.EncodeToString(signedRequest.Signature) - digest := common.QueryRequestDigest(env, signedRequest.QueryRequest) + digest := QueryRequestDigest(env, signedRequest.QueryRequest) signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature) if err != nil { @@ -140,7 +140,7 @@ func handleQueryRequestsImpl( continue } - var queryRequest common.QueryRequest + var queryRequest QueryRequest err = queryRequest.Unmarshal(signedRequest.QueryRequest) if err != nil { qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err)) @@ -155,7 +155,7 @@ func handleQueryRequestsImpl( // Build the set of per chain queries and placeholders for the per chain responses. errorFound := false queries := []*perChainQuery{} - responses := make([]*common.PerChainQueryResponseInternal, len(queryRequest.PerChainQueries)) + responses := make([]*PerChainQueryResponseInternal, len(queryRequest.PerChainQueries)) receiveTime := time.Now() for requestIdx, pcq := range queryRequest.PerChainQueries { @@ -174,7 +174,7 @@ func handleQueryRequestsImpl( } queries = append(queries, &perChainQuery{ - req: &common.PerChainQueryInternal{ + req: &PerChainQueryInternal{ RequestID: requestID, RequestIdx: requestIdx, Request: pcq, @@ -204,7 +204,7 @@ func handleQueryRequestsImpl( } case resp := <-queryResponseReadC: // Response from a watcher. - if resp.Status == common.QuerySuccess { + if resp.Status == QuerySuccess { if resp.Response == nil { qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID)) continue @@ -234,20 +234,20 @@ func handleQueryRequestsImpl( } // Build the list of per chain response publications and the overall query response publication. - responses := []*common.PerChainQueryResponse{} + responses := []*PerChainQueryResponse{} for _, resp := range pq.responses { if resp == nil { qLogger.Error("unexpected null response in pending query!", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) continue } - responses = append(responses, &common.PerChainQueryResponse{ + responses = append(responses, &PerChainQueryResponse{ ChainId: resp.ChainId, Response: resp.Response, }) } - respPub := &common.QueryResponsePublication{ + respPub := &QueryResponsePublication{ Request: pq.signedRequest, PerChainResponses: responses, } @@ -261,13 +261,13 @@ func handleQueryRequestsImpl( qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID)) pq.respPub = respPub } - } else if resp.Status == common.QueryRetryNeeded { + } else if resp.Status == QueryRetryNeeded { if _, exists := pendingQueries[resp.RequestID]; exists { qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) } else { qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) } - } else if resp.Status == common.QueryFatalError { + } else if resp.Status == QueryFatalError { qLogger.Warn("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) delete(pendingQueries, resp.RequestID) } else { @@ -307,8 +307,8 @@ func handleQueryRequestsImpl( } } -// ccqParseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups. -func ccqParseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) { +// ParseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups. +func ParseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) { if ccqAllowedRequesters == "" { return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified") } diff --git a/node/cmd/guardiand/query_test.go b/node/pkg/query/query_test.go similarity index 83% rename from node/cmd/guardiand/query_test.go rename to node/pkg/query/query_test.go index 92a8ed14bc..d55e1c3c84 100644 --- a/node/cmd/guardiand/query_test.go +++ b/node/pkg/query/query_test.go @@ -1,4 +1,4 @@ -package guardiand +package query import ( "bytes" @@ -50,21 +50,21 @@ func createPerChainQueryForTesting( chainId vaa.ChainID, block string, numCalls int, -) *common.PerChainQueryRequest { - callData := []*common.EthCallData{} +) *PerChainQueryRequest { + callData := []*EthCallData{} for count := 0; count < numCalls; count++ { - callData = append(callData, &common.EthCallData{ + callData = append(callData, &EthCallData{ To: []byte(fmt.Sprintf("%-20s", fmt.Sprintf("To for %d:%d", chainId, count))), Data: []byte(fmt.Sprintf("CallData for %d:%d", chainId, count)), }) } - callRequest := &common.EthCallQueryRequest{ + callRequest := &EthCallQueryRequest{ BlockId: block, CallData: callData, } - return &common.PerChainQueryRequest{ + return &PerChainQueryRequest{ ChainId: chainId, Query: callRequest, } @@ -73,10 +73,10 @@ func createPerChainQueryForTesting( // createSignedQueryRequestForTesting creates a query request object and signs it using the specified key. func createSignedQueryRequestForTesting( sk *ecdsa.PrivateKey, - perChainQueries []*common.PerChainQueryRequest, -) (*gossipv1.SignedQueryRequest, *common.QueryRequest) { + perChainQueries []*PerChainQueryRequest, +) (*gossipv1.SignedQueryRequest, *QueryRequest) { nonce += 1 - queryRequest := &common.QueryRequest{ + queryRequest := &QueryRequest{ Nonce: nonce, PerChainQueries: perChainQueries, } @@ -86,7 +86,7 @@ func createSignedQueryRequestForTesting( panic(err) } - digest := common.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) + digest := QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes) sig, err := ethCrypto.Sign(digest.Bytes(), sk) if err != nil { panic(err) @@ -101,17 +101,17 @@ func createSignedQueryRequestForTesting( } // createExpectedResultsForTest generates an array of the results expected for a request. These results are returned by the watcher, and used to validate the response. -func createExpectedResultsForTest(perChainQueries []*common.PerChainQueryRequest) []common.PerChainQueryResponse { - expectedResults := []common.PerChainQueryResponse{} +func createExpectedResultsForTest(perChainQueries []*PerChainQueryRequest) []PerChainQueryResponse { + expectedResults := []PerChainQueryResponse{} for _, pcq := range perChainQueries { switch req := pcq.Query.(type) { - case *common.EthCallQueryRequest: + case *EthCallQueryRequest: now := time.Now() blockNum, err := strconv.ParseUint(strings.TrimPrefix(req.BlockId, "0x"), 16, 64) if err != nil { panic("invalid blockNum!") } - resp := &common.EthCallQueryResponse{ + resp := &EthCallQueryResponse{ BlockNumber: blockNum, Hash: ethCommon.HexToHash("0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"), Time: timeForTest(timeForTest(now)), @@ -120,7 +120,7 @@ func createExpectedResultsForTest(perChainQueries []*common.PerChainQueryRequest for _, cd := range req.CallData { resp.Results = append(resp.Results, []byte(hex.EncodeToString(cd.To)+":"+hex.EncodeToString(cd.Data))) } - expectedResults = append(expectedResults, common.PerChainQueryResponse{ + expectedResults = append(expectedResults, PerChainQueryResponse{ ChainId: pcq.ChainId, Response: resp, }) @@ -136,13 +136,13 @@ func createExpectedResultsForTest(perChainQueries []*common.PerChainQueryRequest // validateResponseForTest performs validation on the responses generated by these tests. Note that it is not a generalized validate function. func validateResponseForTest( t *testing.T, - response *common.QueryResponsePublication, + response *QueryResponsePublication, signedRequest *gossipv1.SignedQueryRequest, - queryRequest *common.QueryRequest, - expectedResults []common.PerChainQueryResponse, + queryRequest *QueryRequest, + expectedResults []PerChainQueryResponse, ) bool { require.NotNil(t, response) - require.True(t, common.SignedQueryRequestEqual(signedRequest, response.Request)) + require.True(t, SignedQueryRequestEqual(signedRequest, response.Request)) require.Equal(t, len(queryRequest.PerChainQueries), len(response.PerChainResponses)) require.True(t, bytes.Equal(response.Request.Signature, signedRequest.Signature)) require.Equal(t, len(response.PerChainResponses), len(expectedResults)) @@ -153,13 +153,8 @@ func validateResponseForTest( return true } -// A timestamp has nanos, but we only marshal down to micros, so trim our time to micros for testing purposes. -func timeForTest(t time.Time) time.Time { - return time.UnixMicro(t.UnixMicro()) -} - -func TestCcqParseAllowedRequestersSuccess(t *testing.T) { - ccqAllowedRequestersList, err := ccqParseAllowedRequesters(testSigner) +func TestParseAllowedRequestersSuccess(t *testing.T) { + ccqAllowedRequestersList, err := ParseAllowedRequesters(testSigner) require.NoError(t, err) require.NotNil(t, ccqAllowedRequestersList) require.Equal(t, 1, len(ccqAllowedRequestersList)) @@ -169,7 +164,7 @@ func TestCcqParseAllowedRequestersSuccess(t *testing.T) { _, exists = ccqAllowedRequestersList[ethCommon.BytesToAddress(ethCommon.Hex2Bytes("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf"))] require.False(t, exists) - ccqAllowedRequestersList, err = ccqParseAllowedRequesters("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf") + ccqAllowedRequestersList, err = ParseAllowedRequesters("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf") require.NoError(t, err) require.NotNil(t, ccqAllowedRequestersList) require.Equal(t, 2, len(ccqAllowedRequestersList)) @@ -180,18 +175,18 @@ func TestCcqParseAllowedRequestersSuccess(t *testing.T) { require.True(t, exists) } -func TestCcqParseAllowedRequestersFailsIfParameterEmpty(t *testing.T) { - ccqAllowedRequestersList, err := ccqParseAllowedRequesters("") +func TestParseAllowedRequestersFailsIfParameterEmpty(t *testing.T) { + ccqAllowedRequestersList, err := ParseAllowedRequesters("") require.Error(t, err) require.Nil(t, ccqAllowedRequestersList) - ccqAllowedRequestersList, err = ccqParseAllowedRequesters(",") + ccqAllowedRequestersList, err = ParseAllowedRequesters(",") require.Error(t, err) require.Nil(t, ccqAllowedRequestersList) } -func TestCcqParseAllowedRequestersFailsIfInvalidParameter(t *testing.T) { - ccqAllowedRequestersList, err := ccqParseAllowedRequesters("Hello") +func TestParseAllowedRequestersFailsIfInvalidParameter(t *testing.T) { + ccqAllowedRequestersList, err := ParseAllowedRequesters("Hello") require.Error(t, err) require.Nil(t, ccqAllowedRequestersList) } @@ -203,17 +198,17 @@ type mockData struct { signedQueryReqReadC <-chan *gossipv1.SignedQueryRequest signedQueryReqWriteC chan<- *gossipv1.SignedQueryRequest - chainQueryReqC map[vaa.ChainID]chan *common.PerChainQueryInternal + chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal - queryResponseReadC <-chan *common.PerChainQueryResponseInternal - queryResponseWriteC chan<- *common.PerChainQueryResponseInternal + queryResponseReadC <-chan *PerChainQueryResponseInternal + queryResponseWriteC chan<- *PerChainQueryResponseInternal - queryResponsePublicationReadC <-chan *common.QueryResponsePublication - queryResponsePublicationWriteC chan<- *common.QueryResponsePublication + queryResponsePublicationReadC <-chan *QueryResponsePublication + queryResponsePublicationWriteC chan<- *QueryResponsePublication mutex sync.Mutex - queryResponsePublication *common.QueryResponsePublication - expectedResults []common.PerChainQueryResponse + queryResponsePublication *QueryResponsePublication + expectedResults []PerChainQueryResponse requestsPerChain map[vaa.ChainID]int retriesPerChain map[vaa.ChainID]int } @@ -229,14 +224,14 @@ func (md *mockData) resetState() { } // setExpectedResults sets the results to be returned by the watchers. -func (md *mockData) setExpectedResults(expectedResults []common.PerChainQueryResponse) { +func (md *mockData) setExpectedResults(expectedResults []PerChainQueryResponse) { md.mutex.Lock() defer md.mutex.Unlock() md.expectedResults = expectedResults } // setRetries allows a test to specify how many times a given watcher should retry before returning success. -// If the count is the special value `fatalError`, the watcher will return common.QueryFatalError. +// If the count is the special value `fatalError`, the watcher will return QueryFatalError. func (md *mockData) setRetries(chainId vaa.ChainID, count int) { md.mutex.Lock() defer md.mutex.Unlock() @@ -253,7 +248,7 @@ func (md *mockData) incrementRequestsPerChainAlreadyLocked(chainId vaa.ChainID) } // getQueryResponsePublication returns the latest query response publication received by the mock. -func (md *mockData) getQueryResponsePublication() *common.QueryResponsePublication { +func (md *mockData) getQueryResponsePublication() *QueryResponsePublication { md.mutex.Lock() defer md.mutex.Unlock() return md.queryResponsePublication @@ -281,10 +276,10 @@ func (md *mockData) shouldIgnoreAlreadyLocked(chainId vaa.ChainID) bool { } // getStatusAlreadyLocked is used by the watchers to determine what query status they should return, based on the `retriesPerChain`. -func (md *mockData) getStatusAlreadyLocked(chainId vaa.ChainID) common.QueryStatus { +func (md *mockData) getStatusAlreadyLocked(chainId vaa.ChainID) QueryStatus { if val, exists := md.retriesPerChain[chainId]; exists { if val == fatalError { - return common.QueryFatalError + return QueryFatalError } val -= 1 if val > 0 { @@ -292,9 +287,9 @@ func (md *mockData) getStatusAlreadyLocked(chainId vaa.ChainID) common.QueryStat } else { delete(md.retriesPerChain, chainId) } - return common.QueryRetryNeeded + return QueryRetryNeeded } - return common.QuerySuccess + return QuerySuccess } // createQueryHandlerForTest creates the query handler mock environment, including the set of watchers and the response listener. @@ -311,28 +306,27 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context md := mockData{} var err error - *unsafeDevMode = true md.sk, err = loadGuardianKey("../../hack/query/dev.guardian.key") require.NoError(t, err) require.NotNil(t, md.sk) - ccqAllowedRequestersList, err := ccqParseAllowedRequesters(testSigner) + ccqAllowedRequestersList, err := ParseAllowedRequesters(testSigner) require.NoError(t, err) // Inbound observation requests from the p2p service (for all chains) - md.signedQueryReqReadC, md.signedQueryReqWriteC = makeChannelPair[*gossipv1.SignedQueryRequest](common.SignedQueryRequestChannelSize) + md.signedQueryReqReadC, md.signedQueryReqWriteC = makeChannelPair[*gossipv1.SignedQueryRequest](SignedQueryRequestChannelSize) // Per-chain query requests - md.chainQueryReqC = make(map[vaa.ChainID]chan *common.PerChainQueryInternal) + md.chainQueryReqC = make(map[vaa.ChainID]chan *PerChainQueryInternal) for _, chainId := range chains { - md.chainQueryReqC[chainId] = make(chan *common.PerChainQueryInternal) + md.chainQueryReqC[chainId] = make(chan *PerChainQueryInternal) } // Query responses from watchers to query handler aggregated across all chains - md.queryResponseReadC, md.queryResponseWriteC = makeChannelPair[*common.PerChainQueryResponseInternal](0) + md.queryResponseReadC, md.queryResponseWriteC = makeChannelPair[*PerChainQueryResponseInternal](0) // Query responses from query handler to p2p - md.queryResponsePublicationReadC, md.queryResponsePublicationWriteC = makeChannelPair[*common.QueryResponsePublication](0) + md.queryResponsePublicationReadC, md.queryResponsePublicationWriteC = makeChannelPair[*QueryResponsePublication](0) md.resetState() @@ -342,7 +336,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context // Create a routine for each configured watcher. It will take a per chain query and return the corresponding expected result. // It also pegs a counter of the number of requests the watcher received, for verification purposes. for chainId := range md.chainQueryReqC { - go func(chainId vaa.ChainID, chainQueryReqC <-chan *common.PerChainQueryInternal) { + go func(chainId vaa.ChainID, chainQueryReqC <-chan *PerChainQueryInternal) { for { select { case <-ctx.Done(): @@ -357,7 +351,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context results := md.expectedResults[pcqr.RequestIdx].Response status := md.getStatusAlreadyLocked(chainId) logger.Info("watcher returning", zap.String("chainId", chainId.String()), zap.Int("requestIdx", pcqr.RequestIdx), zap.Int("status", int(status))) - queryResponse := common.CreatePerChainQueryResponseInternal(pcqr.RequestID, pcqr.RequestIdx, pcqr.Request.ChainId, status, results) + queryResponse := CreatePerChainQueryResponseInternal(pcqr.RequestID, pcqr.RequestIdx, pcqr.Request.ChainId, status, results) md.queryResponseWriteC <- queryResponse } md.mutex.Unlock() @@ -387,7 +381,7 @@ func (md *mockData) startResponseListener(ctx context.Context) { } // waitForResponse is used by the tests to wait for a response publication. It will eventually timeout if the query fails. -func (md *mockData) waitForResponse() *common.QueryResponsePublication { +func (md *mockData) waitForResponse() *QueryResponsePublication { for count := 0; count < 50; count++ { time.Sleep(pollIntervalForTest) ret := md.getQueryResponsePublication() @@ -406,12 +400,12 @@ func TestInvalidQueries(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) - var perChainQueries []*common.PerChainQueryRequest + var perChainQueries []*PerChainQueryRequest var signedQueryRequest *gossipv1.SignedQueryRequest // Query with a bad signature should fail. md.resetState() - perChainQueries = []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} + perChainQueries = []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} signedQueryRequest, _ = createSignedQueryRequestForTesting(md.sk, perChainQueries) signedQueryRequest.Signature[0] += 1 // Corrupt the signature. md.signedQueryReqWriteC <- signedQueryRequest @@ -419,14 +413,14 @@ func TestInvalidQueries(t *testing.T) { // Query for an unsupported chain should fail. The supported chains are defined in supportedChains in query.go md.resetState() - perChainQueries = []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDAlgorand, "0x28d9630", 2)} + perChainQueries = []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDAlgorand, "0x28d9630", 2)} signedQueryRequest, _ = createSignedQueryRequestForTesting(md.sk, perChainQueries) md.signedQueryReqWriteC <- signedQueryRequest require.Nil(t, md.waitForResponse()) // Query for a chain that supports queries but that is not in the watcher channel map should fail. md.resetState() - perChainQueries = []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDSepolia, "0x28d9630", 2)} + perChainQueries = []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDSepolia, "0x28d9630", 2)} signedQueryRequest, _ = createSignedQueryRequestForTesting(md.sk, perChainQueries) md.signedQueryReqWriteC <- signedQueryRequest require.Nil(t, md.waitForResponse()) @@ -440,7 +434,7 @@ func TestSingleQueryShouldSucceed(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} + perChainQueries := []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} signedQueryRequest, queryRequest := createSignedQueryRequestForTesting(md.sk, perChainQueries) expectedResults := createExpectedResultsForTest(queryRequest.PerChainQueries) md.setExpectedResults(expectedResults) @@ -464,7 +458,7 @@ func TestBatchOfTwoQueriesShouldSucceed(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{ + perChainQueries := []*PerChainQueryRequest{ createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2), createPerChainQueryForTesting(vaa.ChainIDBSC, "0x28d9123", 3), } @@ -492,7 +486,7 @@ func TestQueryWithLimitedRetriesShouldSucceed(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} + perChainQueries := []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} signedQueryRequest, queryRequest := createSignedQueryRequestForTesting(md.sk, perChainQueries) expectedResults := createExpectedResultsForTest(queryRequest.PerChainQueries) md.setExpectedResults(expectedResults) @@ -520,7 +514,7 @@ func TestQueryWithRetryDueToTimeoutShouldSucceed(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} + perChainQueries := []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} signedQueryRequest, queryRequest := createSignedQueryRequestForTesting(md.sk, perChainQueries) expectedResults := createExpectedResultsForTest(queryRequest.PerChainQueries) md.setExpectedResults(expectedResults) @@ -547,7 +541,7 @@ func TestQueryWithTooManyRetriesShouldFail(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{ + perChainQueries := []*PerChainQueryRequest{ createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2), createPerChainQueryForTesting(vaa.ChainIDBSC, "0x28d9123", 3), } @@ -580,7 +574,7 @@ func TestQueryWithLimitedRetriesOnMultipleChainsShouldSucceed(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{ + perChainQueries := []*PerChainQueryRequest{ createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2), createPerChainQueryForTesting(vaa.ChainIDBSC, "0x28d9123", 3), } @@ -615,7 +609,7 @@ func TestFatalErrorOnPerChainQueryShouldCauseRequestToFail(t *testing.T) { md := createQueryHandlerForTest(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{ + perChainQueries := []*PerChainQueryRequest{ createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2), createPerChainQueryForTesting(vaa.ChainIDBSC, "0x28d9123", 3), } @@ -645,7 +639,7 @@ func TestPublishRetrySucceeds(t *testing.T) { md := createQueryHandlerForTestWithoutPublisher(t, ctx, logger, watcherChainsForTest) // Create the request and the expected results. Give the expected results to the mock. - perChainQueries := []*common.PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} + perChainQueries := []*PerChainQueryRequest{createPerChainQueryForTesting(vaa.ChainIDPolygon, "0x28d9630", 2)} signedQueryRequest, queryRequest := createSignedQueryRequestForTesting(md.sk, perChainQueries) expectedResults := createExpectedResultsForTest(queryRequest.PerChainQueries) md.setExpectedResults(expectedResults) diff --git a/node/pkg/common/queryRequest.go b/node/pkg/query/request.go similarity index 98% rename from node/pkg/common/queryRequest.go rename to node/pkg/query/request.go index 9f26415d46..79b25361a3 100644 --- a/node/pkg/common/queryRequest.go +++ b/node/pkg/query/request.go @@ -1,4 +1,4 @@ -package common +package query import ( "bytes" @@ -7,6 +7,7 @@ import ( "math" "strings" + "github.com/certusone/wormhole/node/pkg/common" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -74,12 +75,12 @@ type PerChainQueryInternal struct { } // QueryRequestDigest returns the query signing prefix based on the environment. -func QueryRequestDigest(env Environment, b []byte) ethCommon.Hash { +func QueryRequestDigest(env common.Environment, b []byte) ethCommon.Hash { // TODO: should this use a different standard of signing messages, like https://eips.ethereum.org/EIPS/eip-712 var queryRequestPrefix []byte - if env == MainNet { + if env == common.MainNet { queryRequestPrefix = []byte("mainnet_query_request_000000000000|") - } else if env == TestNet { + } else if env == common.TestNet { queryRequestPrefix = []byte("testnet_query_request_000000000000|") } else { queryRequestPrefix = []byte("devnet_query_request_0000000000000|") @@ -94,7 +95,7 @@ func PostSignedQueryRequest(signedQueryReqSendC chan<- *gossipv1.SignedQueryRequ case signedQueryReqSendC <- req: return nil default: - return ErrChanFull + return common.ErrChanFull } } diff --git a/node/pkg/common/queryResponse.go b/node/pkg/query/response.go similarity index 99% rename from node/pkg/common/queryResponse.go rename to node/pkg/query/response.go index f82ff3f371..b644ea9633 100644 --- a/node/pkg/common/queryResponse.go +++ b/node/pkg/query/response.go @@ -1,4 +1,4 @@ -package common +package query import ( "bytes" diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 48f8906dcb..c864edc565 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/query" "github.com/certusone/wormhole/node/pkg/readiness" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -98,10 +99,10 @@ type ( // Incoming query requests from the network. Pre-filtered to only // include requests for our chainID. - queryReqC <-chan *common.PerChainQueryInternal + queryReqC <-chan *query.PerChainQueryInternal // Outbound query responses to query requests - queryResponseC chan<- *common.PerChainQueryResponseInternal + queryResponseC chan<- *query.PerChainQueryResponseInternal pending map[pendingKey]*pendingMessage pendingMu sync.Mutex @@ -152,8 +153,8 @@ func NewEthWatcher( msgC chan<- *common.MessagePublication, setC chan<- *common.GuardianSet, obsvReqC <-chan *gossipv1.ObservationRequest, - queryReqC <-chan *common.PerChainQueryInternal, - queryResponseC chan<- *common.PerChainQueryResponseInternal, + queryReqC <-chan *query.PerChainQueryInternal, + queryResponseC chan<- *query.PerChainQueryResponseInternal, unsafeDevMode bool, ) *Watcher { @@ -539,7 +540,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } switch req := queryRequest.Request.Query.(type) { - case *common.EthCallQueryRequest: + case *query.EthCallQueryRequest: block := req.BlockId logger.Info("received query request", zap.String("eth_network", w.networkName), @@ -642,7 +643,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) continue } @@ -653,7 +654,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) continue } @@ -664,7 +665,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) continue } @@ -675,11 +676,11 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) continue } - resp := common.EthCallQueryResponse{ + resp := query.EthCallQueryResponse{ BlockNumber: blockResult.Number.ToInt().Uint64(), Hash: blockResult.Hash, Time: time.Unix(int64(blockResult.Time), 0), @@ -696,7 +697,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) errFound = true break } @@ -711,7 +712,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Any("batch", batch), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryRetryNeeded, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryRetryNeeded, nil) errFound = true break } @@ -733,7 +734,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } if !errFound { - w.ccqSendQueryResponse(logger, queryRequest, common.QuerySuccess, &resp) + w.ccqSendQueryResponse(logger, queryRequest, query.QuerySuccess, &resp) } default: @@ -741,7 +742,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())), zap.String("component", "ccqevm"), ) - w.ccqSendQueryResponse(logger, queryRequest, common.QueryFatalError, nil) + w.ccqSendQueryResponse(logger, queryRequest, query.QueryFatalError, nil) } } } @@ -1164,8 +1165,8 @@ func (w *Watcher) SetMaxWaitConfirmations(maxWaitConfirmations uint64) { } // ccqSendQueryResponse sends an error response back to the query handler. -func (w *Watcher) ccqSendQueryResponse(logger *zap.Logger, req *common.PerChainQueryInternal, status common.QueryStatus, resp *common.EthCallQueryResponse) { - queryResponse := common.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, resp) +func (w *Watcher) ccqSendQueryResponse(logger *zap.Logger, req *query.PerChainQueryInternal, status query.QueryStatus, resp *query.EthCallQueryResponse) { + queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, resp) select { case w.queryResponseC <- queryResponse: logger.Debug("published query response error to handler", zap.String("component", "ccqevm"))