diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go index 94a16f23dfe..d441b71819e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * } } -// InsertUpkeepState is idempotent and sets upkeep state values in db -func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error { +// BatchInsertRecords is idempotent and sets upkeep state values in db +func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) - query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) - VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7) - ON CONFLICT (evm_chain_id, work_id) - DO NOTHING` + if len(state) == 0 { + return nil + } + + type row struct { + EvmChainId *utils.Big + WorkId string + CompletionState uint8 + BlockNumber int64 + InsertedAt time.Time + UpkeepId *utils.Big + IneligibilityReason uint8 + } + + var rows []row + for _, record := range state { + rows = append(rows, row{ + EvmChainId: o.chainID, + WorkId: record.WorkID, + CompletionState: record.CompletionState, + BlockNumber: record.BlockNumber, + InsertedAt: record.InsertedAt, + UpkeepId: record.UpkeepID, + IneligibilityReason: record.IneligibilityReason, + }) + } - return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason) + return q.ExecQNamed(`INSERT INTO evm_upkeep_states +(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES +(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows) } // SelectStatesByWorkIDs searches the data store for stored states for the diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go index c816627e92c..54ca7285dd0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) - inserted := persistedStateRecord{ - UpkeepID: utils.NewBig(big.NewInt(2)), - WorkID: "0x1", - CompletionState: 100, - BlockNumber: 2, - IneligibilityReason: 2, - InsertedAt: time.Now(), + inserted := []persistedStateRecord{ + { + UpkeepID: utils.NewBig(big.NewInt(2)), + WorkID: "0x1", + CompletionState: 100, + BlockNumber: 2, + IneligibilityReason: 2, + InsertedAt: time.Now(), + }, } - err := orm.InsertUpkeepState(inserted) + err := orm.BatchInsertRecords(inserted) require.NoError(t, err, "no error expected from insert") diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 3b37b58d03d..ce1b50de229 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -21,11 +21,13 @@ const ( // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. - GCInterval = 2 * time.Hour + GCInterval = 2 * time.Hour + flushCadence = 30 * time.Second + concurrentBatchCalls = 10 ) type ORM interface { - InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error + BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) DeleteExpired(time.Time, ...pg.QOpt) error } @@ -39,7 +41,9 @@ type UpkeepStateStore interface { } var ( - _ UpkeepStateStore = &upkeepStateStore{} + _ UpkeepStateStore = &upkeepStateStore{} + newTickerFn = time.NewTicker + batchSize = 1000 ) // upkeepStateRecord is a record that we save in a local cache. @@ -66,6 +70,10 @@ type upkeepStateStore struct { mu sync.RWMutex cache map[string]*upkeepStateRecord + pendingRecords []persistedStateRecord + sem chan struct{} + batchSize int + // service values cancel context.CancelFunc } @@ -73,12 +81,15 @@ type upkeepStateStore struct { // NewUpkeepStateStore creates a new state store func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { return &upkeepStateStore{ - orm: orm, - lggr: lggr.Named("UpkeepStateStore"), - cache: map[string]*upkeepStateRecord{}, - scanner: scanner, - retention: CacheExpiration, - cleanCadence: GCInterval, + orm: orm, + lggr: lggr.Named("UpkeepStateStore"), + cache: map[string]*upkeepStateRecord{}, + scanner: scanner, + retention: CacheExpiration, + cleanCadence: GCInterval, + pendingRecords: []persistedStateRecord{}, + sem: make(chan struct{}, concurrentBatchCalls), + batchSize: batchSize, } } @@ -108,9 +119,12 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { { go func(ctx context.Context) { - ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) + ticker := time.NewTicker(u.cleanCadence) defer ticker.Stop() + flushTicker := newTickerFn(utils.WithJitter(flushCadence)) + defer flushTicker.Stop() + for { select { case <-ticker.C: @@ -119,8 +133,11 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } ticker.Reset(utils.WithJitter(u.cleanCadence)) + case <-flushTicker.C: + u.flush(ctx) case <-ctx.Done(): - + u.flush(ctx) + return } } }(ctx) @@ -129,6 +146,33 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { return nil } +func (u *upkeepStateStore) flush(ctx context.Context) { + cloneRecords := make([]persistedStateRecord, len(u.pendingRecords)) + + u.mu.Lock() + copy(cloneRecords, u.pendingRecords) + u.pendingRecords = []persistedStateRecord{} + u.mu.Unlock() + + for i := 0; i < len(cloneRecords); i += u.batchSize { + end := i + u.batchSize + if end > len(cloneRecords) { + end = len(cloneRecords) + } + + batch := cloneRecords[i:end] + + u.sem <- struct{}{} + + go func() { + if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil { + u.lggr.Errorw("error inserting records", "err", err) + } + <-u.sem + }() + } +} + // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { u.mu.Lock() @@ -200,13 +244,15 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string, u.cache[workID] = record - return u.orm.InsertUpkeepState(persistedStateRecord{ + u.pendingRecords = append(u.pendingRecords, persistedStateRecord{ UpkeepID: utils.NewBig(upkeepID), WorkID: record.workID, CompletionState: uint8(record.state), IneligibilityReason: reason, InsertedAt: record.addedAt, - }, pg.WithParentCtx(ctx)) + }) + + return nil } // fetchPerformed fetches all performed logs from the scanner to populate the cache. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index fd2e6464239..9e7ba81e5eb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -220,29 +220,23 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { } tests := []struct { - name string - queryIDs []string - storedValues []storedValue - expected []ocr2keepers.UpkeepState + name string + flushSize int + expectedWrites int + queryIDs []string + storedValues []storedValue + expected []ocr2keepers.UpkeepState }{ { - name: "querying non-stored workIDs on empty db returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - expected: []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - }, - }, - { - name: "querying non-stored workIDs on db with values returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying non-stored workIDs on db with values returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 10, + expectedWrites: 1, storedValues: []storedValue{ - {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, @@ -252,13 +246,15 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying workIDs with non-stored values returns valid results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "storing eligible values is a noop", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, + expectedWrites: 1, storedValues: []storedValue{ - {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(7, "0x3", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(8, "0x44", false, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(9, "0x1", false, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(10, "0x2", false, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(11, "0x3", false, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(12, "0x4", true, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.Ineligible, @@ -268,31 +264,43 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "storing eligible values is a noop", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "provided state on setupkeepstate is currently ignored for eligible check results", + queryIDs: []string{"0x1", "0x2"}, + flushSize: 1, + expectedWrites: 1, storedValues: []storedValue{ - {result: makeTestResult(9, "0x1", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(10, "0x2", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(11, "0x3", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(12, "0x4", true, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ - ocr2keepers.Ineligible, - ocr2keepers.Ineligible, - ocr2keepers.Ineligible, ocr2keepers.UnknownState, + ocr2keepers.Ineligible, }, }, { - name: "provided state on setupkeepstate is currently ignored for eligible check results", - queryIDs: []string{"0x1", "0x2"}, + name: "provided state outside the flush batch isn't registered in the db", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8"}, + flushSize: 3, + expectedWrites: 2, storedValues: []storedValue{ {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, // gets inserted + {result: makeTestResult(15, "0x3", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(16, "0x4", false, 1), state: ocr2keepers.Performed}, // gets inserted + {result: makeTestResult(17, "0x5", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(18, "0x6", false, 1), state: ocr2keepers.Performed}, // gets inserted + {result: makeTestResult(19, "0x7", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(20, "0x8", false, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, }, }, } @@ -301,13 +309,45 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := testutils.Context(t) + tickerCh := make(chan time.Time) + oldNewTickerFn := newTickerFn + oldFlushSize := batchSize + defer func() { + }() + newTickerFn = func(d time.Duration) *time.Ticker { + return &time.Ticker{ + C: tickerCh, + } + } + batchSize = test.flushSize + defer func() { + newTickerFn = oldNewTickerFn + batchSize = oldFlushSize + }() + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + insertFinished := make(chan struct{}, 1) + orm := &wrappedORM{ + BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertRecords(records, opt...) + insertFinished <- struct{}{} + return err + }, + SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { + return realORM.SelectStatesByWorkIDs(strings, opt...) + }, + DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { + return realORM.DeleteExpired(t, opt...) + }, + } scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) + require.NoError(t, store.Start(ctx)) + t.Cleanup(func() { t.Log("cleaning up database") @@ -320,6 +360,13 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error") } + tickerCh <- time.Now() + + // if this test inserts data, wait for the insert to complete before proceeding + for i := 0; i < test.expectedWrites; i++ { + <-insertFinished + } + // empty the cache before doing selects to force a db lookup store.cache = make(map[string]*upkeepStateRecord) @@ -332,10 +379,50 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { observedLogs.TakeAll() require.Equal(t, 0, observedLogs.Len()) + + require.NoError(t, store.Close()) }) } } +func TestUpkeepStateStore_emptyDB(t *testing.T) { + t.Run("querying non-stored workIDs on empty db returns unknown state results", func(t *testing.T) { + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) + chainID := testutils.FixtureChainID + db := pgtest.NewSqlxDB(t) + realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + insertFinished := make(chan struct{}, 1) + orm := &wrappedORM{ + BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertRecords(records, opt...) + insertFinished <- struct{}{} + return err + }, + SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { + return realORM.SelectStatesByWorkIDs(strings, opt...) + }, + DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { + return realORM.DeleteExpired(t, opt...) + }, + } + scanner := &mockScanner{} + store := NewUpkeepStateStore(orm, lggr, scanner) + + states, err := store.SelectByWorkIDs(context.Background(), []string{"0x1", "0x2", "0x3", "0x4"}...) + assert.NoError(t, err) + assert.Equal(t, []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + }, states) + + observedLogs.TakeAll() + + require.Equal(t, 0, observedLogs.Len()) + }) +} + func TestUpkeepStateStore_Upsert(t *testing.T) { db := pgtest.NewSqlxDB(t) ctx := testutils.Context(t) @@ -475,7 +562,7 @@ func (_m *mockORM) setErr(err error) { _m.err = err } -func (_m *mockORM) InsertUpkeepState(state persistedStateRecord, _ ...pg.QOpt) error { +func (_m *mockORM) BatchInsertRecords(state []persistedStateRecord, _ ...pg.QOpt) error { return nil } @@ -498,3 +585,21 @@ func (_m *mockORM) DeleteExpired(tm time.Time, _ ...pg.QOpt) error { return _m.err } + +type wrappedORM struct { + BatchInsertRecordsFn func([]persistedStateRecord, ...pg.QOpt) error + SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) + DeleteExpiredFn func(time.Time, ...pg.QOpt) error +} + +func (o *wrappedORM) BatchInsertRecords(r []persistedStateRecord, q ...pg.QOpt) error { + return o.BatchInsertRecordsFn(r, q...) +} + +func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) { + return o.SelectStatesByWorkIDsFn(ids, q...) +} + +func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error { + return o.DeleteExpiredFn(t, q...) +}