Skip to content

Commit

Permalink
use services.Ticker (#12668)
Browse files Browse the repository at this point in the history
* use services.NewTicker

* adjust timing to fix flake

* revert MaxWaitTimeForEvents
  • Loading branch information
jmank88 authored Jul 3, 2024
1 parent 190d48a commit dfd239a
Show file tree
Hide file tree
Showing 19 changed files with 90 additions and 90 deletions.
4 changes: 1 addition & 3 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -347,7 +345,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP

c.report(nodeStates)

monitor := time.NewTicker(utils.WithJitter(c.reportInterval))
monitor := services.NewTicker(c.reportInterval)
defer monitor.Stop()

for {
Expand Down
7 changes: 2 additions & 5 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -58,18 +56,17 @@ func (r *Reaper[CHAIN_ID]) Stop() {

func (r *Reaper[CHAIN_ID]) runLoop() {
defer close(r.chDone)
ticker := time.NewTicker(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker := services.NewTicker(r.txConfig.ReaperInterval())
defer ticker.Stop()
for {
select {
case <-r.chStop:
return
case <-ticker.C:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
case <-r.trigger:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker.Reset()
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -120,7 +118,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}

ticker := time.NewTicker(utils.WithJitter(er.interval))
ticker := services.NewTicker(er.interval)
defer ticker.Stop()
for {
select {
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -276,10 +274,12 @@ func (f *FwdMgr) runLoop() {
ctx, cancel := f.stopCh.NewCtx()
defer cancel()

tick := time.After(0)
for ; ; tick = time.After(utils.WithJitter(time.Minute)) {
ticker := services.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-tick:
case <-ticker.C:
if err := f.logpoller.Ready(); err != nil {
f.logger.Warnw("Skipping log syncing", "err", err)
continue
Expand Down
21 changes: 11 additions & 10 deletions core/chains/evm/gas/arbitrum_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/rollups"
Expand Down Expand Up @@ -168,27 +166,31 @@ func (a *arbitrumEstimator) getPricesInArbGas() (perL2Tx uint32, perL1CalldataUn
func (a *arbitrumEstimator) run() {
defer close(a.chDone)

t := a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
close(a.chInitialised)

t := services.TickerConfig{
Initial: a.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(a.pollPeriod)
defer t.Stop()

for {
select {
case <-a.chStop:
return
case ch := <-a.chForceRefetch:
t.Stop()
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
t.Reset()
close(ch)
case <-t.C:
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
}
}
}

// refreshPricesInArbGas calls getPricesInArbGas() and caches the refreshed prices.
func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(a.pollPeriod))

func (a *arbitrumEstimator) refreshPricesInArbGas() {
perL2Tx, perL1CalldataUnit, err := a.l1Oracle.GetPricesInArbGas()
if err != nil {
a.logger.Warnw("Failed to refresh prices", "err", err)
Expand All @@ -201,5 +203,4 @@ func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) {
a.perL2Tx = perL2Tx
a.perL1CalldataUnit = perL1CalldataUnit
a.getPricesInArbGasMu.Unlock()
return
}
29 changes: 16 additions & 13 deletions core/chains/evm/gas/rollups/arbitrum_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -135,41 +134,45 @@ func (o *arbitrumL1Oracle) HealthReport() map[string]error {
func (o *arbitrumL1Oracle) run() {
defer close(o.chDone)

t := o.refresh()
o.refresh()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)
defer t.Stop()

for {
select {
case <-o.chStop:
return
case <-t.C:
t = o.refresh()
o.refresh()
}
}
}
func (o *arbitrumL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
func (o *arbitrumL1Oracle) refresh() {
err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
o.SvcErrBuffer.Append(err)
}
return
}

func (o *arbitrumL1Oracle) refreshWithError() (t *time.Timer, err error) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *arbitrumL1Oracle) refreshWithError() error {
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()

price, err := o.fetchL1GasPrice(ctx)
if err != nil {
return t, err
return err
}

o.l1GasPriceMu.Lock()
defer o.l1GasPriceMu.Unlock()
o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()}
return
return nil
}

func (o *arbitrumL1Oracle) fetchL1GasPrice(ctx context.Context) (price *big.Int, err error) {
Expand Down
29 changes: 16 additions & 13 deletions core/chains/evm/gas/rollups/op_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -242,41 +241,45 @@ func (o *optimismL1Oracle) HealthReport() map[string]error {
func (o *optimismL1Oracle) run() {
defer close(o.chDone)

t := o.refresh()
o.refresh()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)
defer t.Stop()

for {
select {
case <-o.chStop:
return
case <-t.C:
t = o.refresh()
o.refresh()
}
}
}
func (o *optimismL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
func (o *optimismL1Oracle) refresh() {
err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
o.SvcErrBuffer.Append(err)
}
return
}

func (o *optimismL1Oracle) refreshWithError() (t *time.Timer, err error) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *optimismL1Oracle) refreshWithError() error {
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()

price, err := o.GetDAGasPrice(ctx)
if err != nil {
return t, err
return err
}

o.l1GasPriceMu.Lock()
defer o.l1GasPriceMu.Unlock()
o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()}
return
return nil
}

func (o *optimismL1Oracle) GasPrice(_ context.Context) (l1GasPrice *assets.Wei, err error) {
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/gas/rollups/zkSync_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (o *zkSyncL1Oracle) run() {
func (o *zkSyncL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
o.SvcErrBuffer.Append(err)
}
return
Expand Down
19 changes: 10 additions & 9 deletions core/chains/evm/gas/suggested_price_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

"github.com/smartcontractkit/chainlink/v2/common/fee"
Expand Down Expand Up @@ -101,26 +100,29 @@ func (o *SuggestedPriceEstimator) HealthReport() map[string]error {
func (o *SuggestedPriceEstimator) run() {
defer close(o.chDone)

t := o.refreshPrice()
o.refreshPrice()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)

for {
select {
case <-o.chStop:
return
case ch := <-o.chForceRefetch:
t.Stop()
t = o.refreshPrice()
o.refreshPrice()
t.Reset()
close(ch)
case <-t.C:
t = o.refreshPrice()
o.refreshPrice()
}
}
}

func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *SuggestedPriceEstimator) refreshPrice() {
var res hexutil.Big
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()
Expand All @@ -136,7 +138,6 @@ func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) {
o.gasPriceMu.Lock()
defer o.gasPriceMu.Unlock()
o.GasPrice = bi
return
}

// Uses the force refetch chan to trigger a price update and blocks until complete
Expand Down
15 changes: 9 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,14 @@ func (lp *logPoller) run() {
defer lp.wg.Done()
ctx, cancel := lp.stopCh.NewCtx()
defer cancel()
logPollTick := time.After(0)
logPollTicker := services.NewTicker(lp.pollPeriod)
defer logPollTicker.Stop()
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
backupLogPollTicker := services.TickerConfig{
Initial: 100 * time.Millisecond,
JitterPct: services.DefaultJitter,
}.NewTicker(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)
defer backupLogPollTicker.Stop()
filtersLoaded := false

for {
Expand All @@ -570,8 +575,7 @@ func (lp *logPoller) run() {
return
case fromBlockReq := <-lp.replayStart:
lp.handleReplayRequest(ctx, fromBlockReq, filtersLoaded)
case <-logPollTick:
logPollTick = time.After(utils.WithJitter(lp.pollPeriod))
case <-logPollTicker.C:
if !filtersLoaded {
if err := lp.loadFilters(ctx); err != nil {
lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err)
Expand Down Expand Up @@ -609,7 +613,7 @@ func (lp *logPoller) run() {
start = lastProcessed.BlockNumber + 1
}
lp.PollAndSaveLogs(ctx, start)
case <-backupLogPollTick:
case <-backupLogPollTicker.C:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
}
Expand All @@ -621,7 +625,6 @@ func (lp *logPoller) run() {
// frequently than the primary log poller (instead of roughly once per block it runs once roughly once every
// lp.backupPollerDelay blocks--with default settings about 100x less frequently).

backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod))
if !filtersLoaded {
lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
continue
Expand Down
Loading

0 comments on commit dfd239a

Please sign in to comment.