diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 244e8d64f29..3344a0caf79 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -23,7 +23,6 @@ const ( // GCInterval is the amount of time between cache cleanups. GCInterval = 2 * time.Hour flushCadence = 30 * time.Second - flushSize = 1000 ) type ORM interface { @@ -43,6 +42,7 @@ type UpkeepStateStore interface { var ( _ UpkeepStateStore = &upkeepStateStore{} newTickerFn = time.NewTicker + flushSize = 1000 ) // upkeepStateRecord is a record that we save in a local cache. @@ -71,7 +71,7 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord errCh chan error - + sem chan struct{} // service values cancel context.CancelFunc } @@ -87,6 +87,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, errCh: make(chan error, 1), + sem: make(chan struct{}, 10), } } @@ -135,7 +136,8 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { case err := <-u.errCh: u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): - + u.flush(ctx) + return } } }(ctx) @@ -145,6 +147,11 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } func (u *upkeepStateStore) flush(ctx context.Context) { + u.sem <- struct{}{} + + u.mu.Lock() + defer u.mu.Unlock() + batch := len(u.pendingRecords) if batch > flushSize { @@ -156,11 +163,12 @@ func (u *upkeepStateStore) flush(ctx context.Context) { cloneRecords := u.pendingRecords[:batch] u.pendingRecords = u.pendingRecords[batch:] - //go func() { - if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { - //u.errCh <- err - } - //}() + go func() { + if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { + u.errCh <- err + } + <-u.sem + }() } // Close stops the service of pruning stale data; implements io.Closer diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index e3691f9b8c9..76ae349c1db 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -221,13 +221,15 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tests := []struct { name string + flushSize int queryIDs []string storedValues []storedValue expected []ocr2keepers.UpkeepState }{ { - name: "querying non-stored workIDs on empty db returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying non-stored workIDs on empty db returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 1, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, ocr2keepers.UnknownState, @@ -236,8 +238,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying non-stored workIDs on db with values returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying non-stored workIDs on db with values returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 1, storedValues: []storedValue{ {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, @@ -252,8 +255,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying workIDs with non-stored values returns valid results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying workIDs with non-stored values returns valid results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, storedValues: []storedValue{ {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, @@ -268,8 +272,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "storing eligible values is a noop", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "storing eligible values is a noop", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, storedValues: []storedValue{ {result: makeTestResult(9, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(10, "0x2", false, 1), state: ocr2keepers.Ineligible}, @@ -284,8 +289,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "provided state on setupkeepstate is currently ignored for eligible check results", - queryIDs: []string{"0x1", "0x2"}, + name: "provided state on setupkeepstate is currently ignored for eligible check results", + queryIDs: []string{"0x1", "0x2"}, + flushSize: 1, storedValues: []storedValue{ {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, @@ -295,6 +301,31 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.Ineligible, }, }, + { + name: "provided state outside the flush batchg isn't registered in the db", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8"}, + flushSize: 3, + storedValues: []storedValue{ + {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(15, "0x3", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(16, "0x4", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(17, "0x5", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(18, "0x6", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(19, "0x7", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(20, "0x8", false, 1), state: ocr2keepers.Performed}, + }, + expected: []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, // unknown because not flushed to db + ocr2keepers.UnknownState, // unknown because not flushed to db + }, + }, } for _, test := range tests { @@ -303,19 +334,38 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tickerCh := make(chan time.Time) oldNewTickerFn := newTickerFn + oldFlushSize := flushSize + defer func() { + }() newTickerFn = func(d time.Duration) *time.Ticker { return &time.Ticker{ C: tickerCh, } } + flushSize = test.flushSize defer func() { newTickerFn = oldNewTickerFn + flushSize = oldFlushSize }() lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + insertFinished := make(chan struct{}, 1) + orm := &wrappedORM{ + BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertUpkeepStates(records, opt...) + insertFinished <- struct{}{} + return err + }, + SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { + return realORM.SelectStatesByWorkIDs(strings, opt...) + }, + DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { + return realORM.DeleteExpired(t, opt...) + }, + } scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) @@ -334,7 +384,11 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { } tickerCh <- time.Now() - tickerCh <- time.Now() // by the second tick we know the flush has been triggered + + // if this test inserts data, wait for the insert to complete before proceeding + if len(test.storedValues) > 0 { + <-insertFinished + } // empty the cache before doing selects to force a db lookup store.cache = make(map[string]*upkeepStateRecord) @@ -514,3 +568,21 @@ func (_m *mockORM) DeleteExpired(tm time.Time, _ ...pg.QOpt) error { return _m.err } + +type wrappedORM struct { + BatchInsertUpkeepStatesFn func([]persistedStateRecord, ...pg.QOpt) error + SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) + DeleteExpiredFn func(time.Time, ...pg.QOpt) error +} + +func (o *wrappedORM) BatchInsertUpkeepStates(r []persistedStateRecord, q ...pg.QOpt) error { + return o.BatchInsertUpkeepStatesFn(r, q...) +} + +func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) { + return o.SelectStatesByWorkIDsFn(ids, q...) +} + +func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error { + return o.DeleteExpiredFn(t, q...) +}