Skip to content

Commit

Permalink
WIP: CCQ serialize and sign response
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-gray committed May 25, 2023
1 parent 1099c8e commit 4ed601d
Show file tree
Hide file tree
Showing 12 changed files with 693 additions and 331 deletions.
125 changes: 81 additions & 44 deletions node/cmd/guardiand/node.go

Large diffs are not rendered by default.

24 changes: 14 additions & 10 deletions node/cmd/guardiand/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"

gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -14,18 +14,18 @@ import (
// TODO: should this use a different standard of signing messages, like https://eips.ethereum.org/EIPS/eip-712
var queryRequestPrefix = []byte("query_request_00000000000000000000|")

func queryRequestDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(queryRequestPrefix, b...))
func queryRequestDigest(b []byte) ethCommon.Hash {
return ethCrypto.Keccak256Hash(append(queryRequestPrefix, b...))
}

var allowedRequestor = common.BytesToAddress(common.Hex2Bytes("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"))
var allowedRequestor = ethCommon.BytesToAddress(ethCommon.Hex2Bytes("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"))

// Multiplex observation requests to the appropriate chain
func handleQueryRequests(
ctx context.Context,
logger *zap.Logger,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *gossipv1.QueryRequest,
chainQueryReqC map[vaa.ChainID]chan *gossipv1.SignedQueryRequest,
) {
qLogger := logger.With(zap.String("component", "queryHandler"))
for {
Expand All @@ -37,16 +37,20 @@ func handleQueryRequests(
// request type validation is currently handled by the watcher
// in the future, it may be worthwhile to catch certain types of
// invalid requests here for tracking purposes
// e.g.
// - length check on "signature" 65 bytes
// - length check on "to" address 20 bytes
// - valid "block" strings

digest := queryRequestDigest(signedQueryRequest.QueryRequest)

signerBytes, err := ethcrypto.Ecrecover(digest.Bytes(), signedQueryRequest.Signature)
signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedQueryRequest.Signature)
if err != nil {
qLogger.Error("failed to recover public key")
continue
}

signerAddress := common.BytesToAddress(ethcrypto.Keccak256(signerBytes[1:])[12:])
signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])

if signerAddress != allowedRequestor {
qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()))
Expand All @@ -63,8 +67,8 @@ func handleQueryRequests(

if channel, ok := chainQueryReqC[vaa.ChainID(queryRequest.ChainId)]; ok {
select {
// TODO: is pointer fine here?
case channel <- &queryRequest:
// TODO: only send the query request itself and reassemble in this module
case channel <- signedQueryRequest:
default:
qLogger.Warn("failed to send query request to watcher",
zap.Uint16("chain_id", uint16(queryRequest.ChainId)))
Expand Down
19 changes: 3 additions & 16 deletions node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,6 @@ func runSpy(cmd *cobra.Command, args []string) {
// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound observation requests
queryReqC := make(chan *gossipv1.SignedQueryRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

Expand Down Expand Up @@ -505,18 +502,6 @@ func runSpy(cmd *cobra.Command, args []string) {
}
}()

// Ignore query requests
// Note: without this, the whole program hangs on query requests
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-queryReqC:
}
}
}()

// Log signed VAAs
go func() {
for {
Expand Down Expand Up @@ -568,7 +553,9 @@ func runSpy(cmd *cobra.Command, args []string) {
nil,
components,
nil, // ibc feature string
queryReqC,
nil, // query requests
nil, // query responses

)); err != nil {
return err
}
Expand Down
16 changes: 13 additions & 3 deletions node/hack/query/send_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func main() {
}

logger.Info("Waiting for message...")
// TODO: max wait time
for {
envelope, err := sub.Next(ctx)
if err != nil {
Expand All @@ -239,9 +240,18 @@ func main() {
zap.String("from", envelope.GetFrom().String()))
continue
}
// TODO: actually wait for the corresponding response
logger.Info("received message")
break
var isMatchingResponse bool
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedQueryResponse:
// TODO: check if it's matching
logger.Info("response received", zap.Any("response", m.SignedQueryResponse))
isMatchingResponse = true
default:
continue
}
if isMatchingResponse {
break
}
}

//
Expand Down
16 changes: 0 additions & 16 deletions node/pkg/common/queryReqSendC.go

This file was deleted.

16 changes: 16 additions & 0 deletions node/pkg/common/queryRequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package common

import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
)

