Skip to content

Commit

Permalink
CCQ: Batch queries (#3023)
Browse files Browse the repository at this point in the history
* CCQ: Add marshalling tests

* More marshalling changes

* Start of support for multiple calls in a batch

* Multiple responses to go with multiple requests

* Support multiple EVM queries in a batch

* Multiple queries in a batch

* Start of handler tests

* More handler tests

* More handler tests

* node: add multi batch query tests

* Added comments to tests

* Per chain query time out not getting retried

* node: chang rand package

* Tweak serialization of per call request

---------

Co-authored-by: Paul Noel <[email protected]>
  • Loading branch information
bruce-riley and panoel committed Jun 14, 2023
1 parent 71ecfa1 commit db5aed2
Show file tree
Hide file tree
Showing 13 changed files with 2,084 additions and 493 deletions.
44 changes: 22 additions & 22 deletions node/cmd/guardiand/node.go

Large diffs are not rendered by default.

260 changes: 191 additions & 69 deletions node/cmd/guardiand/query.go

Large diffs are not rendered by default.

725 changes: 725 additions & 0 deletions node/cmd/guardiand/query_test.go

Large diffs are not rendered by default.

180 changes: 137 additions & 43 deletions node/hack/query/send_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"encoding/hex"
"fmt"
"io"
"math/big"
"os"
"strings"
"time"

"github.com/certusone/wormhole/node/hack/query/utils"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
Expand All @@ -32,6 +34,7 @@ import (
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/multiformats/go-multiaddr"
"github.com/tendermint/tendermint/libs/rand"
"go.uber.org/zap"
"golang.org/x/crypto/openpgp/armor" //nolint
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -177,32 +180,129 @@ func main() {
panic(err)
}

// methodName := "totalSupply"
methodName := "name"
data, err := wethAbi.Pack(methodName)
methods := []string{"name", "totalSupply"}
callData := []*gossipv1.EthCallQueryRequest_EthCallData{}
to, _ := hex.DecodeString("0d500b1d8e8ef31e21c99d1db9a6444d3adf1270")

for _, method := range methods {
data, err := wethAbi.Pack(method)
if err != nil {
panic(err)
}

callData = append(callData, &gossipv1.EthCallQueryRequest_EthCallData{
To: to,
Data: data,
})
}

// Fetch the latest block number
url := "https://rpc.ankr.com/polygon"
logger.Info("Querying for latest block height", zap.String("url", url))
blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
if err != nil {
panic(err)
logger.Fatal("Failed to fetch latest block number", zap.Error(err))
}

to, _ := hex.DecodeString("0d500b1d8e8ef31e21c99d1db9a6444d3adf1270")
logger.Info("latest block", zap.String("num", blockNum.String()), zap.String("encoded", hexutil.EncodeBig(blockNum)))

// block := "0x28d9630"
block := "latest"
// block := "latest"
// block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"

// Start of query creation...
callRequest := &gossipv1.EthCallQueryRequest{
To: to,
Data: data,
Block: block,
Block: hexutil.EncodeBig(blockNum),
CallData: callData,
}

// Send 2 individual requests for the same thing but 5 blocks apart
// First request...
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
queryRequest := createQueryRequest(callRequest)
sendQueryAndGetRsp(queryRequest, sk, th, ctx, logger, sub, wethAbi, methods)

// This is just so that when I look at the output, it is easier for me. (Paul)
logger.Info("sleeping for 5 seconds")
time.Sleep(time.Second * 5)

// Second request...
blockNum = blockNum.Sub(blockNum, big.NewInt(5))
callRequest2 := &gossipv1.EthCallQueryRequest{
Block: hexutil.EncodeBig(blockNum),
CallData: callData,
}
queryRequest2 := createQueryRequest(callRequest2)
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
sendQueryAndGetRsp(queryRequest2, sk, th, ctx, logger, sub, wethAbi, methods)

// Now, want to send a single query with multiple requests...
logger.Info("Starting multiquery test in 5...")
time.Sleep(time.Second * 5)
multiCallRequest := []*gossipv1.EthCallQueryRequest{callRequest, callRequest2}
multQueryRequest := createQueryRequestWithMultipleRequests(multiCallRequest)
sendQueryAndGetRsp(multQueryRequest, sk, th, ctx, logger, sub, wethAbi, methods)

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
sub.Cancel()
if err := th.Close(); err != nil {
logger.Fatal("Error closing the topic", zap.Error(err))
}
if err := h.Close(); err != nil {
logger.Fatal("Error closing the host", zap.Error(err))
}

//
// END SHUTDOWN
//

logger.Info("Success! All tests passed!")
}

const (
GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
)

func createQueryRequest(callRequest *gossipv1.EthCallQueryRequest) *gossipv1.QueryRequest {
queryRequest := &gossipv1.QueryRequest{
ChainId: 5,
Nonce: 0,
Message: &gossipv1.QueryRequest_EthCallQueryRequest{
EthCallQueryRequest: callRequest}}
Nonce: rand.Uint32(),
PerChainQueries: []*gossipv1.PerChainQueryRequest{
{
ChainId: 5,
Message: &gossipv1.PerChainQueryRequest_EthCallQueryRequest{
EthCallQueryRequest: callRequest,
},
},
},
}
return queryRequest
}

