Skip to content

Commit

Permalink
core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: us…
Browse files Browse the repository at this point in the history
…e sqlutil instead of pg.QOpts (#12806)
  • Loading branch information
jmank88 authored Apr 15, 2024
1 parent 601c79f commit 9964dc8
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 69 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-socks-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: use sqlutil instead of pg.QOpts #internal
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package upkeepstate

import (
"context"
"math/big"
"time"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type orm struct {
chainID *ubig.Big
q pg.Q
ds sqlutil.DataSource
}

type persistedStateRecord struct {
Expand All @@ -27,17 +26,15 @@ type persistedStateRecord struct {
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *orm {
func NewORM(chainID *big.Int, ds sqlutil.DataSource) *orm {
return &orm{
chainID: ubig.New(chainID),
q: pg.NewQ(db, lggr.Named("ORM"), cfg),
ds: ds,
}
}

// BatchInsertRecords is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)

func (o *orm) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error {
if len(state) == 0 {
return nil
}
Expand Down Expand Up @@ -65,17 +62,16 @@ func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt)
})
}

return q.ExecQNamed(`INSERT INTO evm.upkeep_states
_, err := o.ds.NamedExecContext(ctx, `INSERT INTO evm.upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
return err
}

// SelectStatesByWorkIDs searches the data store for stored states for the
// provided work ids and configured chain id
func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states []persistedStateRecord, err error) {
q := o.q.WithOpts(qopts...)

err = q.Select(&states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at
func (o *orm) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) (states []persistedStateRecord, err error) {
err = o.ds.SelectContext(ctx, &states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at
FROM evm.upkeep_states
WHERE work_id = ANY($1) AND evm_chain_id = $2::NUMERIC`, pq.Array(workIDs), o.chainID)

Expand All @@ -87,9 +83,8 @@ func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states
}

// DeleteExpired prunes stored states older than to the provided time
func (o *orm) DeleteExpired(expired time.Time, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
_, err := q.Exec(`DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID)
func (o *orm) DeleteExpired(ctx context.Context, expired time.Time) error {
_, err := o.ds.ExecContext(ctx, `DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID)

return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func TestInsertSelectDelete(t *testing.T) {
lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
ctx := testutils.Context(t)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
orm := NewORM(chainID, db)

inserted := []persistedStateRecord{
{
Expand All @@ -32,20 +30,20 @@ func TestInsertSelectDelete(t *testing.T) {
},
}

err := orm.BatchInsertRecords(inserted)
err := orm.BatchInsertRecords(ctx, inserted)

require.NoError(t, err, "no error expected from insert")

states, err := orm.SelectStatesByWorkIDs([]string{"0x1"})
states, err := orm.SelectStatesByWorkIDs(ctx, []string{"0x1"})

require.NoError(t, err, "no error expected from select")
require.Len(t, states, 1, "records return should equal records inserted")

err = orm.DeleteExpired(time.Now())
err = orm.DeleteExpired(ctx, time.Now())

assert.NoError(t, err, "no error expected from delete")

states, err = orm.SelectStatesByWorkIDs([]string{"0x1"})
states, err = orm.SelectStatesByWorkIDs(ctx, []string{"0x1"})

require.NoError(t, err, "no error expected from select")
require.Len(t, states, 0, "records return should be empty since records were deleted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink-common/pkg/services"

ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand All @@ -31,9 +31,9 @@ const (
)

type ORM interface {
BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpired(time.Time, ...pg.QOpt) error
BatchInsertRecords(context.Context, []persistedStateRecord) error
SelectStatesByWorkIDs(context.Context, []string) ([]persistedStateRecord, error)
DeleteExpired(context.Context, time.Time) error
}

// UpkeepStateStore is the interface for managing upkeeps final state in a local store.
Expand Down Expand Up @@ -152,7 +152,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) {
u.sem <- struct{}{}

go func() {
if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil {
if err := u.orm.BatchInsertRecords(ctx, batch); err != nil {
u.lggr.Errorw("error inserting records", "err", err)
}
<-u.sem
Expand Down Expand Up @@ -268,7 +268,7 @@ func (u *upkeepStateStore) fetchPerformed(ctx context.Context, workIDs ...string
// fetchFromDB fetches all upkeeps indicated as ineligible from the db to
// populate the cache.
func (u *upkeepStateStore) fetchFromDB(ctx context.Context, workIDs ...string) error {
states, err := u.orm.SelectStatesByWorkIDs(workIDs, pg.WithParentCtx(ctx))
states, err := u.orm.SelectStatesByWorkIDs(ctx, workIDs)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,7 +320,9 @@ func (u *upkeepStateStore) cleanup(ctx context.Context) error {
func (u *upkeepStateStore) cleanDB(ctx context.Context) error {
tm := time.Now().Add(-1 * u.retention)

return u.orm.DeleteExpired(tm, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout())
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute)
defer cancel()
return u.orm.DeleteExpired(ctx, tm)
}

// cleanupCache removes any records from the cache that are older than the TTL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

func TestUpkeepStateStore(t *testing.T) {
Expand Down Expand Up @@ -329,20 +328,16 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
realORM := NewORM(chainID, db)
insertFinished := make(chan struct{}, 1)
orm := &wrappedORM{
BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error {
err := realORM.BatchInsertRecords(records, opt...)
BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error {
err := realORM.BatchInsertRecords(ctx, records)
insertFinished <- struct{}{}
return err
},
SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) {
return realORM.SelectStatesByWorkIDs(strings, opt...)
},
DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error {
return realORM.DeleteExpired(t, opt...)
},
SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs,
DeleteExpiredFn: realORM.DeleteExpired,
}
scanner := &mockScanner{}
store := NewUpkeepStateStore(orm, lggr, scanner)
Expand Down Expand Up @@ -389,20 +384,16 @@ func TestUpkeepStateStore_emptyDB(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
realORM := NewORM(chainID, db)
insertFinished := make(chan struct{}, 1)
orm := &wrappedORM{
BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error {
err := realORM.BatchInsertRecords(records, opt...)
BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error {
err := realORM.BatchInsertRecords(ctx, records)
insertFinished <- struct{}{}
return err
},
SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) {
return realORM.SelectStatesByWorkIDs(strings, opt...)
},
DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error {
return realORM.DeleteExpired(t, opt...)
},
SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs,
DeleteExpiredFn: realORM.DeleteExpired,
}
scanner := &mockScanner{}
store := NewUpkeepStateStore(orm, lggr, scanner)
Expand All @@ -427,7 +418,7 @@ func TestUpkeepStateStore_Upsert(t *testing.T) {
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
chainID := testutils.FixtureChainID
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
orm := NewORM(chainID, db)

store := NewUpkeepStateStore(orm, lggr, &mockScanner{})

Expand Down Expand Up @@ -560,11 +551,11 @@ func (_m *mockORM) setErr(err error) {
_m.err = err
}

func (_m *mockORM) BatchInsertRecords(state []persistedStateRecord, opts ...pg.QOpt) error {
func (_m *mockORM) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error {
return nil
}

func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]persistedStateRecord, error) {
func (_m *mockORM) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) ([]persistedStateRecord, error) {
_m.lock.Lock()
defer _m.lock.Unlock()

Expand All @@ -574,7 +565,7 @@ func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]p
return res, _m.err
}

func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error {
func (_m *mockORM) DeleteExpired(ctx context.Context, tm time.Time) error {
_m.lock.Lock()
defer _m.lock.Unlock()

Expand All @@ -585,19 +576,19 @@ func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error {
}

type wrappedORM struct {
BatchInsertRecordsFn func([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpiredFn func(time.Time, ...pg.QOpt) error
BatchInsertRecordsFn func(context.Context, []persistedStateRecord) error
SelectStatesByWorkIDsFn func(context.Context, []string) ([]persistedStateRecord, error)
DeleteExpiredFn func(context.Context, time.Time) error
}

func (o *wrappedORM) BatchInsertRecords(r []persistedStateRecord, q ...pg.QOpt) error {
return o.BatchInsertRecordsFn(r, q...)
func (o *wrappedORM) BatchInsertRecords(ctx context.Context, r []persistedStateRecord) error {
return o.BatchInsertRecordsFn(ctx, r)
}

func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) {
return o.SelectStatesByWorkIDsFn(ids, q...)
func (o *wrappedORM) SelectStatesByWorkIDs(ctx context.Context, ids []string) ([]persistedStateRecord, error) {
return o.SelectStatesByWorkIDsFn(ctx, ids)
}

func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error {
return o.DeleteExpiredFn(t, q...)
func (o *wrappedORM) DeleteExpired(ctx context.Context, t time.Time) error {
return o.DeleteExpiredFn(ctx, t)
}
10 changes: 5 additions & 5 deletions core/services/relay/evm/ocr2keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"

"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/automation"

Expand Down Expand Up @@ -65,17 +65,17 @@ type OCR2KeeperRelayer interface {

// ocr2keeperRelayer is the relayer with added DKG and OCR2Keeper provider functions.
type ocr2keeperRelayer struct {
db *sqlx.DB
ds sqlutil.DataSource
chain legacyevm.Chain
lggr logger.Logger
ethKeystore keystore.Eth
dbCfg pg.QConfig
}

// NewOCR2KeeperRelayer is the constructor of ocr2keeperRelayer
func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer {
func NewOCR2KeeperRelayer(ds sqlutil.DataSource, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer {
return &ocr2keeperRelayer{
db: db,
ds: ds,
chain: chain,
lggr: lggr,
ethKeystore: ethKeystore,
Expand Down Expand Up @@ -126,7 +126,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p

finalityDepth := client.Config().EVM().FinalityDepth()

orm := upkeepstate.NewORM(client.ID(), r.db, r.lggr, r.dbCfg)
orm := upkeepstate.NewORM(client.ID(), r.ds)
scanner := upkeepstate.NewPerformedEventsScanner(r.lggr, client.LogPoller(), addr, finalityDepth)
services.upkeepStateStore = upkeepstate.NewUpkeepStateStore(orm, r.lggr, scanner)

Expand Down

0 comments on commit 9964dc8

Please sign in to comment.