Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCQ/Node: Metrics #3517

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions node/pkg/query/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package query

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
allQueryRequestsReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_requests_received",
Help: "Total number of query requests received, valid and invalid",
})

validQueryRequestsReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_valid_query_requests_received",
Help: "Total number of valid query requests received",
})

invalidQueryRequestReceived = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_invalid_query_requests_received_by_reason",
Help: "Total number of invalid query requests received by reason",
}, []string{"reason"})

totalRequestsByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_requests_by_chain",
Help: "Total number of requests by chain",
}, []string{"chain_name"})

successfulQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_successful_query_responses_received_by_chain",
Help: "Total number of successful query responses received by chain",
}, []string{"chain_name"})

retryNeededQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_retry_needed_query_responses_received_by_chain",
Help: "Total number of retry needed query responses received by chain",
}, []string{"chain_name"})

fatalQueryResponsesReceivedByChain = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_guardian_total_fatal_query_responses_received_by_chain",
Help: "Total number of fatal query responses received by chain",
}, []string{"chain_name"})

queryResponsesPublished = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_responses_published",
Help: "Total number of query responses published",
})

queryRequestsTimedOut = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_guardian_total_query_requests_timed_out",
Help: "Total number of query requests that timed out",
})

TotalWatcherTime = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ccq_guardian_total_watcher_query_time_in_ms",
Help: "Time from time spent in the watcher per query in ms by chain",
Buckets: []float64{1.0, 5.0, 10.0, 100.0, 250.0, 500.0, 1000.0, 5000.0, 10000.0, 30000.0},
}, []string{"chain_name"})
)
20 changes: 20 additions & 0 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func handleQueryRequestsImpl(
delete(supportedChains, chainID)
} else {
logger.Info("queries supported on chain", zap.Stringer("chainID", chainID))

// Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled.
totalRequestsByChain.WithLabelValues(chainID.String()).Add(0)
}
}

Expand All @@ -175,6 +178,7 @@ func handleQueryRequestsImpl(
// - length check on "to" address 20 bytes
// - valid "block" strings

allQueryRequestsReceived.Inc()
requestID := hex.EncodeToString(signedRequest.Signature)
digest := QueryRequestDigest(env, signedRequest.QueryRequest)

Expand All @@ -183,31 +187,36 @@ func handleQueryRequestsImpl(
signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature)
if err != nil {
qLogger.Error("failed to recover public key", zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("failed_to_recover_public_key").Inc()
continue
}

signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])

if _, exists := allowedRequestors[signerAddress]; !exists {
qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
continue
}

// Make sure this is not a duplicate request. TODO: Should we do something smarter here than just dropping the duplicate?
if oldReq, exists := pendingQueries[requestID]; exists {
qLogger.Warn("dropping duplicate query request", zap.String("requestID", requestID), zap.Stringer("origRecvTime", oldReq.receiveTime))
invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc()
continue
}

var queryRequest QueryRequest
err = queryRequest.Unmarshal(signedRequest.QueryRequest)
if err != nil {
qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc()
continue
}

if err := queryRequest.Validate(); err != nil {
qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
invalidQueryRequestReceived.WithLabelValues("invalid_request").Inc()
continue
}

Expand All @@ -221,13 +230,15 @@ func handleQueryRequestsImpl(
chainID := vaa.ChainID(pcq.ChainId)
if _, exists := supportedChains[chainID]; !exists {
qLogger.Error("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
errorFound = true
break
}

channel, channelExists := chainQueryReqC[chainID]
if !channelExists {
qLogger.Error("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
errorFound = true
break
}
Expand All @@ -246,6 +257,8 @@ func handleQueryRequestsImpl(
continue
}

validQueryRequestsReceived.Inc()

// Create the pending query and add it to the cache.
pq := &pendingQuery{
signedRequest: signedRequest,
Expand All @@ -264,6 +277,7 @@ func handleQueryRequestsImpl(

case resp := <-queryResponseReadC: // Response from a watcher.
if resp.Status == QuerySuccess {
successfulQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
if resp.Response == nil {
qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID))
continue
Expand Down Expand Up @@ -315,18 +329,21 @@ func handleQueryRequestsImpl(
select {
case queryResponseWriteC <- respPub:
qLogger.Info("forwarded query response to p2p", zap.String("requestID", resp.RequestID))
queryResponsesPublished.Inc()
delete(pendingQueries, resp.RequestID)
default:
qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID))
pq.respPub = respPub
}
} else if resp.Status == QueryRetryNeeded {
retryNeededQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
if _, exists := pendingQueries[resp.RequestID]; exists {
qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
} else {
qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
}
} else if resp.Status == QueryFatalError {
fatalQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
qLogger.Error("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
delete(pendingQueries, resp.RequestID)
} else {
Expand All @@ -341,13 +358,15 @@ func handleQueryRequestsImpl(
qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout))
if timeout.Before(now) {
qLogger.Error("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
queryRequestsTimedOut.Inc()
delete(pendingQueries, reqId)
} else {
if pq.respPub != nil {
// Resend the response to be published.
select {
case queryResponseWriteC <- pq.respPub:
qLogger.Info("resend of query response to p2p succeeded", zap.String("requestID", reqId))
queryResponsesPublished.Inc()
delete(pendingQueries, reqId)
default:
qLogger.Warn("resend of query response to p2p failed again, will keep retrying", zap.String("requestID", reqId))
Expand Down Expand Up @@ -402,6 +421,7 @@ func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime t
// TODO: only send the query request itself and reassemble in this module
case pcq.channel <- pcq.req:
qLogger.Debug("forwarded query request to watcher", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chainID", pcq.req.Request.ChainId))
totalRequestsByChain.WithLabelValues(pcq.req.Request.ChainId.String()).Inc()
pcq.lastUpdateTime = receiveTime
default:
// By leaving lastUpdateTime unset, we will retry next interval.
Expand Down
4 changes: 4 additions & 0 deletions node/pkg/watchers/evm/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR
panic("ccqevm: invalid chain ID")
}

start := time.Now()

switch req := queryRequest.Request.Query.(type) {
case *query.EthCallQueryRequest:
w.ccqHandleEthCallQueryRequest(logger, ctx, queryRequest, req)
Expand All @@ -50,6 +52,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR
)
w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryFatalError)
}

query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds()))
}

// EvmCallData contains the details of a single query in the batch.
Expand Down
Loading