Skip to content

Commit

Permalink
Use a batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Sep 7, 2023
1 parent 247918e commit 231ad07
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 19 deletions.
38 changes: 31 additions & 7 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *
}
}

// InsertUpkeepState is idempotent and sets upkeep state values in db
func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error {
// BatchInsertUpkeepStates is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertUpkeepStates(state []persistedStateRecord, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)

query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason)
VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7)
ON CONFLICT (evm_chain_id, work_id)
DO NOTHING`
if len(state) == 0 {
return nil
}

type row struct {
EvmChainId *utils.Big
WorkId string
CompletionState uint8
BlockNumber int64
InsertedAt time.Time
UpkeepId *utils.Big
IneligibilityReason uint8
}

var rows []row
for _, record := range state {
rows = append(rows, row{
EvmChainId: o.chainID,
WorkId: record.WorkID,
CompletionState: record.CompletionState,
BlockNumber: record.BlockNumber,
InsertedAt: record.InsertedAt,
UpkeepId: record.UpkeepID,
IneligibilityReason: record.IneligibilityReason,
})
}

return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason)
return q.ExecQNamed(`INSERT INTO evm_upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
}

// SelectStatesByWorkIDs searches the data store for stored states for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

inserted := persistedStateRecord{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
inserted := []persistedStateRecord{
{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
},
}

err := orm.InsertUpkeepState(inserted)
err := orm.BatchInsertUpkeepStates(inserted)

require.NoError(t, err, "no error expected from insert")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

type ORM interface {
InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error
BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpired(time.Time, ...pg.QOpt) error
}
Expand Down Expand Up @@ -200,13 +200,13 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string,

u.cache[workID] = record

return u.orm.InsertUpkeepState(persistedStateRecord{
return u.orm.BatchInsertUpkeepStates([]persistedStateRecord{{
UpkeepID: utils.NewBig(upkeepID),
WorkID: record.workID,
CompletionState: uint8(record.state),
IneligibilityReason: reason,
InsertedAt: record.addedAt,
}, pg.WithParentCtx(ctx))
}}, pg.WithParentCtx(ctx))
}

// fetchPerformed fetches all performed logs from the scanner to populate the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (_m *mockORM) setErr(err error) {
_m.err = err
}

func (_m *mockORM) InsertUpkeepState(state persistedStateRecord, _ ...pg.QOpt) error {
func (_m *mockORM) BatchInsertUpkeepStates(state []persistedStateRecord, _ ...pg.QOpt) error {
return nil
}

Expand Down

0 comments on commit 231ad07

Please sign in to comment.