Skip to content

Commit

Permalink
[FUN-877] Persist subscriptions fetched from contracts (#11573)
Browse files Browse the repository at this point in the history
* feat: db schema to store data fetched from contracts

* feat: build orm layer for subscriptions

* feat: implement subscription cache layer

* feat: load cached subscriptions concurrently

* fix: lint issues

* fix: protect NewORM from invalid parameters, NoopORM for gateway script

* fix: address race condition on update subscription

* chore: make db, cfg and lggr part of the handlerfactory

* chore: removing allowlist migration from this pr

* fix: update cache on new subscriptions, modify balance to be a text at the db layer

* feat: store the router_contract_address

* fix: set router contract address and subscription id as composite primary key

* chore: make router contract address as part of orm properties

* feat: update to db only in case of difference with current state

* feat: filter by router_contract_address when GetSubscriptions

* chore: have balance fields as bigint, add tests covering deleted subscriptions

* fix: GetSubscriptions ASC

* chore: improve redability
  • Loading branch information
agparadiso authored Jan 4, 2024
1 parent e3fe671 commit 529d2cf
Show file tree
Hide file tree
Showing 17 changed files with 762 additions and 44 deletions.
2 changes: 1 addition & 1 deletion core/scripts/gateway/run_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {

lggr, _ := logger.NewLogger()

handlerFactory := gateway.NewHandlerFactory(nil, lggr)
handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr)
gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr)
if err != nil {
fmt.Println("error creating Gateway object:", err)
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
job.Gateway: gateway.NewDelegate(
legacyEVMChains,
keyStore.Eth(),
db,
cfg.Database(),
globalLogger),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
Expand Down
15 changes: 12 additions & 3 deletions core/services/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pelletier/go-toml"
"github.com/pkg/errors"

Expand All @@ -18,13 +19,21 @@ import (
type Delegate struct {
legacyChains legacyevm.LegacyChainContainer
ks keystore.Eth
db *sqlx.DB
cfg pg.QConfig
lggr logger.Logger
}

var _ job.Delegate = (*Delegate)(nil)

func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, lggr logger.Logger) *Delegate {
return &Delegate{legacyChains: legacyChains, ks: ks, lggr: lggr}
func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) *Delegate {
return &Delegate{
legacyChains: legacyChains,
ks: ks,
db: db,
cfg: cfg,
lggr: lggr,
}
}

func (d *Delegate) JobType() job.Type {
Expand All @@ -47,7 +56,7 @@ func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err
if err2 != nil {
return nil, errors.Wrap(err2, "unmarshal gateway config")
}
handlerFactory := NewHandlerFactory(d.legacyChains, d.lggr)
handlerFactory := NewHandlerFactory(d.legacyChains, d.db, d.cfg, d.lggr)
gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions core/services/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.NoError(t, err)
}

Expand All @@ -75,7 +75,7 @@ HandlerName = "dummy"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -89,7 +89,7 @@ HandlerName = "no_such_handler"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -103,7 +103,7 @@ SomeOtherField = "abcd"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -121,15 +121,15 @@ Address = "0xnot_an_address"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

func TestGateway_CleanStartAndClose(t *testing.T) {
t.Parallel()

lggr := logger.TestLogger(t)
gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, lggr), lggr)
gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.NoError(t, err)
servicetest.Run(t, gateway)
}
Expand Down
11 changes: 8 additions & 3 deletions core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"encoding/json"
"fmt"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

const (
Expand All @@ -18,19 +21,21 @@ const (

type handlerFactory struct {
legacyChains legacyevm.LegacyChainContainer
db *sqlx.DB
cfg pg.QConfig
lggr logger.Logger
}

var _ HandlerFactory = (*handlerFactory)(nil)

func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) HandlerFactory {
return &handlerFactory{legacyChains, lggr}
func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) HandlerFactory {
return &handlerFactory{legacyChains, db, cfg, lggr}
}

func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) {
switch handlerType {
case FunctionsHandlerType:
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.lggr)
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.db, hf.cfg, hf.lggr)
case DummyHandlerType:
return handlers.NewDummyHandler(donConfig, don, hf.lggr)
default:
Expand Down
12 changes: 10 additions & 2 deletions core/services/gateway/handlers/functions/handler.functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

var (
Expand Down Expand Up @@ -96,7 +98,7 @@ type PendingRequest struct {

var _ handlers.Handler = (*functionsHandler)(nil)

func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) (handlers.Handler, error) {
func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, qcfg pg.QConfig, lggr logger.Logger) (handlers.Handler, error) {
var cfg FunctionsHandlerConfig
err := json.Unmarshal(handlerConfig, &cfg)
if err != nil {
Expand Down Expand Up @@ -133,7 +135,13 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
if err2 != nil {
return nil, err2
}
subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr)

orm, err2 := NewORM(db, lggr, qcfg, cfg.OnchainSubscriptions.ContractAddress)
if err2 != nil {
return nil, err2
}

subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, orm, lggr)
if err2 != nil {
return nil, err2
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func sendNodeReponses(t *testing.T, handler handlers.Handler, userRequestMsg api
func TestFunctionsHandler_Minimal(t *testing.T) {
t.Parallel()

handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t))
handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t))
require.NoError(t, err)

// empty message should always error out
Expand All @@ -95,7 +95,7 @@ func TestFunctionsHandler_Minimal(t *testing.T) {
func TestFunctionsHandler_CleanStartAndClose(t *testing.T) {
t.Parallel()

handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t))
handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t))
require.NoError(t, err)

servicetest.Run(t, handler)
Expand Down
91 changes: 91 additions & 0 deletions core/services/gateway/handlers/functions/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 529d2cf

Please sign in to comment.