Skip to content

Commit

Permalink
Synchronous tick inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Sep 7, 2023
1 parent 6e5943b commit aa6811f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
35 changes: 33 additions & 2 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ const (
// CacheExpiration is the amount of time that we keep a record in the cache.
CacheExpiration = 24 * time.Hour
// GCInterval is the amount of time between cache cleanups.
GCInterval = 2 * time.Hour
GCInterval = 2 * time.Hour
flushCadence = 30 * time.Second
flushSize = 1000
)

type ORM interface {
Expand All @@ -39,7 +41,8 @@ type UpkeepStateStore interface {
}

var (
_ UpkeepStateStore = &upkeepStateStore{}
_ UpkeepStateStore = &upkeepStateStore{}
newTickerFn = time.NewTicker
)

// upkeepStateRecord is a record that we save in a local cache.
Expand Down Expand Up @@ -67,6 +70,7 @@ type upkeepStateStore struct {
cache map[string]*upkeepStateRecord

pendingRecords []persistedStateRecord
errCh chan error

// service values
cancel context.CancelFunc
Expand All @@ -82,6 +86,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann
retention: CacheExpiration,
cleanCadence: GCInterval,
pendingRecords: []persistedStateRecord{},
errCh: make(chan error, 1),
}
}

Expand Down Expand Up @@ -114,6 +119,9 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence))
defer ticker.Stop()

flushTicker := newTickerFn(flushCadence)
defer flushTicker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -122,6 +130,10 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
}

ticker.Reset(utils.WithJitter(u.cleanCadence))
case <-flushTicker.C:
u.flush(ctx)
case err := <-u.errCh:
u.lggr.Errorw("error inserting records", "err", err)
case <-ctx.Done():

}
Expand All @@ -132,6 +144,25 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
return nil
}

func (u *upkeepStateStore) flush(ctx context.Context) {
batch := len(u.pendingRecords)

if batch > flushSize {
batch = flushSize
} else if batch == 0 {
return
}

cloneRecords := u.pendingRecords[:batch]
u.pendingRecords = u.pendingRecords[batch:]

//go func() {
if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil {
//u.errCh <- err
}
//}()
}

// Close stops the service of pruning stale data; implements io.Closer
func (u *upkeepStateStore) Close() error {
u.mu.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,26 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := testutils.Context(t)

tickerCh := make(chan time.Time)
oldNewTickerFn := newTickerFn
newTickerFn = func(d time.Duration) *time.Ticker {
return &time.Ticker{
C: tickerCh,
}
}
defer func() {
newTickerFn = oldNewTickerFn
}()

lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
scanner := &mockScanner{}
store := NewUpkeepStateStore(orm, lggr, scanner)

require.NoError(t, store.Start(ctx))

t.Cleanup(func() {
t.Log("cleaning up database")

Expand All @@ -320,6 +333,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) {
require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error")
}

tickerCh <- time.Now()
tickerCh <- time.Now() // by the second tick we know the flush has been triggered

// empty the cache before doing selects to force a db lookup
store.cache = make(map[string]*upkeepStateRecord)

Expand Down

0 comments on commit aa6811f

Please sign in to comment.