Skip to content

Commit

Permalink
hacking at unique DB for evm relayer
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Sep 21, 2023
1 parent b378676 commit 61608c0
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 140 deletions.
22 changes: 21 additions & 1 deletion core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ type AppConfig interface {

type ChainRelayExtenderConfig struct {
Logger logger.Logger
DB *sqlx.DB
KeyStore keystore.Eth
*RelayerConfig
}
Expand All @@ -158,6 +157,8 @@ type RelayerConfig struct {
MailMon *utils.MailboxMonitor
GasEstimator gas.EvmFeeEstimator

DB *sqlx.DB

// TODO BCF-2513 remove test code from the API
// Gen-functions are useful for dependency injection by tests
GenEthClient func(*big.Int) client.Client
Expand All @@ -168,6 +169,24 @@ type RelayerConfig struct {
GenGasEstimator func(*big.Int) gas.EvmFeeEstimator
}

func (r RelayerConfig) validate() error {
var err error
if r.AppConfig == nil {
err = errors.Join(err, fmt.Errorf("nil AppConfig"))
}
if r.EventBroadcaster == nil {
err = errors.Join(err, fmt.Errorf("nil EventBroadcaster"))
}

if r.MailMon == nil {
err = errors.Join(err, fmt.Errorf("nil MailMon"))
}

if r.DB == nil {
err = errors.Join(err, fmt.Errorf("nil DB"))
}
return err
}
func NewTOMLChain(ctx context.Context, chain *toml.EVMConfig, opts ChainRelayExtenderConfig) (Chain, error) {
chainID := chain.ChainID
l := opts.Logger.With("evmChainID", chainID.String())
Expand All @@ -180,6 +199,7 @@ func NewTOMLChain(ctx context.Context, chain *toml.EVMConfig, opts ChainRelayExt
}

func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Node, opts ChainRelayExtenderConfig) (*chain, error) {

chainID, chainType := cfg.EVM().ChainID(), cfg.EVM().ChainType()
l := opts.Logger
var client evmclient.Client
Expand Down
7 changes: 5 additions & 2 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
// create the relayer-chain interoperators from application configuration
relayerFactory := chainlink.RelayerFactory{
Logger: appLggr,
DB: db,
QConfig: cfg.Database(),
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
}

evmScopedDB, err := pg.OpenUnlockedDB(cfg.AppID(), cfg.Database(), pg.WithSchema("evm")) //pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr)
if err != nil {
return nil, err
}
evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
RelayerConfig: &evm.RelayerConfig{AppConfig: cfg, EventBroadcaster: eventBroadcaster, MailMon: mailMon},
RelayerConfig: &evm.RelayerConfig{AppConfig: cfg, EventBroadcaster: eventBroadcaster, MailMon: mailMon, DB: evmScopedDB},
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
11 changes: 6 additions & 5 deletions core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (

func genTestEVMRelayers(t *testing.T, opts evm.ChainRelayExtenderConfig, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators {
f := chainlink.RelayerFactory{
Logger: opts.Logger,
DB: opts.DB,
Logger: opts.Logger,
//DB: opts.DB,
QConfig: opts.AppConfig.Database(),
LoopRegistry: plugins.NewLoopRegistry(opts.Logger),
}
Expand Down Expand Up @@ -86,13 +86,14 @@ func TestShell_RunNodeWithPasswords(t *testing.T) {
lggr := logger.TestLogger(t)

opts := evm.ChainRelayExtenderConfig{
Logger: lggr,
DB: db,
Logger: lggr,

KeyStore: keyStore.Eth(),
RelayerConfig: &evm.RelayerConfig{
AppConfig: cfg,
EventBroadcaster: pg.NewNullEventBroadcaster(),
MailMon: &utils.MailboxMonitor{},
DB: pgtest.NewEVMScopedDB(t),
},
}
testRelayers := genTestEVMRelayers(t, opts, keyStore)
Expand Down Expand Up @@ -191,13 +192,13 @@ func TestShell_RunNodeWithAPICredentialsFile(t *testing.T) {
lggr := logger.TestLogger(t)
opts := evm.ChainRelayExtenderConfig{
Logger: lggr,
DB: db,
KeyStore: keyStore.Eth(),
RelayerConfig: &evm.RelayerConfig{
AppConfig: cfg,
EventBroadcaster: pg.NewNullEventBroadcaster(),

MailMon: &utils.MailboxMonitor{},
DB: pgtest.NewEVMScopedDB(t),
},
}
testRelayers := genTestEVMRelayers(t, opts, keyStore)
Expand Down
8 changes: 4 additions & 4 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func TestSetupSolanaRelayer(t *testing.T) {
})

rf := chainlink.RelayerFactory{
Logger: lggr,
DB: pgtest.NewSqlxDB(t),
Logger: lggr,
// DB: pgtest.NewSqlxDB(t),
QConfig: tConfig.Database(),
LoopRegistry: reg,
}
Expand Down Expand Up @@ -456,8 +456,8 @@ func TestSetupStarkNetRelayer(t *testing.T) {
}
})
rf := chainlink.RelayerFactory{
Logger: lggr,
DB: pgtest.NewSqlxDB(t),
Logger: lggr,
//DB: pgtest.NewSqlxDB(t),
QConfig: tConfig.Database(),
LoopRegistry: reg,
}
Expand Down
6 changes: 4 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/keystest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/logger/audit"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
Expand Down Expand Up @@ -385,8 +386,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
loopRegistry := plugins.NewLoopRegistry(lggr)

relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
DB: db,
Logger: lggr,
// DB: db,
QConfig: cfg.Database(),
LoopRegistry: loopRegistry,
GRPCOpts: loop.GRPCOpts{},
Expand All @@ -397,6 +398,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
AppConfig: cfg,
EventBroadcaster: eventBroadcaster,
MailMon: mailMon,
DB: pgtest.NewEVMScopedDB(t),
},
CSAETHKeystore: keyStore,
}
Expand Down
2 changes: 1 addition & 1 deletion core/internal/testutils/evmtest/evmtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func NewChainRelayExtOpts(t testing.TB, testopts TestChainOpts) evm.ChainRelayEx
require.NotNil(t, testopts.KeyStore)
opts := evm.ChainRelayExtenderConfig{
Logger: logger.TestLogger(t),
DB: testopts.DB,
KeyStore: testopts.KeyStore,
RelayerConfig: &evm.RelayerConfig{
AppConfig: testopts.GeneralConfig,
EventBroadcaster: pg.NewNullEventBroadcaster(),
MailMon: testopts.MailMon,
GasEstimator: testopts.GasEstimator,
DB: testopts.DB,
},
}
opts.GenEthClient = func(*big.Int) evmclient.Client {
Expand Down
33 changes: 31 additions & 2 deletions core/internal/testutils/pgtest/pgtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pgtest

import (
"database/sql"
"encoding/json"
"fmt"
"testing"

"github.com/google/uuid"
Expand Down Expand Up @@ -29,9 +31,28 @@ func NewSqlDB(t *testing.T) *sql.DB {
return db
}

func NewSqlxDB(t testing.TB) *sqlx.DB {
func NewEVMScopedDB(t testing.TB) *sqlx.DB {
// hack to scope to evm schema
url := withSchema(defaultDBURL, "evm")
return NewSqlxDB(t, WithURL(url))
}

func withSchema(conn, schema string) string {
return fmt.Sprintf("%s&options=-csearch_path=%s", conn, schema)
}

func NewSqlxDB(t testing.TB, opts ...ConnectionOpt) *sqlx.DB {
testutils.SkipShortDB(t)
db, err := sqlx.Open(string(dialects.TransactionWrappedPostgres), uuid.New().String())
conn := &pg.ConnectionScope{
UUID: uuid.New().String(),
URL: defaultDBURL,
}
for _, opt := range opts {
opt(conn)
}
enc, err := json.Marshal(conn)
require.NoError(t, err)
db, err := sqlx.Open(string(dialects.TransactionWrappedPostgres), string(enc))
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, db.Close()) })

Expand All @@ -40,6 +61,14 @@ func NewSqlxDB(t testing.TB) *sqlx.DB {
return db
}

type ConnectionOpt func(conn *pg.ConnectionScope)

func WithURL(url string) ConnectionOpt {
return func(conn *pg.ConnectionScope) {
conn.URL = url
}
}

func MustExec(t *testing.T, db *sqlx.DB, stmt string, args ...interface{}) {
require.NoError(t, utils.JustError(db.Exec(stmt, args...)))
}
Expand Down
59 changes: 38 additions & 21 deletions core/internal/testutils/pgtest/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"flag"
"fmt"
"io"
Expand All @@ -16,6 +17,7 @@ import (
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
)

Expand All @@ -35,6 +37,7 @@ import (
// store to use the raw DialectPostgres dialect and setup a one-use database.
// See heavyweight.FullTestDB() as a convenience function to help you do this,
// but please use sparingly because as it's name implies, it is expensive.
var defaultDBURL string // global used to create a [NewSqlxDB]
func init() {
testing.Init()
if !flag.Parsed() {
Expand All @@ -44,12 +47,12 @@ func init() {
// -short tests don't need a DB
return
}
dbURL := string(env.DatabaseURL.Get())
if dbURL == "" {
defaultDBURL = string(env.DatabaseURL.Get())
if defaultDBURL == "" {
panic("you must provide a CL_DATABASE_URL environment variable")
}

parsed, err := url.Parse(dbURL)
parsed, err := url.Parse(defaultDBURL)
if err != nil {
panic(err)
}
Expand All @@ -63,8 +66,8 @@ func init() {
}
name := string(dialects.TransactionWrappedPostgres)
sql.Register(name, &txDriver{
dbURL: dbURL,
conns: make(map[string]*conn),
dbURL: defaultDBURL,
conns: make(map[string]map[string]*conn),
})
sqlx.BindDriver(name, sqlx.DOLLAR)
}
Expand All @@ -75,34 +78,47 @@ var _ driver.Conn = &conn{}
// when the Close is called, transaction is rolled back
type txDriver struct {
sync.Mutex
db *sql.DB
conns map[string]*conn
db map[string]*sql.DB // url -> db
conns map[string]map[string]*conn // url -> (uuid -> db) so we can close per url

dbURL string
}

func (d *txDriver) Open(dsn string) (driver.Conn, error) {
// jsonConnectionScope must be a json-encoded [ConnectionScope]. The Open interface requires
// a string
func (d *txDriver) Open(jsonConnectionScope string) (driver.Conn, error) {
d.Lock()
defer d.Unlock()
// Open real db connection if its the first call
var scope pg.ConnectionScope
err := json.Unmarshal([]byte(jsonConnectionScope), &scope)
if err != nil {
return nil, fmt.Errorf("pgtest tx driver failed to parse connection string %s: must be json encoded scope: %w", jsonConnectionScope, err)
}
// initialize dbs if the first call
if d.db == nil {
db, err := sql.Open("pgx", d.dbURL)
d.db = make(map[string]*sql.DB)
}
db, exists := d.db[scope.URL]
if !exists {
db, err = sql.Open("pgx", scope.URL)
if err != nil {
return nil, err
}
d.db = db
d.db[scope.URL] = db
d.conns[scope.URL] = make(map[string]*conn)
}
c, exists := d.conns[dsn]

c, exists := d.conns[scope.URL][scope.UUID]
if !exists || !c.tryOpen() {
tx, err := d.db.Begin()
tx, err := db.Begin()
if err != nil {
return nil, err
}
c = &conn{tx: tx, opened: 1}
c = &conn{tx: tx, opened: 1, scope: scope}
c.removeSelf = func() error {
return d.deleteConn(c)
}
d.conns[dsn] = c
d.conns[scope.URL][scope.UUID] = c
}
return c, nil
}
Expand All @@ -114,22 +130,23 @@ func (d *txDriver) deleteConn(c *conn) error {
d.Lock()
defer d.Unlock()

if d.conns[c.dsn] != c {
if d.conns[c.scope.URL][c.scope.UUID] != c {
return nil // already been replaced
}
delete(d.conns, c.dsn)
if len(d.conns) == 0 && d.db != nil {
if err := d.db.Close(); err != nil {
delete(d.conns[c.scope.URL], c.scope.UUID)
if len(d.conns[c.scope.URL]) == 0 && d.db[c.scope.URL] != nil {
if err := d.db[c.scope.URL].Close(); err != nil {
return err
}
d.db = nil
delete(d.db, c.scope.URL)
}
return nil
}

type conn struct {
sync.Mutex
dsn string
//dsn string
scope pg.ConnectionScope
tx *sql.Tx // tx may be shared by many conns, definitive one lives in the map keyed by DSN on the txDriver. Do not modify from conn
closed bool
opened int
Expand Down
Loading

0 comments on commit 61608c0

Please sign in to comment.