diff --git a/node/pkg/query/metrics.go b/node/pkg/query/metrics.go new file mode 100644 index 0000000000..c6ba5d11c8 --- /dev/null +++ b/node/pkg/query/metrics.go @@ -0,0 +1,69 @@ +package query + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + allQueryRequestsReceived = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_query_requests_received", + Help: "Total number of query requests received, valid and invalid", + }) + + validQueryRequestsReceived = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_valid_query_requests_received", + Help: "Total number of valid query requests received", + }) + + invalidQueryRequestReceived = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_guardian_invalid_query_requests_received_by_reason", + Help: "Total number of invalid query requests received by reason", + }, []string{"reason"}) + + totalRequestsByChain = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_requests_by_chain", + Help: "Total number of requests by chain", + }, []string{"chain_name"}) + + successfulQueryResponsesReceivedByChain = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_successful_query_responses_received_by_chain", + Help: "Total number of successful query responses received by chain", + }, []string{"chain_name"}) + + retryNeededQueryResponsesReceivedByChain = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_retry_needed_query_responses_received_by_chain", + Help: "Total number of retry needed query responses received by chain", + }, []string{"chain_name"}) + + fatalQueryResponsesReceivedByChain = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_fatal_query_responses_received_by_chain", + Help: "Total number of fatal query responses received by chain", + }, []string{"chain_name"}) + + queryResponsesPublished = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_query_responses_published", + Help: "Total number of query responses published", + }) + + queryRequestsTimedOut = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "ccq_guardian_total_query_requests_timed_out", + Help: "Total number of query requests that timed out", + }) + + TotalWatcherTime = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "ccq_guardian_total_watcher_query_time_in_ms", + Help: "Time from time spent in the watcher per query in ms by chain", + Buckets: []float64{1.0, 5.0, 10.0, 100.0, 250.0, 500.0, 1000.0, 5000.0, 10000.0, 30000.0}, + }, []string{"chain_name"}) +) diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 7ade48403d..1dfec8132b 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -154,6 +154,9 @@ func handleQueryRequestsImpl( delete(supportedChains, chainID) } else { logger.Info("queries supported on chain", zap.Stringer("chainID", chainID)) + + // Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled. + totalRequestsByChain.WithLabelValues(chainID.String()).Add(0) } } @@ -175,6 +178,7 @@ func handleQueryRequestsImpl( // - length check on "to" address 20 bytes // - valid "block" strings + allQueryRequestsReceived.Inc() requestID := hex.EncodeToString(signedRequest.Signature) digest := QueryRequestDigest(env, signedRequest.QueryRequest) @@ -183,6 +187,7 @@ func handleQueryRequestsImpl( signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature) if err != nil { qLogger.Error("failed to recover public key", zap.String("requestID", requestID)) + invalidQueryRequestReceived.WithLabelValues("failed_to_recover_public_key").Inc() continue } @@ -190,12 +195,14 @@ func handleQueryRequestsImpl( if _, exists := allowedRequestors[signerAddress]; !exists { qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID)) + invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc() continue } // Make sure this is not a duplicate request. TODO: Should we do something smarter here than just dropping the duplicate? if oldReq, exists := pendingQueries[requestID]; exists { qLogger.Warn("dropping duplicate query request", zap.String("requestID", requestID), zap.Stringer("origRecvTime", oldReq.receiveTime)) + invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc() continue } @@ -203,11 +210,13 @@ func handleQueryRequestsImpl( err = queryRequest.Unmarshal(signedRequest.QueryRequest) if err != nil { qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err)) + invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc() continue } if err := queryRequest.Validate(); err != nil { qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err)) + invalidQueryRequestReceived.WithLabelValues("invalid_request").Inc() continue } @@ -221,6 +230,7 @@ func handleQueryRequestsImpl( chainID := vaa.ChainID(pcq.ChainId) if _, exists := supportedChains[chainID]; !exists { qLogger.Error("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID)) + invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc() errorFound = true break } @@ -228,6 +238,7 @@ func handleQueryRequestsImpl( channel, channelExists := chainQueryReqC[chainID] if !channelExists { qLogger.Error("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID)) + invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc() errorFound = true break } @@ -246,6 +257,8 @@ func handleQueryRequestsImpl( continue } + validQueryRequestsReceived.Inc() + // Create the pending query and add it to the cache. pq := &pendingQuery{ signedRequest: signedRequest, @@ -264,6 +277,7 @@ func handleQueryRequestsImpl( case resp := <-queryResponseReadC: // Response from a watcher. if resp.Status == QuerySuccess { + successfulQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc() if resp.Response == nil { qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID)) continue @@ -315,18 +329,21 @@ func handleQueryRequestsImpl( select { case queryResponseWriteC <- respPub: qLogger.Info("forwarded query response to p2p", zap.String("requestID", resp.RequestID)) + queryResponsesPublished.Inc() delete(pendingQueries, resp.RequestID) default: qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID)) pq.respPub = respPub } } else if resp.Status == QueryRetryNeeded { + retryNeededQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc() if _, exists := pendingQueries[resp.RequestID]; exists { qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) } else { qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) } } else if resp.Status == QueryFatalError { + fatalQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc() qLogger.Error("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx)) delete(pendingQueries, resp.RequestID) } else { @@ -341,6 +358,7 @@ func handleQueryRequestsImpl( qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout)) if timeout.Before(now) { qLogger.Error("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime)) + queryRequestsTimedOut.Inc() delete(pendingQueries, reqId) } else { if pq.respPub != nil { @@ -348,6 +366,7 @@ func handleQueryRequestsImpl( select { case queryResponseWriteC <- pq.respPub: qLogger.Info("resend of query response to p2p succeeded", zap.String("requestID", reqId)) + queryResponsesPublished.Inc() delete(pendingQueries, reqId) default: qLogger.Warn("resend of query response to p2p failed again, will keep retrying", zap.String("requestID", reqId)) @@ -402,6 +421,7 @@ func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime t // TODO: only send the query request itself and reassemble in this module case pcq.channel <- pcq.req: qLogger.Debug("forwarded query request to watcher", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chainID", pcq.req.Request.ChainId)) + totalRequestsByChain.WithLabelValues(pcq.req.Request.ChainId.String()).Inc() pcq.lastUpdateTime = receiveTime default: // By leaving lastUpdateTime unset, we will retry next interval. diff --git a/node/pkg/watchers/evm/ccq.go b/node/pkg/watchers/evm/ccq.go index 4c63e31ab5..e42dce7bc1 100644 --- a/node/pkg/watchers/evm/ccq.go +++ b/node/pkg/watchers/evm/ccq.go @@ -37,6 +37,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR panic("ccqevm: invalid chain ID") } + start := time.Now() + switch req := queryRequest.Request.Query.(type) { case *query.EthCallQueryRequest: w.ccqHandleEthCallQueryRequest(logger, ctx, queryRequest, req) @@ -50,6 +52,8 @@ func (w *Watcher) ccqHandleQuery(logger *zap.Logger, ctx context.Context, queryR ) w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryFatalError) } + + query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds())) } // EvmCallData contains the details of a single query in the batch.