-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch insert records into the upkeep state store #10488
Conversation
I see that you haven't updated any CHANGELOG files. Would it make sense to do so? |
for i, record := range state { | ||
fieldOffset := i * fieldsPerRecord | ||
valueStrings = append(valueStrings, fmt.Sprintf( | ||
"($%d::NUMERIC, $%d, $%d, $%d, $%d, $%d::NUMERIC, $%d)", | ||
fieldOffset+1, fieldOffset+2, fieldOffset+3, fieldOffset+4, fieldOffset+5, fieldOffset+6, fieldOffset+7, | ||
)) | ||
valueArgs = append(valueArgs, o.chainID, record.WorkID, record.CompletionState, record.BlockNumber, record.InsertedAt, record.UpkeepID, record.IneligibilityReason) | ||
} | ||
|
||
query := fmt.Sprintf(` | ||
INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) | ||
VALUES %s | ||
ON CONFLICT (evm_chain_id, work_id) | ||
DO NOTHING; | ||
`, strings.Join(valueStrings, ", ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might be able to do this more elegantly like https://github.com/smartcontractkit/chainlink/blob/develop/core/chains/evm/logpoller/orm.go#L187-L190
}) | ||
|
||
if len(u.pendingRecords) == flushSize { | ||
return u.flush(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try to do this in background instead of blocking the 1000th upsertStateRecord?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! So would we want to run the flush in the background, and then continue to collecting more records, but block the next flush until the previous flush completes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure fully, will need to consider what happens during errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like when we call the post processors in the plugin, we only log out the errors, we don't act on them further. So maybe, if we encounter an error when we flush, we could send the error down a channel and have the Start
function read from the channel, and log any errors encountered? So that we still log the error, but just in a different place? Though it probably won't be clear that a post processor triggered the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah potentially. We also need to consider if we retry flushing it or not, o/w the queue size can keep on growing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the simplest for now is to keep it as is.
One thing i'm not sure if we need to do is change to if len(u.pendingRecords) >= flushSize {
?
@@ -21,16 +21,16 @@ func TestInsertSelectDelete(t *testing.T) { | |||
db := pgtest.NewSqlxDB(t) | |||
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) | |||
|
|||
inserted := persistedStateRecord{ | |||
inserted := []persistedStateRecord{{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we try to test for multiple records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm overall, couple of comments. Would also be good to get @EasterTheBunny / @amirylm 's review
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6109657253 |
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6109812434 |
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6110639305 |
copy(cloneRecords, u.pendingRecords) | ||
u.pendingRecords = []persistedStateRecord{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we need a lock on u.pendingRecords
here? since it can be written to in upsertStateRecord
u.pendingRecords = []persistedStateRecord{} | ||
|
||
go func() { | ||
if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we enforce some max batch size here?
if len(u.pendingRecords) >= flushSize { | ||
u.flush(ctx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, might be cleaner to make flush independent of upsertRecords and trigger it on a timer. (while flushing records in batches)
ea9f6d0
to
36fa612
Compare
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6116822671 |
|
||
go func() { | ||
if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil { | ||
u.errCh <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I think you can simply log the error here?
@@ -129,6 +146,31 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { | |||
return nil | |||
} | |||
|
|||
func (u *upkeepStateStore) flush(ctx context.Context) { | |||
u.sem <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. don't fully understand the reason for this, is it to limit at most 10 (channel size) flushes at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I didn't want to have an unbounded number of go routines running (since I'm not sure how frequently we'll need to flush to the DB, or how long those flushes can take) so I just ballparked at most 10 at a time. Happy to remove this if it's overkill!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this will automatically be limited since flush is done in a loop synchronously? Don't have strong opinions ok to keep as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bit the loop triggers the batch insert asynchronously?
cloneRecords := u.pendingRecords[:batch] | ||
u.pendingRecords = u.pendingRecords[batch:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we should have a loop here in case pendingRecords grow very fast over time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will also need to think about the time we hold the lock though, probably not a good idea to hold the lock while waiting on DB insert
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6122188246 |
case <-ctx.Done(): | ||
|
||
u.flush(ctx) | ||
u.doneCh <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. couldn't find anyone reading from this channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I'm making some changes that affects it in https://github.com/smartcontractkit/chainlink/pull/10535/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@infiloop2 currently the tests read from this to check that the store has stopped before proceeding
cleanCadence: GCInterval, | ||
pendingRecords: []persistedStateRecord{}, | ||
errCh: make(chan error, 1), | ||
sem: make(chan struct{}, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. might be better to define 10 as a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6123827855 |
@@ -119,8 +134,14 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { | |||
} | |||
|
|||
ticker.Reset(utils.WithJitter(u.cleanCadence)) | |||
case <-flushTicker.C: | |||
u.flush(ctx) | |||
case err := <-u.errCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we write into errCh
only from flush, let's also listen to it from there (or maybe avoid it, as we anyway just log the errors)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just removed it!
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6123906384 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. My only request would be to add jitter to one ticker and remove it from the other.
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) | ||
defer ticker.Stop() | ||
|
||
flushTicker := newTickerFn(flushCadence) | ||
defer flushTicker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One request that came to me when lasting working on this was to add jitter to the tickers that were making database calls. I don't think first one needs the jitter anymore, but the second one does.
cleanCadence: GCInterval, | ||
pendingRecords: []persistedStateRecord{}, | ||
errCh: make(chan error, 1), | ||
sem: make(chan struct{}, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6125104720 |
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6125289893 |
9a59647
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6125520311 |
Running downstream job at https://github.com/smartcontractkit/operator-ui/actions/runs/6125778171 |
SonarQube Quality Gate |
No description provided.