From c9312c6120809c53418233fb5109d1217b427a8f Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 20 Nov 2023 09:21:42 -0600 Subject: [PATCH] core/utils: StopChan & StartStopOnce cleanup (#11341) * core/utils: make StopChan an alias to common * use services.StopChan; deprecate utils.StopChan * use services.StateMachine instead of deprecated utils.StartStopOnce --- common/client/multi_node.go | 4 +- common/client/send_only_node.go | 3 +- common/headtracker/head_broadcaster.go | 2 +- common/headtracker/head_listener.go | 4 +- common/headtracker/head_tracker.go | 2 +- common/txmgr/broadcaster.go | 2 +- common/txmgr/reaper.go | 8 +- common/txmgr/txmgr.go | 4 +- core/chains/evm/client/node.go | 2 +- core/chains/evm/client/pool.go | 2 +- core/chains/evm/client/send_only_node.go | 3 +- core/chains/evm/gas/arbitrum_estimator.go | 2 +- .../evm/gas/rollups/l1_gas_price_oracle.go | 2 +- .../evm/gas/suggested_price_estimator.go | 2 +- core/chains/evm/log/broadcaster.go | 2 +- core/chains/evm/log/eth_subscriber.go | 4 +- core/chains/evm/txmgr/broadcaster_test.go | 4 +- core/chains/evm/txmgr/confirmer_test.go | 4 +- core/logger/audit/audit_logger.go | 14 ++-- core/services/cron/cron.go | 5 +- core/services/directrequest/delegate.go | 10 +-- core/services/fluxmonitorv2/flux_monitor.go | 8 +- core/services/gateway/connectionmanager.go | 6 +- core/services/gateway/connector/connector.go | 6 +- .../gateway/handlers/functions/allowlist.go | 4 +- .../handlers/functions/handler.functions.go | 5 +- .../handlers/functions/subscriptions.go | 4 +- core/services/job/spawner.go | 24 +++--- core/services/keeper/upkeep_executer.go | 4 +- core/services/ocr/contract_tracker.go | 4 +- core/services/ocr2/plugins/median/plugin.go | 6 +- .../plugins/ocr2keeper/custom_telemetry.go | 7 +- .../evm21/autotelemetry21/custom_telemetry.go | 7 +- .../ocr2keeper/evm21/mercury/v02/request.go | 4 +- .../ocr2keeper/evm21/mercury/v03/request.go | 4 +- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 5 +- core/services/pipeline/runner.go | 2 +- core/services/promreporter/prom_reporter.go | 2 +- .../relay/evm/functions/logpoller_wrapper.go | 4 +- .../relay/evm/mercury/persistence_manager.go | 4 +- .../services/relay/evm/mercury/transmitter.go | 4 +- .../relay/evm/mercury/wsrpc/client.go | 4 +- .../telemetry_ingress_batch_worker.go | 8 +- .../telemetry_ingress_client.go | 7 +- core/services/vrf/v1/listener_v1.go | 2 +- core/services/vrf/v2/listener_v2.go | 4 +- core/services/webhook/delegate.go | 5 +- core/utils/thread_control.go | 4 +- core/utils/utils.go | 60 +++------------ core/utils/utils_example_test.go | 48 ------------ core/utils/utils_test.go | 73 ------------------- plugins/medianpoc/plugin.go | 5 +- 52 files changed, 138 insertions(+), 281 deletions(-) delete mode 100644 core/utils/utils_example_test.go diff --git a/common/client/multi_node.go b/common/client/multi_node.go index acab47f0836..48a4d37ad8c 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -99,7 +99,7 @@ type multiNode[ activeMu sync.RWMutex activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT] - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup sendOnlyErrorParser func(err error) SendTxReturnCode @@ -146,7 +146,7 @@ func NewMultiNode[ selectionMode: selectionMode, noNewHeadsThreshold: noNewHeadsThreshold, nodeSelector: nodeSelector, - chStop: make(chan struct{}), + chStop: make(services.StopChan), leaseDuration: leaseDuration, chainFamily: chainFamily, sendOnlyErrorParser: sendOnlyErrorParser, diff --git a/common/client/send_only_node.go b/common/client/send_only_node.go index 767fff5aee2..904916122f1 100644 --- a/common/client/send_only_node.go +++ b/common/client/send_only_node.go @@ -10,7 +10,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name sendOnlyClient --structname mockSendOnlyClient --filename "mock_send_only_client_test.go" --inpackage --case=underscore @@ -59,7 +58,7 @@ type sendOnlyNode[ log logger.Logger name string chainID CHAIN_ID - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup } diff --git a/common/headtracker/head_broadcaster.go b/common/headtracker/head_broadcaster.go index 3efe64e1c3f..62b2f47b68a 100644 --- a/common/headtracker/head_broadcaster.go +++ b/common/headtracker/head_broadcaster.go @@ -31,7 +31,7 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct callbacks callbackSet[H, BLOCK_HASH] mailbox *utils.Mailbox[H] mutex sync.Mutex - chClose utils.StopChan + chClose services.StopChan wgDone sync.WaitGroup latest H lastCallbackID int diff --git a/common/headtracker/head_listener.go b/common/headtracker/head_listener.go index ee4969497a8..a3f262f4b73 100644 --- a/common/headtracker/head_listener.go +++ b/common/headtracker/head_listener.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/chainlink-common/pkg/services" + htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" "github.com/smartcontractkit/chainlink/v2/common/types" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -35,7 +37,7 @@ type HeadListener[ config htrktypes.Config client htrktypes.Client[HTH, S, ID, BLOCK_HASH] logger logger.Logger - chStop utils.StopChan + chStop services.StopChan chHeaders chan HTH headSubscription types.Subscription connected atomic.Bool diff --git a/common/headtracker/head_tracker.go b/common/headtracker/head_tracker.go index bf63675128f..810e749a2dc 100644 --- a/common/headtracker/head_tracker.go +++ b/common/headtracker/head_tracker.go @@ -53,7 +53,7 @@ type HeadTracker[ backfillMB *utils.Mailbox[HTH] broadcastMB *utils.Mailbox[HTH] headListener types.HeadListener[HTH, BLOCK_HASH] - chStop utils.StopChan + chStop services.StopChan wgDone sync.WaitGroup getNilHead func() HTH } diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index ba01fb9e2ad..d9a72e367ac 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -135,7 +135,7 @@ type Broadcaster[ // Each key has its own trigger triggers map[ADDR]chan struct{} - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup initSync sync.Mutex diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go index 96bc4860d71..7286efa3a80 100644 --- a/common/txmgr/reaper.go +++ b/common/txmgr/reaper.go @@ -5,6 +5,8 @@ import ( "sync/atomic" "time" + "github.com/smartcontractkit/chainlink-common/pkg/services" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -20,7 +22,7 @@ type Reaper[CHAIN_ID types.ID] struct { log logger.Logger latestBlockNum atomic.Int64 trigger chan struct{} - chStop chan struct{} + chStop services.StopChan chDone chan struct{} } @@ -34,7 +36,7 @@ func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistory lggr.Named("Reaper"), atomic.Int64{}, make(chan struct{}, 1), - make(chan struct{}), + make(services.StopChan), make(chan struct{}), } r.latestBlockNum.Store(-1) @@ -97,7 +99,7 @@ func (r *Reaper[CHAIN_ID]) SetLatestBlockNum(latestBlockNum int64) { // ReapTxes deletes old txes func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error { - ctx, cancel := utils.StopChan(r.chStop).NewCtx() + ctx, cancel := r.chStop.NewCtx() defer cancel() threshold := r.txConfig.ReaperThreshold() if threshold == 0 { diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 24d2428f61a..b49c2b72f15 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -90,7 +90,7 @@ type Txm[ reset chan reset resumeCallback ResumeCallback - chStop chan struct{} + chStop services.StopChan chSubbed chan struct{} wg sync.WaitGroup @@ -228,7 +228,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr // - marks all pending and inflight transactions fatally errored (note: at this point all transactions are either confirmed or fatally errored) // this must not be run while Broadcaster or Confirmer are running func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon(addr ADDR) (err error) { - ctx, cancel := utils.StopChan(b.chStop).NewCtx() + ctx, cancel := services.StopChan(b.chStop).NewCtx() defer cancel() if err = b.txStore.Abandon(ctx, b.chainID, addr); err != nil { return fmt.Errorf("abandon failed to update txes for key %s: %w", addr.String(), err) diff --git a/core/chains/evm/client/node.go b/core/chains/evm/client/node.go index b6907202409..b3ce489cf50 100644 --- a/core/chains/evm/client/node.go +++ b/core/chains/evm/client/node.go @@ -1101,7 +1101,7 @@ func (n *node) makeQueryCtx(ctx context.Context) (context.Context, context.Cance // 1. Passed in ctx cancels // 2. Passed in channel is closed // 3. Default timeout is reached (queryTimeout) -func makeQueryCtx(ctx context.Context, ch utils.StopChan) (context.Context, context.CancelFunc) { +func makeQueryCtx(ctx context.Context, ch services.StopChan) (context.Context, context.CancelFunc) { var chCancel, timeoutCancel context.CancelFunc ctx, chCancel = ch.Ctx(ctx) ctx, timeoutCancel = context.WithTimeout(ctx, queryTimeout) diff --git a/core/chains/evm/client/pool.go b/core/chains/evm/client/pool.go index 18f59b172d7..289a402a1c6 100644 --- a/core/chains/evm/client/pool.go +++ b/core/chains/evm/client/pool.go @@ -72,7 +72,7 @@ type Pool struct { activeMu sync.RWMutex activeNode Node - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup } diff --git a/core/chains/evm/client/send_only_node.go b/core/chains/evm/client/send_only_node.go index beb12dbc4da..02f04881c44 100644 --- a/core/chains/evm/client/send_only_node.go +++ b/core/chains/evm/client/send_only_node.go @@ -16,7 +16,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name SendOnlyNode --output ../mocks/ --case=underscore @@ -69,7 +68,7 @@ type sendOnlyNode struct { dialed bool name string chainID *big.Int - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup } diff --git a/core/chains/evm/gas/arbitrum_estimator.go b/core/chains/evm/gas/arbitrum_estimator.go index c79202c7312..480abfe721d 100644 --- a/core/chains/evm/gas/arbitrum_estimator.go +++ b/core/chains/evm/gas/arbitrum_estimator.go @@ -48,7 +48,7 @@ type arbitrumEstimator struct { chForceRefetch chan (chan struct{}) chInitialised chan struct{} - chStop utils.StopChan + chStop services.StopChan chDone chan struct{} } diff --git a/core/chains/evm/gas/rollups/l1_gas_price_oracle.go b/core/chains/evm/gas/rollups/l1_gas_price_oracle.go index 88c61c49344..1a0fe8b8b24 100644 --- a/core/chains/evm/gas/rollups/l1_gas_price_oracle.go +++ b/core/chains/evm/gas/rollups/l1_gas_price_oracle.go @@ -38,7 +38,7 @@ type l1GasPriceOracle struct { l1GasPrice *assets.Wei chInitialised chan struct{} - chStop utils.StopChan + chStop services.StopChan chDone chan struct{} } diff --git a/core/chains/evm/gas/suggested_price_estimator.go b/core/chains/evm/gas/suggested_price_estimator.go index dadec6210c7..cd5acbc6942 100644 --- a/core/chains/evm/gas/suggested_price_estimator.go +++ b/core/chains/evm/gas/suggested_price_estimator.go @@ -40,7 +40,7 @@ type SuggestedPriceEstimator struct { chForceRefetch chan (chan struct{}) chInitialised chan struct{} - chStop utils.StopChan + chStop services.StopChan chDone chan struct{} } diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index 11c282a4d2e..d69fd696fd7 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -110,7 +110,7 @@ type ( utils.DependentAwaiter - chStop utils.StopChan + chStop services.StopChan wgDone sync.WaitGroup trackedAddressesCount atomic.Uint32 replayChannel chan replayRequest diff --git a/core/chains/evm/log/eth_subscriber.go b/core/chains/evm/log/eth_subscriber.go index cd1b18b474f..b4d386140e7 100644 --- a/core/chains/evm/log/eth_subscriber.go +++ b/core/chains/evm/log/eth_subscriber.go @@ -10,6 +10,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/null" @@ -21,7 +23,7 @@ type ( ethClient evmclient.Client config Config logger logger.Logger - chStop utils.StopChan + chStop services.StopChan } ) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 7967478e624..bf480c66af6 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -112,10 +112,10 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { err = eb.Close() require.NoError(t, err) - // Can't start more than once (Broadcaster implements utils.StartStopOnce) + // Can't start more than once (Broadcaster uses services.StateMachine) err = eb.Start(ctx) require.Error(t, err) - // Can't close more than once (Broadcaster implements utils.StartStopOnce) + // Can't close more than once (Broadcaster uses services.StateMachine) err = eb.Close() require.Error(t, err) diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index e17e7993cf9..60d0648a541 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -157,10 +157,10 @@ func TestEthConfirmer_Lifecycle(t *testing.T) { err = ec.Close() require.NoError(t, err) - // Can't start more than once (Confirmer implements utils.StartStopOnce) + // Can't start more than once (Confirmer uses services.StateMachine) err = ec.Start(ctx) require.Error(t, err) - // Can't close more than once (Confirmer implements utils.StartStopOnce) + // Can't close more than once (Confirmer use services.StateMachine) err = ec.Close() require.Error(t, err) diff --git a/core/logger/audit/audit_logger.go b/core/logger/audit/audit_logger.go index 38afd77a284..ef66a063a55 100644 --- a/core/logger/audit/audit_logger.go +++ b/core/logger/audit/audit_logger.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "encoding/json" + "errors" + "fmt" "io" "net" "net/http" @@ -11,13 +13,11 @@ import ( "os" "time" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/store/models" - "github.com/smartcontractkit/chainlink/v2/core/utils" - - "github.com/pkg/errors" ) const bufferCapacity = 2048 @@ -26,7 +26,7 @@ const webRequestTimeout = 10 type Data = map[string]any type AuditLogger interface { - services.ServiceCtx + services.Service Audit(eventID EventID, data Data) } @@ -47,7 +47,7 @@ type AuditLoggerService struct { loggingClient HTTPAuditLoggerInterface // Abstract type for sending logs onward loggingChannel chan wrappedAuditLog - chStop utils.StopChan + chStop services.StopChan chDone chan struct{} } @@ -72,7 +72,7 @@ func NewAuditLogger(logger logger.Logger, config config.AuditLogger) (AuditLogge hostname, err := os.Hostname() if err != nil { - return nil, errors.Errorf("initialization error - unable to get hostname: %s", err) + return nil, fmt.Errorf("initialization error - unable to get hostname: %w", err) } forwardToUrl, err := config.ForwardToUrl() diff --git a/core/services/cron/cron.go b/core/services/cron/cron.go index e89dd1ceabd..500192554fb 100644 --- a/core/services/cron/cron.go +++ b/core/services/cron/cron.go @@ -6,10 +6,11 @@ import ( "github.com/robfig/cron/v3" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // Cron runs a cron jobSpec from a CronSpec @@ -18,7 +19,7 @@ type Cron struct { logger logger.Logger jobSpec job.Job pipelineRunner pipeline.Runner - chStop utils.StopChan + chStop services.StopChan } // NewCronFromJobSpec instantiates a job that executes on a predefined schedule. diff --git a/core/services/directrequest/delegate.go b/core/services/directrequest/delegate.go index 9da84fd3ee5..687e1cea675 100644 --- a/core/services/directrequest/delegate.go +++ b/core/services/directrequest/delegate.go @@ -129,7 +129,7 @@ type listener struct { pipelineORM pipeline.ORM mailMon *utils.MailboxMonitor job job.Job - runs sync.Map // map[string]utils.StopChan + runs sync.Map // map[string]services.StopChan shutdownWaitGroup sync.WaitGroup mbOracleRequests *utils.Mailbox[log.Broadcast] mbOracleCancelRequests *utils.Mailbox[log.Broadcast] @@ -178,7 +178,7 @@ func (l *listener) Start(context.Context) error { func (l *listener) Close() error { return l.StopOnce("DirectRequestListener", func() error { l.runs.Range(func(key, runCloserChannelIf interface{}) bool { - runCloserChannel := runCloserChannelIf.(utils.StopChan) + runCloserChannel := runCloserChannelIf.(services.StopChan) close(runCloserChannel) return true }) @@ -338,10 +338,10 @@ func (l *listener) handleOracleRequest(request *operator_wrapper.OperatorOracleR meta := make(map[string]interface{}) meta["oracleRequest"] = oracleRequestToMap(request) - runCloserChannel := make(utils.StopChan) + runCloserChannel := make(services.StopChan) runCloserChannelIf, loaded := l.runs.LoadOrStore(formatRequestId(request.RequestId), runCloserChannel) if loaded { - runCloserChannel = runCloserChannelIf.(utils.StopChan) + runCloserChannel = runCloserChannelIf.(services.StopChan) } ctx, cancel := runCloserChannel.NewCtx() defer cancel() @@ -398,7 +398,7 @@ func (l *listener) allowRequester(requester common.Address) bool { func (l *listener) handleCancelOracleRequest(request *operator_wrapper.OperatorCancelOracleRequest, lb log.Broadcast) { runCloserChannelIf, loaded := l.runs.LoadAndDelete(formatRequestId(request.RequestId)) if loaded { - close(runCloserChannelIf.(utils.StopChan)) + close(runCloserChannelIf.(services.StopChan)) } l.markLogConsumed(lb) } diff --git a/core/services/fluxmonitorv2/flux_monitor.go b/core/services/fluxmonitorv2/flux_monitor.go index ea853d879d4..79dd44c8014 100644 --- a/core/services/fluxmonitorv2/flux_monitor.go +++ b/core/services/fluxmonitorv2/flux_monitor.go @@ -83,7 +83,7 @@ type FluxMonitor struct { backlog *utils.BoundedPriorityQueue[log.Broadcast] chProcessLogs chan struct{} - chStop chan struct{} + chStop services.StopChan waitOnStop chan struct{} } @@ -137,7 +137,7 @@ func NewFluxMonitor( PriorityFlagChangedLog: 2, }), chProcessLogs: make(chan struct{}, 1), - chStop: make(chan struct{}), + chStop: make(services.StopChan), waitOnStop: make(chan struct{}), } @@ -588,7 +588,7 @@ func (fm *FluxMonitor) respondToAnswerUpdatedLog(log flux_aggregator_wrapper.Flu // need to poll and submit an answer to the contract regardless of the deviation. func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggregatorNewRound, lb log.Broadcast) { started := time.Now() - ctx, cancel := utils.StopChan(fm.chStop).NewCtx() + ctx, cancel := fm.chStop.NewCtx() defer cancel() newRoundLogger := fm.logger.With( @@ -814,7 +814,7 @@ func (fm *FluxMonitor) checkEligibilityAndAggregatorFunding(roundState flux_aggr func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker *DeviationChecker, broadcast log.Broadcast) { started := time.Now() - ctx, cancel := utils.StopChan(fm.chStop).NewCtx() + ctx, cancel := fm.chStop.NewCtx() defer cancel() l := fm.logger.With( diff --git a/core/services/gateway/connectionmanager.go b/core/services/gateway/connectionmanager.go index ce4a54f4c2b..9f88b51e7b5 100644 --- a/core/services/gateway/connectionmanager.go +++ b/core/services/gateway/connectionmanager.go @@ -72,7 +72,7 @@ type donConnectionManager struct { handler handlers.Handler codec api.Codec closeWait sync.WaitGroup - shutdownCh chan struct{} + shutdownCh services.StopChan lggr logger.Logger } @@ -271,7 +271,7 @@ func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress strin } func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState) { - ctx, _ := utils.StopChan(m.shutdownCh).NewCtx() + ctx, _ := m.shutdownCh.NewCtx() for { select { case <-m.shutdownCh: @@ -296,7 +296,7 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState } func (m *donConnectionManager) heartbeatLoop(intervalSec uint32) { - ctx, _ := utils.StopChan(m.shutdownCh).NewCtx() + ctx, _ := m.shutdownCh.NewCtx() defer m.closeWait.Done() if intervalSec == 0 { diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index 0694e9ad15f..27db8fd44b6 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -57,7 +57,7 @@ type gatewayConnector struct { gateways map[string]*gatewayState urlToId map[string]string closeWait sync.WaitGroup - shutdownCh chan struct{} + shutdownCh services.StopChan lggr logger.Logger } @@ -143,7 +143,7 @@ func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, } func (c *gatewayConnector) readLoop(gatewayState *gatewayState) { - ctx, cancel := utils.StopChan(c.shutdownCh).NewCtx() + ctx, cancel := c.shutdownCh.NewCtx() defer cancel() for { @@ -168,7 +168,7 @@ func (c *gatewayConnector) readLoop(gatewayState *gatewayState) { func (c *gatewayConnector) reconnectLoop(gatewayState *gatewayState) { redialBackoff := utils.NewRedialBackoff() - ctx, cancel := utils.StopChan(c.shutdownCh).NewCtx() + ctx, cancel := c.shutdownCh.NewCtx() defer cancel() for { diff --git a/core/services/gateway/handlers/functions/allowlist.go b/core/services/gateway/handlers/functions/allowlist.go index 3ba9a65d57a..0a18d6e6d87 100644 --- a/core/services/gateway/handlers/functions/allowlist.go +++ b/core/services/gateway/handlers/functions/allowlist.go @@ -55,7 +55,7 @@ type onchainAllowlist struct { blockConfirmations *big.Int lggr logger.Logger closeWait sync.WaitGroup - stopCh utils.StopChan + stopCh services.StopChan } func NewOnchainAllowlist(client evmclient.Client, config OnchainAllowlistConfig, lggr logger.Logger) (OnchainAllowlist, error) { @@ -78,7 +78,7 @@ func NewOnchainAllowlist(client evmclient.Client, config OnchainAllowlistConfig, contractV1: contractV1, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), lggr: lggr.Named("OnchainAllowlist"), - stopCh: make(utils.StopChan), + stopCh: make(services.StopChan), } emptyMap := make(map[common.Address]struct{}) allowlist.allowlist.Store(&emptyMap) diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 6cc4581a505..3269caa2d6a 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -21,7 +21,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -81,7 +80,7 @@ type functionsHandler struct { minimumBalance *assets.Link userRateLimiter *hc.RateLimiter nodeRateLimiter *hc.RateLimiter - chStop utils.StopChan + chStop services.StopChan lggr logger.Logger } @@ -161,7 +160,7 @@ func NewFunctionsHandler( minimumBalance: minimumBalance, userRateLimiter: userRateLimiter, nodeRateLimiter: nodeRateLimiter, - chStop: make(utils.StopChan), + chStop: make(services.StopChan), lggr: lggr, } } diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index 7a59e05731e..ebffbbdd206 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -49,7 +49,7 @@ type onchainSubscriptions struct { lggr logger.Logger closeWait sync.WaitGroup rwMutex sync.RWMutex - stopCh utils.StopChan + stopCh services.StopChan } func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, lggr logger.Logger) (OnchainSubscriptions, error) { @@ -70,7 +70,7 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription router: router, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), lggr: lggr.Named("OnchainSubscriptions"), - stopCh: make(utils.StopChan), + stopCh: make(services.StopChan), }, nil } diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 5656011e14d..5ed017b8743 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -11,10 +11,9 @@ import ( "github.com/jmoiron/sqlx" - commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -26,7 +25,7 @@ type ( // services that perform the work described by job specs. Each active job spec // has 1 or more of these services associated with it. Spawner interface { - services.ServiceCtx + services.Service // CreateJob creates a new job and starts services. // All services must start without errors for the job to be active. @@ -42,18 +41,23 @@ type ( StartService(ctx context.Context, spec Job, qopts ...pg.QOpt) error } + Checker interface { + Register(service services.HealthReporter) error + Unregister(name string) error + } + spawner struct { - commonservices.StateMachine + services.StateMachine orm ORM config Config - checker services.Checker + checker Checker jobTypeDelegates map[Type]Delegate activeJobs map[int32]activeJob activeJobsMu sync.RWMutex q pg.Q lggr logger.Logger - chStop utils.StopChan + chStop services.StopChan lbDependentAwaiters []utils.DependentAwaiter } @@ -86,7 +90,7 @@ type ( var _ Spawner = (*spawner)(nil) -func NewSpawner(orm ORM, config Config, checker services.Checker, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner { +func NewSpawner(orm ORM, config Config, checker Checker, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner { namedLogger := lggr.Named("JobSpawner") s := &spawner{ orm: orm, @@ -96,7 +100,7 @@ func NewSpawner(orm ORM, config Config, checker services.Checker, jobTypeDelegat q: pg.NewQ(db, namedLogger, config), lggr: namedLogger, activeJobs: make(map[int32]activeJob), - chStop: make(chan struct{}), + chStop: make(services.StopChan), lbDependentAwaiters: lbDependentAwaiters, } return s @@ -170,7 +174,7 @@ func (js *spawner) stopService(jobID int32) { for i := len(aj.services) - 1; i >= 0; i-- { service := aj.services[i] sLggr := lggr.With("subservice", i, "serviceType", reflect.TypeOf(service)) - if c, ok := service.(commonservices.HealthReporter); ok { + if c, ok := service.(services.HealthReporter); ok { if err := js.checker.Unregister(c.Name()); err != nil { sLggr.Warnw("Failed to unregister service from health checker", "err", err) } @@ -230,7 +234,7 @@ func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) e lggr.Criticalw("Error starting service for job", "err", err) return err } - if c, ok := srv.(commonservices.HealthReporter); ok { + if c, ok := srv.(services.HealthReporter); ok { err = js.checker.Register(c) if err != nil { lggr.Errorw("Error registering service with health checker", "err", err) diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index ece6f85b068..84349ba2dca 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -55,7 +55,7 @@ type UpkeepExecuterConfig interface { // UpkeepExecuter implements the logic to communicate with KeeperRegistry type UpkeepExecuter struct { services.StateMachine - chStop utils.StopChan + chStop services.StopChan ethClient evmclient.Client config UpkeepExecuterConfig executionQueue chan struct{} @@ -83,7 +83,7 @@ func NewUpkeepExecuter( effectiveKeeperAddress common.Address, ) *UpkeepExecuter { return &UpkeepExecuter{ - chStop: make(chan struct{}), + chStop: make(services.StopChan), ethClient: ethClient, executionQueue: make(chan struct{}, executionQueueSize), headBroadcaster: headBroadcaster, diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 5fecbe86288..4f79bcfc31a 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -74,7 +74,7 @@ type ( unsubscribeHeads func() // Start/Stop lifecycle - chStop utils.StopChan + chStop services.StopChan wg sync.WaitGroup unsubscribeLogs func() @@ -134,7 +134,7 @@ func NewOCRContractTracker( cfg: cfg, mailMon: mailMon, headBroadcaster: headBroadcaster, - chStop: make(chan struct{}), + chStop: make(services.StopChan), latestRoundRequested: offchainaggregator.OffchainAggregatorRoundRequested{}, configsMB: utils.NewMailbox[ocrtypes.ContractConfig](configMailboxSanityLimit), chConfigs: make(chan ocrtypes.ContractConfig), diff --git a/core/services/ocr2/plugins/median/plugin.go b/core/services/ocr2/plugins/median/plugin.go index 4f83c4b5dda..11197f09175 100644 --- a/core/services/ocr2/plugins/median/plugin.go +++ b/core/services/ocr2/plugins/median/plugin.go @@ -10,17 +10,15 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" - - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type Plugin struct { loop.Plugin - stop utils.StopChan + stop services.StopChan } func NewPlugin(lggr logger.Logger) *Plugin { - return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(utils.StopChan)} + return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)} } func (p *Plugin) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog loop.ErrorLog) (loop.ReportingPluginFactory, error) { diff --git a/core/services/ocr2/plugins/ocr2keeper/custom_telemetry.go b/core/services/ocr2/plugins/ocr2keeper/custom_telemetry.go index 0f03ae5bd06..6d52c0e8d25 100644 --- a/core/services/ocr2/plugins/ocr2keeper/custom_telemetry.go +++ b/core/services/ocr2/plugins/ocr2keeper/custom_telemetry.go @@ -5,12 +5,15 @@ import ( "encoding/hex" "time" + "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "google.golang.org/protobuf/proto" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" @@ -19,7 +22,7 @@ import ( ) type AutomationCustomTelemetryService struct { - utils.StartStopOnce + services.StateMachine monitoringEndpoint commontypes.MonitoringEndpoint blockSubscriber *evm21.BlockSubscriber blockSubChanID int diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/autotelemetry21/custom_telemetry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/autotelemetry21/custom_telemetry.go index 93f35ce0d24..0cf0bbef5cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/autotelemetry21/custom_telemetry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/autotelemetry21/custom_telemetry.go @@ -5,12 +5,15 @@ import ( "encoding/hex" "time" + "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "google.golang.org/protobuf/proto" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" @@ -19,7 +22,7 @@ import ( ) type AutomationCustomTelemetryService struct { - utils.StartStopOnce + services.StateMachine monitoringEndpoint commontypes.MonitoringEndpoint blockSubscriber *evm21.BlockSubscriber blockSubChanID int diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v02/request.go index 55436937d11..4ce7893a0b6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v02/request.go @@ -14,6 +14,8 @@ import ( "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mercury" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -34,7 +36,7 @@ type MercuryV02Response struct { } type client struct { - utils.StartStopOnce + services.StateMachine mercuryConfig mercury.MercuryConfigProvider httpClient mercury.HttpClient threadCtrl utils.ThreadControl diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v03/request.go index 3697dca53cd..1c607889741 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/v03/request.go @@ -14,6 +14,8 @@ import ( "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mercury" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -43,7 +45,7 @@ type MercuryV03Report struct { } type client struct { - utils.StartStopOnce + services.StateMachine mercuryConfig mercury.MercuryConfigProvider httpClient mercury.HttpClient threadCtrl utils.ThreadControl diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 252d2d91c79..afb1a7cf6c3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -17,7 +17,7 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/streams" + "github.com/smartcontractkit/chainlink-common/pkg/services" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" @@ -31,6 +31,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mercury/streams" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -166,7 +167,7 @@ func (c *MercuryConfig) SetPluginRetry(k string, v interface{}, d time.Duration) } type EvmRegistry struct { - utils.StartStopOnce + services.StateMachine threadCtrl utils.ThreadControl lggr logger.Logger poller logpoller.LogPoller diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index edb1d337afd..388f7358ef3 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -68,7 +68,7 @@ type runner struct { // test helper runFinished func(*Run) - chStop utils.StopChan + chStop services.StopChan wgDone sync.WaitGroup } diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 2306640bea6..d115b1022c1 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -27,7 +27,7 @@ type ( lggr logger.Logger backend PrometheusBackend newHeads *utils.Mailbox[*evmtypes.Head] - chStop utils.StopChan + chStop services.StopChan wgDone sync.WaitGroup reportPeriod time.Duration } diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 230185d0ad7..95f45022ab3 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -39,7 +39,7 @@ type logPollerWrapper struct { detectedResponses detectedEvents mu sync.Mutex closeWait sync.WaitGroup - stopCh utils.StopChan + stopCh services.StopChan lggr logger.Logger } @@ -106,7 +106,7 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf logPoller: logPoller, client: client, subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber), - stopCh: make(utils.StopChan), + stopCh: make(services.StopChan), lggr: lggr, }, nil } diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index 69dfce9c16d..779e275f154 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -24,7 +24,7 @@ type PersistenceManager struct { orm ORM once services.StateMachine - stopCh utils.StopChan + stopCh services.StopChan wg sync.WaitGroup deleteMu sync.Mutex @@ -41,7 +41,7 @@ func NewPersistenceManager(lggr logger.Logger, orm ORM, jobID int32, maxTransmit return &PersistenceManager{ lggr: lggr.Named("MercuryPersistenceManager"), orm: orm, - stopCh: make(chan struct{}), + stopCh: make(services.StopChan), jobID: jobID, maxTransmitQueueSize: maxTransmitQueueSize, flushDeletesFrequency: flushDeletesFrequency, diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 73aa10243f8..d5346ad28cc 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -115,7 +115,7 @@ type mercuryTransmitter struct { jobID int32 fromAccount string - stopCh utils.StopChan + stopCh services.StopChan queue *TransmitQueue wg sync.WaitGroup @@ -161,7 +161,7 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp feedID, jobID, fmt.Sprintf("%x", fromAccount), - make(chan (struct{})), + make(services.StopChan), nil, sync.WaitGroup{}, make(chan *pb.TransmitRequest, maxDeleteQueueSize), diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index c4db80a58d0..c04c00074a2 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -84,7 +84,7 @@ type client struct { consecutiveTimeoutCnt atomic.Int32 wg sync.WaitGroup - chStop utils.StopChan + chStop services.StopChan chResetTransport chan struct{} timeoutCountMetric prometheus.Counter @@ -106,7 +106,7 @@ func newClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []by serverURL: serverURL, logger: lggr.Named("WSRPC").With("mercuryServerURL", serverURL), chResetTransport: make(chan struct{}, 1), - chStop: make(chan struct{}), + chStop: make(services.StopChan), timeoutCountMetric: timeoutCount.WithLabelValues(serverURL), dialCountMetric: dialCount.WithLabelValues(serverURL), dialSuccessCountMetric: dialSuccessCount.WithLabelValues(serverURL), diff --git a/core/services/synchronization/telemetry_ingress_batch_worker.go b/core/services/synchronization/telemetry_ingress_batch_worker.go index 141eb30c812..e7ea6595811 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker.go @@ -6,23 +6,23 @@ import ( "sync/atomic" "time" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // telemetryIngressBatchWorker pushes telemetry in batches to the ingress server via wsrpc. // A worker is created per ContractID. type telemetryIngressBatchWorker struct { - services.ServiceCtx + services.Service telemMaxBatchSize uint telemSendInterval time.Duration telemSendTimeout time.Duration telemClient telemPb.TelemClient wgDone *sync.WaitGroup - chDone utils.StopChan + chDone services.StopChan chTelemetry chan TelemPayload contractID string telemType TelemetryType diff --git a/core/services/synchronization/telemetry_ingress_client.go b/core/services/synchronization/telemetry_ingress_client.go index b889b3fc97a..b566199fdc0 100644 --- a/core/services/synchronization/telemetry_ingress_client.go +++ b/core/services/synchronization/telemetry_ingress_client.go @@ -15,7 +15,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type NoopTelemetryIngressClient struct{} @@ -46,7 +45,7 @@ type telemetryIngressClient struct { lggr logger.Logger wgDone sync.WaitGroup - chDone chan struct{} + chDone services.StopChan dropMessageCount atomic.Uint32 chTelemetry chan TelemPayload } @@ -61,7 +60,7 @@ func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore logging: logging, lggr: lggr.Named("TelemetryIngressClient"), chTelemetry: make(chan TelemPayload, telemBufferSize), - chDone: make(chan struct{}), + chDone: make(services.StopChan), } } @@ -132,7 +131,7 @@ func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []b func (tc *telemetryIngressClient) handleTelemetry() { go func() { - ctx, cancel := utils.StopChan(tc.chDone).NewCtx() + ctx, cancel := tc.chDone.NewCtx() defer cancel() for { select { diff --git a/core/services/vrf/v1/listener_v1.go b/core/services/vrf/v1/listener_v1.go index 566e5ac9bd8..3e958801cdf 100644 --- a/core/services/vrf/v1/listener_v1.go +++ b/core/services/vrf/v1/listener_v1.go @@ -58,7 +58,7 @@ type Listener struct { GethKs vrfcommon.GethKeyStore MailMon *utils.MailboxMonitor ReqLogs *utils.Mailbox[log.Broadcast] - ChStop utils.StopChan + ChStop services.StopChan WaitOnStop chan struct{} NewHead chan struct{} LatestHead uint64 diff --git a/core/services/vrf/v2/listener_v2.go b/core/services/vrf/v2/listener_v2.go index 5b73ac9e24c..7f23e022771 100644 --- a/core/services/vrf/v2/listener_v2.go +++ b/core/services/vrf/v2/listener_v2.go @@ -128,7 +128,7 @@ func New( q: q, gethks: gethks, reqLogs: reqLogs, - chStop: make(chan struct{}), + chStop: make(services.StopChan), reqAdded: reqAdded, blockNumberToReqID: pairing.New(), latestHeadMu: sync.RWMutex{}, @@ -183,7 +183,7 @@ type listenerV2 struct { q pg.Q gethks keystore.Eth reqLogs *utils.Mailbox[log.Broadcast] - chStop utils.StopChan + chStop services.StopChan // We can keep these pending logs in memory because we // only mark them confirmed once we send a corresponding fulfillment transaction. // So on node restart in the middle of processing, the lb will resend them. diff --git a/core/services/webhook/delegate.go b/core/services/webhook/delegate.go index f5a8d553f23..237245b81c9 100644 --- a/core/services/webhook/delegate.go +++ b/core/services/webhook/delegate.go @@ -8,11 +8,12 @@ import ( "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type ( @@ -111,7 +112,7 @@ func newWebhookJobRunner(runner pipeline.Runner, lggr logger.Logger) *webhookJob type registeredJob struct { job.Job - chRemove utils.StopChan + chRemove services.StopChan } func (r *webhookJobRunner) addSpec(spec job.Job) error { diff --git a/core/utils/thread_control.go b/core/utils/thread_control.go index 8f7fff42496..52cda82797a 100644 --- a/core/utils/thread_control.go +++ b/core/utils/thread_control.go @@ -3,6 +3,8 @@ package utils import ( "context" "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/services" ) var _ ThreadControl = &threadControl{} @@ -25,7 +27,7 @@ func NewThreadControl() *threadControl { type threadControl struct { threadsWG sync.WaitGroup - stop StopChan + stop services.StopChan } func (tc *threadControl) Go(fn func(context.Context)) { diff --git a/core/utils/utils.go b/core/utils/utils.go index a2e418fe046..69597fb9e4a 100644 --- a/core/utils/utils.go +++ b/core/utils/utils.go @@ -399,68 +399,28 @@ func WaitGroupChan(wg *sync.WaitGroup) <-chan struct{} { } // WithCloseChan wraps a context so that it is canceled if the passed in channel is closed. -// Deprecated: Call StopChan.Ctx directly +// Deprecated: Call [services.StopChan.Ctx] directly func WithCloseChan(parentCtx context.Context, chStop chan struct{}) (context.Context, context.CancelFunc) { - return StopChan(chStop).Ctx(parentCtx) + return services.StopChan(chStop).Ctx(parentCtx) } // ContextFromChan creates a context that finishes when the provided channel receives or is closed. -// Deprecated: Call StopChan.NewCtx directly. +// Deprecated: Call [services.StopChan.NewCtx] directly. func ContextFromChan(chStop chan struct{}) (context.Context, context.CancelFunc) { - return StopChan(chStop).NewCtx() + return services.StopChan(chStop).NewCtx() } // ContextFromChanWithTimeout creates a context with a timeout that finishes when the provided channel receives or is closed. -// Deprecated: Call StopChan.CtxCancel directly +// Deprecated: Call [services.StopChan.CtxCancel] directly func ContextFromChanWithTimeout(chStop chan struct{}, timeout time.Duration) (context.Context, context.CancelFunc) { - return StopChan(chStop).CtxCancel(context.WithTimeout(context.Background(), timeout)) + return services.StopChan(chStop).CtxCancel(context.WithTimeout(context.Background(), timeout)) } -// A StopChan signals when some work should stop. -type StopChan chan struct{} +// Deprecated: use services.StopChan +type StopChan = services.StopChan -// NewCtx returns a background [context.Context] that is cancelled when StopChan is closed. -func (s StopChan) NewCtx() (context.Context, context.CancelFunc) { - return StopRChan((<-chan struct{})(s)).NewCtx() -} - -// Ctx cancels a [context.Context] when StopChan is closed. -func (s StopChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc) { - return StopRChan((<-chan struct{})(s)).Ctx(ctx) -} - -// CtxCancel cancels a [context.Context] when StopChan is closed. -// Returns ctx and cancel unmodified, for convenience. -func (s StopChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc) { - return StopRChan((<-chan struct{})(s)).CtxCancel(ctx, cancel) -} - -// A StopRChan signals when some work should stop. -// This version is receive-only. -type StopRChan <-chan struct{} - -// NewCtx returns a background [context.Context] that is cancelled when StopChan is closed. -func (s StopRChan) NewCtx() (context.Context, context.CancelFunc) { - return s.Ctx(context.Background()) -} - -// Ctx cancels a [context.Context] when StopChan is closed. -func (s StopRChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc) { - return s.CtxCancel(context.WithCancel(ctx)) -} - -// CtxCancel cancels a [context.Context] when StopChan is closed. -// Returns ctx and cancel unmodified, for convenience. -func (s StopRChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc) { - go func() { - select { - case <-s: - cancel() - case <-ctx.Done(): - } - }() - return ctx, cancel -} +// Deprecated: use services.StopRChan +type StopRChan = services.StopRChan // DependentAwaiter contains Dependent funcs type DependentAwaiter interface { diff --git a/core/utils/utils_example_test.go b/core/utils/utils_example_test.go deleted file mode 100644 index 0ca9d0be88d..00000000000 --- a/core/utils/utils_example_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package utils - -import ( - "context" - "sync" - "time" -) - -func ExampleStopChan() { - stopCh := make(StopChan) - - work := func(context.Context) {} - - a := func(ctx context.Context, done func()) { - defer done() - ctx, cancel := stopCh.Ctx(ctx) - defer cancel() - work(ctx) - } - - b := func(ctx context.Context, done func()) { - defer done() - ctx, cancel := stopCh.CtxCancel(context.WithTimeout(ctx, time.Second)) - defer cancel() - work(ctx) - } - - c := func(ctx context.Context, done func()) { - defer done() - ctx, cancel := stopCh.CtxCancel(context.WithDeadline(ctx, time.Now().Add(5*time.Second))) - defer cancel() - work(ctx) - } - - ctx, cancel := stopCh.NewCtx() - defer cancel() - - var wg sync.WaitGroup - wg.Add(3) - go a(ctx, wg.Done) - go b(ctx, wg.Done) - go c(ctx, wg.Done) - - time.AfterFunc(time.Second, func() { close(stopCh) }) - - wg.Wait() - // Output: -} diff --git a/core/utils/utils_test.go b/core/utils/utils_test.go index 04802feb3a7..e11c01a8e4f 100644 --- a/core/utils/utils_test.go +++ b/core/utils/utils_test.go @@ -374,79 +374,6 @@ func Test_WithJitter(t *testing.T) { } } -func Test_StartStopOnce_StopWaitsForStartToFinish(t *testing.T) { - t.Parallel() - - once := utils.StartStopOnce{} - - ch := make(chan int, 3) - - ready := make(chan bool) - - go func() { - assert.NoError(t, once.StartOnce("slow service", func() (err error) { - ch <- 1 - ready <- true - <-time.After(time.Millisecond * 500) // wait for StopOnce to happen - ch <- 2 - - return nil - })) - }() - - go func() { - <-ready // try stopping halfway through startup - assert.NoError(t, once.StopOnce("slow service", func() (err error) { - ch <- 3 - - return nil - })) - }() - - require.Equal(t, 1, <-ch) - require.Equal(t, 2, <-ch) - require.Equal(t, 3, <-ch) -} - -func Test_StartStopOnce_MultipleStartNoBlock(t *testing.T) { - t.Parallel() - - once := utils.StartStopOnce{} - - ch := make(chan int, 3) - - ready := make(chan bool) - next := make(chan bool) - - go func() { - ch <- 1 - assert.NoError(t, once.StartOnce("slow service", func() (err error) { - ready <- true - <-next // continue after the other StartOnce call fails - - return nil - })) - <-next - ch <- 2 - - }() - - go func() { - <-ready // try starting halfway through startup - assert.Error(t, once.StartOnce("slow service", func() (err error) { - return nil - })) - next <- true - ch <- 3 - next <- true - - }() - - require.Equal(t, 1, <-ch) - require.Equal(t, 3, <-ch) // 3 arrives before 2 because it returns immediately - require.Equal(t, 2, <-ch) -} - func TestAllEqual(t *testing.T) { t.Parallel() diff --git a/plugins/medianpoc/plugin.go b/plugins/medianpoc/plugin.go index 62b6acc043a..af4ec41ab8f 100644 --- a/plugins/medianpoc/plugin.go +++ b/plugins/medianpoc/plugin.go @@ -14,20 +14,19 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) func NewPlugin(lggr logger.Logger) *Plugin { return &Plugin{ Plugin: loop.Plugin{Logger: lggr}, MedianProviderServer: reportingplugins.MedianProviderServer{}, - stop: make(utils.StopChan), + stop: make(services.StopChan), } } type Plugin struct { loop.Plugin - stop utils.StopChan + stop services.StopChan reportingplugins.MedianProviderServer }