Skip to content

Commit

Permalink
CCQ: Make work with node refactor (#3154)
Browse files Browse the repository at this point in the history
* CCQ: Make work with node refactor

Change-Id: I416db0e01156787b6ed2420598a448d23c4581ca

* Tweak log message

Change-Id: Ibf109cfdccf01297e0755fc849b23d42cde2b772
  • Loading branch information
bruce-riley authored and evan-gray committed Jul 19, 2023
1 parent a759125 commit f1f6160
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def build_node_yaml():
"--ibcContract",
"wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj"
]

return encode_yaml_stream(node_yaml_with_replicas)

k8s_yaml_with_ns(build_node_yaml())
Expand Down
3 changes: 2 additions & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func runNode(cmd *cobra.Command, args []string) {
// Deterministic ganache ETH devnet address.
*ethContract = unsafeDevModeEvmContractAddress(*ethContract)
*bscContract = unsafeDevModeEvmContractAddress(*bscContract)
// *polygonContract = unsafeDevModeEvmContractAddress(*polygonContract)
*polygonContract = unsafeDevModeEvmContractAddress(*polygonContract)
*avalancheContract = unsafeDevModeEvmContractAddress(*avalancheContract)
*oasisContract = unsafeDevModeEvmContractAddress(*oasisContract)
*auroraContract = unsafeDevModeEvmContractAddress(*auroraContract)
Expand Down Expand Up @@ -1368,6 +1368,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionWatchers(watcherConfigs, ibcWatcherConfig),
node.GuardianOptionAccountant(*accountantContract, *accountantWS, *accountantCheckEnabled, wormchainConn),
node.GuardianOptionGovernor(*chainGovernorEnabled),
node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),
Expand Down
20 changes: 20 additions & 0 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"

Expand Down Expand Up @@ -43,6 +44,7 @@ type G struct {
gst *common.GuardianSetState
acct *accountant.Accountant
gov *governor.ChainGovernor
queryHandler *query.QueryHandler
attestationEvents *reporter.AttestationEventReporter
publicrpcServer *grpc.Server

Expand All @@ -69,6 +71,12 @@ type G struct {
injectC channelPair[*vaa.VAA]
// acctC is the channel where messages will be put after they reached quorum in the accountant.
acctC channelPair[*common.MessagePublication]

// Cross Chain Query Handler channels
chainQueryReqC map[vaa.ChainID]chan *query.PerChainQueryInternal
signedQueryReqC channelPair[*gossipv1.SignedQueryRequest]
queryResponseC channelPair[*query.PerChainQueryResponseInternal]
queryResponsePublicationC channelPair[*query.QueryResponsePublication]
}

func NewGuardianNode(
Expand Down Expand Up @@ -96,6 +104,11 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc
g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)
g.injectC = makeChannelPair[*vaa.VAA](0)
g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity)
// Cross Chain Query Handler channels
g.chainQueryReqC = make(map[vaa.ChainID]chan *query.PerChainQueryInternal)
g.signedQueryReqC = makeChannelPair[*gossipv1.SignedQueryRequest](query.SignedQueryRequestChannelSize)
g.queryResponseC = makeChannelPair[*query.PerChainQueryResponseInternal](0)
g.queryResponsePublicationC = makeChannelPair[*query.QueryResponsePublication](0)

// Guardian set state managed by processor
g.gst = common.NewGuardianSetState(nil)
Expand Down Expand Up @@ -175,6 +188,13 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su
}
}

if g.queryHandler != nil {
logger.Info("Starting query handler", zap.String("component", "ccq"))
if err := g.queryHandler.Start(ctx); err != nil {
logger.Fatal("failed to create chain governor", zap.Error(err), zap.String("component", "ccq"))
}
}

// Start any other runnables
for name, runnable := range g.runnables {
if err := supervisor.Run(ctx, name, runnable); err != nil {
Expand Down
57 changes: 56 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/processor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
Expand Down Expand Up @@ -71,6 +72,33 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrap
nil,
components,
ibcFeaturesFunc,
(g.queryHandler != nil),
g.signedQueryReqC.writeC,
g.queryResponsePublicationC.readC,
)

return nil
}}
}

// GuardianOptionQueryHandler configures the Cross Chain Query module.
func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *GuardianOption {
return &GuardianOption{
name: "query",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
if !ccqEnabled {
logger.Info("ccq: cross chain query is disabled", zap.String("component", "ccq"))
return nil
}

g.queryHandler = query.NewQueryHandler(
logger,
g.env,
allowedRequesters,
g.signedQueryReqC.readC,
g.chainQueryReqC,
g.queryResponseC.readC,
g.queryResponsePublicationC.writeC,
)

return nil
Expand Down Expand Up @@ -273,6 +301,32 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
}(chainMsgC[chainId], chainId)
}

