Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch insert records into the upkeep state store #10488

Merged
merged 18 commits into from
Sep 8, 2023
38 changes: 31 additions & 7 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,40 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *
}
}

// InsertUpkeepState is idempotent and sets upkeep state values in db
func (o *orm) InsertUpkeepState(state persistedStateRecord, qopts ...pg.QOpt) error {
// BatchInsertRecords is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)

query := `INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason)
VALUES ($1::NUMERIC, $2, $3, $4, $5, $6::NUMERIC, $7)
ON CONFLICT (evm_chain_id, work_id)
DO NOTHING`
if len(state) == 0 {
return nil
}

type row struct {
EvmChainId *utils.Big
WorkId string
CompletionState uint8
BlockNumber int64
InsertedAt time.Time
UpkeepId *utils.Big
IneligibilityReason uint8
}

var rows []row
for _, record := range state {
rows = append(rows, row{
EvmChainId: o.chainID,
WorkId: record.WorkID,
CompletionState: record.CompletionState,
BlockNumber: record.BlockNumber,
InsertedAt: record.InsertedAt,
UpkeepId: record.UpkeepID,
IneligibilityReason: record.IneligibilityReason,
})
}

return q.ExecQ(query, o.chainID, state.WorkID, state.CompletionState, state.BlockNumber, state.InsertedAt, state.UpkeepID, state.IneligibilityReason)
return q.ExecQNamed(`INSERT INTO evm_upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
}

// SelectStatesByWorkIDs searches the data store for stored states for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func TestInsertSelectDelete(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

inserted := persistedStateRecord{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
inserted := []persistedStateRecord{
{
UpkeepID: utils.NewBig(big.NewInt(2)),
WorkID: "0x1",
CompletionState: 100,
BlockNumber: 2,
IneligibilityReason: 2,
InsertedAt: time.Now(),
},
}

err := orm.InsertUpkeepState(inserted)
err := orm.BatchInsertRecords(inserted)

require.NoError(t, err, "no error expected from insert")

Expand Down
72 changes: 59 additions & 13 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ const (
// CacheExpiration is the amount of time that we keep a record in the cache.
CacheExpiration = 24 * time.Hour
// GCInterval is the amount of time between cache cleanups.
GCInterval = 2 * time.Hour
GCInterval = 2 * time.Hour
flushCadence = 30 * time.Second
concurrentBatchCalls = 10
)

type ORM interface {
InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error
BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpired(time.Time, ...pg.QOpt) error
}
Expand All @@ -39,7 +41,9 @@ type UpkeepStateStore interface {
}

var (
_ UpkeepStateStore = &upkeepStateStore{}
_ UpkeepStateStore = &upkeepStateStore{}
newTickerFn = time.NewTicker
batchSize = 1000
)

// upkeepStateRecord is a record that we save in a local cache.
Expand All @@ -66,19 +70,26 @@ type upkeepStateStore struct {
mu sync.RWMutex
cache map[string]*upkeepStateRecord

pendingRecords []persistedStateRecord
sem chan struct{}
batchSize int

// service values
cancel context.CancelFunc
}

// NewUpkeepStateStore creates a new state store
func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore {
return &upkeepStateStore{
orm: orm,
lggr: lggr.Named("UpkeepStateStore"),
cache: map[string]*upkeepStateRecord{},
scanner: scanner,
retention: CacheExpiration,
cleanCadence: GCInterval,
orm: orm,
lggr: lggr.Named("UpkeepStateStore"),
cache: map[string]*upkeepStateRecord{},
scanner: scanner,
retention: CacheExpiration,
cleanCadence: GCInterval,
pendingRecords: []persistedStateRecord{},
sem: make(chan struct{}, concurrentBatchCalls),
batchSize: batchSize,
}
}

Expand Down Expand Up @@ -108,9 +119,12 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {

{
go func(ctx context.Context) {
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence))
ticker := time.NewTicker(u.cleanCadence)
defer ticker.Stop()

flushTicker := newTickerFn(utils.WithJitter(flushCadence))
defer flushTicker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -119,8 +133,11 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
}

ticker.Reset(utils.WithJitter(u.cleanCadence))
case <-flushTicker.C:
u.flush(ctx)
case <-ctx.Done():

u.flush(ctx)
return
}
}
}(ctx)
Expand All @@ -129,6 +146,33 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
return nil
}

func (u *upkeepStateStore) flush(ctx context.Context) {
cloneRecords := make([]persistedStateRecord, len(u.pendingRecords))

u.mu.Lock()
copy(cloneRecords, u.pendingRecords)
u.pendingRecords = []persistedStateRecord{}
u.mu.Unlock()

for i := 0; i < len(cloneRecords); i += u.batchSize {
end := i + u.batchSize
if end > len(cloneRecords) {
end = len(cloneRecords)
}

batch := cloneRecords[i:end]

u.sem <- struct{}{}

go func() {
if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil {
u.lggr.Errorw("error inserting records", "err", err)
}
<-u.sem
}()
}
}

// Close stops the service of pruning stale data; implements io.Closer
func (u *upkeepStateStore) Close() error {
u.mu.Lock()
Expand Down Expand Up @@ -200,13 +244,15 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string,

u.cache[workID] = record

return u.orm.InsertUpkeepState(persistedStateRecord{
u.pendingRecords = append(u.pendingRecords, persistedStateRecord{
UpkeepID: utils.NewBig(upkeepID),
WorkID: record.workID,
CompletionState: uint8(record.state),
IneligibilityReason: reason,
InsertedAt: record.addedAt,
}, pg.WithParentCtx(ctx))
})

return nil
}

// fetchPerformed fetches all performed logs from the scanner to populate the cache.
Expand Down
Loading