diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 87b46c9785b..f5a18e186ce 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -66,7 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr blockHistorySize: blockHistorySize, blockSize: lookbackDepth, latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, - lggr: lggr.Named("BlockSubscriber"), + lggr: lggr.Named(BlockSubscriberServiceName), } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index b06a3ca809f..15f9080c563 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -152,7 +152,7 @@ type logEventBuffer struct { func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { return &logEventBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), + lggr: lggr.Named("LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), maxBlockLogs: maxBlockLogs, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index b62fb370847..72c227bbe72 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -104,7 +104,7 @@ type logEventProvider struct { func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { return &logEventProvider{ threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("KeepersRegistry.LogEventProvider"), + lggr: lggr.Named(LogProviderServiceName), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, @@ -121,13 +121,14 @@ func (p *logEventProvider) Start(context.Context) error { p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads) for i := 0; i < readerThreads; i++ { + tid := i + 1 p.threadCtrl.Go(func(ctx context.Context) { - p.startReader(ctx, readQ) + p.startReader(ctx, readQ, tid) }) } p.threadCtrl.Go(func(ctx context.Context) { - lggr := p.lggr.With("where", "scheduler") + lggr := p.lggr.Named("Scheduler") p.scheduleReadJobs(ctx, func(ids []*big.Int) { select { @@ -151,7 +152,7 @@ func (p *logEventProvider) Close() error { } func (p *logEventProvider) HealthReport() map[string]error { - return map[string]error{LogProviderServiceName: p.Healthy()} + return map[string]error{p.lggr.Name(): p.Healthy()} } func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([ } // startReader starts a reader that reads logs from the ids coming from readQ. -func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) { +func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) { ctx, cancel := context.WithCancel(pctx) defer cancel() - lggr := p.lggr.With("where", "reader") + lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid)) for { select { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c5b06701737..9eef47d19a3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error { } func (r *logRecoverer) HealthReport() map[string]error { - return map[string]error{LogRecovererServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) { @@ -572,7 +572,7 @@ func (r *logRecoverer) clean(ctx context.Context) { } } r.lock.RUnlock() - lggr := r.lggr.With("where", "clean") + lggr := r.lggr.Named("Cleaner") if len(expired) == 0 { lggr.Debug("no expired upkeeps") return @@ -595,7 +595,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { if err != nil { return fmt.Errorf("failed to get states: %w", err) } - lggr := r.lggr.With("where", "clean") + lggr := r.lggr.Named("TryExpire") start, _ := r.getRecoveryWindow(latestBlock) r.lock.Lock() defer r.lock.Unlock() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index a4684e67078..054fb6355cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -41,7 +41,7 @@ const ( ) var ( - RegistryServiceName = "AutomationRegistry" + RegistryServiceName = "EvmRegistry" ErrLogReadFailure = fmt.Errorf("failure reading logs") ErrHeadNotAvailable = fmt.Errorf("head not available") @@ -87,7 +87,7 @@ func NewEvmRegistry( return &EvmRegistry{ ctx: context.Background(), threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("EvmRegistry"), + lggr: lggr.Named(RegistryServiceName), poller: client.LogPoller(), addr: addr, client: client.Client(), @@ -165,7 +165,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { } r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "upkeeps_referesh") + lggr := r.lggr.Named("UpkeepRefreshThread") err := r.refreshActiveUpkeeps() if err != nil { lggr.Errorf("failed to initialize upkeeps", err) @@ -188,7 +188,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_polling") + lggr := r.lggr.Named("LogPollingThread") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -206,7 +206,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_processing") + lggr := r.lggr.Named("LogProcessingThread") ch := r.chLog for { @@ -234,7 +234,7 @@ func (r *EvmRegistry) Close() error { } func (r *EvmRegistry) HealthReport() map[string]error { - return map[string]error{RegistryServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *EvmRegistry) refreshActiveUpkeeps() error { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index c7345e4ed2f..38d9d1e5609 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct { // streamsLookup looks through check upkeep results looking for any that need off chain lookup func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult { - lggr := r.lggr.With("where", "StreamsLookup") + lggr := r.lggr.Named("StreamsLookup") lookups := map[int]*StreamsLookup{} for i, res := range checkResults { if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index cd123212376..3268f56fe4c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -167,7 +167,7 @@ func (u *upkeepStateStore) Close() error { } func (u *upkeepStateStore) HealthReport() map[string]error { - return map[string]error{UpkeepStateStoreServiceName: u.Healthy()} + return map[string]error{u.lggr.Name(): u.Healthy()} } // SelectByWorkIDs returns the current state of the upkeep for the provided ids.