From 231ad073aaf02164029ac8c390ad11eaf23bfce5 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Sep 2023 23:32:26 +0100 Subject: [PATCH] Use a batch insert --- .../ocr2keeper/evm21/upkeepstate/orm.go | 38 +++++++++++++++---- .../ocr2keeper/evm21/upkeepstate/orm_test.go | 18 +++++---- .../ocr2keeper/evm21/upkeepstate/store.go | 6 +-- .../evm21/upkeepstate/store_test.go | 2 +- 4 files changed, 45 insertions(+), 19 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go index 94a16f23dfe..35ba9a541e8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * } } -// InsertUpkeepState is idempotent and sets upkeep state values in db -func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error { +// BatchInsertUpkeepStates is idempotent and sets upkeep state values in db +func (o *orm) BatchInsertUpkeepStates(state []persistedStateRecord, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) - query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) - VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7) - ON CONFLICT (evm_chain_id, work_id) - DO NOTHING` + if len(state) == 0 { + return nil + } + + type row struct { + EvmChainId *utils.Big + WorkId string + CompletionState uint8 + BlockNumber int64 + InsertedAt time.Time + UpkeepId *utils.Big + IneligibilityReason uint8 + } + + var rows []row + for _, record := range state { + rows = append(rows, row{ + EvmChainId: o.chainID, + WorkId: record.WorkID, + CompletionState: record.CompletionState, + BlockNumber: record.BlockNumber, + InsertedAt: record.InsertedAt, + UpkeepId: record.UpkeepID, + IneligibilityReason: record.IneligibilityReason, + }) + } - return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason) + return q.ExecQNamed(`INSERT INTO evm_upkeep_states +(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES +(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows) } // SelectStatesByWorkIDs searches the data store for stored states for the diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go index c816627e92c..90bc6b6ba26 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) - inserted := persistedStateRecord{ - UpkeepID: utils.NewBig(big.NewInt(2)), - WorkID: "0x1", - CompletionState: 100, - BlockNumber: 2, - IneligibilityReason: 2, - InsertedAt: time.Now(), + inserted := []persistedStateRecord{ + { + UpkeepID: utils.NewBig(big.NewInt(2)), + WorkID: "0x1", + CompletionState: 100, + BlockNumber: 2, + IneligibilityReason: 2, + InsertedAt: time.Now(), + }, } - err := orm.InsertUpkeepState(inserted) + err := orm.BatchInsertUpkeepStates(inserted) require.NoError(t, err, "no error expected from insert") diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 3b37b58d03d..c8e399a8a1f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -25,7 +25,7 @@ const ( ) type ORM interface { - InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error + BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) DeleteExpired(time.Time, ...pg.QOpt) error } @@ -200,13 +200,13 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string, u.cache[workID] = record - return u.orm.InsertUpkeepState(persistedStateRecord{ + return u.orm.BatchInsertUpkeepStates([]persistedStateRecord{{ UpkeepID: utils.NewBig(upkeepID), WorkID: record.workID, CompletionState: uint8(record.state), IneligibilityReason: reason, InsertedAt: record.addedAt, - }, pg.WithParentCtx(ctx)) + }}, pg.WithParentCtx(ctx)) } // fetchPerformed fetches all performed logs from the scanner to populate the cache. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index fd2e6464239..28674d7e858 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -475,7 +475,7 @@ func (_m *mockORM) setErr(err error) { _m.err = err } -func (_m *mockORM) InsertUpkeepState(state persistedStateRecord, _ ...pg.QOpt) error { +func (_m *mockORM) BatchInsertUpkeepStates(state []persistedStateRecord, _ ...pg.QOpt) error { return nil }