From 231ad073aaf02164029ac8c390ad11eaf23bfce5 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Sep 2023 23:32:26 +0100 Subject: [PATCH 01/18] Use a batch insert --- .../ocr2keeper/evm21/upkeepstate/orm.go | 38 +++++++++++++++---- .../ocr2keeper/evm21/upkeepstate/orm_test.go | 18 +++++---- .../ocr2keeper/evm21/upkeepstate/store.go | 6 +-- .../evm21/upkeepstate/store_test.go | 2 +- 4 files changed, 45 insertions(+), 19 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go index 94a16f23dfe..35ba9a541e8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * } } -// InsertUpkeepState is idempotent and sets upkeep state values in db -func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error { +// BatchInsertUpkeepStates is idempotent and sets upkeep state values in db +func (o *orm) BatchInsertUpkeepStates(state []persistedStateRecord, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) - query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) - VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7) - ON CONFLICT (evm_chain_id, work_id) - DO NOTHING` + if len(state) == 0 { + return nil + } + + type row struct { + EvmChainId *utils.Big + WorkId string + CompletionState uint8 + BlockNumber int64 + InsertedAt time.Time + UpkeepId *utils.Big + IneligibilityReason uint8 + } + + var rows []row + for _, record := range state { + rows = append(rows, row{ + EvmChainId: o.chainID, + WorkId: record.WorkID, + CompletionState: record.CompletionState, + BlockNumber: record.BlockNumber, + InsertedAt: record.InsertedAt, + UpkeepId: record.UpkeepID, + IneligibilityReason: record.IneligibilityReason, + }) + } - return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason) + return q.ExecQNamed(`INSERT INTO evm_upkeep_states +(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES +(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows) } // SelectStatesByWorkIDs searches the data store for stored states for the diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go index c816627e92c..90bc6b6ba26 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) - inserted := persistedStateRecord{ - UpkeepID: utils.NewBig(big.NewInt(2)), - WorkID: "0x1", - CompletionState: 100, - BlockNumber: 2, - IneligibilityReason: 2, - InsertedAt: time.Now(), + inserted := []persistedStateRecord{ + { + UpkeepID: utils.NewBig(big.NewInt(2)), + WorkID: "0x1", + CompletionState: 100, + BlockNumber: 2, + IneligibilityReason: 2, + InsertedAt: time.Now(), + }, } - err := orm.InsertUpkeepState(inserted) + err := orm.BatchInsertUpkeepStates(inserted) require.NoError(t, err, "no error expected from insert") diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 3b37b58d03d..c8e399a8a1f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -25,7 +25,7 @@ const ( ) type ORM interface { - InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error + BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) DeleteExpired(time.Time, ...pg.QOpt) error } @@ -200,13 +200,13 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string, u.cache[workID] = record - return u.orm.InsertUpkeepState(persistedStateRecord{ + return u.orm.BatchInsertUpkeepStates([]persistedStateRecord{{ UpkeepID: utils.NewBig(upkeepID), WorkID: record.workID, CompletionState: uint8(record.state), IneligibilityReason: reason, InsertedAt: record.addedAt, - }, pg.WithParentCtx(ctx)) + }}, pg.WithParentCtx(ctx)) } // fetchPerformed fetches all performed logs from the scanner to populate the cache. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index fd2e6464239..28674d7e858 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -475,7 +475,7 @@ func (_m *mockORM) setErr(err error) { _m.err = err } -func (_m *mockORM) InsertUpkeepState(state persistedStateRecord, _ ...pg.QOpt) error { +func (_m *mockORM) BatchInsertUpkeepStates(state []persistedStateRecord, _ ...pg.QOpt) error { return nil } From 6e5943b1c8b55ef3d67863b9e8b0288953971247 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Sep 2023 23:42:45 +0100 Subject: [PATCH 02/18] No longer insert records, store them for bulk insert, fail tests --- .../ocr2keeper/evm21/upkeepstate/store.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index c8e399a8a1f..b9a9be24166 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -66,6 +66,8 @@ type upkeepStateStore struct { mu sync.RWMutex cache map[string]*upkeepStateRecord + pendingRecords []persistedStateRecord + // service values cancel context.CancelFunc } @@ -73,12 +75,13 @@ type upkeepStateStore struct { // NewUpkeepStateStore creates a new state store func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { return &upkeepStateStore{ - orm: orm, - lggr: lggr.Named("UpkeepStateStore"), - cache: map[string]*upkeepStateRecord{}, - scanner: scanner, - retention: CacheExpiration, - cleanCadence: GCInterval, + orm: orm, + lggr: lggr.Named("UpkeepStateStore"), + cache: map[string]*upkeepStateRecord{}, + scanner: scanner, + retention: CacheExpiration, + cleanCadence: GCInterval, + pendingRecords: []persistedStateRecord{}, } } @@ -200,13 +203,15 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string, u.cache[workID] = record - return u.orm.BatchInsertUpkeepStates([]persistedStateRecord{{ + u.pendingRecords = append(u.pendingRecords, persistedStateRecord{ UpkeepID: utils.NewBig(upkeepID), WorkID: record.workID, CompletionState: uint8(record.state), IneligibilityReason: reason, InsertedAt: record.addedAt, - }}, pg.WithParentCtx(ctx)) + }) + + return nil } // fetchPerformed fetches all performed logs from the scanner to populate the cache. From aa6811f7e3fbd899e4c874ff1122bf6a8c56510f Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 00:11:50 +0100 Subject: [PATCH 03/18] Synchronous tick inserts --- .../ocr2keeper/evm21/upkeepstate/store.go | 35 +++++++++++++++++-- .../evm21/upkeepstate/store_test.go | 16 +++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index b9a9be24166..244e8d64f29 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -21,7 +21,9 @@ const ( // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. - GCInterval = 2 * time.Hour + GCInterval = 2 * time.Hour + flushCadence = 30 * time.Second + flushSize = 1000 ) type ORM interface { @@ -39,7 +41,8 @@ type UpkeepStateStore interface { } var ( - _ UpkeepStateStore = &upkeepStateStore{} + _ UpkeepStateStore = &upkeepStateStore{} + newTickerFn = time.NewTicker ) // upkeepStateRecord is a record that we save in a local cache. @@ -67,6 +70,7 @@ type upkeepStateStore struct { cache map[string]*upkeepStateRecord pendingRecords []persistedStateRecord + errCh chan error // service values cancel context.CancelFunc @@ -82,6 +86,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann retention: CacheExpiration, cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, + errCh: make(chan error, 1), } } @@ -114,6 +119,9 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) defer ticker.Stop() + flushTicker := newTickerFn(flushCadence) + defer flushTicker.Stop() + for { select { case <-ticker.C: @@ -122,6 +130,10 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } ticker.Reset(utils.WithJitter(u.cleanCadence)) + case <-flushTicker.C: + u.flush(ctx) + case err := <-u.errCh: + u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): } @@ -132,6 +144,25 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { return nil } +func (u *upkeepStateStore) flush(ctx context.Context) { + batch := len(u.pendingRecords) + + if batch > flushSize { + batch = flushSize + } else if batch == 0 { + return + } + + cloneRecords := u.pendingRecords[:batch] + u.pendingRecords = u.pendingRecords[batch:] + + //go func() { + if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { + //u.errCh <- err + } + //}() +} + // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { u.mu.Lock() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 28674d7e858..e3691f9b8c9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -301,6 +301,17 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := testutils.Context(t) + tickerCh := make(chan time.Time) + oldNewTickerFn := newTickerFn + newTickerFn = func(d time.Duration) *time.Ticker { + return &time.Ticker{ + C: tickerCh, + } + } + defer func() { + newTickerFn = oldNewTickerFn + }() + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) @@ -308,6 +319,8 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) + require.NoError(t, store.Start(ctx)) + t.Cleanup(func() { t.Log("cleaning up database") @@ -320,6 +333,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error") } + tickerCh <- time.Now() + tickerCh <- time.Now() // by the second tick we know the flush has been triggered + // empty the cache before doing selects to force a db lookup store.cache = make(map[string]*upkeepStateRecord) From 36fa612e076bc64bce2c08f0eb740fdf7d57796c Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 03:03:35 +0100 Subject: [PATCH 04/18] Insert on a tick --- .../ocr2keeper/evm21/upkeepstate/store.go | 24 +++-- .../evm21/upkeepstate/store_test.go | 96 ++++++++++++++++--- 2 files changed, 100 insertions(+), 20 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 244e8d64f29..3344a0caf79 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -23,7 +23,6 @@ const ( // GCInterval is the amount of time between cache cleanups. GCInterval = 2 * time.Hour flushCadence = 30 * time.Second - flushSize = 1000 ) type ORM interface { @@ -43,6 +42,7 @@ type UpkeepStateStore interface { var ( _ UpkeepStateStore = &upkeepStateStore{} newTickerFn = time.NewTicker + flushSize = 1000 ) // upkeepStateRecord is a record that we save in a local cache. @@ -71,7 +71,7 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord errCh chan error - + sem chan struct{} // service values cancel context.CancelFunc } @@ -87,6 +87,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, errCh: make(chan error, 1), + sem: make(chan struct{}, 10), } } @@ -135,7 +136,8 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { case err := <-u.errCh: u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): - + u.flush(ctx) + return } } }(ctx) @@ -145,6 +147,11 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } func (u *upkeepStateStore) flush(ctx context.Context) { + u.sem <- struct{}{} + + u.mu.Lock() + defer u.mu.Unlock() + batch := len(u.pendingRecords) if batch > flushSize { @@ -156,11 +163,12 @@ func (u *upkeepStateStore) flush(ctx context.Context) { cloneRecords := u.pendingRecords[:batch] u.pendingRecords = u.pendingRecords[batch:] - //go func() { - if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { - //u.errCh <- err - } - //}() + go func() { + if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { + u.errCh <- err + } + <-u.sem + }() } // Close stops the service of pruning stale data; implements io.Closer diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index e3691f9b8c9..76ae349c1db 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -221,13 +221,15 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tests := []struct { name string + flushSize int queryIDs []string storedValues []storedValue expected []ocr2keepers.UpkeepState }{ { - name: "querying non-stored workIDs on empty db returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying non-stored workIDs on empty db returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 1, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, ocr2keepers.UnknownState, @@ -236,8 +238,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying non-stored workIDs on db with values returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying non-stored workIDs on db with values returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 1, storedValues: []storedValue{ {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, @@ -252,8 +255,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying workIDs with non-stored values returns valid results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "querying workIDs with non-stored values returns valid results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, storedValues: []storedValue{ {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, @@ -268,8 +272,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "storing eligible values is a noop", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + name: "storing eligible values is a noop", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, storedValues: []storedValue{ {result: makeTestResult(9, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(10, "0x2", false, 1), state: ocr2keepers.Ineligible}, @@ -284,8 +289,9 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "provided state on setupkeepstate is currently ignored for eligible check results", - queryIDs: []string{"0x1", "0x2"}, + name: "provided state on setupkeepstate is currently ignored for eligible check results", + queryIDs: []string{"0x1", "0x2"}, + flushSize: 1, storedValues: []storedValue{ {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, @@ -295,6 +301,31 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.Ineligible, }, }, + { + name: "provided state outside the flush batchg isn't registered in the db", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8"}, + flushSize: 3, + storedValues: []storedValue{ + {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(15, "0x3", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(16, "0x4", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(17, "0x5", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(18, "0x6", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(19, "0x7", true, 1), state: ocr2keepers.Ineligible}, + {result: makeTestResult(20, "0x8", false, 1), state: ocr2keepers.Performed}, + }, + expected: []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, + ocr2keepers.UnknownState, // unknown because not flushed to db + ocr2keepers.UnknownState, // unknown because not flushed to db + }, + }, } for _, test := range tests { @@ -303,19 +334,38 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tickerCh := make(chan time.Time) oldNewTickerFn := newTickerFn + oldFlushSize := flushSize + defer func() { + }() newTickerFn = func(d time.Duration) *time.Ticker { return &time.Ticker{ C: tickerCh, } } + flushSize = test.flushSize defer func() { newTickerFn = oldNewTickerFn + flushSize = oldFlushSize }() lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) - orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + insertFinished := make(chan struct{}, 1) + orm := &wrappedORM{ + BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertUpkeepStates(records, opt...) + insertFinished <- struct{}{} + return err + }, + SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { + return realORM.SelectStatesByWorkIDs(strings, opt...) + }, + DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { + return realORM.DeleteExpired(t, opt...) + }, + } scanner := &mockScanner{} store := NewUpkeepStateStore(orm, lggr, scanner) @@ -334,7 +384,11 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { } tickerCh <- time.Now() - tickerCh <- time.Now() // by the second tick we know the flush has been triggered + + // if this test inserts data, wait for the insert to complete before proceeding + if len(test.storedValues) > 0 { + <-insertFinished + } // empty the cache before doing selects to force a db lookup store.cache = make(map[string]*upkeepStateRecord) @@ -514,3 +568,21 @@ func (_m *mockORM) DeleteExpired(tm time.Time, _ ...pg.QOpt) error { return _m.err } + +type wrappedORM struct { + BatchInsertUpkeepStatesFn func([]persistedStateRecord, ...pg.QOpt) error + SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) + DeleteExpiredFn func(time.Time, ...pg.QOpt) error +} + +func (o *wrappedORM) BatchInsertUpkeepStates(r []persistedStateRecord, q ...pg.QOpt) error { + return o.BatchInsertUpkeepStatesFn(r, q...) +} + +func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) { + return o.SelectStatesByWorkIDsFn(ids, q...) +} + +func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error { + return o.DeleteExpiredFn(t, q...) +} From 6982c88d154c081149a8483ee7edc78f2a5d635d Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 12:37:52 +0100 Subject: [PATCH 05/18] Data race --- .../ocr2keeper/evm21/upkeepstate/store.go | 34 ++++++++++--------- .../evm21/upkeepstate/store_test.go | 6 ++-- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 3344a0caf79..07f11dbd1eb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -147,28 +147,30 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { } func (u *upkeepStateStore) flush(ctx context.Context) { - u.sem <- struct{}{} + cloneRecords := make([]persistedStateRecord, len(u.pendingRecords)) u.mu.Lock() - defer u.mu.Unlock() + copy(cloneRecords, u.pendingRecords) + u.pendingRecords = []persistedStateRecord{} + u.mu.Unlock() - batch := len(u.pendingRecords) + for i := 0; i < len(cloneRecords); i += flushSize { + end := i + flushSize + if end > len(cloneRecords) { + end = len(cloneRecords) + } - if batch > flushSize { - batch = flushSize - } else if batch == 0 { - return - } + batch := cloneRecords[i:end] - cloneRecords := u.pendingRecords[:batch] - u.pendingRecords = u.pendingRecords[batch:] + u.sem <- struct{}{} - go func() { - if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { - u.errCh <- err - } - <-u.sem - }() + go func() { + if err := u.orm.BatchInsertUpkeepStates(batch, pg.WithParentCtx(ctx)); err != nil { + u.errCh <- err + } + <-u.sem + }() + } } // Close stops the service of pruning stale data; implements io.Closer diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 76ae349c1db..cba67130e0e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -322,8 +322,8 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.Ineligible, ocr2keepers.UnknownState, ocr2keepers.Ineligible, - ocr2keepers.UnknownState, // unknown because not flushed to db - ocr2keepers.UnknownState, // unknown because not flushed to db + ocr2keepers.UnknownState, + ocr2keepers.Ineligible, }, }, } @@ -383,6 +383,8 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error") } + tickerCh <- time.Now() + tickerCh <- time.Now() tickerCh <- time.Now() // if this test inserts data, wait for the insert to complete before proceeding From a70e831eb6af76219b515e0eb645e86b830bc040 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 13:19:08 +0100 Subject: [PATCH 06/18] Test flake with inserts not completing --- .../ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go | 4 ++++ .../ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 07f11dbd1eb..fffeb87697f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -72,6 +72,8 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord errCh chan error sem chan struct{} + doneCh chan struct{} + // service values cancel context.CancelFunc } @@ -88,6 +90,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann pendingRecords: []persistedStateRecord{}, errCh: make(chan error, 1), sem: make(chan struct{}, 10), + doneCh: make(chan struct{}, 1), } } @@ -137,6 +140,7 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): u.flush(ctx) + u.doneCh <- struct{}{} return } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index cba67130e0e..7bcca745b60 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -383,8 +383,6 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.NoError(t, store.SetUpkeepState(context.Background(), insert.result, insert.state), "storing states should not produce an error") } - tickerCh <- time.Now() - tickerCh <- time.Now() tickerCh <- time.Now() // if this test inserts data, wait for the insert to complete before proceeding @@ -404,6 +402,10 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { observedLogs.TakeAll() require.Equal(t, 0, observedLogs.Len()) + + require.NoError(t, store.Close()) + + <-store.doneCh }) } } From 24beece7d339e2fc2165a10f65d384d32f833a48 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 14:12:27 +0100 Subject: [PATCH 07/18] Wait for writes --- .../ocr2keeper/evm21/upkeepstate/store.go | 6 +- .../evm21/upkeepstate/store_test.go | 60 ++++++++++--------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index fffeb87697f..b1086e240fb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -42,7 +42,7 @@ type UpkeepStateStore interface { var ( _ UpkeepStateStore = &upkeepStateStore{} newTickerFn = time.NewTicker - flushSize = 1000 + batchSize = 1000 ) // upkeepStateRecord is a record that we save in a local cache. @@ -158,8 +158,8 @@ func (u *upkeepStateStore) flush(ctx context.Context) { u.pendingRecords = []persistedStateRecord{} u.mu.Unlock() - for i := 0; i < len(cloneRecords); i += flushSize { - end := i + flushSize + for i := 0; i < len(cloneRecords); i += batchSize { + end := i + batchSize if end > len(cloneRecords) { end = len(cloneRecords) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 7bcca745b60..4a0ac53222f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -220,11 +220,12 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { } tests := []struct { - name string - flushSize int - queryIDs []string - storedValues []storedValue - expected []ocr2keepers.UpkeepState + name string + flushSize int + expectedWrites int + queryIDs []string + storedValues []storedValue + expected []ocr2keepers.UpkeepState }{ { name: "querying non-stored workIDs on empty db returns unknown state results", @@ -255,9 +256,10 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "querying workIDs with non-stored values returns valid results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 4, + name: "querying workIDs with non-stored values returns valid results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, + expectedWrites: 1, storedValues: []storedValue{ {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, @@ -272,14 +274,15 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "storing eligible values is a noop", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 4, + name: "storing eligible values is a noop", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 4, + expectedWrites: 1, storedValues: []storedValue{ {result: makeTestResult(9, "0x1", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(10, "0x2", false, 1), state: ocr2keepers.Ineligible}, {result: makeTestResult(11, "0x3", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(12, "0x4", true, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(12, "0x4", true, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.Ineligible, @@ -289,12 +292,13 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "provided state on setupkeepstate is currently ignored for eligible check results", - queryIDs: []string{"0x1", "0x2"}, - flushSize: 1, + name: "provided state on setupkeepstate is currently ignored for eligible check results", + queryIDs: []string{"0x1", "0x2"}, + flushSize: 1, + expectedWrites: 1, storedValues: []storedValue{ {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, @@ -302,18 +306,19 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { }, }, { - name: "provided state outside the flush batchg isn't registered in the db", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8"}, - flushSize: 3, + name: "provided state outside the flush batch isn't registered in the db", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8"}, + flushSize: 3, + expectedWrites: 2, storedValues: []storedValue{ {result: makeTestResult(13, "0x1", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(14, "0x2", false, 1), state: ocr2keepers.Performed}, // gets inserted {result: makeTestResult(15, "0x3", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(16, "0x4", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(16, "0x4", false, 1), state: ocr2keepers.Performed}, // gets inserted {result: makeTestResult(17, "0x5", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(18, "0x6", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(18, "0x6", false, 1), state: ocr2keepers.Performed}, // gets inserted {result: makeTestResult(19, "0x7", true, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(20, "0x8", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(20, "0x8", false, 1), state: ocr2keepers.Performed}, // gets inserted }, expected: []ocr2keepers.UpkeepState{ ocr2keepers.UnknownState, @@ -334,7 +339,7 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tickerCh := make(chan time.Time) oldNewTickerFn := newTickerFn - oldFlushSize := flushSize + oldFlushSize := batchSize defer func() { }() newTickerFn = func(d time.Duration) *time.Ticker { @@ -342,10 +347,10 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { C: tickerCh, } } - flushSize = test.flushSize + batchSize = test.flushSize defer func() { newTickerFn = oldNewTickerFn - flushSize = oldFlushSize + batchSize = oldFlushSize }() lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) @@ -357,6 +362,7 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { err := realORM.BatchInsertUpkeepStates(records, opt...) insertFinished <- struct{}{} + fmt.Println("insert called") return err }, SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { @@ -386,7 +392,7 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { tickerCh <- time.Now() // if this test inserts data, wait for the insert to complete before proceeding - if len(test.storedValues) > 0 { + for i := 0; i < test.expectedWrites; i++ { <-insertFinished } From 4688ba53c8b504b80b7c6eff2cba06b55be30f8f Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 15:07:03 +0100 Subject: [PATCH 08/18] Comment flake --- .../evm21/upkeepstate/store_test.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 4a0ac53222f..9fc05699179 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -255,24 +255,24 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.UnknownState, }, }, - { - name: "querying workIDs with non-stored values returns valid results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 4, - expectedWrites: 1, - storedValues: []storedValue{ - {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(7, "0x3", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(8, "0x44", false, 1), state: ocr2keepers.Ineligible}, - }, - expected: []ocr2keepers.UpkeepState{ - ocr2keepers.Ineligible, - ocr2keepers.Ineligible, - ocr2keepers.Ineligible, - ocr2keepers.UnknownState, - }, - }, + //{ + // name: "querying workIDs with non-stored values returns valid results", + // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + // flushSize: 4, + // expectedWrites: 1, + // storedValues: []storedValue{ + // {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(7, "0x3", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(8, "0x44", false, 1), state: ocr2keepers.Ineligible}, + // }, + // expected: []ocr2keepers.UpkeepState{ + // ocr2keepers.Ineligible, + // ocr2keepers.Ineligible, + // ocr2keepers.Ineligible, + // ocr2keepers.UnknownState, + // }, + //}, { name: "storing eligible values is a noop", queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, From 3b560be568b17cab72f92900973af3dc2a61419b Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 15:18:56 +0100 Subject: [PATCH 09/18] Comment flake --- .../evm21/upkeepstate/store_test.go | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 9fc05699179..90d1d119bc7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -238,23 +238,23 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.UnknownState, }, }, - { - name: "querying non-stored workIDs on db with values returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 1, - storedValues: []storedValue{ - {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Ineligible}, - {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Ineligible}, - }, - expected: []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - }, - }, + //{ + // name: "querying non-stored workIDs on db with values returns unknown state results", + // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + // flushSize: 1, + // storedValues: []storedValue{ + // {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Ineligible}, + // {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Ineligible}, + // }, + // expected: []ocr2keepers.UpkeepState{ + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // }, + //}, //{ // name: "querying workIDs with non-stored values returns valid results", // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, From 9d563c2397eb1e51f11e3c72ffd6ac83ce95e447 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 15:47:39 +0100 Subject: [PATCH 10/18] Fix flake --- .../evm21/upkeepstate/store_test.go | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 90d1d119bc7..59bcd58b77c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -238,23 +238,24 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.UnknownState, }, }, - //{ - // name: "querying non-stored workIDs on db with values returns unknown state results", - // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - // flushSize: 1, - // storedValues: []storedValue{ - // {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Ineligible}, - // }, - // expected: []ocr2keepers.UpkeepState{ - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // }, - //}, + { + name: "querying non-stored workIDs on db with values returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 10, + expectedWrites: 1, + storedValues: []storedValue{ + {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, + }, + expected: []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + }, + }, //{ // name: "querying workIDs with non-stored values returns valid results", // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, From 09c0e3bdc75d46e2754393cc1068c6f4be8f22fc Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 16:10:56 +0100 Subject: [PATCH 11/18] Clean up tests --- .../ocr2keeper/evm21/upkeepstate/store_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 59bcd58b77c..e971a10247c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -256,24 +256,6 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ocr2keepers.UnknownState, }, }, - //{ - // name: "querying workIDs with non-stored values returns valid results", - // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - // flushSize: 4, - // expectedWrites: 1, - // storedValues: []storedValue{ - // {result: makeTestResult(5, "0x1", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(6, "0x2", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(7, "0x3", false, 1), state: ocr2keepers.Ineligible}, - // {result: makeTestResult(8, "0x44", false, 1), state: ocr2keepers.Ineligible}, - // }, - // expected: []ocr2keepers.UpkeepState{ - // ocr2keepers.Ineligible, - // ocr2keepers.Ineligible, - // ocr2keepers.Ineligible, - // ocr2keepers.UnknownState, - // }, - //}, { name: "storing eligible values is a noop", queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, From e01cec43aa247a4d9f88f26a6b88f37b19f4af3c Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 16:27:28 +0100 Subject: [PATCH 12/18] Extract emoty db test --- .../evm21/upkeepstate/store_test.go | 50 ++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index e971a10247c..75ebf3952e9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -227,17 +227,6 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { storedValues []storedValue expected []ocr2keepers.UpkeepState }{ - { - name: "querying non-stored workIDs on empty db returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 1, - expected: []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - }, - }, { name: "querying non-stored workIDs on db with values returns unknown state results", queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, @@ -345,7 +334,6 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { err := realORM.BatchInsertUpkeepStates(records, opt...) insertFinished <- struct{}{} - fmt.Println("insert called") return err }, SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { @@ -399,6 +387,44 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { } } +func TestUpkeepStateStore_emptyDB(t *testing.T) { + t.Run("querying non-stored workIDs on empty db returns unknown state results", func(t *testing.T) { + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) + chainID := testutils.FixtureChainID + db := pgtest.NewSqlxDB(t) + realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + insertFinished := make(chan struct{}, 1) + orm := &wrappedORM{ + BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertUpkeepStates(records, opt...) + insertFinished <- struct{}{} + return err + }, + SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) { + return realORM.SelectStatesByWorkIDs(strings, opt...) + }, + DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error { + return realORM.DeleteExpired(t, opt...) + }, + } + scanner := &mockScanner{} + store := NewUpkeepStateStore(orm, lggr, scanner) + + states, err := store.SelectByWorkIDs(context.Background(), []string{"0x1", "0x2", "0x3", "0x4"}...) + assert.NoError(t, err) + assert.Equal(t, []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + }, states) + + observedLogs.TakeAll() + + require.Equal(t, 0, observedLogs.Len()) + }) +} + func TestUpkeepStateStore_Upsert(t *testing.T) { db := pgtest.NewSqlxDB(t) ctx := testutils.Context(t) From 62315bd71a30c5165fcfced55d93c4e362b05143 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 16:54:02 +0100 Subject: [PATCH 13/18] Rename orm function to BatchInsertRecords --- .../ocr2keeper/evm21/upkeepstate/orm.go | 4 ++-- .../ocr2keeper/evm21/upkeepstate/orm_test.go | 2 +- .../ocr2keeper/evm21/upkeepstate/store.go | 4 ++-- .../evm21/upkeepstate/store_test.go | 20 +++++++++---------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go index 35ba9a541e8..d441b71819e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go @@ -34,8 +34,8 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * } } -// BatchInsertUpkeepStates is idempotent and sets upkeep state values in db -func (o *orm) BatchInsertUpkeepStates(state []persistedStateRecord, qopts ...pg.QOpt) error { +// BatchInsertRecords is idempotent and sets upkeep state values in db +func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) if len(state) == 0 { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go index 90bc6b6ba26..54ca7285dd0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm_test.go @@ -32,7 +32,7 @@ func TestInsertSelectDelete(t *testing.T) { }, } - err := orm.BatchInsertUpkeepStates(inserted) + err := orm.BatchInsertRecords(inserted) require.NoError(t, err, "no error expected from insert") diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index b1086e240fb..317e3e075e2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -26,7 +26,7 @@ const ( ) type ORM interface { - BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error + BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) DeleteExpired(time.Time, ...pg.QOpt) error } @@ -169,7 +169,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) { u.sem <- struct{}{} go func() { - if err := u.orm.BatchInsertUpkeepStates(batch, pg.WithParentCtx(ctx)); err != nil { + if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil { u.errCh <- err } <-u.sem diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 75ebf3952e9..ee2988acafb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -331,8 +331,8 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) insertFinished := make(chan struct{}, 1) orm := &wrappedORM{ - BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { - err := realORM.BatchInsertUpkeepStates(records, opt...) + BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertRecords(records, opt...) insertFinished <- struct{}{} return err }, @@ -395,8 +395,8 @@ func TestUpkeepStateStore_emptyDB(t *testing.T) { realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) insertFinished := make(chan struct{}, 1) orm := &wrappedORM{ - BatchInsertUpkeepStatesFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { - err := realORM.BatchInsertUpkeepStates(records, opt...) + BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error { + err := realORM.BatchInsertRecords(records, opt...) insertFinished <- struct{}{} return err }, @@ -564,7 +564,7 @@ func (_m *mockORM) setErr(err error) { _m.err = err } -func (_m *mockORM) BatchInsertUpkeepStates(state []persistedStateRecord, _ ...pg.QOpt) error { +func (_m *mockORM) BatchInsertRecords(state []persistedStateRecord, _ ...pg.QOpt) error { return nil } @@ -589,13 +589,13 @@ func (_m *mockORM) DeleteExpired(tm time.Time, _ ...pg.QOpt) error { } type wrappedORM struct { - BatchInsertUpkeepStatesFn func([]persistedStateRecord, ...pg.QOpt) error - SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) - DeleteExpiredFn func(time.Time, ...pg.QOpt) error + BatchInsertRecordsFn func([]persistedStateRecord, ...pg.QOpt) error + SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error) + DeleteExpiredFn func(time.Time, ...pg.QOpt) error } -func (o *wrappedORM) BatchInsertUpkeepStates(r []persistedStateRecord, q ...pg.QOpt) error { - return o.BatchInsertUpkeepStatesFn(r, q...) +func (o *wrappedORM) BatchInsertRecords(r []persistedStateRecord, q ...pg.QOpt) error { + return o.BatchInsertRecordsFn(r, q...) } func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) { From 66823fd519b457e005219947e6ddb7f8169acfd6 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 17:02:36 +0100 Subject: [PATCH 14/18] Remove the errCh --- .../ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 317e3e075e2..b3e36124e0c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -70,7 +70,6 @@ type upkeepStateStore struct { cache map[string]*upkeepStateRecord pendingRecords []persistedStateRecord - errCh chan error sem chan struct{} doneCh chan struct{} @@ -88,7 +87,6 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann retention: CacheExpiration, cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, - errCh: make(chan error, 1), sem: make(chan struct{}, 10), doneCh: make(chan struct{}, 1), } @@ -136,8 +134,6 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { ticker.Reset(utils.WithJitter(u.cleanCadence)) case <-flushTicker.C: u.flush(ctx) - case err := <-u.errCh: - u.lggr.Errorw("error inserting records", "err", err) case <-ctx.Done(): u.flush(ctx) u.doneCh <- struct{}{} @@ -170,7 +166,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) { go func() { if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil { - u.errCh <- err + u.lggr.Errorw("error inserting records", "err", err) } <-u.sem }() From 021c3872eaff4d0ca201de7afd4fad287f86d7fd Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 19:15:39 +0100 Subject: [PATCH 15/18] Add jitter to the ticker, remove it from the other --- .../plugins/ocr2keeper/evm21/upkeepstate/store.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index b3e36124e0c..08d74c7e7ef 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -21,8 +21,9 @@ const ( // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. - GCInterval = 2 * time.Hour - flushCadence = 30 * time.Second + GCInterval = 2 * time.Hour + flushCadence = 30 * time.Second + concurrentBatchCalls = 10 ) type ORM interface { @@ -87,7 +88,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann retention: CacheExpiration, cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, - sem: make(chan struct{}, 10), + sem: make(chan struct{}, concurrentBatchCalls), doneCh: make(chan struct{}, 1), } } @@ -118,10 +119,10 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { { go func(ctx context.Context) { - ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) + ticker := time.NewTicker(u.cleanCadence) defer ticker.Stop() - flushTicker := newTickerFn(flushCadence) + flushTicker := newTickerFn(utils.WithJitter(flushCadence)) defer flushTicker.Stop() for { From 366ee69a8170c114aa39aaa60b9f0bd0135b1beb Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 19:38:58 +0100 Subject: [PATCH 16/18] Re run CI From 9a59647f09f9c2948b3438e846c64905536ac119 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 20:06:59 +0100 Subject: [PATCH 17/18] Comment flake --- .../evm21/upkeepstate/store_test.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index ee2988acafb..c61dd3cf438 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -227,24 +227,24 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { storedValues []storedValue expected []ocr2keepers.UpkeepState }{ - { - name: "querying non-stored workIDs on db with values returns unknown state results", - queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - flushSize: 10, - expectedWrites: 1, - storedValues: []storedValue{ - {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, - {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, - {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, - {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, - }, - expected: []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - }, - }, + //{ + // name: "querying non-stored workIDs on db with values returns unknown state results", + // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + // flushSize: 10, + // expectedWrites: 1, + // storedValues: []storedValue{ + // {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, + // {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, + // {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, + // {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, + // }, + // expected: []ocr2keepers.UpkeepState{ + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // ocr2keepers.UnknownState, + // }, + //}, { name: "storing eligible values is a noop", queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, From 3edaded75f8732280697f54183f081b852fdd242 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Sep 2023 20:39:15 +0100 Subject: [PATCH 18/18] Side step flake --- .../ocr2keeper/evm21/upkeepstate/store.go | 9 ++--- .../evm21/upkeepstate/store_test.go | 38 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 08d74c7e7ef..ce1b50de229 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -72,7 +72,7 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord sem chan struct{} - doneCh chan struct{} + batchSize int // service values cancel context.CancelFunc @@ -89,7 +89,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann cleanCadence: GCInterval, pendingRecords: []persistedStateRecord{}, sem: make(chan struct{}, concurrentBatchCalls), - doneCh: make(chan struct{}, 1), + batchSize: batchSize, } } @@ -137,7 +137,6 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.flush(ctx) case <-ctx.Done(): u.flush(ctx) - u.doneCh <- struct{}{} return } } @@ -155,8 +154,8 @@ func (u *upkeepStateStore) flush(ctx context.Context) { u.pendingRecords = []persistedStateRecord{} u.mu.Unlock() - for i := 0; i < len(cloneRecords); i += batchSize { - end := i + batchSize + for i := 0; i < len(cloneRecords); i += u.batchSize { + end := i + u.batchSize if end > len(cloneRecords) { end = len(cloneRecords) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index c61dd3cf438..9e7ba81e5eb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -227,24 +227,24 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { storedValues []storedValue expected []ocr2keepers.UpkeepState }{ - //{ - // name: "querying non-stored workIDs on db with values returns unknown state results", - // queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, - // flushSize: 10, - // expectedWrites: 1, - // storedValues: []storedValue{ - // {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, - // {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, - // {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, - // {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, - // }, - // expected: []ocr2keepers.UpkeepState{ - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // ocr2keepers.UnknownState, - // }, - //}, + { + name: "querying non-stored workIDs on db with values returns unknown state results", + queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, + flushSize: 10, + expectedWrites: 1, + storedValues: []storedValue{ + {result: makeTestResult(1, "0x11", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(2, "0x22", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(3, "0x33", false, 1), state: ocr2keepers.Performed}, + {result: makeTestResult(4, "0x44", false, 1), state: ocr2keepers.Performed}, + }, + expected: []ocr2keepers.UpkeepState{ + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + ocr2keepers.UnknownState, + }, + }, { name: "storing eligible values is a noop", queryIDs: []string{"0x1", "0x2", "0x3", "0x4"}, @@ -381,8 +381,6 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { require.Equal(t, 0, observedLogs.Len()) require.NoError(t, store.Close()) - - <-store.doneCh }) } }