func createQueryRequestWithMultipleRequests(callRequests []*gossipv1.EthCallQueryRequest) *gossipv1.QueryRequest {
perChainQueries := []*gossipv1.PerChainQueryRequest{}
for _, req := range callRequests {
perChainQueries = append(perChainQueries, &gossipv1.PerChainQueryRequest{
ChainId: 5,
Message: &gossipv1.PerChainQueryRequest_EthCallQueryRequest{
EthCallQueryRequest: req,
},
})
}

queryRequest := &gossipv1.QueryRequest{
Nonce: rand.Uint32(),
PerChainQueries: perChainQueries,
}
return queryRequest
}

func sendQueryAndGetRsp(queryRequest *gossipv1.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription, wethAbi abi.ABI, methods []string) {
queryRequestBytes, err := proto.Marshal(queryRequest)
if err != nil {
panic(err)
}
numQueries := len(queryRequest.PerChainQueries)

// Sign the query request using our private key.
digest := common.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
Expand Down Expand Up @@ -261,14 +361,32 @@ func main() {
// TODO: verify response signature
isMatchingResponse = true

result, err := wethAbi.Methods[methodName].Outputs.Unpack(response.Response.Result)
if err != nil {
logger.Warn("failed to unpack result", zap.Error(err))
if len(response.PerChainResponses) != numQueries {
logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", numQueries), zap.Int("actualNum", len(response.PerChainResponses)))
break
}

resultStr := hexutil.Encode(response.Response.Result)
logger.Info("found matching response", zap.String("number", response.Response.Number.String()), zap.String("hash", response.Response.Hash.String()), zap.String("time", response.Response.Time.String()), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
// Do double loop over responses
for index, pcq := range response.PerChainResponses {
logger.Info("per chain query response index", zap.Int("index", index))

localCallData := queryRequest.PerChainQueries[index].GetEthCallQueryRequest().GetCallData()

if len(pcq.Responses) != len(localCallData) {
logger.Warn("unexpected number of results", zap.Int("expectedNum", len(localCallData)), zap.Int("expectedNum", len(pcq.Responses)))
break
}

for idx, resp := range pcq.Responses {
result, err := wethAbi.Methods[methods[idx]].Outputs.Unpack(resp.Result)
if err != nil {
logger.Warn("failed to unpack result", zap.Error(err))
break
}

resultStr := hexutil.Encode(resp.Result)
logger.Info("found matching response", zap.Int("idx", idx), zap.String("number", resp.Number.String()), zap.String("hash", resp.Hash.String()), zap.String("time", resp.Time.String()), zap.String("method", methods[idx]), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
}
}
}
default:
continue
Expand All @@ -277,32 +395,8 @@ func main() {
break
}
}

//
// BEGIN SHUTDOWN
//

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
sub.Cancel()
if err := th.Close(); err != nil {
logger.Fatal("Error closing the topic", zap.Error(err))
}
if err := h.Close(); err != nil {
logger.Fatal("Error closing the host", zap.Error(err))
}

//
// END SHUTDOWN
//

logger.Info("Success! All tests passed!")
}

const (
GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
)

// loadGuardianKey loads a serialized guardian key from disk.
func loadGuardianKey(filename string) (*ecdsa.PrivateKey, error) {
f, err := os.Open(filename)
Expand Down
53 changes: 41 additions & 12 deletions node/hack/query/test/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,30 @@ func TestCrossChainQuery(t *testing.T) {
panic(err)
}
to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E") // WETH

callData := []*gossipv1.EthCallQueryRequest_EthCallData{
{
To: to,
Data: data,
},
}

callRequest := &gossipv1.EthCallQueryRequest{
To: to,
Data: data,
Block: hexutil.EncodeBig(blockNum),
Block: hexutil.EncodeBig(blockNum),
CallData: callData,
}

queryRequest := &gossipv1.QueryRequest{
ChainId: 2,
Nonce: 0,
Message: &gossipv1.QueryRequest_EthCallQueryRequest{
EthCallQueryRequest: callRequest}}
Nonce: 1,
PerChainQueries: []*gossipv1.PerChainQueryRequest{
{
ChainId: 2,
Message: &gossipv1.PerChainQueryRequest_EthCallQueryRequest{
EthCallQueryRequest: callRequest,
},
},
},
}

queryRequestBytes, err := proto.Marshal(queryRequest)
if err != nil {
Expand Down Expand Up @@ -278,13 +292,28 @@ func TestCrossChainQuery(t *testing.T) {
continue
}

result, err := wethAbi.Methods[methodName].Outputs.Unpack(response.Response.Result)
if err != nil {
logger.Fatal("failed to unpack result", zap.Error(err))
if len(response.PerChainResponses) != 1 {
logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", 1), zap.Int("actualNum", len(response.PerChainResponses)))
break
}

pcq := response.PerChainResponses[0]

if len(pcq.Responses) == 0 {
logger.Warn("response did not contain any results", zap.Error(err))
break
}

resultStr := hexutil.Encode(response.Response.Result)
logger.Info("found matching response", zap.String("number", response.Response.Number.String()), zap.String("hash", response.Response.Hash.String()), zap.String("time", response.Response.Time.String()), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
for idx, resp := range pcq.Responses {
result, err := wethAbi.Methods[methodName].Outputs.Unpack(resp.Result)
if err != nil {
logger.Warn("failed to unpack result", zap.Error(err))
break
}

resultStr := hexutil.Encode(resp.Result)
logger.Info("found matching response", zap.Int("idx", idx), zap.String("number", resp.Number.String()), zap.String("hash", resp.Hash.String()), zap.String("time", resp.Time.String()), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
}

success = true
}
Expand Down
4 changes: 4 additions & 0 deletions node/hack/query/utils/fetchCurrentGuardianSet.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func FetchLatestBlockNumber(ctx context.Context, network common.Environment) (*b
if rawUrl == "" {
return nil, fmt.Errorf("unable to get rpc url")
}
return FetchLatestBlockNumberFromUrl(ctx, rawUrl)
}

func FetchLatestBlockNumberFromUrl(ctx context.Context, rawUrl string) (*big.Int, error) {
rawClient, err := ethRpc.DialContext(ctx, rawUrl)
if err != nil {
return nil, fmt.Errorf("unable to dial eth context: %w", err)
Expand Down
Loading

0 comments on commit db5aed2

Please sign in to comment.