Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log recoverer logging #12645

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"crypto/rand"
"errors"
"fmt"
"github.com/google/uuid"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -139,6 +140,8 @@
for {
select {
case <-recoveryTicker.C:
recoverID := uuid.New()
ctx := context.WithValue(ctx, "recoverID", recoverID)

Check failure on line 144 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go

View workflow job for this annotation

GitHub Actions / lint

context-keys-type: should not use basic type string as key in context.WithValue (revive)
if err := r.recover(ctx); err != nil {
r.lggr.Warnw("failed to recover logs", "err", err)
}
Expand Down Expand Up @@ -348,25 +351,34 @@
return nil
}

r.lggr.Debugw("recovering logs", "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest)
r.lggr.Debugw("recovering logs", "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID"))

startTime := time.Now()
// This is unbounded, should we use a worker pool?
var wg sync.WaitGroup
for _, f := range filters {
wg.Add(1)
go func(f upkeepFilter) {

defer wg.Done()
recoverFilterID := uuid.New()
ctx := context.WithValue(ctx, "recoverFilterID", recoverFilterID)

Check failure on line 365 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go

View workflow job for this annotation

GitHub Actions / lint

context-keys-type: should not use basic type string as key in context.WithValue (revive)
if err := r.recoverFilter(ctx, f, start, offsetBlock); err != nil {
r.lggr.Debugw("error recovering filter", "err", err.Error())
r.lggr.Debugw("error recovering filter", "err", err.Error(), "recoverID", ctx.Value("recoverID"))
}

}(f)
}
wg.Wait()
finishTime := time.Now().Sub(startTime)

Check failure on line 373 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
r.lggr.Debugw("finished recovering logs", "timeTaken", finishTime.String(), "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID"))

return nil
}

// recoverFilter recovers logs for a single upkeep filter.
func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error {
filterStartTime := time.Now()
start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out
// ensure we don't recover logs from before the filter was created
if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock {
Expand All @@ -392,20 +404,27 @@
if err != nil {
return fmt.Errorf("could not read logs: %w", err)
}

r.lggr.Debugw("got logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))

logs = f.Select(logs...)

r.lggr.Debugw("filtered logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))

workIDs := make([]string, 0)
for _, log := range logs {
trigger := logToTrigger(log)
upkeepId := &ocr2keepers.UpkeepIdentifier{}
ok := upkeepId.FromBigInt(f.upkeepID)
if !ok {
r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID)
r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID, "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))
continue
}
workIDs = append(workIDs, core.UpkeepWorkID(*upkeepId, trigger))
}

r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))

states, err := r.states.SelectByWorkIDs(ctx, workIDs...)
if err != nil {
return fmt.Errorf("could not read states: %w", err)
Expand All @@ -415,6 +434,8 @@
}
filteredLogs := r.filterFinalizedStates(f, logs, states)

r.lggr.Debugw("filtered logs", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))

added, alreadyPending, ok := r.populatePending(f, filteredLogs)
if added > 0 {
r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID)
Expand All @@ -430,6 +451,10 @@
return uf1
}, f)

filterFinishTime := time.Now().Sub(filterStartTime)

Check failure on line 454 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)

r.lggr.Debugw("recovered filter", "timeTaken", filterFinishTime.String(), "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID"))

return nil
}

Expand Down Expand Up @@ -479,6 +504,8 @@
}
}
}
r.lggr.Debugw("finished populatePending", "numPending", len(r.pending), "added", len(r.pending)-pendingSizeBefore, "pendingSizeBefore", pendingSizeBefore, "alreadyPending", alreadyPending)

return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0
}

Expand Down Expand Up @@ -671,6 +698,7 @@
}
}
if upkeepPayloads >= maxPendingPayloadsPerUpkeep {
r.lggr.Debugw("upkeepPayloads exceeds maxPendingPayloadsPerUpkeep", "upkeepPayloads", upkeepPayloads, "maxPendingPayloadsPerUpkeep", maxPendingPayloadsPerUpkeep)
return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID)
}
if !exist {
Expand Down
Loading