const SignedQueryRequestChannelSize = 50

func PostSignedQueryRequest(signedQueryReqSendC chan<- *gossipv1.SignedQueryRequest, req *gossipv1.SignedQueryRequest) error {
select {
case signedQueryReqSendC <- req:
return nil
default:
return ErrChanFull
}
}
160 changes: 160 additions & 0 deletions node/pkg/common/queryResponse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package common

import (
"bytes"
"encoding/binary"
"fmt"
"math"
"time"

gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"google.golang.org/protobuf/proto"
)

var queryResponsePrefix = []byte("query_response_0000000000000000000|")

type EthCallQueryResponse struct {
Number *eth_hexutil.Big
Hash common.Hash
Time eth_hexutil.Uint64
Result []byte
}

type QueryResponsePublication struct {
Request *gossipv1.SignedQueryRequest
Response EthCallQueryResponse
}

func (msg *QueryResponsePublication) Marshal() ([]byte, error) {
// TODO: copy request write checks to query module request handling
// TODO: only receive the unmarshalled query request (see note in query.go)
var queryRequest gossipv1.QueryRequest
err := proto.Unmarshal(msg.Request.QueryRequest, &queryRequest)
if err != nil {
return nil, fmt.Errorf("received invalid message from query module")
}

buf := new(bytes.Buffer)

// Source
// TODO: support writing off-chain and on-chain requests
// Here, unset represents an off-chain request
vaa.MustWrite(buf, binary.BigEndian, vaa.ChainIDUnset)
buf.Write(msg.Request.Signature[:])

// Request
// TODO: support writing different types of request/response pairs
switch req := queryRequest.Message.(type) {
case *gossipv1.QueryRequest_EthCallQueryRequest:
vaa.MustWrite(buf, binary.BigEndian, uint8(1))
vaa.MustWrite(buf, binary.BigEndian, queryRequest.ChainId) // uint32
vaa.MustWrite(buf, binary.BigEndian, queryRequest.Nonce) // uint32
if len(req.EthCallQueryRequest.To) != 20 {
return nil, fmt.Errorf("invalid length for To contract")
}
buf.Write(req.EthCallQueryRequest.To)
if len(req.EthCallQueryRequest.Data) > math.MaxUint32 {
return nil, fmt.Errorf("request data too long")
}
vaa.MustWrite(buf, binary.BigEndian, uint32(len(req.EthCallQueryRequest.Data)))
buf.Write(req.EthCallQueryRequest.Data)
if len(req.EthCallQueryRequest.Block) > math.MaxUint32 {
return nil, fmt.Errorf("request block too long")
}
vaa.MustWrite(buf, binary.BigEndian, uint32(len(req.EthCallQueryRequest.Block)))
// TODO: should this be an enum or the literal string?
buf.Write([]byte(req.EthCallQueryRequest.Block))

// Response
// TODO: probably some kind of request/response pair validation
vaa.MustWrite(buf, binary.BigEndian, msg.Response.Number.ToInt().Uint64())
if len(msg.Response.Hash) != 32 {
return nil, fmt.Errorf("invalid length for block hash")
}
buf.Write(msg.Response.Hash[:])
vaa.MustWrite(buf, binary.BigEndian, uint32(time.Unix(int64(msg.Response.Time), 0).Unix()))
if len(msg.Response.Result) > math.MaxUint32 {
return nil, fmt.Errorf("response data too long")
}
vaa.MustWrite(buf, binary.BigEndian, uint32(len(msg.Response.Result)))
buf.Write(msg.Response.Result)
return buf.Bytes(), nil
default:
return nil, fmt.Errorf("received invalid message from query module")
}
}