// Per-chain query response channel
chainQueryResponseC := make(map[vaa.ChainID]chan *query.PerChainQueryResponseInternal)
// aggregate per-chain msgC into msgC.
// SECURITY defense-in-depth: This way we enforce that a watcher must set the msg.EmitterChain to its chainId, which makes the code easier to audit
for _, chainId := range vaa.GetAllNetworkIDs() {
chainQueryResponseC[chainId] = make(chan *query.PerChainQueryResponseInternal)
go func(c <-chan *query.PerChainQueryResponseInternal, chainId vaa.ChainID) {
for {
select {
case <-ctx.Done():
return
case response := <-c:
if response.ChainId != chainId {
// SECURITY: This should never happen. If it does, a watcher has been compromised.
logger.Fatal("SECURITY CRITICAL: Received query response from a chain that was not marked as originating from that chain",
zap.Uint16("responseChainId", uint16(response.ChainId)),
zap.Stringer("watcherChainId", chainId),
)
} else {
g.queryResponseC.writeC <- response
}
}
}
}(chainQueryResponseC[chainId], chainId)
}

watchers := make(map[watchers.NetworkID]interfaces.L1Finalizer)

for _, wc := range watcherConfigs {
Expand All @@ -288,6 +342,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
}

chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
g.chainQueryReqC[wc.GetChainID()] = make(chan *query.PerChainQueryInternal, query.QueryRequestBufferSize)

if wc.RequiredL1Finalizer() != "" {
l1watcher, ok := watchers[wc.RequiredL1Finalizer()]
Expand All @@ -299,7 +354,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
wc.SetL1Finalizer(l1watcher)
}

l1finalizer, runnable, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.setC.writeC, g.env)
l1finalizer, runnable, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.chainQueryReqC[wc.GetChainID()], chainQueryResponseC[wc.GetChainID()], g.setC.writeC, g.env)

if err != nil {
return fmt.Errorf("error creating watcher: %w", err)
Expand Down
89 changes: 72 additions & 17 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"

ethCommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -23,9 +24,47 @@ const (

// RetryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker.
RetryInterval = 10 * time.Second

// SignedQueryRequestChannelSize is the buffer size of the incoming query request channel.
SignedQueryRequestChannelSize = 50

// QueryRequestBufferSize is the buffer size of the per-network query request channel.
QueryRequestBufferSize = 25
)

func NewQueryHandler(
logger *zap.Logger,
env common.Environment,
allowedRequestorsStr string,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
queryResponseReadC <-chan *PerChainQueryResponseInternal,
queryResponseWriteC chan<- *QueryResponsePublication,
) *QueryHandler {
return &QueryHandler{
logger: logger.With(zap.String("component", "ccq")),
env: env,
allowedRequestorsStr: allowedRequestorsStr,
signedQueryReqC: signedQueryReqC,
chainQueryReqC: chainQueryReqC,
queryResponseReadC: queryResponseReadC,
queryResponseWriteC: queryResponseWriteC,
}
}

type (
// QueryHandler defines the cross chain query handler.
QueryHandler struct {
logger *zap.Logger
env common.Environment
allowedRequestorsStr string
signedQueryReqC <-chan *gossipv1.SignedQueryRequest
chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal
queryResponseReadC <-chan *PerChainQueryResponseInternal
queryResponseWriteC chan<- *QueryResponsePublication
allowedRequestors map[ethCommon.Address]struct{}
}

// pendingQuery is the cache entry for a given query.
pendingQuery struct {
signedRequest *gossipv1.SignedQueryRequest
Expand All @@ -47,18 +86,26 @@ type (
}
)

// HandleQueryRequests multiplexes observation requests to the appropriate chain
func HandleQueryRequests(
ctx context.Context,
logger *zap.Logger,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
allowedRequestors map[ethCommon.Address]struct{},
queryResponseReadC <-chan *PerChainQueryResponseInternal,
queryResponseWriteC chan<- *QueryResponsePublication,
env common.Environment,
) {
handleQueryRequestsImpl(ctx, logger, signedQueryReqC, chainQueryReqC, allowedRequestors, queryResponseReadC, queryResponseWriteC, env, RequestTimeout, RetryInterval)
// Start initializes the query handler and starts the runnable.
func (qh *QueryHandler) Start(ctx context.Context) error {
qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr))

var err error
qh.allowedRequestors, err = parseAllowedRequesters(qh.allowedRequestorsStr)
if err != nil {
return fmt.Errorf("failed to parse allowed requesters: %w", err)
}

if err := supervisor.Run(ctx, "query_handler", common.WrapWithScissors(qh.handleQueryRequests, "query_handler")); err != nil {
return fmt.Errorf("failed to start query handler routine: %w", err)
}

return nil
}

// handleQueryRequests multiplexes observation requests to the appropriate chain
func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval)
}

// handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
Expand All @@ -73,7 +120,7 @@ func handleQueryRequestsImpl(
env common.Environment,
requestTimeoutImpl time.Duration,
retryIntervalImpl time.Duration,
) {
) error {
qLogger := logger.With(zap.String("component", "ccqhandler"))
qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))

