Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions: tracking subscriptions #10613

Merged
merged 19 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/scripts/functions/templates/oracle.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ requestTimeoutCheckFrequencySec = 10
requestTimeoutSec = 300
maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760]
maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152]
minimumSubscriptionBalanceLink = 0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated



[pluginConfig.OnchainAllowlist]
blockConfirmations = 1
Expand All @@ -43,6 +45,13 @@ maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_
updateFrequencySec = 30
updateTimeoutSec = 10

[pluginConfig.OnchainSubscriptions]
blockConfirmations = 1
contractAddress = "{{router_contract_address}}"
updateFrequencySec = 30
updateTimeoutSec = 10
updateRangeSize = 100

[pluginConfig.RateLimiter]
globalBurst = 5
globalRPS = 10
Expand Down
44 changes: 28 additions & 16 deletions core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"encoding/json"
"fmt"
"math/big"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand All @@ -21,31 +22,37 @@ import (
type functionsConnectorHandler struct {
utils.StartStopOnce

connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
nodeAddress string
storage s4.Storage
allowlist functions.OnchainAllowlist
rateLimiter *hc.RateLimiter
lggr logger.Logger
connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
nodeAddress string
storage s4.Storage
allowlist functions.OnchainAllowlist
rateLimiter *hc.RateLimiter
subscriptions functions.OnchainSubscriptions
minBalanceWei *big.Int
lggr logger.Logger
}

var (
_ connector.Signer = &functionsConnectorHandler{}
_ connector.GatewayConnectorHandler = &functionsConnectorHandler{}
)

func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, lggr logger.Logger) (*functionsConnectorHandler, error) {
if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil {
return nil, fmt.Errorf("signerKey, storage, allowlist and rateLimiter must be non-nil")
func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumBalance float64, lggr logger.Logger) (*functionsConnectorHandler, error) {
if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil {
return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil")
}
minBalanceWei := new(big.Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not Wei but Juels.

big.NewFloat(0).Mul(big.NewFloat(minimumBalance), big.NewFloat(1e18)).Int(minBalanceWei)
return &functionsConnectorHandler{
nodeAddress: nodeAddress,
signerKey: signerKey,
storage: storage,
allowlist: allowlist,
rateLimiter: rateLimiter,
lggr: lggr.Named("FunctionsConnectorHandler"),
nodeAddress: nodeAddress,
signerKey: signerKey,
storage: storage,
allowlist: allowlist,
rateLimiter: rateLimiter,
subscriptions: subscriptions,
minBalanceWei: minBalanceWei,
lggr: lggr.Named("FunctionsConnectorHandler"),
}, nil
}

Expand All @@ -69,6 +76,11 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga
return
}

if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minBalanceWei) < 0 {
h.lggr.Errorw("request is not backed with a funded subscription", "id", gatewayId, "address", fromAddr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also log user balance and min required balance.

return
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}

h.lggr.Debugw("handling gateway request", "id", gatewayId, "method", body.Method)

switch body.Method {
Expand Down
6 changes: 5 additions & 1 deletion core/services/functions/connector_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"math/big"
"testing"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -31,10 +32,11 @@ func TestFunctionsConnectorHandler(t *testing.T) {
connector := gcmocks.NewGatewayConnector(t)
allowlist := gfmocks.NewOnchainAllowlist(t)
rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
subscriptions := gfmocks.NewOnchainSubscriptions(t)
require.NoError(t, err)
allowlist.On("Start", mock.Anything).Return(nil)
allowlist.On("Close", mock.Anything).Return(nil)
handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, logger)
handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, 0.0, logger)
require.NoError(t, err)

handler.SetConnector(connector)
Expand Down Expand Up @@ -73,6 +75,7 @@ func TestFunctionsConnectorHandler(t *testing.T) {
}
storage.On("List", ctx, addr).Return(snapshot, nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil)
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
Expand Down Expand Up @@ -129,6 +132,7 @@ func TestFunctionsConnectorHandler(t *testing.T) {

storage.On("Put", ctx, &key, &record, signature).Return(nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil)
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
Expand Down
74 changes: 52 additions & 22 deletions core/services/gateway/handlers/functions/handler.functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -20,7 +21,11 @@ import (
type FunctionsHandlerConfig struct {
OnchainAllowlistChainID string `json:"onchainAllowlistChainId"`
// Not specifying OnchainAllowlist config disables allowlist checks
OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainSubscriptionsChainID string `json:"onchainSubscriptionsChainId"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have a single chainID config field for both the allowlist and subscriptions. Maybe rename the existing one to just "ChainId"?

// Not specifying OnchainSubscriptions config disables minimum balance checks
OnchainSubscriptions *OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
MinimumSubscriptionBalanceLink float64 `json:"minimumSubscriptionBalanceLink"`
// Not specifying RateLimiter config disables rate limiting
UserRateLimiter *hc.RateLimiterConfig `json:"userRateLimiter"`
NodeRateLimiter *hc.RateLimiterConfig `json:"nodeRateLimiter"`
Expand All @@ -31,15 +36,17 @@ type FunctionsHandlerConfig struct {
type functionsHandler struct {
utils.StartStopOnce

handlerConfig FunctionsHandlerConfig
donConfig *config.DONConfig
don handlers.DON
pendingRequests hc.RequestCache[PendingSecretsRequest]
allowlist OnchainAllowlist
userRateLimiter *hc.RateLimiter
nodeRateLimiter *hc.RateLimiter
chStop utils.StopChan
lggr logger.Logger
handlerConfig FunctionsHandlerConfig
donConfig *config.DONConfig
don handlers.DON
pendingRequests hc.RequestCache[PendingSecretsRequest]
allowlist OnchainAllowlist
subscriptions OnchainSubscriptions
minimumBalanceWei *big.Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Juels

userRateLimiter *hc.RateLimiter
nodeRateLimiter *hc.RateLimiter
chStop utils.StopChan
lggr logger.Logger
}

type PendingSecretsRequest struct {
Expand All @@ -62,11 +69,11 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
if cfg.OnchainAllowlist != nil {
chain, err2 := legacyChains.Get(cfg.OnchainAllowlistChainID)
if err2 != nil {
return nil, err
return nil, err2
}
allowlist, err2 = NewOnchainAllowlist(chain.Client(), *cfg.OnchainAllowlist, lggr)
if err2 != nil {
return nil, err
return nil, err2
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}
}
var userRateLimiter, nodeRateLimiter *hc.RateLimiter
Expand All @@ -82,8 +89,21 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
return nil, err
}
}
var subscriptions OnchainSubscriptions
if cfg.OnchainSubscriptions != nil {
chain, err2 := legacyChains.Get(cfg.OnchainSubscriptionsChainID)
if err2 != nil {
return nil, err2
}
subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr)
if err2 != nil {
return nil, err2
}
}
minimumBalanceWei := new(big.Int)
big.NewFloat(0).Mul(big.NewFloat(cfg.MinimumSubscriptionBalanceLink), big.NewFloat(1e18)).Int(minimumBalanceWei)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have Juels in config? I'm worried about precision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to what we use in other cases like this: assets.Link.

pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests)
return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, lggr), nil
return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minimumBalanceWei, userRateLimiter, nodeRateLimiter, lggr), nil
}

func NewFunctionsHandler(
Expand All @@ -92,19 +112,23 @@ func NewFunctionsHandler(
don handlers.DON,
pendingRequestsCache hc.RequestCache[PendingSecretsRequest],
allowlist OnchainAllowlist,
subscriptions OnchainSubscriptions,
minimumBalanceWei *big.Int,
userRateLimiter *hc.RateLimiter,
nodeRateLimiter *hc.RateLimiter,
lggr logger.Logger) handlers.Handler {
return &functionsHandler{
handlerConfig: cfg,
donConfig: donConfig,
don: don,
pendingRequests: pendingRequestsCache,
allowlist: allowlist,
userRateLimiter: userRateLimiter,
nodeRateLimiter: nodeRateLimiter,
chStop: make(utils.StopChan),
lggr: lggr,
handlerConfig: cfg,
donConfig: donConfig,
don: don,
pendingRequests: pendingRequestsCache,
allowlist: allowlist,
subscriptions: subscriptions,
minimumBalanceWei: minimumBalanceWei,
userRateLimiter: userRateLimiter,
nodeRateLimiter: nodeRateLimiter,
chStop: make(utils.StopChan),
lggr: lggr,
}
}

Expand All @@ -118,6 +142,12 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa
h.lggr.Debug("rate-limited", "sender", msg.Body.Sender)
return errors.New("rate-limited")
}
if h.subscriptions != nil {
if balance, err := h.subscriptions.GetMaxUserBalance(sender); err != nil || balance.Cmp(h.minimumBalanceWei) < 0 {
h.lggr.Debug("received a message from a user having insufficient balance", "sender", msg.Body.Sender, "balance", balance.String())
return errors.New("sender has insufficient balance")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the message to "insufficient subscription balance: [balance] [unit]"

}
}
switch msg.Body.Method {
case MethodSecretsSet, MethodSecretsList:
return h.handleSecretsRequest(ctx, msg, callbackCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"encoding/json"
"fmt"
"math/big"
"testing"
"time"

Expand All @@ -23,7 +24,7 @@ import (
handlers_mocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks"
)

func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist) {
func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist, *functions_mocks.OnchainSubscriptions) {
cfg := functions.FunctionsHandlerConfig{}
donConfig := &config.DONConfig{
Members: []config.NodeConfig{},
Expand All @@ -39,13 +40,15 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi

don := handlers_mocks.NewDON(t)
allowlist := functions_mocks.NewOnchainAllowlist(t)
subscriptions := functions_mocks.NewOnchainSubscriptions(t)
minBalanceWei := big.NewInt(100)
userRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
require.NoError(t, err)
nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
require.NoError(t, err)
pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000)
handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, logger.TestLogger(t))
return handler, don, allowlist
handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalanceWei, userRateLimiter, nodeRateLimiter, logger.TestLogger(t))
return handler, don, allowlist, subscriptions
}

func newSignedMessage(t *testing.T, id string, method string, donId string, privateKey *ecdsa.PrivateKey) api.Message {
Expand Down Expand Up @@ -113,7 +116,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey)

callbachCh := make(chan handlers.UserCallbackPayload)
Expand All @@ -131,6 +134,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) {
}()

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil)
require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh))
sendNodeReponses(t, handler, userRequestMsg, nodes, test.nodeResults)
Expand All @@ -143,10 +147,11 @@ func TestFunctionsHandler_HandleUserMessage_InvalidMethod(t *testing.T) {
t.Parallel()

nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, _, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
handler, _, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24)
userRequestMsg := newSignedMessage(t, "1234", "secrets_reveal_all_please", "don_id", user.PrivateKey)

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
err := handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, make(chan handlers.UserCallbackPayload))
require.Error(t, err)
}
Expand All @@ -155,7 +160,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) {
t.Parallel()

nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0]
handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10)
handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10)
userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey)

callbachCh := make(chan handlers.UserCallbackPayload)
Expand All @@ -169,6 +174,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) {
}()

allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil)
subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil)
don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil)
require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh))
<-done
Expand Down
Loading
Loading