Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions: tracking subscriptions #10613

Merged
merged 19 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/scripts/functions/templates/oracle.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ requestTimeoutCheckFrequencySec = 10
requestTimeoutSec = 300
maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760]
maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152]
minimumSubscriptionBalance = "0.1 link"


[pluginConfig.OnchainAllowlist]
blockConfirmations = 1
Expand All @@ -43,6 +45,13 @@ maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_
updateFrequencySec = 30
updateTimeoutSec = 10

[pluginConfig.OnchainSubscriptions]
blockConfirmations = 1
contractAddress = "{{router_contract_address}}"
updateFrequencySec = 30
updateTimeoutSec = 10
updateRangeSize = 100

[pluginConfig.RateLimiter]
globalBurst = 5
globalRPS = 10
Expand Down
41 changes: 25 additions & 16 deletions core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"

"github.com/smartcontractkit/chainlink/v2/core/assets"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
Expand All @@ -21,31 +22,35 @@ import (
type functionsConnectorHandler struct {
utils.StartStopOnce

connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
nodeAddress string
storage s4.Storage
allowlist functions.OnchainAllowlist
rateLimiter *hc.RateLimiter
lggr logger.Logger
connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
nodeAddress string
storage s4.Storage
allowlist functions.OnchainAllowlist
rateLimiter *hc.RateLimiter
subscriptions functions.OnchainSubscriptions
minimumBalance assets.Link
lggr logger.Logger
}

var (
_ connector.Signer = &functionsConnectorHandler{}
_ connector.GatewayConnectorHandler = &functionsConnectorHandler{}
)

func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, lggr logger.Logger) (*functionsConnectorHandler, error) {
if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil {
return nil, fmt.Errorf("signerKey, storage, allowlist and rateLimiter must be non-nil")
func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) {
if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil {
return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil")
}
return &functionsConnectorHandler{
nodeAddress: nodeAddress,
signerKey: signerKey,
storage: storage,
allowlist: allowlist,
rateLimiter: rateLimiter,
lggr: lggr.Named("FunctionsConnectorHandler"),
nodeAddress: nodeAddress,
signerKey: signerKey,
storage: storage,
allowlist: allowlist,
rateLimiter: rateLimiter,
subscriptions: subscriptions,
minimumBalance: minimumBalance,
lggr: lggr.Named("FunctionsConnectorHandler"),
}, nil
}

Expand All @@ -68,6 +73,10 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga
h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr)
return
}
if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 {
h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumBalance)
return
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}

h.lggr.Debugw("handling gateway request", "id", gatewayId, "method", body.Method)

Expand Down
7 changes: 6 additions & 1 deletion core/services/functions/connector_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"math/big"
"testing"

"github.com/smartcontractkit/chainlink/v2/core/assets"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/functions"
Expand All @@ -31,10 +33,11 @@ func TestFunctionsConnectorHandler(t *testing.T) {
connector := gcmocks.NewGatewayConnector(t)
allowlist := gfmocks.NewOnchainAllowlist(t)
rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
subscriptions := gfmocks.NewOnchainSubscriptions(t)
require.NoError(t, err)
allowlist.On("Start", mock.Anything).Return(nil)
allowlist.On("Close", mock.Anything).Return(nil)
handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, logger)
handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, *assets.NewLinkFromJuels(0), logger)
require.NoError(t, err)

handler.SetConnector(connector)
Expand Down Expand Up @@ -73,6 +76,7 @@ func TestFunctionsConnectorHandler(t *testing.T) {
}
storage.On("List", ctx, addr).Return(snapshot, nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil)
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
Expand Down Expand Up @@ -129,6 +133,7 @@ func TestFunctionsConnectorHandler(t *testing.T) {

storage.On("Put", ctx, &key, &record, signature).Return(nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil)
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
Expand Down
38 changes: 33 additions & 5 deletions core/services/gateway/handlers/functions/handler.functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink/v2/core/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand Down Expand Up @@ -51,9 +53,12 @@ var (
)

type FunctionsHandlerConfig struct {
OnchainAllowlistChainID string `json:"onchainAllowlistChainId"`
ChainID string `json:"chainId"`
// Not specifying OnchainAllowlist config disables allowlist checks
OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"`
// Not specifying OnchainSubscriptions config disables minimum balance checks
OnchainSubscriptions *OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
MinimumSubscriptionBalance *assets.Link `json:"minimumSubscriptionBalance"`
// Not specifying RateLimiter config disables rate limiting
UserRateLimiter *hc.RateLimiterConfig `json:"userRateLimiter"`
NodeRateLimiter *hc.RateLimiterConfig `json:"nodeRateLimiter"`
Expand All @@ -69,6 +74,8 @@ type functionsHandler struct {
don handlers.DON
pendingRequests hc.RequestCache[PendingSecretsRequest]
allowlist OnchainAllowlist
subscriptions OnchainSubscriptions
minimumBalance *assets.Link
userRateLimiter *hc.RateLimiter
nodeRateLimiter *hc.RateLimiter
chStop utils.StopChan
Expand All @@ -93,13 +100,13 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
lggr = lggr.Named("FunctionsHandler:" + donConfig.DonId)
var allowlist OnchainAllowlist
if cfg.OnchainAllowlist != nil {
chain, err2 := legacyChains.Get(cfg.OnchainAllowlistChainID)
chain, err2 := legacyChains.Get(cfg.ChainID)
if err2 != nil {
return nil, err
return nil, err2
}
allowlist, err2 = NewOnchainAllowlist(chain.Client(), *cfg.OnchainAllowlist, lggr)
if err2 != nil {
return nil, err
return nil, err2
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}
}
var userRateLimiter, nodeRateLimiter *hc.RateLimiter
Expand All @@ -115,8 +122,19 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
return nil, err
}
}
var subscriptions OnchainSubscriptions
if cfg.OnchainSubscriptions != nil {
chain, err2 := legacyChains.Get(cfg.ChainID)
if err2 != nil {
return nil, err2
}
subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr)
if err2 != nil {
return nil, err2
}
}
pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests)
return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, lggr), nil
return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, cfg.MinimumSubscriptionBalance, userRateLimiter, nodeRateLimiter, lggr), nil
}

