From 112d1d0cf60375c8b86cd4f2bdc30ef3a38766fb Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 29 Mar 2024 12:08:27 +0000 Subject: [PATCH 1/7] Extra logging --- .../evmregistry/v21/logprovider/recoverer.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 26c56c23b8c..ba0aa4fa9a0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -348,8 +348,9 @@ func (r *logRecoverer) recover(ctx context.Context) error { return nil } - r.lggr.Debugw("recovering logs", "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest) + r.lggr.Debugw("recovering logs", "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest) + // This is unbounded, should we use a worker pool? var wg sync.WaitGroup for _, f := range filters { wg.Add(1) @@ -392,8 +393,13 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB if err != nil { return fmt.Errorf("could not read logs: %w", err) } + + r.lggr.Debugw("got logs with sigs", "logs", len(logs)) + logs = f.Select(logs...) + r.lggr.Debugw("filtered logs with sigs", "logs", len(logs)) + workIDs := make([]string, 0) for _, log := range logs { trigger := logToTrigger(log) @@ -406,6 +412,8 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB workIDs = append(workIDs, core.UpkeepWorkID(*upkeepId, trigger)) } + r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs)) + states, err := r.states.SelectByWorkIDs(ctx, workIDs...) if err != nil { return fmt.Errorf("could not read states: %w", err) @@ -415,6 +423,8 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB } filteredLogs := r.filterFinalizedStates(f, logs, states) + r.lggr.Debugw("filtered logs", "logs", len(filteredLogs)) + added, alreadyPending, ok := r.populatePending(f, filteredLogs) if added > 0 { r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID) From ff838eff90470c06996015fb31b1dd7cd8cd7212 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sat, 30 Mar 2024 22:44:17 +0000 Subject: [PATCH 2/7] Add recover ID --- .../evmregistry/v21/logprovider/recoverer.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index ba0aa4fa9a0..dfa7b3f4006 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "errors" "fmt" + "github.com/google/uuid" "io" "math" "math/big" @@ -139,6 +140,8 @@ func (r *logRecoverer) Start(ctx context.Context) error { for { select { case <-recoveryTicker.C: + recoverID := uuid.New() + ctx := context.WithValue(ctx, "recoverID", recoverID) if err := r.recover(ctx); err != nil { r.lggr.Warnw("failed to recover logs", "err", err) } @@ -348,7 +351,7 @@ func (r *logRecoverer) recover(ctx context.Context) error { return nil } - r.lggr.Debugw("recovering logs", "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest) + r.lggr.Debugw("recovering logs", "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID")) // This is unbounded, should we use a worker pool? var wg sync.WaitGroup @@ -357,7 +360,7 @@ func (r *logRecoverer) recover(ctx context.Context) error { go func(f upkeepFilter) { defer wg.Done() if err := r.recoverFilter(ctx, f, start, offsetBlock); err != nil { - r.lggr.Debugw("error recovering filter", "err", err.Error()) + r.lggr.Debugw("error recovering filter", "err", err.Error(), "recoverID", ctx.Value("recoverID")) } }(f) } @@ -394,11 +397,11 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB return fmt.Errorf("could not read logs: %w", err) } - r.lggr.Debugw("got logs with sigs", "logs", len(logs)) + r.lggr.Debugw("got logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID")) logs = f.Select(logs...) - r.lggr.Debugw("filtered logs with sigs", "logs", len(logs)) + r.lggr.Debugw("filtered logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID")) workIDs := make([]string, 0) for _, log := range logs { @@ -406,13 +409,13 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB upkeepId := &ocr2keepers.UpkeepIdentifier{} ok := upkeepId.FromBigInt(f.upkeepID) if !ok { - r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID) + r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID, "recoverID", ctx.Value("recoverID")) continue } workIDs = append(workIDs, core.UpkeepWorkID(*upkeepId, trigger)) } - r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs)) + r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs), "recoverID", ctx.Value("recoverID")) states, err := r.states.SelectByWorkIDs(ctx, workIDs...) if err != nil { @@ -423,7 +426,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB } filteredLogs := r.filterFinalizedStates(f, logs, states) - r.lggr.Debugw("filtered logs", "logs", len(filteredLogs)) + r.lggr.Debugw("filtered logs", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID")) added, alreadyPending, ok := r.populatePending(f, filteredLogs) if added > 0 { From af27e9715e1163dc1d04524ca7c6de88ac1139d0 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sat, 30 Mar 2024 23:40:26 +0000 Subject: [PATCH 3/7] Add a recover filter ID --- .../evmregistry/v21/logprovider/recoverer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index dfa7b3f4006..7816b48a66b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -359,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error { wg.Add(1) go func(f upkeepFilter) { defer wg.Done() + recoverFilterID := uuid.New() + ctx := context.WithValue(ctx, "recoverFilterID", recoverFilterID) if err := r.recoverFilter(ctx, f, start, offsetBlock); err != nil { r.lggr.Debugw("error recovering filter", "err", err.Error(), "recoverID", ctx.Value("recoverID")) } @@ -397,11 +399,11 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB return fmt.Errorf("could not read logs: %w", err) } - r.lggr.Debugw("got logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID")) + r.lggr.Debugw("got logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) logs = f.Select(logs...) - r.lggr.Debugw("filtered logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID")) + r.lggr.Debugw("filtered logs with sigs", "logs", len(logs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) workIDs := make([]string, 0) for _, log := range logs { @@ -409,13 +411,13 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB upkeepId := &ocr2keepers.UpkeepIdentifier{} ok := upkeepId.FromBigInt(f.upkeepID) if !ok { - r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID, "recoverID", ctx.Value("recoverID")) + r.lggr.Warnw("failed to convert upkeepID to UpkeepIdentifier", "upkeepID", f.upkeepID, "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) continue } workIDs = append(workIDs, core.UpkeepWorkID(*upkeepId, trigger)) } - r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs), "recoverID", ctx.Value("recoverID")) + r.lggr.Debugw("selecting workIDs", "workIDs", len(workIDs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) states, err := r.states.SelectByWorkIDs(ctx, workIDs...) if err != nil { @@ -426,7 +428,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB } filteredLogs := r.filterFinalizedStates(f, logs, states) - r.lggr.Debugw("filtered logs", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID")) + r.lggr.Debugw("filtered logs", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) added, alreadyPending, ok := r.populatePending(f, filteredLogs) if added > 0 { From c91b84e1dc97627bba2638bc9b964b9dfe10afd5 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sun, 31 Mar 2024 00:57:44 +0000 Subject: [PATCH 4/7] Add log for finishing recover --- .../plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 7816b48a66b..dac3d2b7b96 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -445,6 +445,8 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB return uf1 }, f) + r.lggr.Debugw("recovered filter", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) + return nil } From 8e1c67ea32470d0de2653a9d92c8d693b3adf6fa Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 2 Apr 2024 00:56:54 +0100 Subject: [PATCH 5/7] Add more logging --- .../ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index dac3d2b7b96..76fa68f33e7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -353,6 +353,7 @@ func (r *logRecoverer) recover(ctx context.Context) error { r.lggr.Debugw("recovering logs", "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID")) + startTime := time.Now() // This is unbounded, should we use a worker pool? var wg sync.WaitGroup for _, f := range filters { @@ -367,6 +368,8 @@ func (r *logRecoverer) recover(ctx context.Context) error { }(f) } wg.Wait() + finishTime := time.Now().Sub(startTime) + r.lggr.Debugw("finished recovering logs", "timeTaken", finishTime.String(), "timeTakenMS", finishTime.Milliseconds(), "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID")) return nil } From efe865543099a8b471d64e267c1a8716c4d9202e Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 2 Apr 2024 01:37:54 +0100 Subject: [PATCH 6/7] Add logs on pending records size --- .../ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 76fa68f33e7..42a0ad4ac30 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -499,6 +499,8 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller. } } } + r.lggr.Debugw("finished populatePending", "numPending", len(r.pending), "added", len(r.pending)-pendingSizeBefore, "pendingSizeBefore", pendingSizeBefore, "alreadyPending", alreadyPending) + return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0 } @@ -691,6 +693,7 @@ func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { } } if upkeepPayloads >= maxPendingPayloadsPerUpkeep { + r.lggr.Debugw("upkeepPayloads exceeds maxPendingPayloadsPerUpkeep", "upkeepPayloads", upkeepPayloads, "maxPendingPayloadsPerUpkeep", maxPendingPayloadsPerUpkeep) return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID) } if !exist { From 8851b78139f3e8ecb10d998f6791112cd5fd6eb8 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 2 Apr 2024 12:11:51 +0100 Subject: [PATCH 7/7] Update logging --- .../ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 42a0ad4ac30..0975d32ea92 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -359,23 +359,26 @@ func (r *logRecoverer) recover(ctx context.Context) error { for _, f := range filters { wg.Add(1) go func(f upkeepFilter) { + defer wg.Done() recoverFilterID := uuid.New() ctx := context.WithValue(ctx, "recoverFilterID", recoverFilterID) if err := r.recoverFilter(ctx, f, start, offsetBlock); err != nil { r.lggr.Debugw("error recovering filter", "err", err.Error(), "recoverID", ctx.Value("recoverID")) } + }(f) } wg.Wait() finishTime := time.Now().Sub(startTime) - r.lggr.Debugw("finished recovering logs", "timeTaken", finishTime.String(), "timeTakenMS", finishTime.Milliseconds(), "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID")) + r.lggr.Debugw("finished recovering logs", "timeTaken", finishTime.String(), "numberOfFilters", len(filters), "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest, "recoverID", ctx.Value("recoverID")) return nil } // recoverFilter recovers logs for a single upkeep filter. func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error { + filterStartTime := time.Now() start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out // ensure we don't recover logs from before the filter was created if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock { @@ -448,7 +451,9 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB return uf1 }, f) - r.lggr.Debugw("recovered filter", "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) + filterFinishTime := time.Now().Sub(filterStartTime) + + r.lggr.Debugw("recovered filter", "timeTaken", filterFinishTime.String(), "logs", len(filteredLogs), "recoverID", ctx.Value("recoverID"), "recoverFilterID", ctx.Value("recoverFilterID")) return nil }