Skip to content

Commit

Permalink
core/utils: StopChan & StartStopOnce cleanup (#11341)
Browse files Browse the repository at this point in the history
* core/utils: make StopChan an alias to common

* use services.StopChan; deprecate utils.StopChan

* use services.StateMachine instead of deprecated utils.StartStopOnce
  • Loading branch information
jmank88 authored Nov 20, 2023
1 parent 7e0fa23 commit c9312c6
Show file tree
Hide file tree
Showing 52 changed files with 138 additions and 281 deletions.
4 changes: 2 additions & 2 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type multiNode[
activeMu sync.RWMutex
activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT]

chStop utils.StopChan
chStop services.StopChan
wg sync.WaitGroup

sendOnlyErrorParser func(err error) SendTxReturnCode
Expand Down Expand Up @@ -146,7 +146,7 @@ func NewMultiNode[
selectionMode: selectionMode,
noNewHeadsThreshold: noNewHeadsThreshold,
nodeSelector: nodeSelector,
chStop: make(chan struct{}),
chStop: make(services.StopChan),
leaseDuration: leaseDuration,
chainFamily: chainFamily,
sendOnlyErrorParser: sendOnlyErrorParser,
Expand Down
3 changes: 1 addition & 2 deletions common/client/send_only_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

//go:generate mockery --quiet --name sendOnlyClient --structname mockSendOnlyClient --filename "mock_send_only_client_test.go" --inpackage --case=underscore
Expand Down Expand Up @@ -59,7 +58,7 @@ type sendOnlyNode[
log logger.Logger
name string
chainID CHAIN_ID
chStop utils.StopChan
chStop services.StopChan
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mutex sync.Mutex
chClose utils.StopChan
chClose services.StopChan
wgDone sync.WaitGroup
latest H
lastCallbackID int
Expand Down
4 changes: 3 additions & 1 deletion common/headtracker/head_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/services"

htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -35,7 +37,7 @@ type HeadListener[
config htrktypes.Config
client htrktypes.Client[HTH, S, ID, BLOCK_HASH]
logger logger.Logger
chStop utils.StopChan
chStop services.StopChan
chHeaders chan HTH
headSubscription types.Subscription
connected atomic.Bool
Expand Down
2 changes: 1 addition & 1 deletion common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type HeadTracker[
backfillMB *utils.Mailbox[HTH]
broadcastMB *utils.Mailbox[HTH]
headListener types.HeadListener[HTH, BLOCK_HASH]
chStop utils.StopChan
chStop services.StopChan
wgDone sync.WaitGroup
getNilHead func() HTH
}
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type Broadcaster[
// Each key has its own trigger
triggers map[ADDR]chan struct{}

chStop utils.StopChan
chStop services.StopChan
wg sync.WaitGroup

initSync sync.Mutex
Expand Down
8 changes: 5 additions & 3 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -20,7 +22,7 @@ type Reaper[CHAIN_ID types.ID] struct {
log logger.Logger
latestBlockNum atomic.Int64
trigger chan struct{}
chStop chan struct{}
chStop services.StopChan
chDone chan struct{}
}

Expand All @@ -34,7 +36,7 @@ func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistory
lggr.Named("Reaper"),
atomic.Int64{},
make(chan struct{}, 1),
make(chan struct{}),
make(services.StopChan),
make(chan struct{}),
}
r.latestBlockNum.Store(-1)
Expand Down Expand Up @@ -97,7 +99,7 @@ func (r *Reaper[CHAIN_ID]) SetLatestBlockNum(latestBlockNum int64) {

// ReapTxes deletes old txes
func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
ctx, cancel := utils.StopChan(r.chStop).NewCtx()
ctx, cancel := r.chStop.NewCtx()
defer cancel()
threshold := r.txConfig.ReaperThreshold()
if threshold == 0 {
Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Txm[
reset chan reset
resumeCallback ResumeCallback

chStop chan struct{}
chStop services.StopChan
chSubbed chan struct{}
wg sync.WaitGroup

Expand Down Expand Up @@ -228,7 +228,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
// - marks all pending and inflight transactions fatally errored (note: at this point all transactions are either confirmed or fatally errored)
// this must not be run while Broadcaster or Confirmer are running
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon(addr ADDR) (err error) {
ctx, cancel := utils.StopChan(b.chStop).NewCtx()
ctx, cancel := services.StopChan(b.chStop).NewCtx()
defer cancel()
if err = b.txStore.Abandon(ctx, b.chainID, addr); err != nil {
return fmt.Errorf("abandon failed to update txes for key %s: %w", addr.String(), err)
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (n *node) makeQueryCtx(ctx context.Context) (context.Context, context.Cance
// 1. Passed in ctx cancels
// 2. Passed in channel is closed
// 3. Default timeout is reached (queryTimeout)
func makeQueryCtx(ctx context.Context, ch utils.StopChan) (context.Context, context.CancelFunc) {
func makeQueryCtx(ctx context.Context, ch services.StopChan) (context.Context, context.CancelFunc) {
var chCancel, timeoutCancel context.CancelFunc
ctx, chCancel = ch.Ctx(ctx)
ctx, timeoutCancel = context.WithTimeout(ctx, queryTimeout)
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Pool struct {
activeMu sync.RWMutex
activeNode Node

chStop utils.StopChan
chStop services.StopChan
wg sync.WaitGroup
}

Expand Down
3 changes: 1 addition & 2 deletions core/chains/evm/client/send_only_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

//go:generate mockery --quiet --name SendOnlyNode --output ../mocks/ --case=underscore
Expand Down Expand Up @@ -69,7 +68,7 @@ type sendOnlyNode struct {
dialed bool
name string
chainID *big.Int
chStop utils.StopChan
chStop services.StopChan
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/gas/arbitrum_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type arbitrumEstimator struct {

chForceRefetch chan (chan struct{})
chInitialised chan struct{}
chStop utils.StopChan
chStop services.StopChan
chDone chan struct{}
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/gas/rollups/l1_gas_price_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type l1GasPriceOracle struct {
l1GasPrice *assets.Wei

chInitialised chan struct{}
chStop utils.StopChan
chStop services.StopChan
chDone chan struct{}
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/gas/suggested_price_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SuggestedPriceEstimator struct {

chForceRefetch chan (chan struct{})
chInitialised chan struct{}
chStop utils.StopChan
chStop services.StopChan
chDone chan struct{}
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/log/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type (

utils.DependentAwaiter

chStop utils.StopChan
chStop services.StopChan
wgDone sync.WaitGroup
trackedAddressesCount atomic.Uint32
replayChannel chan replayRequest
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/log/eth_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/null"
Expand All @@ -21,7 +23,7 @@ type (
ethClient evmclient.Client
config Config
logger logger.Logger
chStop utils.StopChan
chStop services.StopChan
}
)

Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) {
err = eb.Close()
require.NoError(t, err)

// Can't start more than once (Broadcaster implements utils.StartStopOnce)
// Can't start more than once (Broadcaster uses services.StateMachine)
err = eb.Start(ctx)
require.Error(t, err)
// Can't close more than once (Broadcaster implements utils.StartStopOnce)
// Can't close more than once (Broadcaster uses services.StateMachine)
err = eb.Close()
require.Error(t, err)

Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ func TestEthConfirmer_Lifecycle(t *testing.T) {
err = ec.Close()
require.NoError(t, err)

// Can't start more than once (Confirmer implements utils.StartStopOnce)
// Can't start more than once (Confirmer uses services.StateMachine)
err = ec.Start(ctx)
require.Error(t, err)
// Can't close more than once (Confirmer implements utils.StartStopOnce)
// Can't close more than once (Confirmer use services.StateMachine)
err = ec.Close()
require.Error(t, err)

Expand Down
14 changes: 7 additions & 7 deletions core/logger/audit/audit_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"

"github.com/pkg/errors"
)

const bufferCapacity = 2048
Expand All @@ -26,7 +26,7 @@ const webRequestTimeout = 10
type Data = map[string]any

type AuditLogger interface {
services.ServiceCtx
services.Service

Audit(eventID EventID, data Data)
}
Expand All @@ -47,7 +47,7 @@ type AuditLoggerService struct {
loggingClient HTTPAuditLoggerInterface // Abstract type for sending logs onward

loggingChannel chan wrappedAuditLog
chStop utils.StopChan
chStop services.StopChan
chDone chan struct{}
}

Expand All @@ -72,7 +72,7 @@ func NewAuditLogger(logger logger.Logger, config config.AuditLogger) (AuditLogge

hostname, err := os.Hostname()
if err != nil {
return nil, errors.Errorf("initialization error - unable to get hostname: %s", err)
return nil, fmt.Errorf("initialization error - unable to get hostname: %w", err)
}

forwardToUrl, err := config.ForwardToUrl()
Expand Down
5 changes: 3 additions & 2 deletions core/services/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

"github.com/robfig/cron/v3"

"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// Cron runs a cron jobSpec from a CronSpec
Expand All @@ -18,7 +19,7 @@ type Cron struct {
logger logger.Logger
jobSpec job.Job
pipelineRunner pipeline.Runner
chStop utils.StopChan
chStop services.StopChan
}

// NewCronFromJobSpec instantiates a job that executes on a predefined schedule.
Expand Down
10 changes: 5 additions & 5 deletions core/services/directrequest/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type listener struct {
pipelineORM pipeline.ORM
mailMon *utils.MailboxMonitor
job job.Job
runs sync.Map // map[string]utils.StopChan
runs sync.Map // map[string]services.StopChan
shutdownWaitGroup sync.WaitGroup
mbOracleRequests *utils.Mailbox[log.Broadcast]
mbOracleCancelRequests *utils.Mailbox[log.Broadcast]
Expand Down Expand Up @@ -178,7 +178,7 @@ func (l *listener) Start(context.Context) error {
func (l *listener) Close() error {
return l.StopOnce("DirectRequestListener", func() error {
l.runs.Range(func(key, runCloserChannelIf interface{}) bool {
runCloserChannel := runCloserChannelIf.(utils.StopChan)
runCloserChannel := runCloserChannelIf.(services.StopChan)
close(runCloserChannel)
return true
})
Expand Down Expand Up @@ -338,10 +338,10 @@ func (l *listener) handleOracleRequest(request *operator_wrapper.OperatorOracleR
meta := make(map[string]interface{})
meta["oracleRequest"] = oracleRequestToMap(request)

runCloserChannel := make(utils.StopChan)
runCloserChannel := make(services.StopChan)
runCloserChannelIf, loaded := l.runs.LoadOrStore(formatRequestId(request.RequestId), runCloserChannel)
if loaded {
runCloserChannel = runCloserChannelIf.(utils.StopChan)
runCloserChannel = runCloserChannelIf.(services.StopChan)
}
ctx, cancel := runCloserChannel.NewCtx()
defer cancel()
Expand Down Expand Up @@ -398,7 +398,7 @@ func (l *listener) allowRequester(requester common.Address) bool {
func (l *listener) handleCancelOracleRequest(request *operator_wrapper.OperatorCancelOracleRequest, lb log.Broadcast) {
runCloserChannelIf, loaded := l.runs.LoadAndDelete(formatRequestId(request.RequestId))
if loaded {
close(runCloserChannelIf.(utils.StopChan))
close(runCloserChannelIf.(services.StopChan))
}
l.markLogConsumed(lb)
}
Expand Down
8 changes: 4 additions & 4 deletions core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type FluxMonitor struct {
backlog *utils.BoundedPriorityQueue[log.Broadcast]
chProcessLogs chan struct{}

chStop chan struct{}
chStop services.StopChan
waitOnStop chan struct{}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func NewFluxMonitor(
PriorityFlagChangedLog: 2,
}),
chProcessLogs: make(chan struct{}, 1),
chStop: make(chan struct{}),
chStop: make(services.StopChan),
waitOnStop: make(chan struct{}),
}

Expand Down Expand Up @@ -588,7 +588,7 @@ func (fm *FluxMonitor) respondToAnswerUpdatedLog(log flux_aggregator_wrapper.Flu
// need to poll and submit an answer to the contract regardless of the deviation.
func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggregatorNewRound, lb log.Broadcast) {
started := time.Now()
ctx, cancel := utils.StopChan(fm.chStop).NewCtx()
ctx, cancel := fm.chStop.NewCtx()
defer cancel()

newRoundLogger := fm.logger.With(
Expand Down Expand Up @@ -814,7 +814,7 @@ func (fm *FluxMonitor) checkEligibilityAndAggregatorFunding(roundState flux_aggr

func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker *DeviationChecker, broadcast log.Broadcast) {
started := time.Now()
ctx, cancel := utils.StopChan(fm.chStop).NewCtx()
ctx, cancel := fm.chStop.NewCtx()
defer cancel()

l := fm.logger.With(
Expand Down
Loading

0 comments on commit c9312c6

Please sign in to comment.