From dd20b8468f1e0cf71eb1f3190a6437b3dd68bef1 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Mon, 22 Jan 2024 20:45:03 -0800 Subject: [PATCH 1/6] Add metric for num logs in buffer --- .../evmregistry/v21/logprovider/buffer.go | 4 ++++ .../evmregistry/v21/logprovider/provider.go | 18 ++++++++++++++++++ .../evmregistry/v21/prommetrics/metrics.go | 18 ++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 9f11a1fca01..2857fcca712 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -150,6 +150,8 @@ type logEventBuffer struct { blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 + // logsInBuffer is the number of logs currently in the buffer + logsInBuffer int64 } func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { @@ -230,6 +232,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { } if added > 0 { lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) + atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)) } return added - dropped @@ -331,6 +334,7 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit if len(results) > 0 { b.lggr.Debugw("Dequeued logs", "results", len(results), "start", start, "end", end) + atomic.AddInt64(&b.logsInBuffer, -int64(len(results))) } return results diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index d1360faaf6d..45a47ec7e85 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -102,6 +103,8 @@ type logEventProvider struct { opts LogTriggersOptions currentPartitionIdx uint64 + + servedLogs int64 } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { @@ -142,6 +145,21 @@ func (p *logEventProvider) Start(context.Context) error { }) }) + p.threadCtrl.Go(func(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + p.lggr.Debugw("logs stats", "servedLogs", atomic.LoadInt64(&p.servedLogs), "logsInBuffer", atomic.LoadInt64(&p.buffer.logsInBuffer), "latestBlockSeen", p.buffer.latestBlockSeen()) + prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.LoadInt64(&p.buffer.logsInBuffer))) + case <-ctx.Done(): + return + + } + } + }) + return nil }) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go new file mode 100644 index 00000000000..5ea40718923 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -0,0 +1,18 @@ +package prommetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// AutomationNamespace is the namespace for all Automation related metrics +const AutomationNamespace = "automation" + +// Automation metrics +var ( + AutomationLogsInLogBuffer = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: AutomationNamespace, + Name: "num_logs_in_log_buffer", + Help: "The total number of logs currently being stored in the log buffer", + }) +) From b8e5bfff2ad80f1d94c9fe1e8e575d8951cbd5a4 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Tue, 23 Jan 2024 11:44:17 -0800 Subject: [PATCH 2/6] Set metrics in buffer, remove ticker --- .../evmregistry/v21/logprovider/buffer.go | 5 +++-- .../evmregistry/v21/logprovider/provider.go | 18 ------------------ 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 2857fcca712..8349b337743 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" ) var ( @@ -232,7 +233,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { } if added > 0 { lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) - atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)) + prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)))) } return added - dropped @@ -334,7 +335,7 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit if len(results) > 0 { b.lggr.Debugw("Dequeued logs", "results", len(results), "start", start, "end", end) - atomic.AddInt64(&b.logsInBuffer, -int64(len(results))) + prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.AddInt64(&b.logsInBuffer, -int64(len(results))))) } return results diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 45a47ec7e85..d1360faaf6d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -24,7 +24,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -103,8 +102,6 @@ type logEventProvider struct { opts LogTriggersOptions currentPartitionIdx uint64 - - servedLogs int64 } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { @@ -145,21 +142,6 @@ func (p *logEventProvider) Start(context.Context) error { }) }) - p.threadCtrl.Go(func(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - p.lggr.Debugw("logs stats", "servedLogs", atomic.LoadInt64(&p.servedLogs), "logsInBuffer", atomic.LoadInt64(&p.buffer.logsInBuffer), "latestBlockSeen", p.buffer.latestBlockSeen()) - prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.LoadInt64(&p.buffer.logsInBuffer))) - case <-ctx.Done(): - return - - } - } - }) - return nil }) } From 9168f9d7e428d383ae4344404252ee7b611d7db3 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Fri, 26 Jan 2024 15:02:27 -0800 Subject: [PATCH 3/6] Add missed logs metric --- .../ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 2 ++ .../ocr2keeper/evmregistry/v21/prommetrics/metrics.go | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 13b8bb17245..06c9b63ebb2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -27,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -417,6 +418,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB added, alreadyPending, ok := r.populatePending(f, filteredLogs) if added > 0 { r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID) + prommetrics.AutomationRecovererMissedLogs.Add(float64(added)) } if !ok { r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go index 5ea40718923..9de47941941 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -6,13 +6,18 @@ import ( ) // AutomationNamespace is the namespace for all Automation related metrics -const AutomationNamespace = "automation" +const AutomationLogTriggerNamespace = "automation_log_trigger" // Automation metrics var ( AutomationLogsInLogBuffer = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: AutomationNamespace, + Namespace: AutomationLogTriggerNamespace, Name: "num_logs_in_log_buffer", Help: "The total number of logs currently being stored in the log buffer", }) + AutomationRecovererMissedLogs = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: AutomationLogTriggerNamespace, + Name: "num_recoverer_missed_logs", + Help: "How many valid log triggers were identified as being missed by the recoverer", + }) ) From 1c179f9225b1ef76eb82bb1874a773aec380aa04 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Fri, 16 Feb 2024 15:52:40 +0800 Subject: [PATCH 4/6] Remove logsInBuffer var --- .../ocr2keeper/evmregistry/v21/logprovider/buffer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 8349b337743..6418d683869 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -151,8 +151,6 @@ type logEventBuffer struct { blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 - // logsInBuffer is the number of logs currently in the buffer - logsInBuffer int64 } func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { @@ -233,7 +231,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { } if added > 0 { lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) - prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)))) + prommetrics.AutomationLogsInLogBuffer.Add(float64(added - dropped)) } return added - dropped @@ -335,7 +333,7 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit if len(results) > 0 { b.lggr.Debugw("Dequeued logs", "results", len(results), "start", start, "end", end) - prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.AddInt64(&b.logsInBuffer, -int64(len(results))))) + prommetrics.AutomationLogsInLogBuffer.Sub(float64(len(results))) } return results From 3ede80ef61bdc1bc9790a085aef4cc1eca782fe8 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Tue, 20 Feb 2024 15:52:28 -0800 Subject: [PATCH 5/6] Add more metrics --- .../ocr2keeper/evmregistry/v21/active_list.go | 8 ++++++-- .../evmregistry/v21/logprovider/provider.go | 2 ++ .../evmregistry/v21/logprovider/recoverer.go | 5 ++++- .../evmregistry/v21/prommetrics/metrics.go | 15 +++++++++++++++ 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/active_list.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/active_list.go index 55c01939cb8..27c13f079b2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/active_list.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/active_list.go @@ -9,6 +9,7 @@ import ( ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" ) // ActiveUpkeepList is a list to manage active upkeep IDs @@ -49,9 +50,10 @@ func (al *activeList) Reset(ids ...*big.Int) { for _, id := range ids { al.items[id.String()] = true } + prommetrics.AutomationActiveUpkeeps.Set(float64(len(al.items))) } -// Add adds new entries to the list +// Add adds new entries to the list. Returns the number of items added func (al *activeList) Add(ids ...*big.Int) int { al.lock.Lock() defer al.lock.Unlock() @@ -63,10 +65,11 @@ func (al *activeList) Add(ids ...*big.Int) int { al.items[key] = true } } + prommetrics.AutomationActiveUpkeeps.Set(float64(len(al.items))) return count } -// Remove removes entries from the list +// Remove removes entries from the list. Returns the number of items removed func (al *activeList) Remove(ids ...*big.Int) int { al.lock.Lock() defer al.lock.Unlock() @@ -79,6 +82,7 @@ func (al *activeList) Remove(ids ...*big.Int) int { delete(al.items, key) } } + prommetrics.AutomationActiveUpkeeps.Set(float64(len(al.items))) return count } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index d1360faaf6d..e06593a9109 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -162,6 +163,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers if err != nil { return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } + prommetrics.AutomationLogProviderLatestBlock.Set(float64(latest.BlockNumber)) start := latest.BlockNumber - p.opts.LookbackBlocks if start <= 0 { start = 1 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 06c9b63ebb2..9cdbdd0ac86 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -306,7 +306,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { if allLogsCounter >= MaxProposals { - // we have enough proposals, pushed the rest are pushed back to pending + // we have enough proposals, the rest are pushed back to pending pending = append(pending, payload) continue } @@ -322,6 +322,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. } r.pending = pending + prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) r.lggr.Debugf("found %d recoverable payloads", len(results)) @@ -676,6 +677,7 @@ func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { if !exist { r.pending = append(pending, payload) } + prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) return nil } @@ -689,6 +691,7 @@ func (r *logRecoverer) removePending(workID string) { } } r.pending = updated + prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) } // sortPending sorts the pending list by a random order based on the normalized latest block number. diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go index 9de47941941..cebbac59884 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -20,4 +20,19 @@ var ( Name: "num_recoverer_missed_logs", Help: "How many valid log triggers were identified as being missed by the recoverer", }) + AutomationRecovererPendingPayloads = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: AutomationLogTriggerNamespace, + Name: "num_recoverer_pending_payloads", + Help: "How many log trigger payloads are currently pending in the recoverer", + }) + AutomationActiveUpkeeps = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: AutomationLogTriggerNamespace, + Name: "num_active_upkeeps", + Help: "How many log trigger upkeeps are currently active", + }) + AutomationLogProviderLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: AutomationLogTriggerNamespace, + Name: "log_provider_latest_block", + Help: "The latest block number the log provider has seen", + }) ) From 3543f86b0998b158cf372a100ae8fe5ce9ed29c1 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Wed, 21 Feb 2024 09:53:07 -0800 Subject: [PATCH 6/6] Change to inc and dec --- .../ocr2keeper/evmregistry/v21/logprovider/recoverer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 9cdbdd0ac86..2eef5db17d9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -676,8 +676,8 @@ func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { } if !exist { r.pending = append(pending, payload) + prommetrics.AutomationRecovererPendingPayloads.Inc() } - prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) return nil } @@ -688,10 +688,11 @@ func (r *logRecoverer) removePending(workID string) { for _, p := range r.pending { if p.WorkID != workID { updated = append(updated, p) + } else { + prommetrics.AutomationRecovererPendingPayloads.Dec() } } r.pending = updated - prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) } // sortPending sorts the pending list by a random order based on the normalized latest block number.