func NewFunctionsHandler(
Expand All @@ -125,6 +143,8 @@ func NewFunctionsHandler(
don handlers.DON,
pendingRequestsCache hc.RequestCache[PendingSecretsRequest],
allowlist OnchainAllowlist,
subscriptions OnchainSubscriptions,
minimumBalance *assets.Link,
userRateLimiter *hc.RateLimiter,
nodeRateLimiter *hc.RateLimiter,
lggr logger.Logger) handlers.Handler {
Expand All @@ -134,6 +154,8 @@ func NewFunctionsHandler(
don: don,
pendingRequests: pendingRequestsCache,
allowlist: allowlist,
subscriptions: subscriptions,
minimumBalance: minimumBalance,
userRateLimiter: userRateLimiter,
nodeRateLimiter: nodeRateLimiter,
chStop: make(utils.StopChan),
Expand All @@ -153,6 +175,12 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa
promHandlerError.WithLabelValues(h.donConfig.DonId, ErrRateLimited.Error()).Inc()
return ErrRateLimited
}
if h.subscriptions != nil && h.minimumBalance != nil {
if balance, err := h.subscriptions.GetMaxUserBalance(sender); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 {
h.lggr.Debug("received a message from a user having insufficient balance", "sender", msg.Body.Sender, "balance", balance.String())
return fmt.Errorf("sender has insufficient balance: %v juels", balance.String())
}
}
switch msg.Body.Method {
case MethodSecretsSet, MethodSecretsList:
return h.handleSecretsRequest(ctx, msg, callbackCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"crypto/ecdsa"
"encoding/json"
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/assets"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand All @@ -23,7 +25,7 @@ import (
handlers_mocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks"
)

func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist) {
func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist, *functions_mocks.OnchainSubscriptions) {
cfg := functions.FunctionsHandlerConfig{}
donConfig := &config.DONConfig{
Members: []config.NodeConfig{},
Expand All @@ -39,13 +41,15 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi

don := handlers_mocks.NewDON(t)
allowlist := functions_mocks.NewOnchainAllowlist(t)
subscriptions := functions_mocks.NewOnchainSubscriptions(t)
minBalance := assets.NewLinkFromJuels(100)
userRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
require.NoError(t, err)
nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
require.NoError(t, err)
pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000)
handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, logger.TestLogger(t))
return handler, don, allowlist
handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalance, userRateLimiter, nodeRateLimiter, logger.TestLogger(t))
return handler, don, allowlist, subscriptions
}

func newSignedMessage(t *testing.T, id string, method string, donId string, privateKey *ecdsa.PrivateKey) api.Message {
Expand Down Expand Up @@ -113,7 +117,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey)

callbachCh := make(chan handlers.UserCallbackPayload)
Expand All @@ -131,6 +135,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) {
}()

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil)
require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh))
sendNodeReponses(t, handler, userRequestMsg, nodes, test.nodeResults)
Expand All @@ -143,10 +148,11 @@ func TestFunctionsHandler_HandleUserMessage_InvalidMethod(t *testing.T) {
t.Parallel()

nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, _, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
handler, _, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
userRequestMsg := newSignedMessage(t, "1234", "secrets_reveal_all_please", "don_id", user.PrivateKey)

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
err := handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, make(chan handlers.UserCallbackPayload))
require.Error(t, err)
}
Expand All @@ -155,7 +161,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) {
t.Parallel()

nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10)
handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10)
userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey)

callbachCh := make(chan handlers.UserCallbackPayload)
Expand All @@ -169,6 +175,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) {
}()

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil)
require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh))
<-done
Expand Down
Loading