Skip to content

Commit

Permalink
WIP: CCQ add block to call
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-gray committed May 25, 2023
1 parent b290e30 commit 1099c8e
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 106 deletions.
5 changes: 5 additions & 0 deletions node/cmd/guardiand/adminserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,6 +69,10 @@ func (m mockEVMConnector) RawCallContext(ctx context.Context, result interface{}
panic("unimplemented")
}

func (m mockEVMConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.BatchElem) error {
panic("unimplemented")
}

func generateGS(num int) (keys []*ecdsa.PrivateKey, addrs []common.Address) {
for i := 0; i < num; i++ {
key, err := ethcrypto.GenerateKey()
Expand Down
3 changes: 2 additions & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ var Build = "prod"

// observationRequestBufferSize is the buffer size of the per-network reobservation channel
const observationRequestBufferSize = 25

// queryRequestBufferSize is the buffer size of the per-network reobservation channel
const queryRequestBufferSize = 25

Expand Down Expand Up @@ -1546,7 +1547,7 @@ func runNode(cmd *cobra.Command, args []string) {
}

go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqReadC, chainObsvReqC)
go handleQueryRequests(rootCtx, clock.New(), logger, signedQueryReqReadC, chainQueryReqC)
go handleQueryRequests(rootCtx, logger, signedQueryReqReadC, chainQueryReqC)

if acct != nil {
if err := acct.Start(ctx); err != nil {
Expand Down
18 changes: 6 additions & 12 deletions node/cmd/guardiand/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package guardiand
import (
"context"

"github.com/benbjohnson/clock"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
Expand All @@ -24,7 +23,6 @@ var allowedRequestor = common.BytesToAddress(common.Hex2Bytes("beFA429d57cD18b7F
// Multiplex observation requests to the appropriate chain
func handleQueryRequests(
ctx context.Context,
clock clock.Clock,
logger *zap.Logger,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *gossipv1.QueryRequest,
Expand All @@ -37,33 +35,29 @@ func handleQueryRequests(
case signedQueryRequest := <-signedQueryReqC:
// requestor validation happens here
// request type validation is currently handled by the watcher
// in the future, it may be worthwhile to catch certain types of
// in the future, it may be worthwhile to catch certain types of
// invalid requests here for tracking purposes
requestorAddr := common.BytesToAddress(signedQueryRequest.RequestorAddr)
if requestorAddr != allowedRequestor {
qLogger.Error("invalid requestor", zap.String("requestor", requestorAddr.Hex()))
continue
}

digest := queryRequestDigest(signedQueryRequest.QueryRequest)

signerBytes, err := ethcrypto.Ecrecover(digest.Bytes(), signedQueryRequest.Signature)
if err != nil {
qLogger.Error("failed to recover public key", zap.String("requestor", requestorAddr.Hex()))
qLogger.Error("failed to recover public key")
continue
}

signerAddress := common.BytesToAddress(ethcrypto.Keccak256(signerBytes[1:])[12:])
if signerAddress != requestorAddr {
qLogger.Error("requestor signer mismatch", zap.String("requestor", requestorAddr.Hex()), zap.String("signer", signerAddress.Hex()))

if signerAddress != allowedRequestor {
qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()))
continue
}

var queryRequest gossipv1.QueryRequest
err = proto.Unmarshal(signedQueryRequest.QueryRequest, &queryRequest)
if err != nil {
qLogger.Error("received invalid message",
zap.String("requestor", requestorAddr.Hex()))
zap.String("requestor", signerAddress.Hex()))
continue
}

Expand Down
79 changes: 50 additions & 29 deletions node/hack/query/send_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"os"
"strings"
"time"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
Expand Down Expand Up @@ -43,9 +44,22 @@ func queryRequestDigest(b []byte) ethCommon.Hash {
// this script has to be run inside kubernetes since it relies on UDP
// https://github.com/kubernetes/kubernetes/issues/47862
// kubectl --namespace=wormhole exec -it spy-0 -- sh -c "cd node/hack/query/ && go run send_req.go"
// one way to iterate inside the container
// kubectl --namespace=wormhole exec -it spy-0 -- bash
// apt update
// apt install nano
// cd node/hack/query
// echo "" > send_req.go
// nano send_req.go
// [paste, ^x, y, enter]
// go run send_req.go

func main() {

//
// BEGIN SETUP
//

p2pNetworkID := "/wormhole/dev"
var p2pPort uint = 8998 // don't collide with spy so we can run from the same container in tilt
p2pBootstrap := "/dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
Expand Down Expand Up @@ -144,28 +158,37 @@ func main() {
if err != nil {
logger.Panic("failed to join topic", zap.Error(err))
}

sub, err := th.Subscribe()
if err != nil {
logger.Panic("failed to subscribe topic", zap.Error(err))
}

logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

// Wait for peers
for len(th.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}

//
// END SETUP
//

to, _ := hex.DecodeString("0d500b1d8e8ef31e21c99d1db9a6444d3adf1270")
data, _ := hex.DecodeString("18160ddd")
// block := "0x28d9630"
// block := "latest"
block := "0x2e0d2bc116d77308db4e76eb906f6c168767ed00ad62cd2e2a31c61744c506e6"
block := "latest"
// block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"
callRequest := &gossipv1.EthCallQueryRequest{
To: to,
Data: data,
Block: block,
To: to,
Data: data,
Block: block,
}
queryRequest := &gossipv1.QueryRequest{
ChainId: 5,
Nonce: 0,
Nonce: 0,
Message: &gossipv1.QueryRequest_EthCallQueryRequest{
EthCallQueryRequest: callRequest}}

Expand All @@ -183,8 +206,7 @@ func main() {

signedQueryRequest := &gossipv1.SignedQueryRequest{
QueryRequest: queryRequestBytes,
Signature: sig,
RequestorAddr: ethCrypto.PubkeyToAddress(sk.PublicKey).Bytes(),
Signature: sig,
}

msg := gossipv1.GossipMessage{
Expand All @@ -198,25 +220,6 @@ func main() {
panic(err)
}

// do something to wait for peers, this waits to receive a message
logger.Info("Waiting for a message...")
for {
envelope, err := sub.Next(ctx)
if err != nil {
logger.Panic("failed to receive pubsub message", zap.Error(err))
}
var msg gossipv1.GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
if err != nil {
logger.Info("received invalid message",
zap.Binary("data", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
continue
}
logger.Info("Received a message!")
break
}

err = th.Publish(ctx, b)
if err != nil {
panic(err)
Expand All @@ -241,6 +244,24 @@ func main() {
break
}

//
// BEGIN SHUTDOWN
//

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
sub.Cancel()
if err := th.Close(); err != nil {
logger.Fatal("Error closing the topic", zap.Error(err))
}
if err := h.Close(); err != nil {
logger.Fatal("Error closing the host", zap.Error(err))
}

//
// END SHUTDOWN
//

logger.Info("Success! All tests passed!")
}

Expand Down
1 change: 1 addition & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,6 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.signedGovSt,
g.components,
nil, // ibc feature string
nil, // signed query request channel
))
}
54 changes: 21 additions & 33 deletions node/pkg/proto/gossip/v1/gossip.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions node/pkg/watchers/evm/connectors/celo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethEvent "github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"

"github.com/certusone/wormhole/node/pkg/common"
"go.uber.org/zap"
Expand Down Expand Up @@ -181,6 +182,19 @@ func (c *CeloConnector) RawCallContext(ctx context.Context, result interface{},
return c.rawClient.CallContext(ctx, result, method, args...)
}

func (c *CeloConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.BatchElem) error {
celoB := make([]celoRpc.BatchElem, len(b))
for i, v := range b {
celoB[i] = celoRpc.BatchElem{
Method: v.Method,
Args: v.Args,
Result: v.Result,
Error: v.Error,
}
}
return c.rawClient.BatchCallContext(ctx, celoB)
}

func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished {
return &ethAbi.AbiLogMessagePublished{
Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()),
Expand Down
13 changes: 13 additions & 0 deletions node/pkg/watchers/evm/connectors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,22 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)

type BlockMarshaller struct {
Number *hexutil.Big
Hash common.Hash `json:"hash"`
Time hexutil.Uint64 `json:"timestamp"`

// L1BlockNumber is the L1 block number in which an Arbitrum batch containing this block was submitted.
// This field is only populated when connecting to Arbitrum.
L1BlockNumber *hexutil.Big
}

type NewBlock struct {
Number *big.Int
Hash common.Hash
Expand All @@ -33,6 +45,7 @@ type Connector interface {
ParseLogMessagePublished(log types.Log) (*ethabi.AbiLogMessagePublished, error)
SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error)
RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
RawBatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}

type PollSubscription struct {
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/watchers/evm/connectors/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, errC chan er

func (e *EthereumConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return e.rawClient.CallContext(ctx, result, method, args...)
}

func (e *EthereumConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.BatchElem) error {
return e.rawClient.BatchCallContext(ctx, b)
}

func (e *EthereumConnector) Client() *ethClient.Client {
Expand Down
Loading

0 comments on commit 1099c8e

Please sign in to comment.