From a334b436abf5ab6b75cd39d89ff004c2780817ee Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Wed, 13 Sep 2023 12:45:46 +0300 Subject: [PATCH 01/12] Functions: tracking subscriptions --- core/scripts/functions/templates/oracle.toml | 7 + core/services/functions/connector_handler.go | 38 ++-- .../functions/connector_handler_test.go | 10 +- .../functions/mocks/onchain_subscriptions.go | 77 +++++++ .../handlers/functions/subscriptions.go | 192 ++++++++++++++++++ .../handlers/functions/subscriptions_test.go | 63 ++++++ .../ocr2/plugins/functions/config/config.go | 43 ++-- .../services/ocr2/plugins/functions/plugin.go | 12 +- .../ocr2/plugins/functions/plugin_test.go | 6 +- 9 files changed, 404 insertions(+), 44 deletions(-) create mode 100644 core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go create mode 100644 core/services/gateway/handlers/functions/subscriptions.go create mode 100644 core/services/gateway/handlers/functions/subscriptions_test.go diff --git a/core/scripts/functions/templates/oracle.toml b/core/scripts/functions/templates/oracle.toml index 208637b50fc..5d090fd186b 100644 --- a/core/scripts/functions/templates/oracle.toml +++ b/core/scripts/functions/templates/oracle.toml @@ -43,6 +43,13 @@ maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_ updateFrequencySec = 30 updateTimeoutSec = 10 + [pluginConfig.OnchainSubscriptions] + blockConfirmations = 1 + routerAddress = "{{router_contract_address}}" + queryFrequencySec = 30 + queryTimeoutSec = 10 + queryRangeSize = 100 + [pluginConfig.RateLimiter] globalBurst = 5 globalRPS = 10 diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 31d47716c0d..54e511f8799 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -21,13 +21,14 @@ import ( type functionsConnectorHandler struct { utils.StartStopOnce - connector connector.GatewayConnector - signerKey *ecdsa.PrivateKey - nodeAddress string - storage s4.Storage - allowlist functions.OnchainAllowlist - rateLimiter *hc.RateLimiter - lggr logger.Logger + connector connector.GatewayConnector + signerKey *ecdsa.PrivateKey + nodeAddress string + storage s4.Storage + allowlist functions.OnchainAllowlist + subscriptions functions.OnchainSubscriptions + rateLimiter *hc.RateLimiter + lggr logger.Logger } var ( @@ -35,17 +36,18 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) -func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, lggr logger.Logger) (*functionsConnectorHandler, error) { - if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil { - return nil, fmt.Errorf("signerKey, storage, allowlist and rateLimiter must be non-nil") +func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, lggr logger.Logger) (*functionsConnectorHandler, error) { + if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { + return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } return &functionsConnectorHandler{ - nodeAddress: nodeAddress, - signerKey: signerKey, - storage: storage, - allowlist: allowlist, - rateLimiter: rateLimiter, - lggr: lggr.Named("FunctionsConnectorHandler"), + nodeAddress: nodeAddress, + signerKey: signerKey, + storage: storage, + allowlist: allowlist, + rateLimiter: rateLimiter, + subscriptions: subscriptions, + lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -68,6 +70,10 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } + if h.subscriptions.GetSubscription(fromAddr) == nil { + h.lggr.Errorw("request is not backed with a valid subscription", "id", gatewayId, "address", fromAddr) + return + } h.lggr.Debugw("handling gateway request", "id", gatewayId, "method", body.Method) diff --git a/core/services/functions/connector_handler_test.go b/core/services/functions/connector_handler_test.go index 99b0cba0a28..d42b8d7dfff 100644 --- a/core/services/functions/connector_handler_test.go +++ b/core/services/functions/connector_handler_test.go @@ -6,6 +6,7 @@ import ( "errors" "testing" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/functions" @@ -31,10 +32,11 @@ func TestFunctionsConnectorHandler(t *testing.T) { connector := gcmocks.NewGatewayConnector(t) allowlist := gfmocks.NewOnchainAllowlist(t) rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) + subscriptions := gfmocks.NewOnchainSubscriptions(t) require.NoError(t, err) allowlist.On("Start", mock.Anything).Return(nil) allowlist.On("Close", mock.Anything).Return(nil) - handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, logger) + handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, logger) require.NoError(t, err) handler.SetConnector(connector) @@ -73,6 +75,9 @@ func TestFunctionsConnectorHandler(t *testing.T) { } storage.On("List", ctx, addr).Return(snapshot, nil).Once() allowlist.On("Allow", addr).Return(true).Once() + subscriptions.On("GetSubscription", mock.Anything).Return( + &functions_router.IFunctionsSubscriptionsSubscription{}, + ) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) @@ -129,6 +134,9 @@ func TestFunctionsConnectorHandler(t *testing.T) { storage.On("Put", ctx, &key, &record, signature).Return(nil).Once() allowlist.On("Allow", addr).Return(true).Once() + subscriptions.On("GetSubscription", mock.Anything).Return( + &functions_router.IFunctionsSubscriptionsSubscription{}, + ) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) diff --git a/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go b/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go new file mode 100644 index 00000000000..1d4ad250b1b --- /dev/null +++ b/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go @@ -0,0 +1,77 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + common "github.com/ethereum/go-ethereum/common" + + functions_router "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + + mock "github.com/stretchr/testify/mock" +) + +// OnchainSubscriptions is an autogenerated mock type for the OnchainSubscriptions type +type OnchainSubscriptions struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *OnchainSubscriptions) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetSubscription provides a mock function with given fields: _a0 +func (_m *OnchainSubscriptions) GetSubscription(_a0 common.Address) *functions_router.IFunctionsSubscriptionsSubscription { + ret := _m.Called(_a0) + + var r0 *functions_router.IFunctionsSubscriptionsSubscription + if rf, ok := ret.Get(0).(func(common.Address) *functions_router.IFunctionsSubscriptionsSubscription); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*functions_router.IFunctionsSubscriptionsSubscription) + } + } + + return r0 +} + +// Start provides a mock function with given fields: _a0 +func (_m *OnchainSubscriptions) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewOnchainSubscriptions interface { + mock.TestingT + Cleanup(func()) +} + +// NewOnchainSubscriptions creates a new instance of OnchainSubscriptions. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewOnchainSubscriptions(t mockConstructorTestingTNewOnchainSubscriptions) *OnchainSubscriptions { + mock := &OnchainSubscriptions{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go new file mode 100644 index 00000000000..e31c449ddbe --- /dev/null +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -0,0 +1,192 @@ +package functions + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type OnchainSubscriptionsConfig struct { + RouterAddress common.Address `json:"routerAddress"` + BlockConfirmations uint `json:"blockConfirmations"` + QueryFrequencySec uint `json:"queryFrequencySec"` + QueryTimeoutSec uint `json:"queryTimeoutSec"` + QueryRangeSize uint `json:"queryRangeSize"` +} + +// OnchainSubscriptions maintains a mirror of all subscriptions fetched from the blockchain (EVM-only). +// All methods are thread-safe. +// +//go:generate mockery --quiet --name OnchainSubscriptions --output ./mocks/ --case=underscore +type OnchainSubscriptions interface { + job.ServiceCtx + + // GetSubscription returns a subscription for the given user address, or null if not found + GetSubscription(common.Address) *functions_router.IFunctionsSubscriptionsSubscription +} + +type onchainSubscriptions struct { + utils.StartStopOnce + + config OnchainSubscriptionsConfig + subscriptions map[common.Address]*functions_router.IFunctionsSubscriptionsSubscription + client evmclient.Client + router *functions_router.FunctionsRouter + blockConfirmations *big.Int + lggr logger.Logger + closeWait sync.WaitGroup + rwMutex sync.RWMutex + stopCh utils.StopChan +} + +func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, lggr logger.Logger) (OnchainSubscriptions, error) { + if client == nil { + return nil, errors.New("client is nil") + } + if lggr == nil { + return nil, errors.New("logger is nil") + } + router, err := functions_router.NewFunctionsRouter(config.RouterAddress, client) + if err != nil { + return nil, fmt.Errorf("unexpected error during functions_router.NewFunctionsRouter: %s", err) + } + return &onchainSubscriptions{ + config: config, + subscriptions: make(map[common.Address]*functions_router.IFunctionsSubscriptionsSubscription), + client: client, + router: router, + blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), + lggr: lggr.Named("OnchainSubscriptions"), + stopCh: make(utils.StopChan), + }, nil +} + +func (s *onchainSubscriptions) Start(ctx context.Context) error { + return s.StartOnce("OnchainSubscriptions", func() error { + s.lggr.Info("starting onchain subscriptions") + if s.config.QueryFrequencySec == 0 { + return errors.New("OnchainSubscriptionsConfig.UpdateFrequencySec must be greater than 0") + } + if s.config.QueryTimeoutSec == 0 { + return errors.New("OnchainSubscriptionsConfig.UpdateTimeoutSec must be greater than 0") + } + if s.config.QueryRangeSize == 0 { + return errors.New("OnchainSubscriptionsConfig.QueryRangeSize must be greater than 0") + } + + s.closeWait.Add(1) + go s.queryLoop() + + return nil + }) +} + +func (s *onchainSubscriptions) Close() error { + return s.StopOnce("OnchainSubscriptions", func() (err error) { + s.lggr.Info("closing onchain subscriptions") + close(s.stopCh) + s.closeWait.Wait() + return nil + }) +} + +func (s *onchainSubscriptions) GetSubscription(address common.Address) *functions_router.IFunctionsSubscriptionsSubscription { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + subscription, ok := s.subscriptions[address] + if !ok { + return nil + } + return subscription +} + +func (s *onchainSubscriptions) queryLoop() { + defer s.closeWait.Done() + + ticker := time.NewTicker(time.Duration(s.config.QueryFrequencySec) * time.Second) + defer ticker.Stop() + + var start uint64 = 1 + + for { + select { + case <-s.stopCh: + return + case <-ticker.C: + ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.QueryTimeoutSec)*time.Second) + + latestBlockHeight, err := s.client.LatestBlockHeight(ctx) + if err != nil || latestBlockHeight == nil { + cancel() + s.lggr.Errorw("Error calling LatestBlockHeight", "err", err, "latestBlockHeight", latestBlockHeight.Int64()) + continue + } + + blockNumber := big.NewInt(0).Sub(latestBlockHeight, s.blockConfirmations) + + count, err := s.getSubscriptionsCount(ctx, blockNumber) + if err != nil { + cancel() + s.lggr.Errorw("Error getting subscriptions count", "err", err) + continue + } + if count == 0 { + cancel() + s.lggr.Info("Router has no subscriptions yet") + continue + } + + end := start + uint64(s.config.QueryRangeSize) + if end > count { + end = count + } + if err := s.querySubscriptionsRange(ctx, blockNumber, start, end); err != nil { + cancel() + s.lggr.Errorw("Error querying subscriptions", "err", err, "start", start, "end", end) + continue + } + } + } +} + +func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, blockNumber *big.Int, start, end uint64) error { + subscriptions, err := s.router.GetSubscriptionsInRange(&bind.CallOpts{ + Pending: false, + BlockNumber: blockNumber, + Context: ctx, + }, start, end) + if err != nil { + return errors.Wrap(err, "unexpected error during functions_router.GetSubscriptionsInRange") + } + + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + for _, subscription := range subscriptions { + if subscription.Owner == utils.ZeroAddress { + continue + } + s.subscriptions[subscription.Owner] = &subscription + } + + return nil +} + +func (s *onchainSubscriptions) getSubscriptionsCount(ctx context.Context, blockNumber *big.Int) (uint64, error) { + return s.router.GetSubscriptionCount(&bind.CallOpts{ + Pending: false, + BlockNumber: blockNumber, + Context: ctx, + }) +} diff --git a/core/services/gateway/handlers/functions/subscriptions_test.go b/core/services/gateway/handlers/functions/subscriptions_test.go new file mode 100644 index 00000000000..b09cd0c67ad --- /dev/null +++ b/core/services/gateway/handlers/functions/subscriptions_test.go @@ -0,0 +1,63 @@ +package functions_test + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" +) + +const ( + validUser = "0x9ED925d8206a4f88a2f643b28B3035B315753Cd6" + invalidUser = "0x6E2dc0F9DB014aE19888F539E59285D2Ea04244C" +) + +func TestSubscriptions(t *testing.T) { + t.Parallel() + + getSubscriptionCount := hexutil.MustDecode("0x0000000000000000000000000000000000000000000000000000000000000003") + getSubscriptionsInRange := hexutil.MustDecode("0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000240000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000109e6e1b12098cc8f3a1e9719a817ec53ab9b35c000000000000000000000000000000000000000000000000000034e23f515cb0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f5340f0968ee8b7dfd97e3327a6139273cc2c4fa000000000000000000000000000000000000000000000001158e460913d000000000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001bc14b92364c75e20000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000005439e5881a529f3ccbffc0e82d49f9db3950aefe") + + ctx := testutils.Context(t) + client := mocks.NewClient(t) + client.On("LatestBlockHeight", mock.Anything).Return(big.NewInt(42), nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getSubscriptionCount + To: &common.Address{}, + Data: hexutil.MustDecode("0x66419970"), + }, mock.Anything).Return(getSubscriptionCount, nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // GetSubscriptionsInRange + To: &common.Address{}, + Data: hexutil.MustDecode("0xec2454e500000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000003"), + }, mock.Anything).Return(getSubscriptionsInRange, nil) + config := functions.OnchainSubscriptionsConfig{ + RouterAddress: common.Address{}, + BlockConfirmations: 1, + QueryFrequencySec: 1, + QueryTimeoutSec: 1, + QueryRangeSize: 10, + } + subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + require.NoError(t, err) + + err = subscriptions.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, subscriptions.Close()) + }) + + gomega.NewGomegaWithT(t).Eventually(func() bool { + return subscriptions.GetSubscription(common.HexToAddress(validUser)) != nil && subscriptions.GetSubscription(common.HexToAddress(invalidUser)) == nil + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) +} diff --git a/core/services/ocr2/plugins/functions/config/config.go b/core/services/ocr2/plugins/functions/config/config.go index d373de57376..a4bc6f782ae 100644 --- a/core/services/ocr2/plugins/functions/config/config.go +++ b/core/services/ocr2/plugins/functions/config/config.go @@ -19,27 +19,28 @@ import ( // This config is part of the job spec and is loaded only once on node boot/job creation. type PluginConfig struct { - EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"` - DONID string `json:"donID"` - ContractVersion uint32 `json:"contractVersion"` - MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"` - RequestTimeoutSec uint32 `json:"requestTimeoutSec"` - RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"` - RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"` - PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"` - PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"` - PruneBatchSize uint32 `json:"pruneBatchSize"` - ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"` - ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"` - ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"` - MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"` - MaxRequestSizesList []uint32 `json:"maxRequestSizesList"` - MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"` - GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"` - OnchainAllowlist *functions.OnchainAllowlistConfig `json:"onchainAllowlist"` - RateLimiter *common.RateLimiterConfig `json:"rateLimiter"` - S4Constraints *s4.Constraints `json:"s4Constraints"` - DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"` + EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"` + DONID string `json:"donID"` + ContractVersion uint32 `json:"contractVersion"` + MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"` + RequestTimeoutSec uint32 `json:"requestTimeoutSec"` + RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"` + RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"` + PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"` + PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"` + PruneBatchSize uint32 `json:"pruneBatchSize"` + ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"` + ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"` + ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"` + MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"` + MaxRequestSizesList []uint32 `json:"maxRequestSizesList"` + MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"` + GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"` + OnchainAllowlist *functions.OnchainAllowlistConfig `json:"onchainAllowlist"` + OnchainSubscriptions *functions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"` + RateLimiter *common.RateLimiterConfig `json:"rateLimiter"` + S4Constraints *s4.Constraints `json:"s4Constraints"` + DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"` } type DecryptionQueueConfig struct { diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 3a13c0caba8..563f6e821b8 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -133,14 +133,18 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs if pluginConfig.GatewayConnectorConfig != nil && s4Storage != nil && pluginConfig.OnchainAllowlist != nil && pluginConfig.RateLimiter != nil { allowlist, err2 := gwFunctions.NewOnchainAllowlist(conf.Chain.Client(), *pluginConfig.OnchainAllowlist, conf.Logger) if err2 != nil { - return nil, errors.Wrap(err, "failed to call NewOnchainAllowlist while creating a Functions Reporting Plugin") + return nil, errors.Wrap(err, "failed to create OnchainAllowlist") } rateLimiter, err2 := hc.NewRateLimiter(*pluginConfig.RateLimiter) if err2 != nil { return nil, errors.Wrap(err, "failed to create a RateLimiter") } + subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, conf.Logger) + if err2 != nil { + return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") + } connectorLogger := conf.Logger.Named("GatewayConnector").With("jobName", conf.Job.PipelineSpec.JobName) - connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, connectorLogger) + connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, subscriptions, connectorLogger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a GatewayConnector") } @@ -167,7 +171,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs return allServices, nil } -func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, lggr logger.Logger) (connector.GatewayConnector, error) { +func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions gwFunctions.OnchainSubscriptions, lggr logger.Logger) (connector.GatewayConnector, error) { enabledKeys, err := ethKeystore.EnabledKeysForChain(chainID) if err != nil { return nil, err @@ -180,7 +184,7 @@ func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, c signerKey := enabledKeys[idx].ToEcdsaPrivKey() nodeAddress := enabledKeys[idx].ID() - handler, err := functions.NewFunctionsConnectorHandler(nodeAddress, signerKey, s4Storage, allowlist, rateLimiter, lggr) + handler, err := functions.NewFunctionsConnectorHandler(nodeAddress, signerKey, s4Storage, allowlist, rateLimiter, subscriptions, lggr) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/functions/plugin_test.go b/core/services/ocr2/plugins/functions/plugin_test.go index 6b1e6f6e4c8..540b5413b04 100644 --- a/core/services/ocr2/plugins/functions/plugin_test.go +++ b/core/services/ocr2/plugins/functions/plugin_test.go @@ -31,10 +31,11 @@ func TestNewConnector_Success(t *testing.T) { ethKeystore := ksmocks.NewEth(t) s4Storage := s4mocks.NewStorage(t) allowlist := gfmocks.NewOnchainAllowlist(t) + subscriptions := gfmocks.NewOnchainSubscriptions(t) rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{keyV2}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, logger.TestLogger(t)) require.NoError(t, err) } @@ -53,9 +54,10 @@ func TestNewConnector_NoKeyForConfiguredAddress(t *testing.T) { ethKeystore := ksmocks.NewEth(t) s4Storage := s4mocks.NewStorage(t) allowlist := gfmocks.NewOnchainAllowlist(t) + subscriptions := gfmocks.NewOnchainSubscriptions(t) rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{{Address: common.HexToAddress(addresses[1])}}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, logger.TestLogger(t)) require.Error(t, err) } From 1af842021ca30641179041f49bb53838d4f7ab2b Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Wed, 13 Sep 2023 12:59:21 +0300 Subject: [PATCH 02/12] Fixing linter issue --- core/services/gateway/handlers/functions/subscriptions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index e31c449ddbe..5aa517b6bc5 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -177,6 +177,7 @@ func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, bloc if subscription.Owner == utils.ZeroAddress { continue } + subscription := subscription s.subscriptions[subscription.Owner] = &subscription } From d55b93e4d144781a38a234d4920611ae360557d8 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Wed, 13 Sep 2023 13:26:59 +0300 Subject: [PATCH 03/12] Bugfixing --- .../handlers/functions/subscriptions.go | 71 ++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index 5aa517b6bc5..87b007ab8d8 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -120,43 +120,50 @@ func (s *onchainSubscriptions) queryLoop() { var start uint64 = 1 + queryFunc := func() { + ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.QueryTimeoutSec)*time.Second) + defer cancel() + + latestBlockHeight, err := s.client.LatestBlockHeight(ctx) + if err != nil || latestBlockHeight == nil { + s.lggr.Errorw("Error calling LatestBlockHeight", "err", err, "latestBlockHeight", latestBlockHeight.Int64()) + return + } + + blockNumber := big.NewInt(0).Sub(latestBlockHeight, s.blockConfirmations) + + count, err := s.getSubscriptionsCount(ctx, blockNumber) + if err != nil { + s.lggr.Errorw("Error getting subscriptions count", "err", err) + return + } + if count == 0 { + s.lggr.Info("Router has no subscriptions yet") + return + } + + if start > count { + start = 1 + } + + end := start + uint64(s.config.QueryRangeSize) + if end > count { + end = count + } + if err := s.querySubscriptionsRange(ctx, blockNumber, start, end); err != nil { + s.lggr.Errorw("Error querying subscriptions", "err", err, "start", start, "end", end) + return + } + + start = end + 1 + } + for { select { case <-s.stopCh: return case <-ticker.C: - ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.QueryTimeoutSec)*time.Second) - - latestBlockHeight, err := s.client.LatestBlockHeight(ctx) - if err != nil || latestBlockHeight == nil { - cancel() - s.lggr.Errorw("Error calling LatestBlockHeight", "err", err, "latestBlockHeight", latestBlockHeight.Int64()) - continue - } - - blockNumber := big.NewInt(0).Sub(latestBlockHeight, s.blockConfirmations) - - count, err := s.getSubscriptionsCount(ctx, blockNumber) - if err != nil { - cancel() - s.lggr.Errorw("Error getting subscriptions count", "err", err) - continue - } - if count == 0 { - cancel() - s.lggr.Info("Router has no subscriptions yet") - continue - } - - end := start + uint64(s.config.QueryRangeSize) - if end > count { - end = count - } - if err := s.querySubscriptionsRange(ctx, blockNumber, start, end); err != nil { - cancel() - s.lggr.Errorw("Error querying subscriptions", "err", err, "start", start, "end", end) - continue - } + queryFunc() } } } From 4530867113396d777610d7e57b1ff49121f87fbf Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 14 Sep 2023 16:51:46 +0300 Subject: [PATCH 04/12] Reworking solution --- core/scripts/functions/templates/oracle.toml | 8 +- .../handlers/functions/subscriptions.go | 50 ++++++------ .../handlers/functions/subscriptions_test.go | 13 ++-- .../handlers/functions/user_subscriptions.go | 68 ++++++++++++++++ .../functions/user_subscriptions_test.go | 77 +++++++++++++++++++ 5 files changed, 180 insertions(+), 36 deletions(-) create mode 100644 core/services/gateway/handlers/functions/user_subscriptions.go create mode 100644 core/services/gateway/handlers/functions/user_subscriptions_test.go diff --git a/core/scripts/functions/templates/oracle.toml b/core/scripts/functions/templates/oracle.toml index 5d090fd186b..665395bdf4b 100644 --- a/core/scripts/functions/templates/oracle.toml +++ b/core/scripts/functions/templates/oracle.toml @@ -45,10 +45,10 @@ maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_ [pluginConfig.OnchainSubscriptions] blockConfirmations = 1 - routerAddress = "{{router_contract_address}}" - queryFrequencySec = 30 - queryTimeoutSec = 10 - queryRangeSize = 100 + contractAddress = "{{router_contract_address}}" + updateFrequencySec = 30 + updateTimeoutSec = 10 + updateRangeSize = 100 [pluginConfig.RateLimiter] globalBurst = 5 diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index 87b007ab8d8..5ed92128d54 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -19,11 +19,11 @@ import ( ) type OnchainSubscriptionsConfig struct { - RouterAddress common.Address `json:"routerAddress"` + ContractAddress common.Address `json:"contractAddress"` BlockConfirmations uint `json:"blockConfirmations"` - QueryFrequencySec uint `json:"queryFrequencySec"` - QueryTimeoutSec uint `json:"queryTimeoutSec"` - QueryRangeSize uint `json:"queryRangeSize"` + UpdateFrequencySec uint `json:"updateFrequencySec"` + UpdateTimeoutSec uint `json:"updateTimeoutSec"` + UpdateRangeSize uint `json:"updateRangeSize"` } // OnchainSubscriptions maintains a mirror of all subscriptions fetched from the blockchain (EVM-only). @@ -33,15 +33,15 @@ type OnchainSubscriptionsConfig struct { type OnchainSubscriptions interface { job.ServiceCtx - // GetSubscription returns a subscription for the given user address, or null if not found - GetSubscription(common.Address) *functions_router.IFunctionsSubscriptionsSubscription + // GetMaxUserBalance returns a maximum subscription balance, or error if user has no subscriptions. + GetMaxUserBalance(common.Address) (*big.Int, error) } type onchainSubscriptions struct { utils.StartStopOnce config OnchainSubscriptionsConfig - subscriptions map[common.Address]*functions_router.IFunctionsSubscriptionsSubscription + subscriptions UserSubscriptions client evmclient.Client router *functions_router.FunctionsRouter blockConfirmations *big.Int @@ -58,13 +58,13 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription if lggr == nil { return nil, errors.New("logger is nil") } - router, err := functions_router.NewFunctionsRouter(config.RouterAddress, client) + router, err := functions_router.NewFunctionsRouter(config.ContractAddress, client) if err != nil { return nil, fmt.Errorf("unexpected error during functions_router.NewFunctionsRouter: %s", err) } return &onchainSubscriptions{ config: config, - subscriptions: make(map[common.Address]*functions_router.IFunctionsSubscriptionsSubscription), + subscriptions: NewUserSubscriptions(), client: client, router: router, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), @@ -76,14 +76,14 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription func (s *onchainSubscriptions) Start(ctx context.Context) error { return s.StartOnce("OnchainSubscriptions", func() error { s.lggr.Info("starting onchain subscriptions") - if s.config.QueryFrequencySec == 0 { + if s.config.UpdateFrequencySec == 0 { return errors.New("OnchainSubscriptionsConfig.UpdateFrequencySec must be greater than 0") } - if s.config.QueryTimeoutSec == 0 { + if s.config.UpdateTimeoutSec == 0 { return errors.New("OnchainSubscriptionsConfig.UpdateTimeoutSec must be greater than 0") } - if s.config.QueryRangeSize == 0 { - return errors.New("OnchainSubscriptionsConfig.QueryRangeSize must be greater than 0") + if s.config.UpdateRangeSize == 0 { + return errors.New("OnchainSubscriptionsConfig.UpdateRangeSize must be greater than 0") } s.closeWait.Add(1) @@ -102,26 +102,22 @@ func (s *onchainSubscriptions) Close() error { }) } -func (s *onchainSubscriptions) GetSubscription(address common.Address) *functions_router.IFunctionsSubscriptionsSubscription { +func (s *onchainSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, error) { s.rwMutex.RLock() defer s.rwMutex.RUnlock() - subscription, ok := s.subscriptions[address] - if !ok { - return nil - } - return subscription + return s.subscriptions.GetMaxUserBalance(user) } func (s *onchainSubscriptions) queryLoop() { defer s.closeWait.Done() - ticker := time.NewTicker(time.Duration(s.config.QueryFrequencySec) * time.Second) + ticker := time.NewTicker(time.Duration(s.config.UpdateFrequencySec) * time.Second) defer ticker.Stop() var start uint64 = 1 queryFunc := func() { - ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.QueryTimeoutSec)*time.Second) + ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.UpdateTimeoutSec)*time.Second) defer cancel() latestBlockHeight, err := s.client.LatestBlockHeight(ctx) @@ -146,7 +142,7 @@ func (s *onchainSubscriptions) queryLoop() { start = 1 } - end := start + uint64(s.config.QueryRangeSize) + end := start + uint64(s.config.UpdateRangeSize) if end > count { end = count } @@ -158,6 +154,8 @@ func (s *onchainSubscriptions) queryLoop() { start = end + 1 } + queryFunc() + for { select { case <-s.stopCh: @@ -180,12 +178,10 @@ func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, bloc s.rwMutex.Lock() defer s.rwMutex.Unlock() - for _, subscription := range subscriptions { - if subscription.Owner == utils.ZeroAddress { - continue - } + for i, subscription := range subscriptions { + subscriptionId := start + uint64(i) subscription := subscription - s.subscriptions[subscription.Owner] = &subscription + s.subscriptions.UpdateSubscription(subscriptionId, &subscription) } return nil diff --git a/core/services/gateway/handlers/functions/subscriptions_test.go b/core/services/gateway/handlers/functions/subscriptions_test.go index b09cd0c67ad..1e46bff5c0f 100644 --- a/core/services/gateway/handlers/functions/subscriptions_test.go +++ b/core/services/gateway/handlers/functions/subscriptions_test.go @@ -42,11 +42,11 @@ func TestSubscriptions(t *testing.T) { Data: hexutil.MustDecode("0xec2454e500000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000003"), }, mock.Anything).Return(getSubscriptionsInRange, nil) config := functions.OnchainSubscriptionsConfig{ - RouterAddress: common.Address{}, + ContractAddress: common.Address{}, BlockConfirmations: 1, - QueryFrequencySec: 1, - QueryTimeoutSec: 1, - QueryRangeSize: 10, + UpdateFrequencySec: 1, + UpdateTimeoutSec: 1, + UpdateRangeSize: 10, } subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) require.NoError(t, err) @@ -58,6 +58,9 @@ func TestSubscriptions(t *testing.T) { }) gomega.NewGomegaWithT(t).Eventually(func() bool { - return subscriptions.GetSubscription(common.HexToAddress(validUser)) != nil && subscriptions.GetSubscription(common.HexToAddress(invalidUser)) == nil + expectedBalance := big.NewInt(0).SetBytes(hexutil.MustDecode("0x01158e460913d00000")) + balance, err1 := subscriptions.GetMaxUserBalance(common.HexToAddress(validUser)) + _, err2 := subscriptions.GetMaxUserBalance(common.HexToAddress(invalidUser)) + return err1 == nil && err2 != nil && balance.Cmp(expectedBalance) == 0 }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) } diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go new file mode 100644 index 00000000000..9441fc60e21 --- /dev/null +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -0,0 +1,68 @@ +package functions + +import ( + "encoding/hex" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +// Methods are NOT thread-safe. + +type UserSubscriptions interface { + UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) + GetMaxUserBalance(user common.Address) (*big.Int, error) +} + +type userSubscriptions struct { + userSubscriptionsMap map[common.Address]map[uint64]*functions_router.IFunctionsSubscriptionsSubscription + subscriptionIdsMap map[uint64]common.Address +} + +func NewUserSubscriptions() UserSubscriptions { + return &userSubscriptions{ + userSubscriptionsMap: make(map[common.Address]map[uint64]*functions_router.IFunctionsSubscriptionsSubscription), + subscriptionIdsMap: make(map[uint64]common.Address), + } +} + +func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) { + if subscription == nil || subscription.Owner == utils.ZeroAddress { + user, ok := us.subscriptionIdsMap[subscriptionId] + if ok { + delete(us.userSubscriptionsMap[user], subscriptionId) + if len(us.userSubscriptionsMap[user]) == 0 { + delete(us.userSubscriptionsMap, user) + } + } + delete(us.subscriptionIdsMap, subscriptionId) + return + } else { + us.subscriptionIdsMap[subscriptionId] = subscription.Owner + if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { + us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) + } + us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + } +} + +func (us *userSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, error) { + subs, exists := us.userSubscriptionsMap[user] + if !exists { + return nil, errors.New("user has no subscriptions") + } + + maxBalance := big.NewInt(0) + for _, sub := range subs { + if sub.Balance.Cmp(maxBalance) > 0 { + maxBalance = sub.Balance + } + } + fmt.Println(hex.EncodeToString(maxBalance.Bytes())) + return maxBalance, nil +} diff --git a/core/services/gateway/handlers/functions/user_subscriptions_test.go b/core/services/gateway/handlers/functions/user_subscriptions_test.go new file mode 100644 index 00000000000..9e6a660adad --- /dev/null +++ b/core/services/gateway/handlers/functions/user_subscriptions_test.go @@ -0,0 +1,77 @@ +package functions_test + +import ( + "math/big" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + "github.com/smartcontractkit/chainlink/v2/core/utils" + + "github.com/stretchr/testify/assert" +) + +func TestUserSubscriptions(t *testing.T) { + t.Parallel() + + us := functions.NewUserSubscriptions() + + t.Run("GetMaxUserBalance for unknown user", func(t *testing.T) { + _, err := us.GetMaxUserBalance(utils.RandomAddress()) + assert.Error(t, err) + }) + + t.Run("UpdateSubscription then GetMaxUserBalance", func(t *testing.T) { + user1 := utils.RandomAddress() + user1Balance := big.NewInt(10) + user2 := utils.RandomAddress() + user2Balance1 := big.NewInt(50) + user2Balance2 := big.NewInt(70) + + us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: user1, + Balance: user1Balance, + }) + us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: user2, + Balance: user2Balance1, + }) + us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: user2, + Balance: user2Balance2, + }) + + balance, err := us.GetMaxUserBalance(user1) + assert.NoError(t, err) + assert.Zero(t, balance.Cmp(user1Balance)) + + balance, err = us.GetMaxUserBalance(user2) + assert.NoError(t, err) + assert.Zero(t, balance.Cmp(user2Balance2)) + }) + + t.Run("UpdateSubscription to remove subscriptions", func(t *testing.T) { + user2 := utils.RandomAddress() + user2Balance1 := big.NewInt(50) + user2Balance2 := big.NewInt(70) + + us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: user2, + Balance: user2Balance1, + }) + us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: user2, + Balance: user2Balance2, + }) + + us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.ZeroAddress, + }) + us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.ZeroAddress, + }) + + _, err := us.GetMaxUserBalance(user2) + assert.Error(t, err) + }) +} From 0ab8ee1b7de6ef2add9d1ee85918aea8188ad76e Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 14 Sep 2023 17:05:24 +0300 Subject: [PATCH 05/12] Fixed tests --- core/services/functions/connector_handler.go | 5 ++-- .../functions/connector_handler_test.go | 10 ++----- .../functions/mocks/onchain_subscriptions.go | 25 +++++++++++----- .../handlers/functions/subscriptions.go | 29 +++++++++++++------ 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 54e511f8799..fe54ed645a9 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/json" "fmt" + "math/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -70,8 +71,8 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } - if h.subscriptions.GetSubscription(fromAddr) == nil { - h.lggr.Errorw("request is not backed with a valid subscription", "id", gatewayId, "address", fromAddr) + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(big.NewInt(0)) == 0 { + h.lggr.Errorw("request is not backed with a funded subscription", "id", gatewayId, "address", fromAddr) return } diff --git a/core/services/functions/connector_handler_test.go b/core/services/functions/connector_handler_test.go index d42b8d7dfff..0b92a1d38aa 100644 --- a/core/services/functions/connector_handler_test.go +++ b/core/services/functions/connector_handler_test.go @@ -4,9 +4,9 @@ import ( "encoding/base64" "encoding/json" "errors" + "math/big" "testing" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/functions" @@ -75,9 +75,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { } storage.On("List", ctx, addr).Return(snapshot, nil).Once() allowlist.On("Allow", addr).Return(true).Once() - subscriptions.On("GetSubscription", mock.Anything).Return( - &functions_router.IFunctionsSubscriptionsSubscription{}, - ) + subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(10), nil) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) @@ -134,9 +132,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { storage.On("Put", ctx, &key, &record, signature).Return(nil).Once() allowlist.On("Allow", addr).Return(true).Once() - subscriptions.On("GetSubscription", mock.Anything).Return( - &functions_router.IFunctionsSubscriptionsSubscription{}, - ) + subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(10), nil) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) diff --git a/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go b/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go index 1d4ad250b1b..86397407466 100644 --- a/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go +++ b/core/services/gateway/handlers/functions/mocks/onchain_subscriptions.go @@ -4,11 +4,10 @@ package mocks import ( context "context" + big "math/big" common "github.com/ethereum/go-ethereum/common" - functions_router "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" - mock "github.com/stretchr/testify/mock" ) @@ -31,20 +30,30 @@ func (_m *OnchainSubscriptions) Close() error { return r0 } -// GetSubscription provides a mock function with given fields: _a0 -func (_m *OnchainSubscriptions) GetSubscription(_a0 common.Address) *functions_router.IFunctionsSubscriptionsSubscription { +// GetMaxUserBalance provides a mock function with given fields: _a0 +func (_m *OnchainSubscriptions) GetMaxUserBalance(_a0 common.Address) (*big.Int, error) { ret := _m.Called(_a0) - var r0 *functions_router.IFunctionsSubscriptionsSubscription - if rf, ok := ret.Get(0).(func(common.Address) *functions_router.IFunctionsSubscriptionsSubscription); ok { + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(common.Address) (*big.Int, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(common.Address) *big.Int); ok { r0 = rf(_a0) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*functions_router.IFunctionsSubscriptionsSubscription) + r0 = ret.Get(0).(*big.Int) } } - return r0 + if rf, ok := ret.Get(1).(func(common.Address) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // Start provides a mock function with given fields: _a0 diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index 5ed92128d54..e029eaa89a5 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -114,7 +114,8 @@ func (s *onchainSubscriptions) queryLoop() { ticker := time.NewTicker(time.Duration(s.config.UpdateFrequencySec) * time.Second) defer ticker.Stop() - var start uint64 = 1 + start := uint64(1) + lastKnownCount := uint64(0) queryFunc := func() { ctx, cancel := utils.ContextFromChanWithTimeout(s.stopCh, time.Duration(s.config.UpdateTimeoutSec)*time.Second) @@ -128,23 +129,33 @@ func (s *onchainSubscriptions) queryLoop() { blockNumber := big.NewInt(0).Sub(latestBlockHeight, s.blockConfirmations) - count, err := s.getSubscriptionsCount(ctx, blockNumber) - if err != nil { - s.lggr.Errorw("Error getting subscriptions count", "err", err) - return + updateLastKnownCount := func() { + count, err := s.getSubscriptionsCount(ctx, blockNumber) + if err != nil { + s.lggr.Errorw("Error getting subscriptions count", "err", err) + return + } + lastKnownCount = count + } + + if lastKnownCount == 0 { + updateLastKnownCount() } - if count == 0 { + if lastKnownCount == 0 { s.lggr.Info("Router has no subscriptions yet") return } - if start > count { + if start > lastKnownCount { start = 1 } end := start + uint64(s.config.UpdateRangeSize) - if end > count { - end = count + if end > lastKnownCount { + updateLastKnownCount() + if end > lastKnownCount { + end = lastKnownCount + } } if err := s.querySubscriptionsRange(ctx, blockNumber, start, end); err != nil { s.lggr.Errorw("Error querying subscriptions", "err", err, "start", start, "end", end) From 1c2477aad6d1654feb2e65f359b48ae66e1dad73 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 14 Sep 2023 17:14:55 +0300 Subject: [PATCH 06/12] Fixed linter error --- core/services/gateway/handlers/functions/user_subscriptions.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go index 9441fc60e21..e25aab965b3 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions.go +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -41,7 +41,6 @@ func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscript } } delete(us.subscriptionIdsMap, subscriptionId) - return } else { us.subscriptionIdsMap[subscriptionId] = subscription.Owner if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { From c5ae2e53ecea887010513443984dcfceeb100135 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Fri, 15 Sep 2023 08:01:20 +0300 Subject: [PATCH 07/12] Addressed PR feedback --- core/services/gateway/handlers/functions/subscriptions.go | 1 + core/services/gateway/handlers/functions/user_subscriptions.go | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index e029eaa89a5..2fcc78578c5 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -135,6 +135,7 @@ func (s *onchainSubscriptions) queryLoop() { s.lggr.Errorw("Error getting subscriptions count", "err", err) return } + s.lggr.Infow("Updated subscriptions count", "err", err, "count", count, "blockNumber", blockNumber.Int64()) lastKnownCount = count } diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go index e25aab965b3..c47ac8a4c90 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions.go +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -1,8 +1,6 @@ package functions import ( - "encoding/hex" - "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -62,6 +60,5 @@ func (us *userSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, e maxBalance = sub.Balance } } - fmt.Println(hex.EncodeToString(maxBalance.Bytes())) return maxBalance, nil } From fa7eb52710a42e9ff5aba0200043ed8af27ad31f Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Fri, 15 Sep 2023 08:24:58 +0300 Subject: [PATCH 08/12] Added minimum balance config --- core/scripts/functions/templates/oracle.toml | 2 + core/services/functions/connector_handler.go | 39 +++++++++++-------- .../functions/connector_handler_test.go | 6 +-- .../ocr2/plugins/functions/config/config.go | 1 + .../services/ocr2/plugins/functions/plugin.go | 6 +-- .../ocr2/plugins/functions/plugin_test.go | 4 +- 6 files changed, 33 insertions(+), 25 deletions(-) diff --git a/core/scripts/functions/templates/oracle.toml b/core/scripts/functions/templates/oracle.toml index 665395bdf4b..22614db1cfc 100644 --- a/core/scripts/functions/templates/oracle.toml +++ b/core/scripts/functions/templates/oracle.toml @@ -35,6 +35,8 @@ requestTimeoutCheckFrequencySec = 10 requestTimeoutSec = 300 maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760] maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152] +minimumSubscriptionBalanceLink = 0.1 + [pluginConfig.OnchainAllowlist] blockConfirmations = 1 diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index fe54ed645a9..082b0953cbc 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -22,14 +22,15 @@ import ( type functionsConnectorHandler struct { utils.StartStopOnce - connector connector.GatewayConnector - signerKey *ecdsa.PrivateKey - nodeAddress string - storage s4.Storage - allowlist functions.OnchainAllowlist - subscriptions functions.OnchainSubscriptions - rateLimiter *hc.RateLimiter - lggr logger.Logger + connector connector.GatewayConnector + signerKey *ecdsa.PrivateKey + nodeAddress string + storage s4.Storage + allowlist functions.OnchainAllowlist + rateLimiter *hc.RateLimiter + subscriptions functions.OnchainSubscriptions + minimumBalance float64 + lggr logger.Logger } var ( @@ -37,18 +38,19 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) -func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, lggr logger.Logger) (*functionsConnectorHandler, error) { +func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumBalance float64, lggr logger.Logger) (*functionsConnectorHandler, error) { if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } return &functionsConnectorHandler{ - nodeAddress: nodeAddress, - signerKey: signerKey, - storage: storage, - allowlist: allowlist, - rateLimiter: rateLimiter, - subscriptions: subscriptions, - lggr: lggr.Named("FunctionsConnectorHandler"), + nodeAddress: nodeAddress, + signerKey: signerKey, + storage: storage, + allowlist: allowlist, + rateLimiter: rateLimiter, + subscriptions: subscriptions, + minimumBalance: minimumBalance, + lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -71,7 +73,10 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } - if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(big.NewInt(0)) == 0 { + + minBalanceWei := new(big.Int) + big.NewFloat(0).Mul(big.NewFloat(h.minimumBalance), big.NewFloat(1e18)).Int(minBalanceWei) + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(minBalanceWei) < 0 { h.lggr.Errorw("request is not backed with a funded subscription", "id", gatewayId, "address", fromAddr) return } diff --git a/core/services/functions/connector_handler_test.go b/core/services/functions/connector_handler_test.go index 0b92a1d38aa..944ca84b8bb 100644 --- a/core/services/functions/connector_handler_test.go +++ b/core/services/functions/connector_handler_test.go @@ -36,7 +36,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { require.NoError(t, err) allowlist.On("Start", mock.Anything).Return(nil) allowlist.On("Close", mock.Anything).Return(nil) - handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, logger) + handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, 0.0, logger) require.NoError(t, err) handler.SetConnector(connector) @@ -75,7 +75,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { } storage.On("List", ctx, addr).Return(snapshot, nil).Once() allowlist.On("Allow", addr).Return(true).Once() - subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(10), nil) + subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) @@ -132,7 +132,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { storage.On("Put", ctx, &key, &record, signature).Return(nil).Once() allowlist.On("Allow", addr).Return(true).Once() - subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(10), nil) + subscriptions.On("GetMaxUserBalance", mock.Anything).Return(big.NewInt(100), nil) connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) { msg, ok := args[2].(*api.Message) require.True(t, ok) diff --git a/core/services/ocr2/plugins/functions/config/config.go b/core/services/ocr2/plugins/functions/config/config.go index a4bc6f782ae..f79e3f6526d 100644 --- a/core/services/ocr2/plugins/functions/config/config.go +++ b/core/services/ocr2/plugins/functions/config/config.go @@ -35,6 +35,7 @@ type PluginConfig struct { MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"` MaxRequestSizesList []uint32 `json:"maxRequestSizesList"` MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"` + MinimumSubscriptionBalanceLink float64 `json:"minimumSubscriptionBalanceLink"` GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"` OnchainAllowlist *functions.OnchainAllowlistConfig `json:"onchainAllowlist"` OnchainSubscriptions *functions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"` diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 563f6e821b8..9824a13dd98 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -144,7 +144,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") } connectorLogger := conf.Logger.Named("GatewayConnector").With("jobName", conf.Job.PipelineSpec.JobName) - connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, subscriptions, connectorLogger) + connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, subscriptions, pluginConfig.MinimumSubscriptionBalanceLink, connectorLogger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a GatewayConnector") } @@ -171,7 +171,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs return allServices, nil } -func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions gwFunctions.OnchainSubscriptions, lggr logger.Logger) (connector.GatewayConnector, error) { +func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions gwFunctions.OnchainSubscriptions, minimumBalance float64, lggr logger.Logger) (connector.GatewayConnector, error) { enabledKeys, err := ethKeystore.EnabledKeysForChain(chainID) if err != nil { return nil, err @@ -184,7 +184,7 @@ func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, c signerKey := enabledKeys[idx].ToEcdsaPrivKey() nodeAddress := enabledKeys[idx].ID() - handler, err := functions.NewFunctionsConnectorHandler(nodeAddress, signerKey, s4Storage, allowlist, rateLimiter, subscriptions, lggr) + handler, err := functions.NewFunctionsConnectorHandler(nodeAddress, signerKey, s4Storage, allowlist, rateLimiter, subscriptions, minimumBalance, lggr) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/functions/plugin_test.go b/core/services/ocr2/plugins/functions/plugin_test.go index 540b5413b04..2c38edfb837 100644 --- a/core/services/ocr2/plugins/functions/plugin_test.go +++ b/core/services/ocr2/plugins/functions/plugin_test.go @@ -35,7 +35,7 @@ func TestNewConnector_Success(t *testing.T) { rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{keyV2}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, 0.0, logger.TestLogger(t)) require.NoError(t, err) } @@ -58,6 +58,6 @@ func TestNewConnector_NoKeyForConfiguredAddress(t *testing.T) { rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{{Address: common.HexToAddress(addresses[1])}}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, 0.0, logger.TestLogger(t)) require.Error(t, err) } From 19aadcc92451dd522060b94e020215a5a5f53af3 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Fri, 15 Sep 2023 09:48:21 +0300 Subject: [PATCH 09/12] Added subscriptions check to gateway --- core/services/functions/connector_handler.go | 40 +++++----- .../handlers/functions/handler.functions.go | 74 +++++++++++++------ .../functions/handler.functions_test.go | 18 +++-- 3 files changed, 84 insertions(+), 48 deletions(-) diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 082b0953cbc..a3cd90c6993 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -22,15 +22,15 @@ import ( type functionsConnectorHandler struct { utils.StartStopOnce - connector connector.GatewayConnector - signerKey *ecdsa.PrivateKey - nodeAddress string - storage s4.Storage - allowlist functions.OnchainAllowlist - rateLimiter *hc.RateLimiter - subscriptions functions.OnchainSubscriptions - minimumBalance float64 - lggr logger.Logger + connector connector.GatewayConnector + signerKey *ecdsa.PrivateKey + nodeAddress string + storage s4.Storage + allowlist functions.OnchainAllowlist + rateLimiter *hc.RateLimiter + subscriptions functions.OnchainSubscriptions + minBalanceWei *big.Int + lggr logger.Logger } var ( @@ -42,15 +42,17 @@ func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKe if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } + minBalanceWei := new(big.Int) + big.NewFloat(0).Mul(big.NewFloat(minimumBalance), big.NewFloat(1e18)).Int(minBalanceWei) return &functionsConnectorHandler{ - nodeAddress: nodeAddress, - signerKey: signerKey, - storage: storage, - allowlist: allowlist, - rateLimiter: rateLimiter, - subscriptions: subscriptions, - minimumBalance: minimumBalance, - lggr: lggr.Named("FunctionsConnectorHandler"), + nodeAddress: nodeAddress, + signerKey: signerKey, + storage: storage, + allowlist: allowlist, + rateLimiter: rateLimiter, + subscriptions: subscriptions, + minBalanceWei: minBalanceWei, + lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -74,9 +76,7 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga return } - minBalanceWei := new(big.Int) - big.NewFloat(0).Mul(big.NewFloat(h.minimumBalance), big.NewFloat(1e18)).Int(minBalanceWei) - if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(minBalanceWei) < 0 { + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minBalanceWei) < 0 { h.lggr.Errorw("request is not backed with a funded subscription", "id", gatewayId, "address", fromAddr) return } diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 4ef6ab72fb2..8e50ab250aa 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "math/big" "time" "github.com/ethereum/go-ethereum/common" @@ -20,7 +21,11 @@ import ( type FunctionsHandlerConfig struct { OnchainAllowlistChainID string `json:"onchainAllowlistChainId"` // Not specifying OnchainAllowlist config disables allowlist checks - OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"` + OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"` + OnchainSubscriptionsChainID string `json:"onchainSubscriptionsChainId"` + // Not specifying OnchainSubscriptions config disables minimum balance checks + OnchainSubscriptions *OnchainSubscriptionsConfig `json:"onchainSubscriptions"` + MinimumSubscriptionBalanceLink float64 `json:"minimumSubscriptionBalanceLink"` // Not specifying RateLimiter config disables rate limiting UserRateLimiter *hc.RateLimiterConfig `json:"userRateLimiter"` NodeRateLimiter *hc.RateLimiterConfig `json:"nodeRateLimiter"` @@ -31,15 +36,17 @@ type FunctionsHandlerConfig struct { type functionsHandler struct { utils.StartStopOnce - handlerConfig FunctionsHandlerConfig - donConfig *config.DONConfig - don handlers.DON - pendingRequests hc.RequestCache[PendingSecretsRequest] - allowlist OnchainAllowlist - userRateLimiter *hc.RateLimiter - nodeRateLimiter *hc.RateLimiter - chStop utils.StopChan - lggr logger.Logger + handlerConfig FunctionsHandlerConfig + donConfig *config.DONConfig + don handlers.DON + pendingRequests hc.RequestCache[PendingSecretsRequest] + allowlist OnchainAllowlist + subscriptions OnchainSubscriptions + minimumBalanceWei *big.Int + userRateLimiter *hc.RateLimiter + nodeRateLimiter *hc.RateLimiter + chStop utils.StopChan + lggr logger.Logger } type PendingSecretsRequest struct { @@ -62,11 +69,11 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con if cfg.OnchainAllowlist != nil { chain, err2 := legacyChains.Get(cfg.OnchainAllowlistChainID) if err2 != nil { - return nil, err + return nil, err2 } allowlist, err2 = NewOnchainAllowlist(chain.Client(), *cfg.OnchainAllowlist, lggr) if err2 != nil { - return nil, err + return nil, err2 } } var userRateLimiter, nodeRateLimiter *hc.RateLimiter @@ -82,8 +89,21 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con return nil, err } } + var subscriptions OnchainSubscriptions + if cfg.OnchainSubscriptions != nil { + chain, err2 := legacyChains.Get(cfg.OnchainSubscriptionsChainID) + if err2 != nil { + return nil, err2 + } + subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr) + if err2 != nil { + return nil, err2 + } + } + minimumBalanceWei := new(big.Int) + big.NewFloat(0).Mul(big.NewFloat(cfg.MinimumSubscriptionBalanceLink), big.NewFloat(1e18)).Int(minimumBalanceWei) pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests) - return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, lggr), nil + return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minimumBalanceWei, userRateLimiter, nodeRateLimiter, lggr), nil } func NewFunctionsHandler( @@ -92,19 +112,23 @@ func NewFunctionsHandler( don handlers.DON, pendingRequestsCache hc.RequestCache[PendingSecretsRequest], allowlist OnchainAllowlist, + subscriptions OnchainSubscriptions, + minimumBalanceWei *big.Int, userRateLimiter *hc.RateLimiter, nodeRateLimiter *hc.RateLimiter, lggr logger.Logger) handlers.Handler { return &functionsHandler{ - handlerConfig: cfg, - donConfig: donConfig, - don: don, - pendingRequests: pendingRequestsCache, - allowlist: allowlist, - userRateLimiter: userRateLimiter, - nodeRateLimiter: nodeRateLimiter, - chStop: make(utils.StopChan), - lggr: lggr, + handlerConfig: cfg, + donConfig: donConfig, + don: don, + pendingRequests: pendingRequestsCache, + allowlist: allowlist, + subscriptions: subscriptions, + minimumBalanceWei: minimumBalanceWei, + userRateLimiter: userRateLimiter, + nodeRateLimiter: nodeRateLimiter, + chStop: make(utils.StopChan), + lggr: lggr, } } @@ -118,6 +142,12 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa h.lggr.Debug("rate-limited", "sender", msg.Body.Sender) return errors.New("rate-limited") } + if h.subscriptions != nil { + if balance, err := h.subscriptions.GetMaxUserBalance(sender); err != nil || balance.Cmp(h.minimumBalanceWei) < 0 { + h.lggr.Debug("received a message from a user having insufficient balance", "sender", msg.Body.Sender, "balance", balance.String()) + return errors.New("sender has insufficient balance") + } + } switch msg.Body.Method { case MethodSecretsSet, MethodSecretsList: return h.handleSecretsRequest(ctx, msg, callbackCh) diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index 12fa8b954f5..bc309c4de59 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "encoding/json" "fmt" + "math/big" "testing" "time" @@ -23,7 +24,7 @@ import ( handlers_mocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks" ) -func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist) { +func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTimeout time.Duration) (handlers.Handler, *handlers_mocks.DON, *functions_mocks.OnchainAllowlist, *functions_mocks.OnchainSubscriptions) { cfg := functions.FunctionsHandlerConfig{} donConfig := &config.DONConfig{ Members: []config.NodeConfig{}, @@ -39,13 +40,15 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi don := handlers_mocks.NewDON(t) allowlist := functions_mocks.NewOnchainAllowlist(t) + subscriptions := functions_mocks.NewOnchainSubscriptions(t) + minBalanceWei := big.NewInt(100) userRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000) - handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, userRateLimiter, nodeRateLimiter, logger.TestLogger(t)) - return handler, don, allowlist + handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalanceWei, userRateLimiter, nodeRateLimiter, logger.TestLogger(t)) + return handler, don, allowlist, subscriptions } func newSignedMessage(t *testing.T, id string, method string, donId string, privateKey *ecdsa.PrivateKey) api.Message { @@ -113,7 +116,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0] - handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24) + handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24) userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey) callbachCh := make(chan handlers.UserCallbackPayload) @@ -131,6 +134,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) { }() allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil) + subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil) don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh)) sendNodeReponses(t, handler, userRequestMsg, nodes, test.nodeResults) @@ -143,10 +147,11 @@ func TestFunctionsHandler_HandleUserMessage_InvalidMethod(t *testing.T) { t.Parallel() nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0] - handler, _, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24) + handler, _, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Hour*24) userRequestMsg := newSignedMessage(t, "1234", "secrets_reveal_all_please", "don_id", user.PrivateKey) allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil) + subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil) err := handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, make(chan handlers.UserCallbackPayload)) require.Error(t, err) } @@ -155,7 +160,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) { t.Parallel() nodes, user := gc.NewTestNodes(t, 4), gc.NewTestNodes(t, 1)[0] - handler, don, allowlist := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10) + handler, don, allowlist, subscriptions := newFunctionsHandlerForATestDON(t, nodes, time.Millisecond*10) userRequestMsg := newSignedMessage(t, "1234", "secrets_set", "don_id", user.PrivateKey) callbachCh := make(chan handlers.UserCallbackPayload) @@ -169,6 +174,7 @@ func TestFunctionsHandler_HandleUserMessage_Timeout(t *testing.T) { }() allowlist.On("Allow", common.HexToAddress(user.Address)).Return(true, nil) + subscriptions.On("GetMaxUserBalance", common.HexToAddress(user.Address)).Return(big.NewInt(1000), nil) don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) require.NoError(t, handler.HandleUserMessage(testutils.Context(t), &userRequestMsg, callbachCh)) <-done From a344936277d328bc81fd1ca2e3eb8b3b95880b43 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Mon, 18 Sep 2023 08:19:41 +0300 Subject: [PATCH 10/12] Using assets.Link --- core/services/functions/connector_handler.go | 15 ++-- .../functions/connector_handler_test.go | 3 +- .../handlers/functions/handler.functions.go | 72 +++++++++---------- .../functions/handler.functions_test.go | 5 +- .../handlers/functions/subscriptions.go | 2 +- .../ocr2/plugins/functions/config/config.go | 3 +- .../services/ocr2/plugins/functions/plugin.go | 5 +- .../ocr2/plugins/functions/plugin_test.go | 5 +- 8 files changed, 55 insertions(+), 55 deletions(-) diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index a3cd90c6993..3f56b7658ca 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -5,8 +5,8 @@ import ( "crypto/ecdsa" "encoding/json" "fmt" - "math/big" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common" @@ -29,7 +29,7 @@ type functionsConnectorHandler struct { allowlist functions.OnchainAllowlist rateLimiter *hc.RateLimiter subscriptions functions.OnchainSubscriptions - minBalanceWei *big.Int + minBalance assets.Link lggr logger.Logger } @@ -38,12 +38,10 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) -func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumBalance float64, lggr logger.Logger) (*functionsConnectorHandler, error) { +func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) { if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } - minBalanceWei := new(big.Int) - big.NewFloat(0).Mul(big.NewFloat(minimumBalance), big.NewFloat(1e18)).Int(minBalanceWei) return &functionsConnectorHandler{ nodeAddress: nodeAddress, signerKey: signerKey, @@ -51,7 +49,7 @@ func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKe allowlist: allowlist, rateLimiter: rateLimiter, subscriptions: subscriptions, - minBalanceWei: minBalanceWei, + minBalance: minBalance, lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -75,9 +73,8 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } - - if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minBalanceWei) < 0 { - h.lggr.Errorw("request is not backed with a funded subscription", "id", gatewayId, "address", fromAddr) + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minBalance.ToInt()) < 0 { + h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minBalance) return } diff --git a/core/services/functions/connector_handler_test.go b/core/services/functions/connector_handler_test.go index 944ca84b8bb..de327f29832 100644 --- a/core/services/functions/connector_handler_test.go +++ b/core/services/functions/connector_handler_test.go @@ -7,6 +7,7 @@ import ( "math/big" "testing" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/functions" @@ -36,7 +37,7 @@ func TestFunctionsConnectorHandler(t *testing.T) { require.NoError(t, err) allowlist.On("Start", mock.Anything).Return(nil) allowlist.On("Close", mock.Anything).Return(nil) - handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, 0.0, logger) + handler, err := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, rateLimiter, subscriptions, *assets.NewLinkFromJuels(0), logger) require.NoError(t, err) handler.SetConnector(connector) diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 8c74ecaabd9..722ac136839 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -4,13 +4,14 @@ import ( "context" "encoding/json" "errors" - "math/big" + "fmt" "time" "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -52,13 +53,12 @@ var ( ) type FunctionsHandlerConfig struct { - OnchainAllowlistChainID string `json:"onchainAllowlistChainId"` + ChainID string `json:"chainId"` // Not specifying OnchainAllowlist config disables allowlist checks - OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"` - OnchainSubscriptionsChainID string `json:"onchainSubscriptionsChainId"` + OnchainAllowlist *OnchainAllowlistConfig `json:"onchainAllowlist"` // Not specifying OnchainSubscriptions config disables minimum balance checks - OnchainSubscriptions *OnchainSubscriptionsConfig `json:"onchainSubscriptions"` - MinimumSubscriptionBalanceLink float64 `json:"minimumSubscriptionBalanceLink"` + OnchainSubscriptions *OnchainSubscriptionsConfig `json:"onchainSubscriptions"` + MinimumSubscriptionBalance *assets.Link `json:"minimumSubscriptionBalance"` // Not specifying RateLimiter config disables rate limiting UserRateLimiter *hc.RateLimiterConfig `json:"userRateLimiter"` NodeRateLimiter *hc.RateLimiterConfig `json:"nodeRateLimiter"` @@ -69,17 +69,17 @@ type FunctionsHandlerConfig struct { type functionsHandler struct { utils.StartStopOnce - handlerConfig FunctionsHandlerConfig - donConfig *config.DONConfig - don handlers.DON - pendingRequests hc.RequestCache[PendingSecretsRequest] - allowlist OnchainAllowlist - subscriptions OnchainSubscriptions - minimumBalanceWei *big.Int - userRateLimiter *hc.RateLimiter - nodeRateLimiter *hc.RateLimiter - chStop utils.StopChan - lggr logger.Logger + handlerConfig FunctionsHandlerConfig + donConfig *config.DONConfig + don handlers.DON + pendingRequests hc.RequestCache[PendingSecretsRequest] + allowlist OnchainAllowlist + subscriptions OnchainSubscriptions + minimumBalance *assets.Link + userRateLimiter *hc.RateLimiter + nodeRateLimiter *hc.RateLimiter + chStop utils.StopChan + lggr logger.Logger } type PendingSecretsRequest struct { @@ -100,7 +100,7 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con lggr = lggr.Named("FunctionsHandler:" + donConfig.DonId) var allowlist OnchainAllowlist if cfg.OnchainAllowlist != nil { - chain, err2 := legacyChains.Get(cfg.OnchainAllowlistChainID) + chain, err2 := legacyChains.Get(cfg.ChainID) if err2 != nil { return nil, err2 } @@ -124,7 +124,7 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con } var subscriptions OnchainSubscriptions if cfg.OnchainSubscriptions != nil { - chain, err2 := legacyChains.Get(cfg.OnchainSubscriptionsChainID) + chain, err2 := legacyChains.Get(cfg.ChainID) if err2 != nil { return nil, err2 } @@ -133,10 +133,8 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con return nil, err2 } } - minimumBalanceWei := new(big.Int) - big.NewFloat(0).Mul(big.NewFloat(cfg.MinimumSubscriptionBalanceLink), big.NewFloat(1e18)).Int(minimumBalanceWei) pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests) - return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minimumBalanceWei, userRateLimiter, nodeRateLimiter, lggr), nil + return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, cfg.MinimumSubscriptionBalance, userRateLimiter, nodeRateLimiter, lggr), nil } func NewFunctionsHandler( @@ -146,22 +144,22 @@ func NewFunctionsHandler( pendingRequestsCache hc.RequestCache[PendingSecretsRequest], allowlist OnchainAllowlist, subscriptions OnchainSubscriptions, - minimumBalanceWei *big.Int, + minimumBalance *assets.Link, userRateLimiter *hc.RateLimiter, nodeRateLimiter *hc.RateLimiter, lggr logger.Logger) handlers.Handler { return &functionsHandler{ - handlerConfig: cfg, - donConfig: donConfig, - don: don, - pendingRequests: pendingRequestsCache, - allowlist: allowlist, - subscriptions: subscriptions, - minimumBalanceWei: minimumBalanceWei, - userRateLimiter: userRateLimiter, - nodeRateLimiter: nodeRateLimiter, - chStop: make(utils.StopChan), - lggr: lggr, + handlerConfig: cfg, + donConfig: donConfig, + don: don, + pendingRequests: pendingRequestsCache, + allowlist: allowlist, + subscriptions: subscriptions, + minimumBalance: minimumBalance, + userRateLimiter: userRateLimiter, + nodeRateLimiter: nodeRateLimiter, + chStop: make(utils.StopChan), + lggr: lggr, } } @@ -177,10 +175,10 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa promHandlerError.WithLabelValues(h.donConfig.DonId, ErrRateLimited.Error()).Inc() return ErrRateLimited } - if h.subscriptions != nil { - if balance, err := h.subscriptions.GetMaxUserBalance(sender); err != nil || balance.Cmp(h.minimumBalanceWei) < 0 { + if h.subscriptions != nil && h.minimumBalance != nil { + if balance, err := h.subscriptions.GetMaxUserBalance(sender); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 { h.lggr.Debug("received a message from a user having insufficient balance", "sender", msg.Body.Sender, "balance", balance.String()) - return errors.New("sender has insufficient balance") + return fmt.Errorf("sender has insufficient balance: %v juels", balance.String()) } } switch msg.Body.Method { diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index bc309c4de59..1446bc84571 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -41,13 +42,13 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi don := handlers_mocks.NewDON(t) allowlist := functions_mocks.NewOnchainAllowlist(t) subscriptions := functions_mocks.NewOnchainSubscriptions(t) - minBalanceWei := big.NewInt(100) + minBalance := assets.NewLinkFromJuels(100) userRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000) - handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalanceWei, userRateLimiter, nodeRateLimiter, logger.TestLogger(t)) + handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalance, userRateLimiter, nodeRateLimiter, logger.TestLogger(t)) return handler, don, allowlist, subscriptions } diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index 2fcc78578c5..181a98009f1 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -33,7 +33,7 @@ type OnchainSubscriptionsConfig struct { type OnchainSubscriptions interface { job.ServiceCtx - // GetMaxUserBalance returns a maximum subscription balance, or error if user has no subscriptions. + // GetMaxUserBalance returns a maximum subscription balance (juels), or error if user has no subscriptions. GetMaxUserBalance(common.Address) (*big.Int, error) } diff --git a/core/services/ocr2/plugins/functions/config/config.go b/core/services/ocr2/plugins/functions/config/config.go index f79e3f6526d..3f35d1dba9b 100644 --- a/core/services/ocr2/plugins/functions/config/config.go +++ b/core/services/ocr2/plugins/functions/config/config.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" @@ -35,7 +36,7 @@ type PluginConfig struct { MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"` MaxRequestSizesList []uint32 `json:"maxRequestSizesList"` MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"` - MinimumSubscriptionBalanceLink float64 `json:"minimumSubscriptionBalanceLink"` + MinimumSubscriptionBalance assets.Link `json:"minimumSubscriptionBalance"` GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"` OnchainAllowlist *functions.OnchainAllowlistConfig `json:"onchainAllowlist"` OnchainSubscriptions *functions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"` diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 9824a13dd98..b4040748e5d 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/libocr/commontypes" libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -144,7 +145,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") } connectorLogger := conf.Logger.Named("GatewayConnector").With("jobName", conf.Job.PipelineSpec.JobName) - connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, subscriptions, pluginConfig.MinimumSubscriptionBalanceLink, connectorLogger) + connector, err2 := NewConnector(pluginConfig.GatewayConnectorConfig, conf.EthKeystore, conf.Chain.ID(), s4Storage, allowlist, rateLimiter, subscriptions, pluginConfig.MinimumSubscriptionBalance, connectorLogger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a GatewayConnector") } @@ -171,7 +172,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs return allServices, nil } -func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions gwFunctions.OnchainSubscriptions, minimumBalance float64, lggr logger.Logger) (connector.GatewayConnector, error) { +func NewConnector(gwcCfg *connector.ConnectorConfig, ethKeystore keystore.Eth, chainID *big.Int, s4Storage s4.Storage, allowlist gwFunctions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions gwFunctions.OnchainSubscriptions, minimumBalance assets.Link, lggr logger.Logger) (connector.GatewayConnector, error) { enabledKeys, err := ethKeystore.EnabledKeysForChain(chainID) if err != nil { return nil, err diff --git a/core/services/ocr2/plugins/functions/plugin_test.go b/core/services/ocr2/plugins/functions/plugin_test.go index 2c38edfb837..2e35672e920 100644 --- a/core/services/ocr2/plugins/functions/plugin_test.go +++ b/core/services/ocr2/plugins/functions/plugin_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" @@ -35,7 +36,7 @@ func TestNewConnector_Success(t *testing.T) { rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{keyV2}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, 0.0, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, *assets.NewLinkFromJuels(0), logger.TestLogger(t)) require.NoError(t, err) } @@ -58,6 +59,6 @@ func TestNewConnector_NoKeyForConfiguredAddress(t *testing.T) { rateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{{Address: common.HexToAddress(addresses[1])}}, nil) - _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, 0.0, logger.TestLogger(t)) + _, err = functions.NewConnector(gwcCfg, ethKeystore, chainID, s4Storage, allowlist, rateLimiter, subscriptions, *assets.NewLinkFromJuels(0), logger.TestLogger(t)) require.Error(t, err) } From 13984f98770fba5ac1a5994d35714893c337fc75 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Mon, 18 Sep 2023 08:35:00 +0300 Subject: [PATCH 11/12] Fixed template and naming --- core/scripts/functions/templates/oracle.toml | 2 +- core/services/functions/connector_handler.go | 40 ++++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/scripts/functions/templates/oracle.toml b/core/scripts/functions/templates/oracle.toml index 22614db1cfc..58f20afdd9f 100644 --- a/core/scripts/functions/templates/oracle.toml +++ b/core/scripts/functions/templates/oracle.toml @@ -35,7 +35,7 @@ requestTimeoutCheckFrequencySec = 10 requestTimeoutSec = 300 maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760] maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152] -minimumSubscriptionBalanceLink = 0.1 +minimumSubscriptionBalance = "0.1 link" [pluginConfig.OnchainAllowlist] diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 3f56b7658ca..3147d44de35 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -22,15 +22,15 @@ import ( type functionsConnectorHandler struct { utils.StartStopOnce - connector connector.GatewayConnector - signerKey *ecdsa.PrivateKey - nodeAddress string - storage s4.Storage - allowlist functions.OnchainAllowlist - rateLimiter *hc.RateLimiter - subscriptions functions.OnchainSubscriptions - minBalance assets.Link - lggr logger.Logger + connector connector.GatewayConnector + signerKey *ecdsa.PrivateKey + nodeAddress string + storage s4.Storage + allowlist functions.OnchainAllowlist + rateLimiter *hc.RateLimiter + subscriptions functions.OnchainSubscriptions + minimumSubscriptionBalance assets.Link + lggr logger.Logger } var ( @@ -38,19 +38,19 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) -func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) { +func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumSubscriptionBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) { if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } return &functionsConnectorHandler{ - nodeAddress: nodeAddress, - signerKey: signerKey, - storage: storage, - allowlist: allowlist, - rateLimiter: rateLimiter, - subscriptions: subscriptions, - minBalance: minBalance, - lggr: lggr.Named("FunctionsConnectorHandler"), + nodeAddress: nodeAddress, + signerKey: signerKey, + storage: storage, + allowlist: allowlist, + rateLimiter: rateLimiter, + subscriptions: subscriptions, + minimumSubscriptionBalance: minimumSubscriptionBalance, + lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -73,8 +73,8 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } - if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minBalance.ToInt()) < 0 { - h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minBalance) + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumSubscriptionBalance.ToInt()) < 0 { + h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumSubscriptionBalance) return } From a235b3927fc69b615a16a0efb88d4ccc86819fd8 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Mon, 18 Sep 2023 08:36:42 +0300 Subject: [PATCH 12/12] Reverted renaming --- core/services/functions/connector_handler.go | 40 ++++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 3147d44de35..8083dfc2a39 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -22,15 +22,15 @@ import ( type functionsConnectorHandler struct { utils.StartStopOnce - connector connector.GatewayConnector - signerKey *ecdsa.PrivateKey - nodeAddress string - storage s4.Storage - allowlist functions.OnchainAllowlist - rateLimiter *hc.RateLimiter - subscriptions functions.OnchainSubscriptions - minimumSubscriptionBalance assets.Link - lggr logger.Logger + connector connector.GatewayConnector + signerKey *ecdsa.PrivateKey + nodeAddress string + storage s4.Storage + allowlist functions.OnchainAllowlist + rateLimiter *hc.RateLimiter + subscriptions functions.OnchainSubscriptions + minimumBalance assets.Link + lggr logger.Logger } var ( @@ -38,19 +38,19 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) -func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumSubscriptionBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) { +func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, rateLimiter *hc.RateLimiter, subscriptions functions.OnchainSubscriptions, minimumBalance assets.Link, lggr logger.Logger) (*functionsConnectorHandler, error) { if signerKey == nil || storage == nil || allowlist == nil || rateLimiter == nil || subscriptions == nil { return nil, fmt.Errorf("signerKey, storage, allowlist, rateLimiter and subscriptions must be non-nil") } return &functionsConnectorHandler{ - nodeAddress: nodeAddress, - signerKey: signerKey, - storage: storage, - allowlist: allowlist, - rateLimiter: rateLimiter, - subscriptions: subscriptions, - minimumSubscriptionBalance: minimumSubscriptionBalance, - lggr: lggr.Named("FunctionsConnectorHandler"), + nodeAddress: nodeAddress, + signerKey: signerKey, + storage: storage, + allowlist: allowlist, + rateLimiter: rateLimiter, + subscriptions: subscriptions, + minimumBalance: minimumBalance, + lggr: lggr.Named("FunctionsConnectorHandler"), }, nil } @@ -73,8 +73,8 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga h.lggr.Errorw("request rate-limited", "id", gatewayId, "address", fromAddr) return } - if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumSubscriptionBalance.ToInt()) < 0 { - h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumSubscriptionBalance) + if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 { + h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumBalance) return }