-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch insert records into the upkeep state store #10488
Changes from 12 commits
231ad07
6e5943b
aa6811f
36fa612
6982c88
a70e831
24beece
4688ba5
3b560be
9d563c2
09c0e3b
e01cec4
62315bd
66823fd
021c387
366ee69
9a59647
3edaded
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,12 @@ const ( | |
// CacheExpiration is the amount of time that we keep a record in the cache. | ||
CacheExpiration = 24 * time.Hour | ||
// GCInterval is the amount of time between cache cleanups. | ||
GCInterval = 2 * time.Hour | ||
GCInterval = 2 * time.Hour | ||
flushCadence = 30 * time.Second | ||
) | ||
|
||
type ORM interface { | ||
InsertUpkeepState(persistedStateRecord, ...pg.QOpt) error | ||
BatchInsertUpkeepStates([]persistedStateRecord, ...pg.QOpt) error | ||
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error) | ||
DeleteExpired(time.Time, ...pg.QOpt) error | ||
} | ||
|
@@ -39,7 +40,9 @@ type UpkeepStateStore interface { | |
} | ||
|
||
var ( | ||
_ UpkeepStateStore = &upkeepStateStore{} | ||
_ UpkeepStateStore = &upkeepStateStore{} | ||
newTickerFn = time.NewTicker | ||
batchSize = 1000 | ||
) | ||
|
||
// upkeepStateRecord is a record that we save in a local cache. | ||
|
@@ -66,19 +69,28 @@ type upkeepStateStore struct { | |
mu sync.RWMutex | ||
cache map[string]*upkeepStateRecord | ||
|
||
pendingRecords []persistedStateRecord | ||
errCh chan error | ||
sem chan struct{} | ||
doneCh chan struct{} | ||
|
||
// service values | ||
cancel context.CancelFunc | ||
} | ||
|
||
// NewUpkeepStateStore creates a new state store | ||
func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { | ||
return &upkeepStateStore{ | ||
orm: orm, | ||
lggr: lggr.Named("UpkeepStateStore"), | ||
cache: map[string]*upkeepStateRecord{}, | ||
scanner: scanner, | ||
retention: CacheExpiration, | ||
cleanCadence: GCInterval, | ||
orm: orm, | ||
lggr: lggr.Named("UpkeepStateStore"), | ||
cache: map[string]*upkeepStateRecord{}, | ||
scanner: scanner, | ||
retention: CacheExpiration, | ||
cleanCadence: GCInterval, | ||
pendingRecords: []persistedStateRecord{}, | ||
errCh: make(chan error, 1), | ||
sem: make(chan struct{}, 10), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. might be better to define 10 as a constant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯ |
||
doneCh: make(chan struct{}, 1), | ||
} | ||
} | ||
|
||
|
@@ -111,6 +123,9 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { | |
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) | ||
defer ticker.Stop() | ||
|
||
flushTicker := newTickerFn(flushCadence) | ||
defer flushTicker.Stop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One request that came to me when lasting working on this was to add jitter to the tickers that were making database calls. I don't think first one needs the jitter anymore, but the second one does. |
||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
|
@@ -119,8 +134,14 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { | |
} | ||
|
||
ticker.Reset(utils.WithJitter(u.cleanCadence)) | ||
case <-flushTicker.C: | ||
u.flush(ctx) | ||
case err := <-u.errCh: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we write into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've just removed it! |
||
u.lggr.Errorw("error inserting records", "err", err) | ||
case <-ctx.Done(): | ||
|
||
u.flush(ctx) | ||
u.doneCh <- struct{}{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. couldn't find anyone reading from this channel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that I'm making some changes that affects it in https://github.com/smartcontractkit/chainlink/pull/10535/files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @infiloop2 currently the tests read from this to check that the store has stopped before proceeding |
||
return | ||
} | ||
} | ||
}(ctx) | ||
|
@@ -129,6 +150,33 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { | |
return nil | ||
} | ||
|
||
func (u *upkeepStateStore) flush(ctx context.Context) { | ||
cloneRecords := make([]persistedStateRecord, len(u.pendingRecords)) | ||
|
||
u.mu.Lock() | ||
copy(cloneRecords, u.pendingRecords) | ||
u.pendingRecords = []persistedStateRecord{} | ||
u.mu.Unlock() | ||
|
||
for i := 0; i < len(cloneRecords); i += batchSize { | ||
end := i + batchSize | ||
if end > len(cloneRecords) { | ||
end = len(cloneRecords) | ||
} | ||
|
||
batch := cloneRecords[i:end] | ||
|
||
u.sem <- struct{}{} | ||
|
||
go func() { | ||
if err := u.orm.BatchInsertUpkeepStates(batch, pg.WithParentCtx(ctx)); err != nil { | ||
u.errCh <- err | ||
} | ||
<-u.sem | ||
}() | ||
} | ||
} | ||
|
||
// Close stops the service of pruning stale data; implements io.Closer | ||
func (u *upkeepStateStore) Close() error { | ||
u.mu.Lock() | ||
|
@@ -200,13 +248,15 @@ func (u *upkeepStateStore) upsertStateRecord(ctx context.Context, workID string, | |
|
||
u.cache[workID] = record | ||
|
||
return u.orm.InsertUpkeepState(persistedStateRecord{ | ||
u.pendingRecords = append(u.pendingRecords, persistedStateRecord{ | ||
UpkeepID: utils.NewBig(upkeepID), | ||
WorkID: record.workID, | ||
CompletionState: uint8(record.state), | ||
IneligibilityReason: reason, | ||
InsertedAt: record.addedAt, | ||
}, pg.WithParentCtx(ctx)) | ||
}) | ||
|
||
return nil | ||
} | ||
|
||
// fetchPerformed fetches all performed logs from the scanner to populate the cache. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to
records