Expand Down Expand Up @@ -106,7 +153,7 @@ func handleQueryRequestsImpl(
for {
select {
case <-ctx.Done():
return
return nil

case signedRequest := <-signedQueryReqC: // Inbound query request.
// requestor validation happens here
Expand All @@ -121,6 +168,8 @@ func handleQueryRequestsImpl(
requestID := hex.EncodeToString(signedRequest.Signature)
digest := QueryRequestDigest(env, signedRequest.QueryRequest)

qLogger.Info("received a query request", zap.String("requestID", requestID))

signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature)
if err != nil {
qLogger.Error("failed to recover public key", zap.String("requestID", requestID))
Expand Down Expand Up @@ -296,7 +345,13 @@ func handleQueryRequestsImpl(
} else {
for requestIdx, pcq := range pq.queries {
if pq.responses[requestIdx] == nil && pcq.lastUpdateTime.Add(retryIntervalImpl).Before(now) {
qLogger.Info("retrying query request", zap.String("requestId", reqId), zap.Int("requestIdx", requestIdx), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("lastUpdateTime", pcq.lastUpdateTime))
qLogger.Info("retrying query request",
zap.String("requestId", reqId),
zap.Int("requestIdx", requestIdx),
zap.Stringer("receiveTime", pq.receiveTime),
zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
)
pcq.ccqForwardToWatcher(qLogger, pq.receiveTime)
}
}
Expand All @@ -307,8 +362,8 @@ func handleQueryRequestsImpl(
}
}

// ParseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups.
func ParseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) {
// parseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups.
func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) {
if ccqAllowedRequesters == "" {
return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
}
Expand Down
19 changes: 11 additions & 8 deletions node/pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func validateResponseForTest(
}

func TestParseAllowedRequestersSuccess(t *testing.T) {
ccqAllowedRequestersList, err := ParseAllowedRequesters(testSigner)
ccqAllowedRequestersList, err := parseAllowedRequesters(testSigner)
require.NoError(t, err)
require.NotNil(t, ccqAllowedRequestersList)
require.Equal(t, 1, len(ccqAllowedRequestersList))
Expand All @@ -164,7 +164,7 @@ func TestParseAllowedRequestersSuccess(t *testing.T) {
_, exists = ccqAllowedRequestersList[ethCommon.BytesToAddress(ethCommon.Hex2Bytes("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf"))]
require.False(t, exists)

ccqAllowedRequestersList, err = ParseAllowedRequesters("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf")
ccqAllowedRequestersList, err = parseAllowedRequesters("beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBf")
require.NoError(t, err)
require.NotNil(t, ccqAllowedRequestersList)
require.Equal(t, 2, len(ccqAllowedRequestersList))
Expand All @@ -176,17 +176,17 @@ func TestParseAllowedRequestersSuccess(t *testing.T) {
}

func TestParseAllowedRequestersFailsIfParameterEmpty(t *testing.T) {
ccqAllowedRequestersList, err := ParseAllowedRequesters("")
ccqAllowedRequestersList, err := parseAllowedRequesters("")
require.Error(t, err)
require.Nil(t, ccqAllowedRequestersList)

ccqAllowedRequestersList, err = ParseAllowedRequesters(",")
ccqAllowedRequestersList, err = parseAllowedRequesters(",")
require.Error(t, err)
require.Nil(t, ccqAllowedRequestersList)
}

func TestParseAllowedRequestersFailsIfInvalidParameter(t *testing.T) {
ccqAllowedRequestersList, err := ParseAllowedRequesters("Hello")
ccqAllowedRequestersList, err := parseAllowedRequesters("Hello")
require.Error(t, err)
require.Nil(t, ccqAllowedRequestersList)
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context
require.NoError(t, err)
require.NotNil(t, md.sk)

ccqAllowedRequestersList, err := ParseAllowedRequesters(testSigner)
ccqAllowedRequestersList, err := parseAllowedRequesters(testSigner)
require.NoError(t, err)

// Inbound observation requests from the p2p service (for all chains)
Expand All @@ -330,8 +330,11 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context

md.resetState()

go handleQueryRequestsImpl(ctx, logger, md.signedQueryReqReadC, md.chainQueryReqC, ccqAllowedRequestersList,
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest)
go func() {
err := handleQueryRequestsImpl(ctx, logger, md.signedQueryReqReadC, md.chainQueryReqC, ccqAllowedRequestersList,
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest)
assert.NoError(t, err)
}()

// Create a routine for each configured watcher. It will take a per chain query and return the corresponding expected result.
// It also pegs a counter of the number of requests the watcher received, for verification purposes.
Expand Down
1 change: 0 additions & 1 deletion node/pkg/query/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type EthCallData struct {
Data []byte
}

const SignedQueryRequestChannelSize = 50
const EvmContractAddressLength = 20

// PerChainQueryInternal is an internal representation of a query request that is passed to the watcher.
Expand Down
Loading

0 comments on commit f1f6160

Please sign in to comment.