Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch insert records into the upkeep state store #10488

Merged
merged 18 commits into from
Sep 8, 2023
38 changes: 31 additions & 7 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *
}
}

// InsertUpkeepState is idempotent and sets upkeep state values in db
func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error {
// BatchInsertUpkeepStates is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertUpkeepStates(state []persistedStateRecord, qopts ...pg.QOpt) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to records

q := o.q.WithOpts(qopts...)

query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason)
VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7)
ON CONFLICT (evm_chain_id, work_id)
DO NOTHING`
if len(state) == 0 {
return nil
}

type row struct {
EvmChainId *utils.Big
WorkId string
CompletionState uint8
BlockNumber int64
InsertedAt time.Time
UpkeepId *utils.Big
IneligibilityReason uint8
}

var rows []row
for _, record := range state {
rows = append(rows, row{
EvmChainId: o.chainID,
WorkId: record.WorkID,
CompletionState: record.CompletionState,
BlockNumber: record.BlockNumber,
InsertedAt: record.InsertedAt,
UpkeepId: record.UpkeepID,
IneligibilityReason: record.IneligibilityReason,
})
}

return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason)
return q.ExecQNamed(`INSERT INTO evm_upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
}

// SelectStatesByWorkIDs searches the data store for stored states for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

inserted := persistedStateRecord{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
inserted := []persistedStateRecord{
{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
},
}

err := orm.InsertUpkeepState(inserted)
err := orm.BatchInsertUpkeepStates(inserted)

require.NoError(t, err, "no error expected from insert")

Expand Down
74 changes: 62 additions & 12 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ const (
// CacheExpiration is the amount of time that we keep a record in the cache.
CacheExpiration = 24 * time.Hour
// GCInterval is the amount of time between cache cleanups.
GCInterval = 2 * time.Hour
GCInterval = 2 * time.Hour
flushCadence = 30 * time.Second
)

type ORM interface {
InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error
BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpired(time.Time, ...pg.QOpt) error
}
Expand All @@ -39,7 +40,9 @@ type UpkeepStateStore interface {
}

var (
_ UpkeepStateStore = &upkeepStateStore{}
_ UpkeepStateStore = &upkeepStateStore{}
newTickerFn = time.NewTicker
batchSize = 1000
)

// upkeepStateRecord is a record that we save in a local cache.
Expand All @@ -66,19 +69,28 @@ type upkeepStateStore struct {
mu sync.RWMutex
cache map[string]*upkeepStateRecord

pendingRecords []persistedStateRecord
errCh chan error
sem chan struct{}
doneCh chan struct{}

// service values
cancel context.CancelFunc
}

// NewUpkeepStateStore creates a new state store
func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore {
return &upkeepStateStore{
orm: orm,
lggr: lggr.Named("UpkeepStateStore"),
cache: map[string]*upkeepStateRecord{},
scanner: scanner,
retention: CacheExpiration,
cleanCadence: GCInterval,
orm: orm,
lggr: lggr.Named("UpkeepStateStore"),
cache: map[string]*upkeepStateRecord{},
scanner: scanner,
retention: CacheExpiration,
cleanCadence: GCInterval,
pendingRecords: []persistedStateRecord{},
errCh: make(chan error, 1),
sem: make(chan struct{}, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. might be better to define 10 as a constant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯

doneCh: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -111,6 +123,9 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence))
defer ticker.Stop()

flushTicker := newTickerFn(flushCadence)
defer flushTicker.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One request that came to me when lasting working on this was to add jitter to the tickers that were making database calls. I don't think first one needs the jitter anymore, but the second one does.


for {
select {
case <-ticker.C:
Expand All @@ -119,8 +134,14 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
}

ticker.Reset(utils.WithJitter(u.cleanCadence))
case <-flushTicker.C:
u.flush(ctx)
case err := <-u.errCh:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we write into errCh only from flush, let's also listen to it from there (or maybe avoid it, as we anyway just log the errors)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just removed it!

u.lggr.Errorw("error inserting records", "err", err)
case <-ctx.Done():

u.flush(ctx)
u.doneCh <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. couldn't find anyone reading from this channel?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm making some changes that affects it in https://github.com/smartcontractkit/chainlink/pull/10535/files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@infiloop2 currently the tests read from this to check that the store has stopped before proceeding

return
}
}
}(ctx)
Expand All @@ -129,6 +150,33 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
return nil
}

func (u *upkeepStateStore) flush(ctx context.Context) {
cloneRecords := make([]persistedStateRecord, len(u.pendingRecords))

u.mu.Lock()
copy(cloneRecords, u.pendingRecords)
u.pendingRecords = []persistedStateRecord{}
u.mu.Unlock()

for i := 0; i < len(cloneRecords); i += batchSize {
end := i + batchSize
if end > len(cloneRecords) {
end = len(cloneRecords)
}

batch := cloneRecords[i:end]

u.sem <- struct{}{}

go func() {
if err := u.orm.BatchInsertUpkeepStates(batch, pg.WithParentCtx(ctx)); err != nil {
u.errCh <- err
}
<-u.sem
}()
}
}

// Close stops the service of pruning stale data; implements io.Closer
func (u *upkeepStateStore) Close() error {
u.mu.Lock()
Expand Down Expand Up @@ -200,13 +248,15 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string,

u.cache[workID] = record

return u.orm.InsertUpkeepState(persistedStateRecord{
u.pendingRecords = append(u.pendingRecords, persistedStateRecord{
UpkeepID: utils.NewBig(upkeepID),
WorkID: record.workID,
CompletionState: uint8(record.state),
IneligibilityReason: reason,
InsertedAt: record.addedAt,
}, pg.WithParentCtx(ctx))
})

return nil
}

// fetchPerformed fetches all performed logs from the scanner to populate the cache.
Expand Down
Loading
Loading