From 9964dc82e591f8653adb06f0b149a16e0b6cea40 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 15 Apr 2024 06:29:43 -0500 Subject: [PATCH] core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: use sqlutil instead of pg.QOpts (#12806) --- .changeset/poor-socks-travel.md | 5 ++ .../evmregistry/v21/upkeepstate/orm.go | 29 ++++------ .../evmregistry/v21/upkeepstate/orm_test.go | 14 ++--- .../evmregistry/v21/upkeepstate/store.go | 16 +++--- .../evmregistry/v21/upkeepstate/store_test.go | 55 ++++++++----------- core/services/relay/evm/ocr2keeper.go | 10 ++-- 6 files changed, 60 insertions(+), 69 deletions(-) create mode 100644 .changeset/poor-socks-travel.md diff --git a/.changeset/poor-socks-travel.md b/.changeset/poor-socks-travel.md new file mode 100644 index 00000000000..88986845095 --- /dev/null +++ b/.changeset/poor-socks-travel.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: use sqlutil instead of pg.QOpts #internal diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm.go index a5bd738de4c..3d8a30f81cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm.go @@ -1,20 +1,19 @@ package upkeepstate import ( + "context" "math/big" "time" - "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) type orm struct { chainID *ubig.Big - q pg.Q + ds sqlutil.DataSource } type persistedStateRecord struct { @@ -27,17 +26,15 @@ type persistedStateRecord struct { } // NewORM creates an ORM scoped to chainID. -func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *orm { +func NewORM(chainID *big.Int, ds sqlutil.DataSource) *orm { return &orm{ chainID: ubig.New(chainID), - q: pg.NewQ(db, lggr.Named("ORM"), cfg), + ds: ds, } } // BatchInsertRecords is idempotent and sets upkeep state values in db -func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - +func (o *orm) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error { if len(state) == 0 { return nil } @@ -65,17 +62,16 @@ func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) }) } - return q.ExecQNamed(`INSERT INTO evm.upkeep_states + _, err := o.ds.NamedExecContext(ctx, `INSERT INTO evm.upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES (:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows) + return err } // SelectStatesByWorkIDs searches the data store for stored states for the // provided work ids and configured chain id -func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states []persistedStateRecord, err error) { - q := o.q.WithOpts(qopts...) - - err = q.Select(&states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at +func (o *orm) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) (states []persistedStateRecord, err error) { + err = o.ds.SelectContext(ctx, &states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at FROM evm.upkeep_states WHERE work_id = ANY($1) AND evm_chain_id = $2::NUMERIC`, pq.Array(workIDs), o.chainID) @@ -87,9 +83,8 @@ func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states } // DeleteExpired prunes stored states older than to the provided time -func (o *orm) DeleteExpired(expired time.Time, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - _, err := q.Exec(`DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID) +func (o *orm) DeleteExpired(ctx context.Context, expired time.Time) error { + _, err := o.ds.ExecContext(ctx, `DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID) return err } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm_test.go index bfd131b5055..894e3b0ef33 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/orm_test.go @@ -7,19 +7,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" ) func TestInsertSelectDelete(t *testing.T) { - lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel) + ctx := testutils.Context(t) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + orm := NewORM(chainID, db) inserted := []persistedStateRecord{ { @@ -32,20 +30,20 @@ func TestInsertSelectDelete(t *testing.T) { }, } - err := orm.BatchInsertRecords(inserted) + err := orm.BatchInsertRecords(ctx, inserted) require.NoError(t, err, "no error expected from insert") - states, err := orm.SelectStatesByWorkIDs([]string{"0x1"}) + states, err := orm.SelectStatesByWorkIDs(ctx, []string{"0x1"}) require.NoError(t, err, "no error expected from select") require.Len(t, states, 1, "records return should equal records inserted") - err = orm.DeleteExpired(time.Now()) + err = orm.DeleteExpired(ctx, time.Now()) assert.NoError(t, err, "no error expected from delete") - states, err = orm.SelectStatesByWorkIDs([]string{"0x1"}) + states, err = orm.SelectStatesByWorkIDs(ctx, []string{"0x1"}) require.NoError(t, err, "no error expected from select") require.Len(t, states, 0, "records return should be empty since records were deleted") diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index 9410374d7ca..bf7a62aad48 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -15,7 +16,6 @@ import ( ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -31,9 +31,9 @@ const ( ) type ORM interface { - BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error - SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) - DeleteExpired(time.Time, ...pg.QOpt) error + BatchInsertRecords(context.Context, []persistedStateRecord) error + SelectStatesByWorkIDs(context.Context, []string) ([]persistedStateRecord, error) + DeleteExpired(context.Context, time.Time) error } // UpkeepStateStore is the interface for managing upkeeps final state in a local store. @@ -152,7 +152,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) { u.sem <- struct{}{} go func() { - if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil { + if err := u.orm.BatchInsertRecords(ctx, batch); err != nil { u.lggr.Errorw("error inserting records", "err", err) } <-u.sem @@ -268,7 +268,7 @@ func (u *upkeepStateStore) fetchPerformed(ctx context.Context, workIDs ...string // fetchFromDB fetches all upkeeps indicated as ineligible from the db to // populate the cache. func (u *upkeepStateStore) fetchFromDB(ctx context.Context, workIDs ...string) error { - states, err := u.orm.SelectStatesByWorkIDs(workIDs, pg.WithParentCtx(ctx)) + states, err := u.orm.SelectStatesByWorkIDs(ctx, workIDs) if err != nil { return err } @@ -320,7 +320,9 @@ func (u *upkeepStateStore) cleanup(ctx context.Context) error { func (u *upkeepStateStore) cleanDB(ctx context.Context) error { tm := time.Now().Add(-1 * u.retention) - return u.orm.DeleteExpired(tm, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()) + ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute) + defer cancel() + return u.orm.DeleteExpired(ctx, tm) } // cleanupCache removes any records from the cache that are older than the TTL. diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store_test.go index 3912e2a99c6..eae92aeca35 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store_test.go @@ -18,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) func TestUpkeepStateStore(t *testing.T) { @@ -329,20 +328,16 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + realORM := NewORM(chainID, db) insertFinished := make(chan struct{}, 1) orm := &wrappedORM{ - BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { - err := realORM.BatchInsertRecords(records, opt...) + BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error { + err := realORM.BatchInsertRecords(ctx, records) insertFinished <- struct{}{} return err }, - SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { - return realORM.SelectStatesByWorkIDs(strings, opt...) - }, - DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { - return realORM.DeleteExpired(t, opt...) - }, + SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs, + DeleteExpiredFn: realORM.DeleteExpired, } scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) @@ -389,20 +384,16 @@ func TestUpkeepStateStore_emptyDB(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + realORM := NewORM(chainID, db) insertFinished := make(chan struct{}, 1) orm := &wrappedORM{ - BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { - err := realORM.BatchInsertRecords(records, opt...) + BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error { + err := realORM.BatchInsertRecords(ctx, records) insertFinished <- struct{}{} return err }, - SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { - return realORM.SelectStatesByWorkIDs(strings, opt...) - }, - DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { - return realORM.DeleteExpired(t, opt...) - }, + SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs, + DeleteExpiredFn: realORM.DeleteExpired, } scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) @@ -427,7 +418,7 @@ func TestUpkeepStateStore_Upsert(t *testing.T) { ctx := testutils.Context(t) lggr := logger.TestLogger(t) chainID := testutils.FixtureChainID - orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + orm := NewORM(chainID, db) store := NewUpkeepStateStore(orm, lggr, &mockScanner{}) @@ -560,11 +551,11 @@ func (_m *mockORM) setErr(err error) { _m.err = err } -func (_m *mockORM) BatchInsertRecords(state []persistedStateRecord, opts ...pg.QOpt) error { +func (_m *mockORM) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error { return nil } -func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]persistedStateRecord, error) { +func (_m *mockORM) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) ([]persistedStateRecord, error) { _m.lock.Lock() defer _m.lock.Unlock() @@ -574,7 +565,7 @@ func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]p return res, _m.err } -func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error { +func (_m *mockORM) DeleteExpired(ctx context.Context, tm time.Time) error { _m.lock.Lock() defer _m.lock.Unlock() @@ -585,19 +576,19 @@ func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error { } type wrappedORM struct { - BatchInsertRecordsFn func([]persistedStateRecord, ...pg.QOpt) error - SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) - DeleteExpiredFn func(time.Time, ...pg.QOpt) error + BatchInsertRecordsFn func(context.Context, []persistedStateRecord) error + SelectStatesByWorkIDsFn func(context.Context, []string) ([]persistedStateRecord, error) + DeleteExpiredFn func(context.Context, time.Time) error } -func (o *wrappedORM) BatchInsertRecords(r []persistedStateRecord, q ...pg.QOpt) error { - return o.BatchInsertRecordsFn(r, q...) +func (o *wrappedORM) BatchInsertRecords(ctx context.Context, r []persistedStateRecord) error { + return o.BatchInsertRecordsFn(ctx, r) } -func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) { - return o.SelectStatesByWorkIDsFn(ids, q...) +func (o *wrappedORM) SelectStatesByWorkIDs(ctx context.Context, ids []string) ([]persistedStateRecord, error) { + return o.SelectStatesByWorkIDsFn(ctx, ids) } -func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error { - return o.DeleteExpiredFn(t, q...) +func (o *wrappedORM) DeleteExpired(ctx context.Context, t time.Time) error { + return o.DeleteExpiredFn(ctx, t) } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 742bab3a696..0dd971123c6 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" @@ -14,6 +13,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/automation" @@ -65,7 +65,7 @@ type OCR2KeeperRelayer interface { // ocr2keeperRelayer is the relayer with added DKG and OCR2Keeper provider functions. type ocr2keeperRelayer struct { - db *sqlx.DB + ds sqlutil.DataSource chain legacyevm.Chain lggr logger.Logger ethKeystore keystore.Eth @@ -73,9 +73,9 @@ type ocr2keeperRelayer struct { } // NewOCR2KeeperRelayer is the constructor of ocr2keeperRelayer -func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer { +func NewOCR2KeeperRelayer(ds sqlutil.DataSource, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer { return &ocr2keeperRelayer{ - db: db, + ds: ds, chain: chain, lggr: lggr, ethKeystore: ethKeystore, @@ -126,7 +126,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p finalityDepth := client.Config().EVM().FinalityDepth() - orm := upkeepstate.NewORM(client.ID(), r.db, r.lggr, r.dbCfg) + orm := upkeepstate.NewORM(client.ID(), r.ds) scanner := upkeepstate.NewPerformedEventsScanner(r.lggr, client.LogPoller(), addr, finalityDepth) services.upkeepStateStore = upkeepstate.NewUpkeepStateStore(orm, r.lggr, scanner)