// TODO
// Unmarshal deserializes the binary representation of a VAA
// func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
// if len(data) < minMsgLength {
// return nil, fmt.Errorf("message is too short")
// }

// msg := &MessagePublication{}

// reader := bytes.NewReader(data[:])

// txHash := common.Hash{}
// if n, err := reader.Read(txHash[:]); err != nil || n != 32 {
// return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
// }
// msg.TxHash = txHash

// unixSeconds := uint32(0)
// if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
// return nil, fmt.Errorf("failed to read timestamp: %w", err)
// }
// msg.Timestamp = time.Unix(int64(unixSeconds), 0)

// if err := binary.Read(reader, binary.BigEndian, &msg.Nonce); err != nil {
// return nil, fmt.Errorf("failed to read nonce: %w", err)
// }

// if err := binary.Read(reader, binary.BigEndian, &msg.Sequence); err != nil {
// return nil, fmt.Errorf("failed to read sequence: %w", err)
// }

// if err := binary.Read(reader, binary.BigEndian, &msg.ConsistencyLevel); err != nil {
// return nil, fmt.Errorf("failed to read consistency level: %w", err)
// }

// if err := binary.Read(reader, binary.BigEndian, &msg.EmitterChain); err != nil {
// return nil, fmt.Errorf("failed to read emitter chain: %w", err)
// }

// emitterAddress := vaa.Address{}
// if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
// return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
// }
// msg.EmitterAddress = emitterAddress

// payload := make([]byte, reader.Len())
// n, err := reader.Read(payload)
// if err != nil || n == 0 {
// return nil, fmt.Errorf("failed to read payload [%d]: %w", n, err)
// }
// msg.Payload = payload[:n]

// return msg, nil
// }

// Similar to sdk/vaa/structs.go,
// In order to save space in the solana signature verification instruction, we hash twice so we only need to pass in
// the first hash (32 bytes) vs the full body data.
// TODO: confirm if this works / is worthwhile.
func (msg *QueryResponsePublication) SigningDigest() (common.Hash, error) {
msgBytes, err := msg.Marshal()
if err != nil {
return common.Hash{}, err
}
return GetQueryResponseDigestFromBytes(msgBytes), nil
}

func GetQueryResponseDigestFromBytes(b []byte) common.Hash {
return crypto.Keccak256Hash(append(queryResponsePrefix, crypto.Keccak256Hash(b).Bytes()...))
}
35 changes: 34 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func Run(
components *Components,
ibcFeatures *string,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *node_common.QueryResponsePublication,
) func(ctx context.Context) error {
if components == nil {
components = DefaultComponents()
Expand Down Expand Up @@ -409,6 +410,36 @@ func Run(
} else {
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
case msg := <-queryResponseReadC:
msgBytes, err := msg.Marshal()
if err != nil {
logger.Error("failed to marshal query response", zap.Error(err))
continue
}
digest := node_common.GetQueryResponseDigestFromBytes(msgBytes)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}
envelope := &gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedQueryResponse{
SignedQueryResponse: &gossipv1.SignedQueryResponse{
QueryResponse: msgBytes,
Signature: sig,
},
},
}
b, err := proto.Marshal(envelope)
if err != nil {
panic(err)
}
err = th.Publish(ctx, b)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish query response", zap.Error(err))
} else {
logger.Info("published signed query response", zap.Any("query_response", msg), zap.Any("signature", sig))
}
}
}
}()
Expand Down Expand Up @@ -557,7 +588,9 @@ func Run(
}
case *gossipv1.GossipMessage_SignedQueryRequest:
if signedQueryReqC != nil {
signedQueryReqC <- m.SignedQueryRequest
if err := node_common.PostSignedQueryRequest(signedQueryReqC, m.SignedQueryRequest); err != nil {
logger.Warn("failed to handle query request", zap.Error(err))
}
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
Expand Down
1 change: 1 addition & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,6 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.components,
nil, // ibc feature string
nil, // signed query request channel
nil, // query response channel
))
}
Loading

0 comments on commit 4ed601d

Please sign in to comment.