From f2a09a62bdf797c47b85928bfbc78c927ef05783 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 12 Sep 2023 15:21:46 +0300 Subject: [PATCH 1/6] log instead of error for old trigger config --- .../ocr2keeper/evm21/logprovider/provider_life_cycle.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go index ab816adb1b3..2796c4b3b0e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go @@ -76,7 +76,8 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption if currentFilter != nil { if currentFilter.configUpdateBlock > opts.UpdateBlock { // already registered with a config from a higher block number - return fmt.Errorf("filter for upkeep with id %s already registered with newer config", upkeepID.String()) + p.lggr.Debugf("filter for upkeep with id %s already registered with newer config", upkeepID.String()) + return nil } else if currentFilter.configUpdateBlock == opts.UpdateBlock { // already registered with the same config p.lggr.Debugf("filter for upkeep with id %s already registered with the same config", upkeepID.String()) From 6fabc6df49e339347ccdec6e4a0d60377e5536d1 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 12 Sep 2023 16:28:37 +0300 Subject: [PATCH 2/6] fix test --- .../plugins/ocr2keeper/evm21/logprovider/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index b5f229f6015..77d9d4ec044 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -175,7 +175,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { TriggerConfig: cfg, UpdateBlock: bn.Uint64() - 1, }) - require.Error(t, err) + require.NoError(t, err) // new block b, err = ethClient.BlockByHash(ctx, backend.Commit()) require.NoError(t, err) From a707c3444a37e4ed42c4e1c1597c8e8056851618 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 13 Sep 2023 15:40:11 +0300 Subject: [PATCH 3/6] aligned named logger --- .../plugins/ocr2keeper/evm21/block_subscriber.go | 2 +- .../plugins/ocr2keeper/evm21/logprovider/buffer.go | 2 +- .../ocr2keeper/evm21/logprovider/provider.go | 13 +++++++------ .../ocr2keeper/evm21/logprovider/recoverer.go | 6 +++--- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 12 ++++++------ .../ocr2/plugins/ocr2keeper/evm21/streams_lookup.go | 2 +- .../plugins/ocr2keeper/evm21/upkeepstate/store.go | 2 +- 7 files changed, 20 insertions(+), 19 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 87b46c9785b..f5a18e186ce 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -66,7 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr blockHistorySize: blockHistorySize, blockSize: lookbackDepth, latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, - lggr: lggr.Named("BlockSubscriber"), + lggr: lggr.Named(BlockSubscriberServiceName), } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index b06a3ca809f..15f9080c563 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -152,7 +152,7 @@ type logEventBuffer struct { func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { return &logEventBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), + lggr: lggr.Named("LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), maxBlockLogs: maxBlockLogs, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index b62fb370847..72c227bbe72 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -104,7 +104,7 @@ type logEventProvider struct { func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { return &logEventProvider{ threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("KeepersRegistry.LogEventProvider"), + lggr: lggr.Named(LogProviderServiceName), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, @@ -121,13 +121,14 @@ func (p *logEventProvider) Start(context.Context) error { p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads) for i := 0; i < readerThreads; i++ { + tid := i + 1 p.threadCtrl.Go(func(ctx context.Context) { - p.startReader(ctx, readQ) + p.startReader(ctx, readQ, tid) }) } p.threadCtrl.Go(func(ctx context.Context) { - lggr := p.lggr.With("where", "scheduler") + lggr := p.lggr.Named("Scheduler") p.scheduleReadJobs(ctx, func(ids []*big.Int) { select { @@ -151,7 +152,7 @@ func (p *logEventProvider) Close() error { } func (p *logEventProvider) HealthReport() map[string]error { - return map[string]error{LogProviderServiceName: p.Healthy()} + return map[string]error{p.lggr.Name(): p.Healthy()} } func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([ } // startReader starts a reader that reads logs from the ids coming from readQ. -func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) { +func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) { ctx, cancel := context.WithCancel(pctx) defer cancel() - lggr := p.lggr.With("where", "reader") + lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid)) for { select { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c5b06701737..9eef47d19a3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error { } func (r *logRecoverer) HealthReport() map[string]error { - return map[string]error{LogRecovererServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) { @@ -572,7 +572,7 @@ func (r *logRecoverer) clean(ctx context.Context) { } } r.lock.RUnlock() - lggr := r.lggr.With("where", "clean") + lggr := r.lggr.Named("Cleaner") if len(expired) == 0 { lggr.Debug("no expired upkeeps") return @@ -595,7 +595,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { if err != nil { return fmt.Errorf("failed to get states: %w", err) } - lggr := r.lggr.With("where", "clean") + lggr := r.lggr.Named("TryExpire") start, _ := r.getRecoveryWindow(latestBlock) r.lock.Lock() defer r.lock.Unlock() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index a4684e67078..054fb6355cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -41,7 +41,7 @@ const ( ) var ( - RegistryServiceName = "AutomationRegistry" + RegistryServiceName = "EvmRegistry" ErrLogReadFailure = fmt.Errorf("failure reading logs") ErrHeadNotAvailable = fmt.Errorf("head not available") @@ -87,7 +87,7 @@ func NewEvmRegistry( return &EvmRegistry{ ctx: context.Background(), threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("EvmRegistry"), + lggr: lggr.Named(RegistryServiceName), poller: client.LogPoller(), addr: addr, client: client.Client(), @@ -165,7 +165,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { } r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "upkeeps_referesh") + lggr := r.lggr.Named("UpkeepRefreshThread") err := r.refreshActiveUpkeeps() if err != nil { lggr.Errorf("failed to initialize upkeeps", err) @@ -188,7 +188,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_polling") + lggr := r.lggr.Named("LogPollingThread") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -206,7 +206,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_processing") + lggr := r.lggr.Named("LogProcessingThread") ch := r.chLog for { @@ -234,7 +234,7 @@ func (r *EvmRegistry) Close() error { } func (r *EvmRegistry) HealthReport() map[string]error { - return map[string]error{RegistryServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *EvmRegistry) refreshActiveUpkeeps() error { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index c7345e4ed2f..38d9d1e5609 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct { // streamsLookup looks through check upkeep results looking for any that need off chain lookup func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult { - lggr := r.lggr.With("where", "StreamsLookup") + lggr := r.lggr.Named("StreamsLookup") lookups := map[int]*StreamsLookup{} for i, res := range checkResults { if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index cd123212376..3268f56fe4c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -167,7 +167,7 @@ func (u *upkeepStateStore) Close() error { } func (u *upkeepStateStore) HealthReport() map[string]error { - return map[string]error{UpkeepStateStoreServiceName: u.Healthy()} + return map[string]error{u.lggr.Name(): u.Healthy()} } // SelectByWorkIDs returns the current state of the upkeep for the provided ids. From 0683e15c0c1e5f2beabd3d723acee2a89dc68eb6 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 13 Sep 2023 16:39:23 +0300 Subject: [PATCH 4/6] fix failing test --- .../ocr2keeper/evm21/logprovider/provider_life_cycle_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go index 4b1ff06f316..2c19d37df6c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go @@ -70,7 +70,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) { }, { "existing config with old block", - true, + false, big.NewInt(111), LogTriggerConfig{ ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)), From d633bf6a077e48d77dd46a6a53ae902adedbaabe Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 13 Sep 2023 19:11:45 +0300 Subject: [PATCH 5/6] avoid redundant named loggers and unexport vars --- .../ocr2keeper/evm21/block_subscriber.go | 10 ++++------ .../ocr2keeper/evm21/logprovider/provider.go | 8 ++++---- .../ocr2keeper/evm21/logprovider/recoverer.go | 11 +++++------ .../ocr2/plugins/ocr2keeper/evm21/registry.go | 19 ++++++++----------- 4 files changed, 21 insertions(+), 27 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index f5a18e186ce..bcc73ad5ba9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -27,10 +27,8 @@ const ( lookbackDepth = 1024 // blockHistorySize decides the block history size sent to subscribers blockHistorySize = int64(256) -) -var ( - BlockSubscriberServiceName = "BlockSubscriber" + blockSubscriberServiceName = "BlockSubscriber" ) type BlockSubscriber struct { @@ -66,7 +64,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr blockHistorySize: blockHistorySize, blockSize: lookbackDepth, latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, - lggr: lggr.Named(BlockSubscriberServiceName), + lggr: lggr.Named(blockSubscriberServiceName), } } @@ -148,7 +146,7 @@ func (bs *BlockSubscriber) initialize(ctx context.Context) { } func (bs *BlockSubscriber) Start(ctx context.Context) error { - return bs.StartOnce(BlockSubscriberServiceName, func() error { + return bs.StartOnce(blockSubscriberServiceName, func() error { bs.lggr.Info("block subscriber started.") bs.initialize(ctx) // poll from head broadcaster channel and push to subscribers @@ -184,7 +182,7 @@ func (bs *BlockSubscriber) Start(ctx context.Context) error { } func (bs *BlockSubscriber) Close() error { - return bs.StopOnce(BlockSubscriberServiceName, func() error { + return bs.StopOnce(blockSubscriberServiceName, func() error { bs.lggr.Info("stop block subscriber") bs.threadCtrl.Close() bs.unsubscribe() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 72c227bbe72..a3d84f010a6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -26,7 +26,7 @@ import ( ) var ( - LogProviderServiceName = "LogEventProvider" + logProviderServiceName = "LogEventProvider" ErrHeadNotAvailable = fmt.Errorf("head not available") ErrBlockLimitExceeded = fmt.Errorf("block limit exceeded") @@ -104,7 +104,7 @@ type logEventProvider struct { func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { return &logEventProvider{ threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named(LogProviderServiceName), + lggr: lggr.Named(logProviderServiceName), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, @@ -114,7 +114,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa } func (p *logEventProvider) Start(context.Context) error { - return p.StartOnce(LogProviderServiceName, func() error { + return p.StartOnce(logProviderServiceName, func() error { readQ := make(chan []*big.Int, readJobQueueSize) @@ -145,7 +145,7 @@ func (p *logEventProvider) Start(context.Context) error { } func (p *logEventProvider) Close() error { - return p.StopOnce(LogProviderServiceName, func() error { + return p.StopOnce(logProviderServiceName, func() error { p.threadCtrl.Close() return nil }) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 9eef47d19a3..a8dd8a6aed3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -572,14 +572,14 @@ func (r *logRecoverer) clean(ctx context.Context) { } } r.lock.RUnlock() - lggr := r.lggr.Named("Cleaner") + if len(expired) == 0 { - lggr.Debug("no expired upkeeps") + r.lggr.Debug("no expired upkeeps") return } err := r.tryExpire(ctx, expired...) if err != nil { - lggr.Warnw("failed to clean visited upkeeps", "err", err) + r.lggr.Warnw("failed to clean visited upkeeps", "err", err) } } @@ -595,7 +595,6 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { if err != nil { return fmt.Errorf("failed to get states: %w", err) } - lggr := r.lggr.Named("TryExpire") start, _ := r.getRecoveryWindow(latestBlock) r.lock.Lock() defer r.lock.Unlock() @@ -612,7 +611,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { } if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start { // we can't recover this log anymore, so we remove it from the visited list - lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, + r.lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, "latestBlock", latestBlock, "logBlock", logBlock, "start", start) r.removePending(rec.payload.WorkID) delete(r.visited, ids[i]) @@ -629,7 +628,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { } if removed > 0 { - lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed) + r.lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed) } return nil diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 054fb6355cd..80f53b1bf60 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -41,7 +41,7 @@ const ( ) var ( - RegistryServiceName = "EvmRegistry" + registryServiceName = "EvmRegistry" ErrLogReadFailure = fmt.Errorf("failure reading logs") ErrHeadNotAvailable = fmt.Errorf("head not available") @@ -87,7 +87,7 @@ func NewEvmRegistry( return &EvmRegistry{ ctx: context.Background(), threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named(RegistryServiceName), + lggr: lggr.Named(registryServiceName), poller: client.LogPoller(), addr: addr, client: client.Client(), @@ -159,16 +159,15 @@ func (r *EvmRegistry) Name() string { } func (r *EvmRegistry) Start(ctx context.Context) error { - return r.StartOnce(RegistryServiceName, func() error { + return r.StartOnce(registryServiceName, func() error { if err := r.registerEvents(r.chainID, r.addr); err != nil { return fmt.Errorf("logPoller error while registering automation events: %w", err) } r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.Named("UpkeepRefreshThread") err := r.refreshActiveUpkeeps() if err != nil { - lggr.Errorf("failed to initialize upkeeps", err) + r.lggr.Errorf("failed to initialize upkeeps", err) } ticker := time.NewTicker(refreshInterval) @@ -179,7 +178,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case <-ticker.C: err = r.refreshActiveUpkeeps() if err != nil { - lggr.Errorf("failed to refresh upkeeps", err) + r.lggr.Errorf("failed to refresh upkeeps", err) } case <-ctx.Done(): return @@ -188,7 +187,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.Named("LogPollingThread") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -197,7 +195,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case <-ticker.C: err := r.pollUpkeepStateLogs() if err != nil { - lggr.Errorf("failed to poll logs for upkeeps", err) + r.lggr.Errorf("failed to poll logs for upkeeps", err) } case <-ctx.Done(): return @@ -206,7 +204,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.Named("LogProcessingThread") ch := r.chLog for { @@ -214,7 +211,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case l := <-ch: err := r.processUpkeepStateLog(l) if err != nil { - lggr.Errorf("failed to process log for upkeep", err) + r.lggr.Errorf("failed to process log for upkeep", err) } case <-ctx.Done(): return @@ -227,7 +224,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { } func (r *EvmRegistry) Close() error { - return r.StopOnce(RegistryServiceName, func() error { + return r.StopOnce(registryServiceName, func() error { r.threadCtrl.Close() return nil }) From 096e5d528aaa5b98931cfa063109a58cc884729e Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 13 Sep 2023 19:12:24 +0300 Subject: [PATCH 6/6] leftover --- .../ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 3268f56fe4c..1bdffcbf541 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -17,7 +17,7 @@ import ( ) const ( - UpkeepStateStoreServiceName = "UpkeepStateStore" + upkeepStateStoreServiceName = "UpkeepStateStore" // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. @@ -81,7 +81,7 @@ type upkeepStateStore struct { func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { return &upkeepStateStore{ orm: orm, - lggr: lggr.Named(UpkeepStateStoreServiceName), + lggr: lggr.Named(upkeepStateStoreServiceName), cache: map[string]*upkeepStateRecord{}, scanner: scanner, retention: CacheExpiration, @@ -97,7 +97,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann // it does background cleanup of the cache every GCInterval, // and flush records to DB every flushCadence. func (u *upkeepStateStore) Start(pctx context.Context) error { - return u.StartOnce(UpkeepStateStoreServiceName, func() error { + return u.StartOnce(upkeepStateStoreServiceName, func() error { if err := u.scanner.Start(pctx); err != nil { return fmt.Errorf("failed to start scanner") } @@ -160,7 +160,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) { // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { - return u.StopOnce(UpkeepStateStoreServiceName, func() error { + return u.StopOnce(upkeepStateStoreServiceName, func() error { u.threadCtrl.Close() return nil })