Skip to content

Commit

Permalink
use services.NewTicker
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 4, 2024
1 parent 324ca5b commit 493c20a
Show file tree
Hide file tree
Showing 26 changed files with 69 additions and 86 deletions.
4 changes: 1 addition & 3 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/config"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
Expand Down Expand Up @@ -327,7 +325,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP

c.report()

monitor := time.NewTicker(utils.WithJitter(c.reportInterval))
monitor := services.NewTicker(c.reportInterval)
defer monitor.Stop()

for {
Expand Down
7 changes: 2 additions & 5 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -58,18 +56,17 @@ func (r *Reaper[CHAIN_ID]) Stop() {

func (r *Reaper[CHAIN_ID]) runLoop() {
defer close(r.chDone)
ticker := time.NewTicker(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker := services.NewTicker(r.txConfig.ReaperInterval())
defer ticker.Stop()
for {
select {
case <-r.chStop:
return
case <-ticker.C:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
case <-r.trigger:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker.Reset()
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -120,7 +119,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}

ticker := time.NewTicker(utils.WithJitter(er.interval))
ticker := services.NewTicker(er.interval)
defer ticker.Stop()
for {
select {
Expand Down
4 changes: 1 addition & 3 deletions core/chains/evm/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)
Expand Down Expand Up @@ -235,7 +233,7 @@ func (p *Pool) runLoop() {
// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
reportInterval := 6500 * time.Millisecond
monitor := time.NewTicker(utils.WithJitter(reportInterval))
monitor := services.NewTicker(reportInterval)
defer monitor.Stop()

for {
Expand Down
9 changes: 4 additions & 5 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -233,11 +231,12 @@ func (f *FwdMgr) getCachedSenders(addr common.Address) ([]common.Address, bool)

func (f *FwdMgr) runLoop() {
defer f.wg.Done()
tick := time.After(0)
ticker := services.NewTicker(time.Minute)
defer ticker.Stop()

for ; ; tick = time.After(utils.WithJitter(time.Minute)) {
for {
select {
case <-tick:
case <-ticker.C:
if err := f.logpoller.Ready(); err != nil {
f.logger.Warnw("Skipping log syncing", "err", err)
continue
Expand Down
15 changes: 6 additions & 9 deletions core/chains/evm/gas/arbitrum_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -175,27 +173,26 @@ func (a *arbitrumEstimator) getPricesInArbGas() (perL2Tx uint32, perL1CalldataUn
func (a *arbitrumEstimator) run() {
defer close(a.chDone)

t := a.refreshPricesInArbGas()
t := services.NewTicker(a.pollPeriod)
defer t.Stop()
close(a.chInitialised)

for {
select {
case <-a.chStop:
return
case ch := <-a.chForceRefetch:
t.Stop()
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
t.Reset()
close(ch)
case <-t.C:
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
}
}
}

// refreshPricesInArbGas calls getPricesInArbGas() and caches the refreshed prices.
func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(a.pollPeriod))

func (a *arbitrumEstimator) refreshPricesInArbGas() {
perL2Tx, perL1CalldataUnit, err := a.callGetPricesInArbGas()
if err != nil {
a.logger.Warnw("Failed to refresh prices", "err", err)
Expand Down
20 changes: 9 additions & 11 deletions core/chains/evm/gas/rollups/l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/common/config"
Expand Down Expand Up @@ -221,35 +220,34 @@ func (o *l1Oracle) HealthReport() map[string]error {
func (o *l1Oracle) run() {
defer close(o.chDone)

t := o.refresh()
t := services.NewTicker(o.pollPeriod)
defer t.Stop()
close(o.chInitialised)

for {
select {
case <-o.chStop:
return
case <-t.C:
t = o.refresh()
o.refresh()
}
}
}
func (o *l1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
func (o *l1Oracle) refresh() {
err := o.refreshWithError()
if err != nil {
o.SvcErrBuffer.Append(err)
}
return

Check failure on line 241 in core/chains/evm/gas/rollups/l1_oracle.go

View workflow job for this annotation

GitHub Actions / lint

S1023: redundant `return` statement (gosimple)
}

func (o *l1Oracle) refreshWithError() (t *time.Timer, err error) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *l1Oracle) refreshWithError() (err error) {
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()

price, err := o.fetchL1GasPrice(ctx)
if err != nil {
return t, err
return err
}

o.l1GasPriceMu.Lock()
Expand Down
14 changes: 6 additions & 8 deletions core/chains/evm/gas/suggested_price_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

"github.com/smartcontractkit/chainlink/v2/common/fee"
Expand Down Expand Up @@ -94,26 +93,25 @@ func (o *SuggestedPriceEstimator) HealthReport() map[string]error {
func (o *SuggestedPriceEstimator) run() {
defer close(o.chDone)

t := o.refreshPrice()
t := services.NewTicker(o.pollPeriod)
defer t.Stop()
close(o.chInitialised)

for {
select {
case <-o.chStop:
return
case ch := <-o.chForceRefetch:
t.Stop()
t = o.refreshPrice()
o.refreshPrice()
t.Reset()
close(ch)
case <-t.C:
t = o.refreshPrice()
o.refreshPrice()
}
}
}

func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *SuggestedPriceEstimator) refreshPrice() {
var res hexutil.Big
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()
Expand Down
15 changes: 9 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,14 @@ func (lp *logPoller) loadFilters() error {

func (lp *logPoller) run() {
defer lp.wg.Done()
logPollTick := time.After(0)
logPollTicker := services.NewTicker(lp.pollPeriod)
defer logPollTicker.Stop()
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
backupLogPollTicker := services.TickerConfig{
Initial: 100 * time.Millisecond,
JitterPct: services.DefaultJitter,
}.NewTicker(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)
defer backupLogPollTicker.Stop()
filtersLoaded := false

for {
Expand All @@ -564,8 +569,7 @@ func (lp *logPoller) run() {
return
case fromBlockReq := <-lp.replayStart:
lp.handleReplayRequest(fromBlockReq, filtersLoaded)
case <-logPollTick:
logPollTick = time.After(utils.WithJitter(lp.pollPeriod))
case <-logPollTicker.C:
if !filtersLoaded {
if err := lp.loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err)
Expand Down Expand Up @@ -603,7 +607,7 @@ func (lp *logPoller) run() {
start = lastProcessed.BlockNumber + 1
}
lp.PollAndSaveLogs(lp.ctx, start)
case <-backupLogPollTick:
case <-backupLogPollTicker.C:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
}
Expand All @@ -615,7 +619,6 @@ func (lp *logPoller) run() {
// frequently than the primary log poller (instead of roughly once per block it runs once roughly once every
// lp.backupPollerDelay blocks--with default settings about 100x less frequently).

backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod))
if !filtersLoaded {
lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
continue
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI=
github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1 h1:GkGvPo3Oo+sxGdPV0LpD1odtOIR3bModCGlhtHPSR/k=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404121355-0f6ca04d47b1/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
3 changes: 1 addition & 2 deletions core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var _ job.ServiceCtx = &service{}
Expand Down Expand Up @@ -212,7 +211,6 @@ type service struct {
func (s *service) Start(context.Context) error {
return s.StartOnce("BHS Feeder Service", func() error {
s.logger.Infow("Starting BHS feeder")
ticker := time.NewTicker(utils.WithJitter(s.pollPeriod))
s.parentCtx, s.cancel = context.WithCancel(context.Background())
s.wg.Add(2)
go func() {
Expand All @@ -221,6 +219,7 @@ func (s *service) Start(context.Context) error {
}()
go func() {
defer s.wg.Done()
ticker := services.NewTicker(s.pollPeriod)
defer ticker.Stop()
for {
select {
Expand Down
3 changes: 1 addition & 2 deletions core/services/blockheaderfeeder/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var _ job.ServiceCtx = &service{}
Expand Down Expand Up @@ -226,10 +225,10 @@ type service struct {
func (s *service) Start(context.Context) error {
return s.StartOnce("Block Header Feeder Service", func() error {
s.logger.Infow("Starting BlockHeaderFeeder")
ticker := time.NewTicker(utils.WithJitter(s.pollPeriod))
s.parentCtx, s.cancel = context.WithCancel(context.Background())
go func() {
defer close(s.done)
ticker := services.NewTicker(s.pollPeriod)
defer ticker.Stop()
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions core/services/llo/onchain_channel_definition_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/channel_config_store"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

type ChannelDefinitionCacheORM interface {
Expand Down Expand Up @@ -114,7 +113,8 @@ const pollInterval = 1 * time.Second
func (c *channelDefinitionCache) poll() {
defer c.wg.Done()

pollT := time.NewTicker(utils.WithJitter(pollInterval))
pollT := services.NewTicker(pollInterval)
defer pollT.Stop()

for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ func (r *logRecoverer) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
cleanupTicker := time.NewTicker(utils.WithJitter(GCInterval))
cleanupTicker := services.NewTicker(GCInterval)
defer cleanupTicker.Stop()

for {
select {
case <-cleanupTicker.C:
r.clean(ctx)
cleanupTicker.Reset(utils.WithJitter(GCInterval))
cleanupTicker.Reset()
case <-ctx.Done():
return
}
Expand Down
Loading

0 comments on commit 493c20a

Please sign in to comment.