diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index b9a9be24166..244e8d64f29 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -21,7 +21,9 @@ const ( // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. - GCInterval = 2 * time.Hour + GCInterval = 2 * time.Hour + flushCadence = 30 * time.Second + flushSize = 1000 ) type ORM interface { @@ -39,7 +41,8 @@ type UpkeepStateStore interface { } var ( - _ UpkeepStateStore = &upkeepStateStore{} + _ UpkeepStateStore = &upkeepStateStore{} + newTickerFn = time.NewTicker ) // upkeepStateRecord is a record that we save in a local cache. @@ -67,6 +70,7 @@ type upkeepStateStore struct { cache map[string]*upkeepStateRecord pendingRecords []persistedStateRecord + errCh chan error // service values cancel context.CancelFunc @@ -82,6 +86,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann retention: CacheExpiration, cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, + errCh: make(chan error, 1), } } @@ -114,6 +119,9 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) defer ticker.Stop() + flushTicker := newTickerFn(flushCadence) + defer flushTicker.Stop() + for { select { case <-ticker.C: @@ -122,6 +130,10 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } ticker.Reset(utils.WithJitter(u.cleanCadence)) + case <-flushTicker.C: + u.flush(ctx) + case err := <-u.errCh: + u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): } @@ -132,6 +144,25 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { return nil } +func (u *upkeepStateStore) flush(ctx context.Context) { + batch := len(u.pendingRecords) + + if batch > flushSize { + batch = flushSize + } else if batch == 0 { + return + } + + cloneRecords := u.pendingRecords[:batch] + u.pendingRecords = u.pendingRecords[batch:] + + //go func() { + if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { + //u.errCh <- err + } + //}() +} + // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { u.mu.Lock() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 28674d7e858..e3691f9b8c9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -301,6 +301,17 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := testutils.Context(t) + tickerCh := make(chan time.Time) + oldNewTickerFn := newTickerFn + newTickerFn = func(d time.Duration) *time.Ticker { + return &time.Ticker{ + C: tickerCh, + } + } + defer func() { + newTickerFn = oldNewTickerFn + }() + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) @@ -308,6 +319,8 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) + require.NoError(t, store.Start(ctx)) + t.Cleanup(func() { t.Log("cleaning up database") @@ -320,6 +333,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error") } + tickerCh <- time.Now() + tickerCh <- time.Now() // by the second tick we know the flush has been triggered + // empty the cache before doing selects to force a db lookup store.cache = make(map[string]*upkeepStateRecord)