Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch insert records into the upkeep state store #10488

Merged
merged 18 commits into from
Sep 8, 2023
Merged

Conversation

ferglor
Copy link
Collaborator

@ferglor ferglor commented Sep 5, 2023

No description provided.

@ferglor ferglor requested a review from a team as a code owner September 5, 2023 13:48
@github-actions
Copy link
Contributor

github-actions bot commented Sep 5, 2023

I see that you haven't updated any CHANGELOG files. Would it make sense to do so?

Comment on lines 48 to 62
for i, record := range state {
fieldOffset := i * fieldsPerRecord
valueStrings = append(valueStrings, fmt.Sprintf(
"($%d::NUMERIC, $%d, $%d, $%d, $%d, $%d::NUMERIC, $%d)",
fieldOffset+1, fieldOffset+2, fieldOffset+3, fieldOffset+4, fieldOffset+5, fieldOffset+6, fieldOffset+7,
))
valueArgs = append(valueArgs, o.chainID, record.WorkID, record.CompletionState, record.BlockNumber, record.InsertedAt, record.UpkeepID, record.IneligibilityReason)
}

query := fmt.Sprintf(`
INSERT INTO evm_upkeep_states (evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason)
VALUES %s
ON CONFLICT (evm_chain_id, work_id)
DO NOTHING;
`, strings.Join(valueStrings, ", "))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})

if len(u.pendingRecords) == flushSize {
return u.flush(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we try to do this in background instead of blocking the 1000th upsertStateRecord?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! So would we want to run the flush in the background, and then continue to collecting more records, but block the next flush until the previous flush completes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure fully, will need to consider what happens during errors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like when we call the post processors in the plugin, we only log out the errors, we don't act on them further. So maybe, if we encounter an error when we flush, we could send the error down a channel and have the Start function read from the channel, and log any errors encountered? So that we still log the error, but just in a different place? Though it probably won't be clear that a post processor triggered the error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah potentially. We also need to consider if we retry flushing it or not, o/w the queue size can keep on growing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the simplest for now is to keep it as is.
One thing i'm not sure if we need to do is change to if len(u.pendingRecords) >= flushSize { ?

@@ -21,16 +21,16 @@ func TestInsertSelectDelete(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

inserted := persistedStateRecord{
inserted := []persistedStateRecord{{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to test for multiple records

Copy link
Contributor

@infiloop2 infiloop2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm overall, couple of comments. Would also be good to get @EasterTheBunny / @amirylm 's review

@github-actions
Copy link
Contributor

github-actions bot commented Sep 7, 2023

@github-actions
Copy link
Contributor

github-actions bot commented Sep 7, 2023

@github-actions
Copy link
Contributor

github-actions bot commented Sep 7, 2023

Comment on lines 236 to 237
copy(cloneRecords, u.pendingRecords)
u.pendingRecords = []persistedStateRecord{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need a lock on u.pendingRecords here? since it can be written to in upsertStateRecord

u.pendingRecords = []persistedStateRecord{}

go func() {
if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we enforce some max batch size here?

Comment on lines 225 to 227
if len(u.pendingRecords) >= flushSize {
u.flush(ctx)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, might be cleaner to make flush independent of upsertRecords and trigger it on a timer. (while flushing records in batches)

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023


go func() {
if err := u.orm.BatchInsertUpkeepStates(cloneRecords, pg.WithParentCtx(ctx)); err != nil {
u.errCh <- err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I think you can simply log the error here?

@@ -129,6 +146,31 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
return nil
}

func (u *upkeepStateStore) flush(ctx context.Context) {
u.sem <- struct{}{}
Copy link
Contributor

@infiloop2 infiloop2 Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. don't fully understand the reason for this, is it to limit at most 10 (channel size) flushes at once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't want to have an unbounded number of go routines running (since I'm not sure how frequently we'll need to flush to the DB, or how long those flushes can take) so I just ballparked at most 10 at a time. Happy to remove this if it's overkill!

Copy link
Contributor

@infiloop2 infiloop2 Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this will automatically be limited since flush is done in a loop synchronously? Don't have strong opinions ok to keep as is

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit the loop triggers the batch insert asynchronously?

Comment on lines 163 to 164
cloneRecords := u.pendingRecords[:batch]
u.pendingRecords = u.pendingRecords[batch:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should have a loop here in case pendingRecords grow very fast over time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will also need to think about the time we hold the lock though, probably not a good idea to hold the lock while waiting on DB insert

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

case <-ctx.Done():

u.flush(ctx)
u.doneCh <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. couldn't find anyone reading from this channel?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm making some changes that affects it in https://github.com/smartcontractkit/chainlink/pull/10535/files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@infiloop2 currently the tests read from this to check that the store has stopped before proceeding

cleanCadence: GCInterval,
pendingRecords: []persistedStateRecord{},
errCh: make(chan error, 1),
sem: make(chan struct{}, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. might be better to define 10 as a constant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯

infiloop2
infiloop2 previously approved these changes Sep 8, 2023
Copy link
Contributor

@infiloop2 infiloop2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

@@ -119,8 +134,14 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {
}

ticker.Reset(utils.WithJitter(u.cleanCadence))
case <-flushTicker.C:
u.flush(ctx)
case err := <-u.errCh:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we write into errCh only from flush, let's also listen to it from there (or maybe avoid it, as we anyway just log the errors)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just removed it!

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

Copy link
Contributor

@EasterTheBunny EasterTheBunny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. My only request would be to add jitter to one ticker and remove it from the other.

Comment on lines 123 to 127
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence))
defer ticker.Stop()

flushTicker := newTickerFn(flushCadence)
defer flushTicker.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One request that came to me when lasting working on this was to add jitter to the tickers that were making database calls. I don't think first one needs the jitter anymore, but the second one does.

cleanCadence: GCInterval,
pendingRecords: []persistedStateRecord{},
errCh: make(chan error, 1),
sem: make(chan struct{}, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally all values like this should be constants. We can add a linter that catches "magic numbers". It's pretty effective but can get annoying too. ¯_(ツ)_/¯

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

EasterTheBunny
EasterTheBunny previously approved these changes Sep 8, 2023
infiloop2
infiloop2 previously approved these changes Sep 8, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

@ferglor ferglor dismissed stale reviews from infiloop2 and EasterTheBunny via 9a59647 September 8, 2023 19:07
@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

@cl-sonarqube-production
Copy link

@ferglor ferglor added this pull request to the merge queue Sep 8, 2023
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Sep 8, 2023
@infiloop2 infiloop2 added this pull request to the merge queue Sep 8, 2023
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Sep 8, 2023
@infiloop2 infiloop2 added this pull request to the merge queue Sep 8, 2023
Merged via the queue into develop with commit dc6e52b Sep 8, 2023
117 checks passed
@infiloop2 infiloop2 deleted the feature/AUTO-4895 branch September 8, 2023 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants