From 493c20a453434ab0be3db957ffa0fea71567b068 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 2 Apr 2024 20:46:06 -0500 Subject: [PATCH] use services.NewTicker --- common/client/multi_node.go | 4 +--- common/txmgr/reaper.go | 7 ++----- common/txmgr/resender.go | 5 ++--- core/chains/evm/client/pool.go | 4 +--- .../evm/forwarders/forwarder_manager.go | 9 ++++----- core/chains/evm/gas/arbitrum_estimator.go | 15 ++++++-------- core/chains/evm/gas/rollups/l1_oracle.go | 20 +++++++++---------- .../evm/gas/suggested_price_estimator.go | 14 ++++++------- core/chains/evm/logpoller/log_poller.go | 15 ++++++++------ core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 ++-- core/services/blockhashstore/delegate.go | 3 +-- core/services/blockheaderfeeder/delegate.go | 3 +-- .../llo/onchain_channel_definition_cache.go | 4 ++-- .../evmregistry/v21/logprovider/recoverer.go | 4 ++-- .../evmregistry/v21/upkeepstate/store.go | 4 ++-- core/services/pipeline/runner.go | 5 ++--- .../relay/evm/mercury/persistence_manager.go | 6 +++--- core/services/relay/evm/mercury/queue.go | 3 +-- .../relay/evm/mercury/wsrpc/cache/cache.go | 6 +++--- go.mod | 2 +- go.sum | 4 ++-- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 ++-- 26 files changed, 69 insertions(+), 86 deletions(-) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index cc8daed599c..2a03dbcb9ff 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -15,8 +15,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - "github.com/smartcontractkit/chainlink/v2/common/config" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -327,7 +325,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP c.report() - monitor := time.NewTicker(utils.WithJitter(c.reportInterval)) + monitor := services.NewTicker(c.reportInterval) defer monitor.Stop() for { diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go index 3ed05b2caee..932b58f6430 100644 --- a/common/txmgr/reaper.go +++ b/common/txmgr/reaper.go @@ -7,8 +7,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -58,7 +56,7 @@ func (r *Reaper[CHAIN_ID]) Stop() { func (r *Reaper[CHAIN_ID]) runLoop() { defer close(r.chDone) - ticker := time.NewTicker(utils.WithJitter(r.txConfig.ReaperInterval())) + ticker := services.NewTicker(r.txConfig.ReaperInterval()) defer ticker.Stop() for { select { @@ -66,10 +64,9 @@ func (r *Reaper[CHAIN_ID]) runLoop() { return case <-ticker.C: r.work() - ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval())) case <-r.trigger: r.work() - ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval())) + ticker.Reset() } } } diff --git a/common/txmgr/resender.go b/common/txmgr/resender.go index 8c2dd6b827e..2666e7c9802 100644 --- a/common/txmgr/resender.go +++ b/common/txmgr/resender.go @@ -8,8 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chains/label" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/common/client" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -120,7 +119,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err) } - ticker := time.NewTicker(utils.WithJitter(er.interval)) + ticker := services.NewTicker(er.interval) defer ticker.Stop() for { select { diff --git a/core/chains/evm/client/pool.go b/core/chains/evm/client/pool.go index 891fd8c9226..db902378bf7 100644 --- a/core/chains/evm/client/pool.go +++ b/core/chains/evm/client/pool.go @@ -17,8 +17,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - "github.com/smartcontractkit/chainlink/v2/common/config" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -235,7 +233,7 @@ func (p *Pool) runLoop() { // Prometheus' default interval is 15s, set this to under 7.5s to avoid // aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency) reportInterval := 6500 * time.Millisecond - monitor := time.NewTicker(utils.WithJitter(reportInterval)) + monitor := services.NewTicker(reportInterval) defer monitor.Stop() for { diff --git a/core/chains/evm/forwarders/forwarder_manager.go b/core/chains/evm/forwarders/forwarder_manager.go index f0786c091c4..b6e19ea9016 100644 --- a/core/chains/evm/forwarders/forwarder_manager.go +++ b/core/chains/evm/forwarders/forwarder_manager.go @@ -13,8 +13,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -233,11 +231,12 @@ func (f *FwdMgr) getCachedSenders(addr common.Address) ([]common.Address, bool) func (f *FwdMgr) runLoop() { defer f.wg.Done() - tick := time.After(0) + ticker := services.NewTicker(time.Minute) + defer ticker.Stop() - for ; ; tick = time.After(utils.WithJitter(time.Minute)) { + for { select { - case <-tick: + case <-ticker.C: if err := f.logpoller.Ready(); err != nil { f.logger.Warnw("Skipping log syncing", "err", err) continue diff --git a/core/chains/evm/gas/arbitrum_estimator.go b/core/chains/evm/gas/arbitrum_estimator.go index 40366c5b998..6211341bbea 100644 --- a/core/chains/evm/gas/arbitrum_estimator.go +++ b/core/chains/evm/gas/arbitrum_estimator.go @@ -15,8 +15,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -175,7 +173,8 @@ func (a *arbitrumEstimator) getPricesInArbGas() (perL2Tx uint32, perL1CalldataUn func (a *arbitrumEstimator) run() { defer close(a.chDone) - t := a.refreshPricesInArbGas() + t := services.NewTicker(a.pollPeriod) + defer t.Stop() close(a.chInitialised) for { @@ -183,19 +182,17 @@ func (a *arbitrumEstimator) run() { case <-a.chStop: return case ch := <-a.chForceRefetch: - t.Stop() - t = a.refreshPricesInArbGas() + a.refreshPricesInArbGas() + t.Reset() close(ch) case <-t.C: - t = a.refreshPricesInArbGas() + a.refreshPricesInArbGas() } } } // refreshPricesInArbGas calls getPricesInArbGas() and caches the refreshed prices. -func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) { - t = time.NewTimer(utils.WithJitter(a.pollPeriod)) - +func (a *arbitrumEstimator) refreshPricesInArbGas() { perL2Tx, perL1CalldataUnit, err := a.callGetPricesInArbGas() if err != nil { a.logger.Warnw("Failed to refresh prices", "err", err) diff --git a/core/chains/evm/gas/rollups/l1_oracle.go b/core/chains/evm/gas/rollups/l1_oracle.go index ae46071cf0d..33183ef26fc 100644 --- a/core/chains/evm/gas/rollups/l1_oracle.go +++ b/core/chains/evm/gas/rollups/l1_oracle.go @@ -14,11 +14,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - - gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink/v2/common/client" "github.com/smartcontractkit/chainlink/v2/common/config" @@ -221,7 +220,8 @@ func (o *l1Oracle) HealthReport() map[string]error { func (o *l1Oracle) run() { defer close(o.chDone) - t := o.refresh() + t := services.NewTicker(o.pollPeriod) + defer t.Stop() close(o.chInitialised) for { @@ -229,27 +229,25 @@ func (o *l1Oracle) run() { case <-o.chStop: return case <-t.C: - t = o.refresh() + o.refresh() } } } -func (o *l1Oracle) refresh() (t *time.Timer) { - t, err := o.refreshWithError() +func (o *l1Oracle) refresh() { + err := o.refreshWithError() if err != nil { o.SvcErrBuffer.Append(err) } return } -func (o *l1Oracle) refreshWithError() (t *time.Timer, err error) { - t = time.NewTimer(utils.WithJitter(o.pollPeriod)) - +func (o *l1Oracle) refreshWithError() (err error) { ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout()) defer cancel() price, err := o.fetchL1GasPrice(ctx) if err != nil { - return t, err + return err } o.l1GasPriceMu.Lock() diff --git a/core/chains/evm/gas/suggested_price_estimator.go b/core/chains/evm/gas/suggested_price_estimator.go index edc1b0f92fa..8f67c0e1499 100644 --- a/core/chains/evm/gas/suggested_price_estimator.go +++ b/core/chains/evm/gas/suggested_price_estimator.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" "github.com/smartcontractkit/chainlink/v2/common/fee" @@ -94,7 +93,8 @@ func (o *SuggestedPriceEstimator) HealthReport() map[string]error { func (o *SuggestedPriceEstimator) run() { defer close(o.chDone) - t := o.refreshPrice() + t := services.NewTicker(o.pollPeriod) + defer t.Stop() close(o.chInitialised) for { @@ -102,18 +102,16 @@ func (o *SuggestedPriceEstimator) run() { case <-o.chStop: return case ch := <-o.chForceRefetch: - t.Stop() - t = o.refreshPrice() + o.refreshPrice() + t.Reset() close(ch) case <-t.C: - t = o.refreshPrice() + o.refreshPrice() } } } -func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) { - t = time.NewTimer(utils.WithJitter(o.pollPeriod)) - +func (o *SuggestedPriceEstimator) refreshPrice() { var res hexutil.Big ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout()) defer cancel() diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index de2a182bbce..0619b31cd48 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -553,9 +553,14 @@ func (lp *logPoller) loadFilters() error { func (lp *logPoller) run() { defer lp.wg.Done() - logPollTick := time.After(0) + logPollTicker := services.NewTicker(lp.pollPeriod) + defer logPollTicker.Stop() // stagger these somewhat, so they don't all run back-to-back - backupLogPollTick := time.After(100 * time.Millisecond) + backupLogPollTicker := services.TickerConfig{ + Initial: 100 * time.Millisecond, + JitterPct: services.DefaultJitter, + }.NewTicker(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod) + defer backupLogPollTicker.Stop() filtersLoaded := false for { @@ -564,8 +569,7 @@ func (lp *logPoller) run() { return case fromBlockReq := <-lp.replayStart: lp.handleReplayRequest(fromBlockReq, filtersLoaded) - case <-logPollTick: - logPollTick = time.After(utils.WithJitter(lp.pollPeriod)) + case <-logPollTicker.C: if !filtersLoaded { if err := lp.loadFilters(); err != nil { lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err) @@ -603,7 +607,7 @@ func (lp *logPoller) run() { start = lastProcessed.BlockNumber + 1 } lp.PollAndSaveLogs(lp.ctx, start) - case <-backupLogPollTick: + case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled } @@ -615,7 +619,6 @@ func (lp *logPoller) run() { // frequently than the primary log poller (instead of roughly once per block it runs once roughly once every // lp.backupPollerDelay blocks--with default settings about 100x less frequently). - backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)) if !filtersLoaded { lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping") continue diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 272debca3ac..6895dd03123 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 3b61b46475d..11b3de343d6 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 h1:GkGvPo3Oo+sxGdPV0LpD1odtOIR3bModCGlhtHPSR/k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/core/services/blockhashstore/delegate.go b/core/services/blockhashstore/delegate.go index 9a11c057c32..624a6e2523d 100644 --- a/core/services/blockhashstore/delegate.go +++ b/core/services/blockhashstore/delegate.go @@ -20,7 +20,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var _ job.ServiceCtx = &service{} @@ -212,7 +211,6 @@ type service struct { func (s *service) Start(context.Context) error { return s.StartOnce("BHS Feeder Service", func() error { s.logger.Infow("Starting BHS feeder") - ticker := time.NewTicker(utils.WithJitter(s.pollPeriod)) s.parentCtx, s.cancel = context.WithCancel(context.Background()) s.wg.Add(2) go func() { @@ -221,6 +219,7 @@ func (s *service) Start(context.Context) error { }() go func() { defer s.wg.Done() + ticker := services.NewTicker(s.pollPeriod) defer ticker.Stop() for { select { diff --git a/core/services/blockheaderfeeder/delegate.go b/core/services/blockheaderfeeder/delegate.go index 19edb43bc23..d00736b027c 100644 --- a/core/services/blockheaderfeeder/delegate.go +++ b/core/services/blockheaderfeeder/delegate.go @@ -20,7 +20,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var _ job.ServiceCtx = &service{} @@ -226,10 +225,10 @@ type service struct { func (s *service) Start(context.Context) error { return s.StartOnce("Block Header Feeder Service", func() error { s.logger.Infow("Starting BlockHeaderFeeder") - ticker := time.NewTicker(utils.WithJitter(s.pollPeriod)) s.parentCtx, s.cancel = context.WithCancel(context.Background()) go func() { defer close(s.done) + ticker := services.NewTicker(s.pollPeriod) defer ticker.Stop() for { select { diff --git a/core/services/llo/onchain_channel_definition_cache.go b/core/services/llo/onchain_channel_definition_cache.go index d72079d0b1e..5c89561481a 100644 --- a/core/services/llo/onchain_channel_definition_cache.go +++ b/core/services/llo/onchain_channel_definition_cache.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/channel_config_store" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type ChannelDefinitionCacheORM interface { @@ -114,7 +113,8 @@ const pollInterval = 1 * time.Second func (c *channelDefinitionCache) poll() { defer c.wg.Done() - pollT := time.NewTicker(utils.WithJitter(pollInterval)) + pollT := services.NewTicker(pollInterval) + defer pollT.Stop() for { select { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 26c56c23b8c..d3f79324ac8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -149,14 +149,14 @@ func (r *logRecoverer) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - cleanupTicker := time.NewTicker(utils.WithJitter(GCInterval)) + cleanupTicker := services.NewTicker(GCInterval) defer cleanupTicker.Stop() for { select { case <-cleanupTicker.C: r.clean(ctx) - cleanupTicker.Reset(utils.WithJitter(GCInterval)) + cleanupTicker.Reset() case <-ctx.Done(): return } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index 9410374d7ca..f4680d2357b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -108,7 +108,7 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.lggr.Debug("Starting upkeep state store") u.threadCtrl.Go(func(ctx context.Context) { - ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) + ticker := services.NewTicker(u.cleanCadence) defer ticker.Stop() flushTicker := newTickerFn(utils.WithJitter(flushCadence)) @@ -120,7 +120,7 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { if err := u.cleanup(ctx); err != nil { u.lggr.Errorw("unable to clean old state values", "err", err) } - ticker.Reset(utils.WithJitter(u.cleanCadence)) + ticker.Reset() case <-flushTicker.C: u.flush(ctx) flushTicker.Reset(utils.WithJitter(flushCadence)) diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 3b89a1d4945..f6c477d1666 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -25,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/recovery" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/store/models" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name Runner --output ./mocks/ --case=underscore @@ -171,7 +170,7 @@ func (r *runner) runReaperLoop() { return } - runReaperTicker := time.NewTicker(utils.WithJitter(r.config.ReaperInterval())) + runReaperTicker := services.NewTicker(r.config.ReaperInterval()) defer runReaperTicker.Stop() for { select { @@ -179,7 +178,7 @@ func (r *runner) runReaperLoop() { return case <-runReaperTicker.C: r.runReaperWorker.WakeUp() - runReaperTicker.Reset(utils.WithJitter(r.config.ReaperInterval())) + runReaperTicker.Reset() } } } diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index dc805c12e7b..944222b78d4 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -90,11 +90,11 @@ func (pm *PersistenceManager) runFlushDeletesLoop() { ctx, cancel := pm.stopCh.Ctx(context.Background()) defer cancel() - ticker := time.NewTicker(utils.WithJitter(pm.flushDeletesFrequency)) + ticker := services.NewTicker(pm.flushDeletesFrequency) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: queuedReqs := pm.resetDeleteQueue() @@ -115,10 +115,10 @@ func (pm *PersistenceManager) runPruneLoop() { defer cancel() ticker := time.NewTicker(utils.WithJitter(pm.pruneFrequency)) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: if err := pm.orm.PruneTransmitRequests(pm.serverURL, pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 8a89f47302b..abf3c58f0e9 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name asyncDeleter --output ./mocks/ --case=underscore --structname=AsyncDeleter @@ -134,7 +133,7 @@ func (tq *TransmitQueue) IsEmpty() bool { func (tq *TransmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { - t := time.NewTicker(utils.WithJitter(promInterval)) + t := services.NewTicker(promInterval) wg := new(sync.WaitGroup) chStop := make(chan struct{}) tq.stopMonitor = func() { diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache.go b/core/services/relay/evm/mercury/wsrpc/cache/cache.go index adc439e802b..58e04c5d730 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache.go @@ -15,7 +15,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -346,13 +345,14 @@ func (m *memCache) runloop() { if m.cfg.MaxStaleAge == 0 { return } - t := time.NewTicker(utils.WithJitter(m.cfg.MaxStaleAge)) + t := services.NewTicker(m.cfg.MaxStaleAge) + defer t.Stop() for { select { case <-t.C: m.cleanup() - t.Reset(utils.WithJitter(m.cfg.MaxStaleAge)) + t.Reset() case <-m.chStop: return } diff --git a/go.mod b/go.mod index d4a2a5f37a1..64b2c1b4adc 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 diff --git a/go.sum b/go.sum index 9c82988c9c5..8fcd609fbfe 100644 --- a/go.sum +++ b/go.sum @@ -1182,8 +1182,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 h1:GkGvPo3Oo+sxGdPV0LpD1odtOIR3bModCGlhtHPSR/k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index fec4e8aab07..6587ad2cfc5 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -24,7 +24,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 github.com/smartcontractkit/chainlink-testing-framework v1.27.8 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index bd4fdc83dbc..1b9eca3046f 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1525,8 +1525,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 h1:GkGvPo3Oo+sxGdPV0LpD1odtOIR3bModCGlhtHPSR/k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index dbf0c3d28fd..0891cdde0fa 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 github.com/smartcontractkit/chainlink-testing-framework v1.27.8 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 9b92b5e561d..ca7accb789b 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1508,8 +1508,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 h1:GkGvPo3Oo+sxGdPV0LpD1odtOIR3bModCGlhtHPSR/k= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=