Skip to content

Commit

Permalink
[Gateway] Basic prom counters (#10659)
Browse files Browse the repository at this point in the history
1. Successful heartbeats to monitor connected nodes
2. All requests by response code
3. Functions-specific methods with success/failure counters
  • Loading branch information
bolekk authored Sep 15, 2023
1 parent 9f19654 commit 0659b05
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 8 deletions.
21 changes: 21 additions & 0 deletions core/services/gateway/api/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ const (
FatalError
)

func (e ErrorCode) String() string {
switch e {
case NoError:
return "NoError"
case UserMessageParseError:
return "UserMessageParseError"
case UnsupportedDONIdError:
return "UnsupportedDONIdError"
case HandlerError:
return "HandlerError"
case RequestTimeoutError:
return "RequestTimeoutError"
case NodeReponseEncodingError:
return "NodeReponseEncodingError"
case FatalError:
return "FatalError"
default:
return "UnknownError"
}
}

// See https://www.jsonrpc.org/specification#error_object
func ToJsonRPCErrorCode(errorCode ErrorCode) int {
gatewayErrorToJsonRPCError := map[ErrorCode]int{
Expand Down
8 changes: 8 additions & 0 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -23,6 +25,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var promHeartbeatsSent = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gateway_heartbeats_sent",
Help: "Metric to track the number of successful node heartbeates per DON",
}, []string{"don_id"})

// ConnectionManager holds all connections between Gateway and Nodes.
type ConnectionManager interface {
job.ServiceCtx
Expand Down Expand Up @@ -300,6 +307,7 @@ func (m *donConnectionManager) heartbeatLoop(intervalSec uint32) {
errorCount++
}
}
promHeartbeatsSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount))
m.lggr.Infow("sent heartbeat to nodes", "donID", m.donConfig.DonId, "errCount", errorCount)
}
}
Expand Down
11 changes: 11 additions & 0 deletions core/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (

"github.com/ethereum/go-ethereum/common"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
Expand All @@ -19,6 +22,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var promRequest = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_request",
Help: "Metric to track received requests and response codes",
}, []string{"response_code"})

type Gateway interface {
job.ServiceCtx
gw_net.HTTPRequestHandler
Expand Down Expand Up @@ -154,15 +162,18 @@ func (g *gateway) ProcessRequest(ctx context.Context, rawRequest []byte) (rawRes
if err != nil {
return newError(g.codec, msg.Body.MessageId, api.NodeReponseEncodingError, "")
}
promRequest.WithLabelValues(api.NoError.String()).Inc()
return rawResponse, api.ToHttpErrorCode(api.NoError)
}

func newError(codec api.Codec, id string, errCode api.ErrorCode, errMsg string) ([]byte, int) {
rawResponse, err := codec.EncodeNewErrorResponse(id, api.ToJsonRPCErrorCode(errCode), errMsg, nil)
if err != nil {
// we're not even able to encode a valid JSON response
promRequest.WithLabelValues(api.FatalError.String()).Inc()
return []byte("fatal error"), api.ToHttpErrorCode(api.FatalError)
}
promRequest.WithLabelValues(errCode.String()).Inc()
return rawResponse, api.ToHttpErrorCode(errCode)
}

Expand Down
68 changes: 60 additions & 8 deletions core/services/gateway/handlers/functions/handler.functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -17,6 +19,37 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
ErrNotAllowlisted = errors.New("sender not allowlisted")
ErrRateLimited = errors.New("rate-limited")
ErrUnsupportedMethod = errors.New("unsupported method")

promHandlerError = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_functions_handler_error",
Help: "Metric to track functions handler errors",
}, []string{"don_id", "error"})

promSecretsSetSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_functions_secrets_set_success",
Help: "Metric to track successful secrets_set calls",
}, []string{"don_id"})

promSecretsSetFailure = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_functions_secrets_set_failure",
Help: "Metric to track failed secrets_set calls",
}, []string{"don_id"})

promSecretsListSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_functions_secrets_list_success",
Help: "Metric to track successful secrets_list calls",
}, []string{"don_id"})

promSecretsListFailure = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gateway_functions_secrets_list_failure",
Help: "Metric to track failed secrets_list calls",
}, []string{"don_id"})
)

