Skip to content

Commit

Permalink
CCQ/Node: Metrics (#3517)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley authored Nov 14, 2023
1 parent 65701f9 commit 5c39c8d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
69 changes: 69 additions & 0 deletions node/pkg/query/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package query

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
allQueryRequestsReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_requests_received",
Help: "Total number of query requests received, valid and invalid",
})

validQueryRequestsReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_valid_query_requests_received",
Help: "Total number of valid query requests received",
})

invalidQueryRequestReceived = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_invalid_query_requests_received_by_reason",
Help: "Total number of invalid query requests received by reason",
}, []string{"reason"})

totalRequestsByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_requests_by_chain",
Help: "Total number of requests by chain",
}, []string{"chain_name"})

successfulQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_successful_query_responses_received_by_chain",
Help: "Total number of successful query responses received by chain",
}, []string{"chain_name"})

retryNeededQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_retry_needed_query_responses_received_by_chain",
Help: "Total number of retry needed query responses received by chain",
}, []string{"chain_name"})

fatalQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_fatal_query_responses_received_by_chain",
Help: "Total number of fatal query responses received by chain",
}, []string{"chain_name"})

queryResponsesPublished = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_responses_published",
Help: "Total number of query responses published",
})

queryRequestsTimedOut = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_requests_timed_out",
Help: "Total number of query requests that timed out",
})

TotalWatcherTime = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ccq_guardian_total_watcher_query_time_in_ms",
Help: "Time from time spent in the watcher per query in ms by chain",
Buckets: []float64{1.0, 5.0, 10.0, 100.0, 250.0, 500.0, 1000.0, 5000.0, 10000.0, 30000.0},
}, []string{"chain_name"})
)
20 changes: 20 additions & 0 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func handleQueryRequestsImpl(
delete(supportedChains, chainID)
} else {
logger.Info("queries supported on chain", zap.Stringer("chainID", chainID))

// Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled.
totalRequestsByChain.WithLabelValues(chainID.String()).Add(0)
}
}

Expand All @@ -175,6 +178,7 @@ func handleQueryRequestsImpl(
// - length check on "to" address 20 bytes
// - valid "block" strings

allQueryRequestsReceived.Inc()
requestID := hex.EncodeToString(signedRequest.Signature)
digest := QueryRequestDigest(env, signedRequest.QueryRequest)

Expand All @@ -183,31 +187,36 @@ func handleQueryRequestsImpl(
signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature)
if err != nil {
qLogger.Error("failed to recover public key", zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("failed_to_recover_public_key").Inc()
continue
}

signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])

if _, exists := allowedRequestors[signerAddress]; !exists {
qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
continue
}

// Make sure this is not a duplicate request. TODO: Should we do something smarter here than just dropping the duplicate?
if oldReq, exists := pendingQueries[requestID]; exists {
qLogger.Warn("dropping duplicate query request", zap.String("requestID", requestID), zap.Stringer("origRecvTime", oldReq.receiveTime))
invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc()
continue
}

var queryRequest QueryRequest
err = queryRequest.Unmarshal(signedRequest.QueryRequest)
if err != nil {
qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc()
continue
}

if err := queryRequest.Validate(); err != nil {
qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
invalidQueryRequestReceived.WithLabelValues("invalid_request").Inc()
continue
}

Expand All @@ -221,13 +230,15 @@ func handleQueryRequestsImpl(
chainID := vaa.ChainID(pcq.ChainId)
if _, exists := supportedChains[chainID]; !exists {
qLogger.Error("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
errorFound = true
break
}

channel, channelExists := chainQueryReqC[chainID]
if !channelExists {
qLogger.Error("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
errorFound = true
break
}
Expand All @@ -246,6 +257,8 @@ func handleQueryRequestsImpl(
continue
}

validQueryRequestsReceived.Inc()

// Create the pending query and add it to the cache.
pq := &pendingQuery{
signedRequest: signedRequest,
Expand All @@ -264,6 +277,7 @@ func handleQueryRequestsImpl(

case resp := <-queryResponseReadC: // Response from a watcher.
if resp.Status == QuerySuccess {
successfulQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
if resp.Response == nil {
qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID))
continue
Expand Down Expand Up @@ -315,18 +329,21 @@ func handleQueryRequestsImpl(
select {
case queryResponseWriteC <- respPub:
qLogger.Info("forwarded query response to p2p", zap.String("requestID", resp.RequestID))
queryResponsesPublished.Inc()
delete(pendingQueries, resp.RequestID)
default:
qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID))
pq.respPub = respPub
}
} else if resp.Status == QueryRetryNeeded {
retryNeededQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
if _, exists := pendingQueries[resp.RequestID]; exists {
qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
} else {
qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
}
} else if resp.Status == QueryFatalError {
fatalQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
qLogger.Error("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
delete(pendingQueries, resp.RequestID)
} else {
Expand All @@ -341,13 +358,15 @@ func handleQueryRequestsImpl(
qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout))
if timeout.Before(now) {
qLogger.Error("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
queryRequestsTimedOut.Inc()
delete(pendingQueries, reqId)
} else {
if pq.respPub != nil {
// Resend the response to be published.
select {
case queryResponseWriteC <- pq.respPub:
qLogger.Info("resend of query response to p2p succeeded", zap.String("requestID", reqId))
queryResponsesPublished.Inc()
delete(pendingQueries, reqId)
default:
qLogger.Warn("resend of query response to p2p failed again, will keep retrying", zap.String("requestID", reqId))
Expand Down Expand Up @@ -402,6 +421,7 @@ func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime t
// TODO: only send the query request itself and reassemble in this module
case pcq.channel <- pcq.req:
qLogger.Debug("forwarded query request to watcher", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chainID", pcq.req.Request.ChainId))
totalRequestsByChain.WithLabelValues(pcq.req.Request.ChainId.String()).Inc()
pcq.lastUpdateTime = receiveTime
default:
// By leaving lastUpdateTime unset, we will retry next interval.
Expand Down
4 changes: 4 additions & 0 deletions node/pkg/watchers/evm/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR
panic("ccqevm: invalid chain ID")
}

start := time.Now()

switch req := queryRequest.Request.Query.(type) {
case *query.EthCallQueryRequest:
w.ccqHandleEthCallQueryRequest(logger, ctx, queryRequest, req)
Expand All @@ -50,6 +52,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR
)
w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryFatalError)
}

query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds()))
}

// EvmCallData contains the details of a single query in the batch.
Expand Down

0 comments on commit 5c39c8d

Please sign in to comment.