type FunctionsHandlerConfig struct {
OnchainAllowlistChainID string `json:"onchainAllowlistChainId"`
// Not specifying OnchainAllowlist config disables allowlist checks
Expand Down Expand Up @@ -112,18 +145,21 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa
sender := common.HexToAddress(msg.Body.Sender)
if h.allowlist != nil && !h.allowlist.Allow(sender) {
h.lggr.Debugw("received a message from a non-allowlisted address", "sender", msg.Body.Sender)
return errors.New("sender not allowlisted")
promHandlerError.WithLabelValues(h.donConfig.DonId, ErrNotAllowlisted.Error()).Inc()
return ErrNotAllowlisted
}
if h.userRateLimiter != nil && !h.userRateLimiter.Allow(msg.Body.Sender) {
h.lggr.Debug("rate-limited", "sender", msg.Body.Sender)
return errors.New("rate-limited")
h.lggr.Debugw("rate-limited", "sender", msg.Body.Sender)
promHandlerError.WithLabelValues(h.donConfig.DonId, ErrRateLimited.Error()).Inc()
return ErrRateLimited
}
switch msg.Body.Method {
case MethodSecretsSet, MethodSecretsList:
return h.handleSecretsRequest(ctx, msg, callbackCh)
default:
h.lggr.Debug("unsupported method", "method", msg.Body.Method)
return errors.New("unsupported method")
h.lggr.Debugw("unsupported method", "method", msg.Body.Method)
promHandlerError.WithLabelValues(h.donConfig.DonId, ErrUnsupportedMethod.Error()).Inc()
return ErrUnsupportedMethod
}
}

Expand All @@ -132,6 +168,7 @@ func (h *functionsHandler) handleSecretsRequest(ctx context.Context, msg *api.Me
err := h.pendingRequests.NewRequest(msg, callbackCh, &PendingSecretsRequest{request: msg, responses: make(map[string]*api.Message)})
if err != nil {
h.lggr.Warnw("handleSecretsRequest: error adding new request", "sender", msg.Body.Sender, "err", err)
promHandlerError.WithLabelValues(h.donConfig.DonId, err.Error()).Inc()
return err
}
// Send to all nodes.
Expand All @@ -147,15 +184,15 @@ func (h *functionsHandler) handleSecretsRequest(ctx context.Context, msg *api.Me
func (h *functionsHandler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error {
h.lggr.Debugw("HandleNodeMessage: processing message", "nodeAddr", nodeAddr, "receiver", msg.Body.Receiver, "id", msg.Body.MessageId)
if h.nodeRateLimiter != nil && !h.nodeRateLimiter.Allow(nodeAddr) {
h.lggr.Debug("rate-limited", "sender", nodeAddr)
h.lggr.Debugw("rate-limited", "sender", nodeAddr)
return errors.New("rate-limited")
}
switch msg.Body.Method {
case MethodSecretsSet, MethodSecretsList:
return h.pendingRequests.ProcessResponse(msg, h.processSecretsResponse)
default:
h.lggr.Debug("unsupported method", "method", msg.Body.Method)
return errors.New("unsupported method")
h.lggr.Debugw("unsupported method", "method", msg.Body.Method)
return ErrUnsupportedMethod
}
}

Expand Down Expand Up @@ -200,6 +237,21 @@ func newSecretsResponse(request *api.Message, success bool, responses []*api.Mes
if err != nil {
return nil, err
}

if request.Body.Method == MethodSecretsSet {
if success {
promSecretsSetSuccess.WithLabelValues(request.Body.DonId).Inc()
} else {
promSecretsSetFailure.WithLabelValues(request.Body.DonId).Inc()
}
} else if request.Body.Method == MethodSecretsList {
if success {
promSecretsListSuccess.WithLabelValues(request.Body.DonId).Inc()
} else {
promSecretsListFailure.WithLabelValues(request.Body.DonId).Inc()
}
}

userResponse := *request
userResponse.Body.Receiver = request.Body.Sender
userResponse.Body.Payload = payloadJson
Expand Down

0 comments on commit 0659b05

Please sign in to comment.