From 69f7bd68199b91c4ef4be65b5701e4d45a250350 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 6 Aug 2024 23:55:49 +0200 Subject: [PATCH] use services.Config.NewService/Engine (#13851) * use services.Config.NewService/Engine * feedback --- common/headtracker/head_broadcaster.go | 70 +++---- common/headtracker/head_listener.go | 73 ++++--- common/headtracker/head_tracker.go | 105 ++++------ core/bridges/cache.go | 76 ++----- core/chains/evm/headtracker/head_listener.go | 28 --- .../evm/headtracker/head_listener_test.go | 188 ++++++++--------- core/chains/evm/headtracker/head_tracker.go | 4 +- core/chains/evm/monitor/balance.go | 76 +++---- core/recovery/recover.go | 42 ++-- core/services/chainlink/application.go | 13 +- .../fluxmonitorv2/deviation_checker.go | 4 +- core/services/fluxmonitorv2/flux_monitor.go | 95 ++++----- .../fluxmonitorv2/flux_monitor_test.go | 16 +- core/services/fluxmonitorv2/helpers_test.go | 12 +- core/services/fluxmonitorv2/poll_manager.go | 10 +- core/services/nurse.go | 192 ++++++++---------- core/services/nurse_test.go | 3 +- .../relay/evm/functions/logpoller_wrapper.go | 116 +++++------ core/services/synchronization/helpers_test.go | 6 +- .../telemetry_ingress_batch_client.go | 133 ++++++------ .../telemetry_ingress_batch_worker.go | 64 ++---- .../telemetry_ingress_batch_worker_test.go | 4 - .../telemetry_ingress_client.go | 90 +++----- core/services/telemetry/manager.go | 92 ++++----- core/services/telemetry/manager_test.go | 2 +- 25 files changed, 608 insertions(+), 906 deletions(-) delete mode 100644 core/chains/evm/headtracker/head_listener.go diff --git a/common/headtracker/head_broadcaster.go b/common/headtracker/head_broadcaster.go index 7edcccfccbd..c81c61141f2 100644 --- a/common/headtracker/head_broadcaster.go +++ b/common/headtracker/head_broadcaster.go @@ -42,13 +42,12 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interf } type headBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { - services.StateMachine - logger logger.Logger + services.Service + eng *services.Engine + callbacks callbackSet[H, BLOCK_HASH] mailbox *mailbox.Mailbox[H] mutex sync.Mutex - chClose services.StopChan - wgDone sync.WaitGroup latest H lastCallbackID int } @@ -60,41 +59,29 @@ func NewHeadBroadcaster[ ]( lggr logger.Logger, ) HeadBroadcaster[H, BLOCK_HASH] { - return &headBroadcaster[H, BLOCK_HASH]{ - logger: logger.Named(lggr, "HeadBroadcaster"), + hb := &headBroadcaster[H, BLOCK_HASH]{ callbacks: make(callbackSet[H, BLOCK_HASH]), mailbox: mailbox.NewSingle[H](), - chClose: make(chan struct{}), } + hb.Service, hb.eng = services.Config{ + Name: "HeadBroadcaster", + Start: hb.start, + Close: hb.close, + }.NewServiceEngine(lggr) + return hb } -func (hb *headBroadcaster[H, BLOCK_HASH]) Start(context.Context) error { - return hb.StartOnce("HeadBroadcaster", func() error { - hb.wgDone.Add(1) - go hb.run() - return nil - }) -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) Close() error { - return hb.StopOnce("HeadBroadcaster", func() error { - hb.mutex.Lock() - // clear all callbacks - hb.callbacks = make(callbackSet[H, BLOCK_HASH]) - hb.mutex.Unlock() - - close(hb.chClose) - hb.wgDone.Wait() - return nil - }) +func (hb *headBroadcaster[H, BLOCK_HASH]) start(context.Context) error { + hb.eng.Go(hb.run) + return nil } -func (hb *headBroadcaster[H, BLOCK_HASH]) Name() string { - return hb.logger.Name() -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error { - return map[string]error{hb.Name(): hb.Healthy()} +func (hb *headBroadcaster[H, BLOCK_HASH]) close() error { + hb.mutex.Lock() + // clear all callbacks + hb.callbacks = make(callbackSet[H, BLOCK_HASH]) + hb.mutex.Unlock() + return nil } func (hb *headBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) { @@ -121,15 +108,13 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) Subscribe(callback HeadTrackable[H, BL return } -func (hb *headBroadcaster[H, BLOCK_HASH]) run() { - defer hb.wgDone.Done() - +func (hb *headBroadcaster[H, BLOCK_HASH]) run(ctx context.Context) { for { select { - case <-hb.chClose: + case <-ctx.Done(): return case <-hb.mailbox.Notify(): - hb.executeCallbacks() + hb.executeCallbacks(ctx) } } } @@ -137,10 +122,10 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) run() { // DEV: the head relayer makes no promises about head delivery! Subscribing // Jobs should expect to the relayer to skip heads if there is a large number of listeners // and all callbacks cannot be completed in the allotted time. -func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() { +func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context) { head, exists := hb.mailbox.Retrieve() if !exists { - hb.logger.Info("No head to retrieve. It might have been skipped") + hb.eng.Info("No head to retrieve. It might have been skipped") return } @@ -149,7 +134,7 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() { hb.latest = head hb.mutex.Unlock() - hb.logger.Debugw("Initiating callbacks", + hb.eng.Debugw("Initiating callbacks", "headNum", head.BlockNumber(), "numCallbacks", len(callbacks), ) @@ -157,9 +142,6 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() { wg := sync.WaitGroup{} wg.Add(len(callbacks)) - ctx, cancel := hb.chClose.NewCtx() - defer cancel() - for _, callback := range callbacks { go func(trackable HeadTrackable[H, BLOCK_HASH]) { defer wg.Done() @@ -168,7 +150,7 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() { defer cancel() trackable.OnNewLongestChain(cctx, head) elapsed := time.Since(start) - hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed), + hb.eng.Debugw(fmt.Sprintf("Finished callback in %s", elapsed), "callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed) }(callback) } diff --git a/common/headtracker/head_listener.go b/common/headtracker/head_listener.go index 25715b35280..d240caab3c3 100644 --- a/common/headtracker/head_listener.go +++ b/common/headtracker/head_listener.go @@ -29,14 +29,15 @@ var ( }, []string{"ChainID"}) ) -// headHandler is a callback that handles incoming heads -type headHandler[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] func(ctx context.Context, header H) error +// HeadHandler is a callback that handles incoming heads +type HeadHandler[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] func(ctx context.Context, header H) error // HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node type HeadListener[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface { - // ListenForNewHeads kicks off the listen loop (not thread safe) - // done() must be executed upon leaving ListenForNewHeads() - ListenForNewHeads(onSubscribe func(), handleNewHead headHandler[H, BLOCK_HASH], done func()) + services.Service + + // ListenForNewHeads runs the listen loop (not thread safe) + ListenForNewHeads(ctx context.Context) // ReceivingHeads returns true if the listener is receiving heads (thread safe) ReceivingHeads() bool @@ -54,10 +55,13 @@ type headListener[ ID types.ID, BLOCK_HASH types.Hashable, ] struct { + services.Service + eng *services.Engine + config htrktypes.Config client htrktypes.Client[HTH, S, ID, BLOCK_HASH] - logger logger.Logger - chStop services.StopChan + onSubscription func(context.Context) + handleNewHead HeadHandler[HTH, BLOCK_HASH] chHeaders chan HTH headSubscription types.Subscription connected atomic.Bool @@ -74,38 +78,43 @@ func NewHeadListener[ lggr logger.Logger, client CLIENT, config htrktypes.Config, - chStop chan struct{}, + onSubscription func(context.Context), + handleNewHead HeadHandler[HTH, BLOCK_HASH], ) HeadListener[HTH, BLOCK_HASH] { - return &headListener[HTH, S, ID, BLOCK_HASH]{ - config: config, - client: client, - logger: logger.Named(lggr, "HeadListener"), - chStop: chStop, + hl := &headListener[HTH, S, ID, BLOCK_HASH]{ + config: config, + client: client, + onSubscription: onSubscription, + handleNewHead: handleNewHead, } + hl.Service, hl.eng = services.Config{ + Name: "HeadListener", + Start: hl.start, + }.NewServiceEngine(lggr) + return hl } -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Name() string { - return hl.logger.Name() +func (hl *headListener[HTH, S, ID, BLOCK_HASH]) start(context.Context) error { + hl.eng.Go(hl.ListenForNewHeads) + return nil } -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(onSubscription func(), handleNewHead headHandler[HTH, BLOCK_HASH], done func()) { - defer done() +func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(ctx context.Context) { defer hl.unsubscribe() - ctx, cancel := hl.chStop.NewCtx() - defer cancel() - for { if !hl.subscribe(ctx) { break } - onSubscription() - err := hl.receiveHeaders(ctx, handleNewHead) + if hl.onSubscription != nil { + hl.onSubscription(ctx) + } + err := hl.receiveHeaders(ctx, hl.handleNewHead) if ctx.Err() != nil { break } else if err != nil { - hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err) + hl.eng.Errorw("Error in new head subscription, unsubscribed", "err", err) continue } break @@ -131,7 +140,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error return map[string]error{hl.Name(): err} } -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead headHandler[HTH, BLOCK_HASH]) error { +func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead HeadHandler[HTH, BLOCK_HASH]) error { var noHeadsAlarmC <-chan time.Time var noHeadsAlarmT *time.Ticker noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold() @@ -142,7 +151,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte for { select { - case <-hl.chStop: + case <-ctx.Done(): return nil case blockHeader, open := <-hl.chHeaders: @@ -158,13 +167,13 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte return errors.New("head listener: chHeaders prematurely closed") } if !blockHeader.IsValid() { - hl.logger.Error("got nil block header") + hl.eng.Error("got nil block header") continue } // Compare the chain ID of the block header to the chain ID of the client if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() { - hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID()) + hl.eng.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID()) } promNumHeadsReceived.WithLabelValues(chainId.String()).Inc() @@ -184,7 +193,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte case <-noHeadsAlarmC: // We haven't received a head on the channel for a long time, log a warning - hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration) + hl.eng.Warnf("have not received a head for %v", noHeadsAlarmDuration) hl.receivingHeads.Store(false) } } @@ -198,19 +207,19 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) b for { hl.unsubscribe() - hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String()) + hl.eng.Debugf("Subscribing to new heads on chain %s", chainId.String()) select { - case <-hl.chStop: + case <-ctx.Done(): return false case <-time.After(subscribeRetryBackoff.Duration()): err := hl.subscribeToHead(ctx) if err != nil { promEthConnectionErrors.WithLabelValues(chainId.String()).Inc() - hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err) + hl.eng.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err) } else { - hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String()) + hl.eng.Debugf("Subscribed to heads on chain %s", chainId.String()) return true } } diff --git a/common/headtracker/head_tracker.go b/common/headtracker/head_tracker.go index 851458591b8..8546d856b67 100644 --- a/common/headtracker/head_tracker.go +++ b/common/headtracker/head_tracker.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -51,7 +50,9 @@ type headTracker[ ID types.ID, BLOCK_HASH types.Hashable, ] struct { - services.StateMachine + services.Service + eng *services.Engine + log logger.SugaredLogger headBroadcaster HeadBroadcaster[HTH, BLOCK_HASH] headSaver HeadSaver[HTH, BLOCK_HASH] @@ -64,8 +65,6 @@ type headTracker[ backfillMB *mailbox.Mailbox[HTH] broadcastMB *mailbox.Mailbox[HTH] headListener HeadListener[HTH, BLOCK_HASH] - chStop services.StopChan - wgDone sync.WaitGroup getNilHead func() HTH } @@ -85,52 +84,52 @@ func NewHeadTracker[ mailMon *mailbox.Monitor, getNilHead func() HTH, ) HeadTracker[HTH, BLOCK_HASH] { - chStop := make(chan struct{}) - lggr = logger.Named(lggr, "HeadTracker") - return &headTracker[HTH, S, ID, BLOCK_HASH]{ + ht := &headTracker[HTH, S, ID, BLOCK_HASH]{ headBroadcaster: headBroadcaster, client: client, chainID: client.ConfiguredChainID(), config: config, htConfig: htConfig, - log: logger.Sugared(lggr), backfillMB: mailbox.NewSingle[HTH](), broadcastMB: mailbox.New[HTH](HeadsBufferSize), - chStop: chStop, - headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop), headSaver: headSaver, mailMon: mailMon, getNilHead: getNilHead, } + ht.Service, ht.eng = services.Config{ + Name: "HeadTracker", + NewSubServices: func(lggr logger.Logger) []services.Service { + ht.headListener = NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, + // NOTE: Always try to start the head tracker off with whatever the + // latest head is, without waiting for the subscription to send us one. + // + // In some cases the subscription will send us the most recent head + // anyway when we connect (but we should not rely on this because it is + // not specced). If it happens this is fine, and the head will be + // ignored as a duplicate. + func(ctx context.Context) { + err := ht.handleInitialHead(ctx) + if err != nil { + ht.log.Errorw("Error handling initial head", "err", err.Error()) + } + }, ht.handleNewHead) + return []services.Service{ht.headListener} + }, + Start: ht.start, + Close: ht.close, + }.NewServiceEngine(lggr) + ht.log = logger.Sugared(ht.eng) + return ht } // Start starts HeadTracker service. -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error { - return ht.StartOnce("HeadTracker", func() error { - ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID) - // NOTE: Always try to start the head tracker off with whatever the - // latest head is, without waiting for the subscription to send us one. - // - // In some cases the subscription will send us the most recent head - // anyway when we connect (but we should not rely on this because it is - // not specced). If it happens this is fine, and the head will be - // ignored as a duplicate. - onSubscribe := func() { - err := ht.handleInitialHead(ctx) - if err != nil { - ht.log.Errorw("Error handling initial head", "err", err.Error()) - } - } +func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) start(context.Context) error { + ht.eng.Go(ht.backfillLoop) + ht.eng.Go(ht.broadcastLoop) - ht.wgDone.Add(3) - go ht.headListener.ListenForNewHeads(onSubscribe, ht.handleNewHead, ht.wgDone.Done) - go ht.backfillLoop() - go ht.broadcastLoop() + ht.mailMon.Monitor(ht.broadcastMB, "HeadTracker", "Broadcast", ht.chainID.String()) - ht.mailMon.Monitor(ht.broadcastMB, "HeadTracker", "Broadcast", ht.chainID.String()) - - return nil - }) + return nil } func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) handleInitialHead(ctx context.Context) error { @@ -176,23 +175,8 @@ func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) handleInitialHead(ctx context.Con return nil } -// Close stops HeadTracker service. -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Close() error { - return ht.StopOnce("HeadTracker", func() error { - close(ht.chStop) - ht.wgDone.Wait() - return ht.broadcastMB.Close() - }) -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Name() string { - return ht.log.Name() -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { - report := map[string]error{ht.Name(): ht.Healthy()} - services.CopyHealth(report, ht.headListener.HealthReport()) - return report +func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) close() error { + return ht.broadcastMB.Close() } func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH) (err error) { @@ -265,15 +249,13 @@ func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context promOldHead.WithLabelValues(ht.chainID.String()).Inc() err := fmt.Errorf("got very old block with number %d (highest seen was %d)", head.BlockNumber(), prevHead.BlockNumber()) ht.log.Critical("Got very old block. Either a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", "err", err) - ht.SvcErrBuffer.Append(err) + ht.eng.EmitHealthErr(err) } } return nil } -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { - defer ht.wgDone.Done() - +func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop(ctx context.Context) { samplingInterval := ht.htConfig.SamplingInterval() if samplingInterval > 0 { ht.log.Debugf("Head sampling is enabled - sampling interval is set to: %v", samplingInterval) @@ -281,7 +263,7 @@ func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { defer debounceHead.Stop() for { select { - case <-ht.chStop: + case <-ctx.Done(): return case <-debounceHead.C: item := ht.broadcastMB.RetrieveLatestAndClear() @@ -295,7 +277,7 @@ func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { ht.log.Info("Head sampling is disabled - callback will be called on every head") for { select { - case <-ht.chStop: + case <-ctx.Done(): return case <-ht.broadcastMB.Notify(): for { @@ -310,15 +292,10 @@ func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { } } -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { - defer ht.wgDone.Done() - - ctx, cancel := ht.chStop.NewCtx() - defer cancel() - +func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop(ctx context.Context) { for { select { - case <-ht.chStop: + case <-ctx.Done(): return case <-ht.backfillMB.Notify(): for { diff --git a/core/bridges/cache.go b/core/bridges/cache.go index 4b5a6552447..e97874a35e5 100644 --- a/core/bridges/cache.go +++ b/core/bridges/cache.go @@ -10,11 +10,9 @@ import ( "golang.org/x/exp/maps" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( @@ -25,13 +23,11 @@ const ( type Cache struct { // dependencies and configurations ORM - lggr logger.Logger interval time.Duration // service state - services.StateMachine - wg sync.WaitGroup - chStop services.StopChan + services.Service + eng *services.Engine // data state bridgeTypesCache sync.Map @@ -43,17 +39,20 @@ var _ ORM = (*Cache)(nil) var _ services.Service = (*Cache)(nil) func NewCache(base ORM, lggr logger.Logger, upsertInterval time.Duration) *Cache { - return &Cache{ + c := &Cache{ ORM: base, - lggr: lggr.Named(CacheServiceName), interval: upsertInterval, - chStop: make(chan struct{}), bridgeLastValueCache: make(map[string]BridgeResponse), } + c.Service, c.eng = services.Config{ + Name: CacheServiceName, + Start: c.start, + }.NewServiceEngine(lggr) + return c } func (c *Cache) WithDataSource(ds sqlutil.DataSource) ORM { - return NewCache(NewORM(ds), c.lggr, c.interval) + return NewCache(NewORM(ds), c.eng, c.interval) } func (c *Cache) FindBridge(ctx context.Context, name BridgeName) (BridgeType, error) { @@ -190,51 +189,17 @@ func (c *Cache) UpsertBridgeResponse(ctx context.Context, dotId string, specId i return nil } -func (c *Cache) Start(_ context.Context) error { - return c.StartOnce(CacheServiceName, func() error { - c.wg.Add(1) - - go c.run() - - return nil - }) -} - -func (c *Cache) Close() error { - return c.StopOnce(CacheServiceName, func() error { - close(c.chStop) - c.wg.Wait() - - return nil - }) -} - -func (c *Cache) HealthReport() map[string]error { - return map[string]error{c.Name(): c.Healthy()} -} - -func (c *Cache) Name() string { - return c.lggr.Name() -} - -func (c *Cache) run() { - defer c.wg.Done() - - for { - timer := time.NewTimer(utils.WithJitter(c.interval)) +func (c *Cache) start(_ context.Context) error { + ticker := services.TickerConfig{ + Initial: c.interval, + JitterPct: services.DefaultJitter, + }.NewTicker(c.interval) + c.eng.GoTick(ticker, c.doBulkUpsert) - select { - case <-timer.C: - c.doBulkUpsert() - case <-c.chStop: - timer.Stop() - - return - } - } + return nil } -func (c *Cache) doBulkUpsert() { +func (c *Cache) doBulkUpsert(ctx context.Context) { c.mu.RLock() values := maps.Values(c.bridgeLastValueCache) c.mu.RUnlock() @@ -243,11 +208,8 @@ func (c *Cache) doBulkUpsert() { return } - ctx, cancel := c.chStop.NewCtx() - defer cancel() - if err := c.ORM.BulkUpsertBridgeResponse(ctx, values); err != nil { - c.lggr.Warnf("bulk upsert of bridge responses failed: %s", err.Error()) + c.eng.Warnf("bulk upsert of bridge responses failed: %s", err.Error()) } } diff --git a/core/chains/evm/headtracker/head_listener.go b/core/chains/evm/headtracker/head_listener.go deleted file mode 100644 index 04535a34868..00000000000 --- a/core/chains/evm/headtracker/head_listener.go +++ /dev/null @@ -1,28 +0,0 @@ -package headtracker - -import ( - "math/big" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink/v2/common/headtracker" - - htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" -) - -type headListener = headtracker.HeadListener[*evmtypes.Head, common.Hash] - -func NewHeadListener( - lggr logger.Logger, - ethClient evmclient.Client, - config htrktypes.Config, chStop chan struct{}, -) headListener { - return headtracker.NewHeadListener[ - *evmtypes.Head, - ethereum.Subscription, *big.Int, common.Hash, - ](lggr, ethClient, config, chStop) -} diff --git a/core/chains/evm/headtracker/head_listener_test.go b/core/chains/evm/headtracker/head_listener_test.go index 29b090bbffe..2e459af2a2b 100644 --- a/core/chains/evm/headtracker/head_listener_test.go +++ b/core/chains/evm/headtracker/head_listener_test.go @@ -16,9 +16,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/common/headtracker" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -40,17 +40,10 @@ func Test_HeadListener_HappyPath(t *testing.T) { evmcfg := testutils.NewTestChainScopedConfig(t, func(c *toml.EVMConfig) { c.NoNewHeadsThreshold = &commonconfig.Duration{} }) - chStop := make(chan struct{}) - hl := headtracker.NewHeadListener(lggr, ethClient, evmcfg.EVM(), chStop) var headCount atomic.Int32 - handler := func(context.Context, *evmtypes.Head) error { - headCount.Add(1) - return nil - } - - subscribeAwaiter := testutils.NewAwaiter() unsubscribeAwaiter := testutils.NewAwaiter() + subscribeAwaiter := testutils.NewAwaiter() var chHeads chan<- *evmtypes.Head var chErr = make(chan error) var chSubErr <-chan error = chErr @@ -66,23 +59,23 @@ func Test_HeadListener_HappyPath(t *testing.T) { close(chErr) }) - doneAwaiter := testutils.NewAwaiter() - done := func() { - doneAwaiter.ItHappened() - } - go hl.ListenForNewHeads(func() {}, handler, done) - - subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) - require.Eventually(t, hl.Connected, tests.WaitTimeout(t), tests.TestInterval) + func() { + hl := headtracker.NewHeadListener(lggr, ethClient, evmcfg.EVM(), nil, func(context.Context, *evmtypes.Head) error { + headCount.Add(1) + return nil + }) + require.NoError(t, hl.Start(tests.Context(t))) + defer func() { assert.NoError(t, hl.Close()) }() - chHeads <- testutils.Head(0) - chHeads <- testutils.Head(1) - chHeads <- testutils.Head(2) + subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) + require.Eventually(t, hl.Connected, tests.WaitTimeout(t), tests.TestInterval) - require.True(t, hl.ReceivingHeads()) + chHeads <- testutils.Head(0) + chHeads <- testutils.Head(1) + chHeads <- testutils.Head(2) - close(chStop) - doneAwaiter.AwaitOrFail(t) + require.True(t, hl.ReceivingHeads()) + }() unsubscribeAwaiter.AwaitOrFail(t) require.Equal(t, int32(3), headCount.Load()) @@ -101,14 +94,8 @@ func Test_HeadListener_NotReceivingHeads(t *testing.T) { evmcfg := testutils.NewTestChainScopedConfig(t, func(c *toml.EVMConfig) { c.NoNewHeadsThreshold = commonconfig.MustNewDuration(time.Second) }) - chStop := make(chan struct{}) - hl := headtracker.NewHeadListener(lggr, ethClient, evmcfg.EVM(), chStop) firstHeadAwaiter := testutils.NewAwaiter() - handler := func(context.Context, *evmtypes.Head) error { - firstHeadAwaiter.ItHappened() - return nil - } subscribeAwaiter := testutils.NewAwaiter() var chHeads chan<- *evmtypes.Head @@ -125,25 +112,25 @@ func Test_HeadListener_NotReceivingHeads(t *testing.T) { close(chErr) }) - doneAwaiter := testutils.NewAwaiter() - done := func() { - doneAwaiter.ItHappened() - } - go hl.ListenForNewHeads(func() {}, handler, done) - - subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) + func() { + hl := headtracker.NewHeadListener(lggr, ethClient, evmcfg.EVM(), nil, func(context.Context, *evmtypes.Head) error { + firstHeadAwaiter.ItHappened() + return nil + }) + require.NoError(t, hl.Start(tests.Context(t))) + defer func() { assert.NoError(t, hl.Close()) }() - chHeads <- testutils.Head(0) - firstHeadAwaiter.AwaitOrFail(t) + subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) - require.True(t, hl.ReceivingHeads()) + chHeads <- testutils.Head(0) + firstHeadAwaiter.AwaitOrFail(t) - time.Sleep(time.Second * 2) + require.True(t, hl.ReceivingHeads()) - require.False(t, hl.ReceivingHeads()) + time.Sleep(time.Second * 2) - close(chStop) - doneAwaiter.AwaitOrFail(t) + require.False(t, hl.ReceivingHeads()) + }() } func Test_HeadListener_SubscriptionErr(t *testing.T) { @@ -161,19 +148,11 @@ func Test_HeadListener_SubscriptionErr(t *testing.T) { for _, test := range cases { test := test t.Run(test.name, func(t *testing.T) { - l := logger.Test(t) + lggr := logger.Test(t) ethClient := testutils.NewEthClientMockWithDefaultChain(t) evmcfg := testutils.NewTestChainScopedConfig(t, nil) - chStop := make(chan struct{}) - hl := headtracker.NewHeadListener(l, ethClient, evmcfg.EVM(), chStop) hnhCalled := make(chan *evmtypes.Head) - hnh := func(_ context.Context, header *evmtypes.Head) error { - hnhCalled <- header - return nil - } - doneAwaiter := testutils.NewAwaiter() - done := doneAwaiter.ItHappened chSubErrTest := make(chan error) var chSubErr <-chan error = chSubErrTest @@ -189,63 +168,66 @@ func Test_HeadListener_SubscriptionErr(t *testing.T) { headsCh = args.Get(1).(chan<- *evmtypes.Head) subscribeAwaiter.ItHappened() }) - go func() { - hl.ListenForNewHeads(func() {}, hnh, done) - }() - - // Put a head on the channel to ensure we test all code paths - subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) - head := testutils.Head(0) - headsCh <- head - - h := <-hnhCalled - assert.Equal(t, head, h) - - // Expect a call to unsubscribe on error - sub.On("Unsubscribe").Once().Run(func(_ mock.Arguments) { - close(headsCh) - // geth guarantees that Unsubscribe closes the errors channel - if !test.closeErr { + func() { + hl := headtracker.NewHeadListener(lggr, ethClient, evmcfg.EVM(), nil, func(_ context.Context, header *evmtypes.Head) error { + hnhCalled <- header + return nil + }) + require.NoError(t, hl.Start(tests.Context(t))) + defer func() { assert.NoError(t, hl.Close()) }() + + // Put a head on the channel to ensure we test all code paths + subscribeAwaiter.AwaitOrFail(t, tests.WaitTimeout(t)) + head := testutils.Head(0) + headsCh <- head + + h := <-hnhCalled + assert.Equal(t, head, h) + + // Expect a call to unsubscribe on error + sub.On("Unsubscribe").Once().Run(func(_ mock.Arguments) { + close(headsCh) + // geth guarantees that Unsubscribe closes the errors channel + if !test.closeErr { + close(chSubErrTest) + } + }) + // Expect a resubscribe + chSubErrTest2 := make(chan error) + var chSubErr2 <-chan error = chSubErrTest2 + sub2 := commonmocks.NewSubscription(t) + sub2.On("Err").Return(chSubErr2) + subscribeAwaiter2 := testutils.NewAwaiter() + + var headsCh2 chan<- *evmtypes.Head + ethClient.On("SubscribeNewHead", mock.Anything, mock.AnythingOfType("chan<- *types.Head")).Return(sub2, nil).Once().Run(func(args mock.Arguments) { + headsCh2 = args.Get(1).(chan<- *evmtypes.Head) + subscribeAwaiter2.ItHappened() + }) + + // Sending test error + if test.closeErr { close(chSubErrTest) + } else { + chSubErrTest <- test.err } - }) - // Expect a resubscribe - chSubErrTest2 := make(chan error) - var chSubErr2 <-chan error = chSubErrTest2 - sub2 := commonmocks.NewSubscription(t) - sub2.On("Err").Return(chSubErr2) - subscribeAwaiter2 := testutils.NewAwaiter() - - var headsCh2 chan<- *evmtypes.Head - ethClient.On("SubscribeNewHead", mock.Anything, mock.AnythingOfType("chan<- *types.Head")).Return(sub2, nil).Once().Run(func(args mock.Arguments) { - headsCh2 = args.Get(1).(chan<- *evmtypes.Head) - subscribeAwaiter2.ItHappened() - }) - - // Sending test error - if test.closeErr { - close(chSubErrTest) - } else { - chSubErrTest <- test.err - } - // Wait for it to resubscribe - subscribeAwaiter2.AwaitOrFail(t, tests.WaitTimeout(t)) + // Wait for it to resubscribe + subscribeAwaiter2.AwaitOrFail(t, tests.WaitTimeout(t)) - head2 := testutils.Head(1) - headsCh2 <- head2 + head2 := testutils.Head(1) + headsCh2 <- head2 - h2 := <-hnhCalled - assert.Equal(t, head2, h2) + h2 := <-hnhCalled + assert.Equal(t, head2, h2) - // Second call to unsubscribe on close - sub2.On("Unsubscribe").Once().Run(func(_ mock.Arguments) { - close(headsCh2) - // geth guarantees that Unsubscribe closes the errors channel - close(chSubErrTest2) - }) - close(chStop) - doneAwaiter.AwaitOrFail(t) + // Second call to unsubscribe on close + sub2.On("Unsubscribe").Once().Run(func(_ mock.Arguments) { + close(headsCh2) + // geth guarantees that Unsubscribe closes the errors channel + close(chSubErrTest2) + }) + }() }) } } diff --git a/core/chains/evm/headtracker/head_tracker.go b/core/chains/evm/headtracker/head_tracker.go index d6c2cdc64e7..f7607189f7e 100644 --- a/core/chains/evm/headtracker/head_tracker.go +++ b/core/chains/evm/headtracker/head_tracker.go @@ -2,10 +2,8 @@ package headtracker import ( "context" - "math/big" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -27,7 +25,7 @@ func NewHeadTracker( headSaver httypes.HeadSaver, mailMon *mailbox.Monitor, ) httypes.HeadTracker { - return headtracker.NewHeadTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash]( + return headtracker.NewHeadTracker[*evmtypes.Head, ethereum.Subscription]( lggr, ethClient, config, diff --git a/core/chains/evm/monitor/balance.go b/core/chains/evm/monitor/balance.go index b8194a38af9..3e28d5c436a 100644 --- a/core/chains/evm/monitor/balance.go +++ b/core/chains/evm/monitor/balance.go @@ -33,14 +33,15 @@ type ( } balanceMonitor struct { - services.StateMachine - logger logger.Logger + services.Service + eng *services.Engine + ethClient evmclient.Client chainID *big.Int chainIDStr string ethKeyStore keystore.Eth ethBalances map[gethCommon.Address]*assets.Eth - ethBalancesMtx *sync.RWMutex + ethBalancesMtx sync.RWMutex sleeperTask *utils.SleeperTask } @@ -53,59 +54,42 @@ var _ BalanceMonitor = (*balanceMonitor)(nil) func NewBalanceMonitor(ethClient evmclient.Client, ethKeyStore keystore.Eth, lggr logger.Logger) *balanceMonitor { chainId := ethClient.ConfiguredChainID() bm := &balanceMonitor{ - services.StateMachine{}, - logger.Named(lggr, "BalanceMonitor"), - ethClient, - chainId, - chainId.String(), - ethKeyStore, - make(map[gethCommon.Address]*assets.Eth), - new(sync.RWMutex), - nil, + ethClient: ethClient, + chainID: chainId, + chainIDStr: chainId.String(), + ethKeyStore: ethKeyStore, + ethBalances: make(map[gethCommon.Address]*assets.Eth), } + bm.Service, bm.eng = services.Config{ + Name: "BalanceMonitor", + Start: bm.start, + Close: bm.close, + }.NewServiceEngine(lggr) bm.sleeperTask = utils.NewSleeperTask(&worker{bm: bm}) return bm } -func (bm *balanceMonitor) Start(ctx context.Context) error { - return bm.StartOnce("BalanceMonitor", func() error { - // Always query latest balance on start - (&worker{bm}).WorkCtx(ctx) - return nil - }) -} - -// Close shuts down the BalanceMonitor, should not be used after this -func (bm *balanceMonitor) Close() error { - return bm.StopOnce("BalanceMonitor", func() error { - return bm.sleeperTask.Stop() - }) -} - -func (bm *balanceMonitor) Ready() error { +func (bm *balanceMonitor) start(ctx context.Context) error { + // Always query latest balance on start + (&worker{bm}).WorkCtx(ctx) return nil } -func (bm *balanceMonitor) Name() string { - return bm.logger.Name() -} - -func (bm *balanceMonitor) HealthReport() map[string]error { - return map[string]error{bm.Name(): bm.Healthy()} +// Close shuts down the BalanceMonitor, should not be used after this +func (bm *balanceMonitor) close() error { + return bm.sleeperTask.Stop() } // OnNewLongestChain checks the balance for each key -func (bm *balanceMonitor) OnNewLongestChain(_ context.Context, head *evmtypes.Head) { - ok := bm.IfStarted(func() { - bm.checkBalance(head) - }) +func (bm *balanceMonitor) OnNewLongestChain(_ context.Context, _ *evmtypes.Head) { + ok := bm.sleeperTask.IfStarted(bm.checkBalances) if !ok { - bm.logger.Debugw("BalanceMonitor: ignoring OnNewLongestChain call, balance monitor is not started", "state", bm.State()) + bm.eng.Debugw("BalanceMonitor: ignoring OnNewLongestChain call, balance monitor is not started", "state", bm.sleeperTask.State()) } } -func (bm *balanceMonitor) checkBalance(head *evmtypes.Head) { - bm.logger.Debugw("BalanceMonitor: signalling balance worker") +func (bm *balanceMonitor) checkBalances() { + bm.eng.Debugw("BalanceMonitor: signalling balance worker") bm.sleeperTask.WakeUp() } @@ -117,7 +101,7 @@ func (bm *balanceMonitor) updateBalance(ethBal assets.Eth, address gethCommon.Ad bm.ethBalances[address] = ðBal bm.ethBalancesMtx.Unlock() - lgr := logger.Named(bm.logger, "BalanceLog") + lgr := logger.Named(bm.eng, "BalanceLog") lgr = logger.With(lgr, "address", address.Hex(), "ethBalance", ethBal.String(), @@ -151,7 +135,7 @@ func (bm *balanceMonitor) promUpdateEthBalance(balance *assets.Eth, from gethCom balanceFloat, err := ApproximateFloat64(balance) if err != nil { - bm.logger.Error(fmt.Errorf("updatePrometheusEthBalance: %v", err)) + bm.eng.Error(fmt.Errorf("updatePrometheusEthBalance: %v", err)) return } @@ -174,7 +158,7 @@ func (w *worker) Work() { func (w *worker) WorkCtx(ctx context.Context) { enabledAddresses, err := w.bm.ethKeyStore.EnabledAddressesForChain(ctx, w.bm.chainID) if err != nil { - w.bm.logger.Error("BalanceMonitor: error getting keys", err) + w.bm.eng.Error("BalanceMonitor: error getting keys", err) } var wg sync.WaitGroup @@ -198,12 +182,12 @@ func (w *worker) checkAccountBalance(ctx context.Context, address gethCommon.Add bal, err := w.bm.ethClient.BalanceAt(ctx, address, nil) if err != nil { - w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", address.Hex()), + w.bm.eng.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", address.Hex()), "err", err, "address", address, ) } else if bal == nil { - w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", address.Hex()), + w.bm.eng.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", address.Hex()), "err", err, "address", address, ) diff --git a/core/recovery/recover.go b/core/recovery/recover.go index 8e485abc556..61315defa9a 100644 --- a/core/recovery/recover.go +++ b/core/recovery/recover.go @@ -3,38 +3,38 @@ package recovery import ( "github.com/getsentry/sentry-go" - "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + corelogger "github.com/smartcontractkit/chainlink/v2/core/logger" ) func ReportPanics(fn func()) { - defer func() { - if err := recover(); err != nil { - sentry.CurrentHub().Recover(err) - sentry.Flush(logger.SentryFlushDeadline) + HandleFn(fn, func(err any) { + sentry.CurrentHub().Recover(err) + sentry.Flush(corelogger.SentryFlushDeadline) - panic(err) - } - }() - fn() + panic(err) + }) } func WrapRecover(lggr logger.Logger, fn func()) { - defer func() { - if err := recover(); err != nil { - lggr.Recover(err) + WrapRecoverHandle(lggr, fn, nil) +} + +func WrapRecoverHandle(lggr logger.Logger, fn func(), onPanic func(recovered any)) { + HandleFn(fn, func(recovered any) { + logger.Sugared(lggr).Criticalw("Recovered goroutine panic", "panic", recovered) + + if onPanic != nil { + onPanic(recovered) } - }() - fn() + }) } -func WrapRecoverHandle(lggr logger.Logger, fn func(), onPanic func(interface{})) { +func HandleFn(fn func(), onPanic func(recovered any)) { defer func() { - if err := recover(); err != nil { - lggr.Recover(err) - - if onPanic != nil { - onPanic(err) - } + if recovered := recover(); recovered != nil { + onPanic(recovered) } }() fn() diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 138ca25ed3b..c23ec08a692 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -148,7 +148,6 @@ type ChainlinkApplication struct { shutdownOnce sync.Once srvcs []services.ServiceCtx HealthChecker services.Checker - Nurse *services.Nurse logger logger.SugaredLogger AuditLogger audit.AuditLogger closeLogger func() error @@ -277,14 +276,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } ap := cfg.AutoPprof() - var nurse *services.Nurse if ap.Enabled() { globalLogger.Info("Nurse service (automatic pprof profiling) is enabled") - nurse = services.NewNurse(ap, globalLogger) - err := nurse.Start() - if err != nil { - return nil, err - } + srvcs = append(srvcs, services.NewNurse(ap, globalLogger)) } else { globalLogger.Info("Nurse service (automatic pprof profiling) is disabled") } @@ -588,7 +582,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { SessionReaper: sessionReaper, ExternalInitiatorManager: externalInitiatorManager, HealthChecker: healthChecker, - Nurse: nurse, logger: globalLogger, AuditLogger: auditLogger, closeLogger: opts.CloseLogger, @@ -708,10 +701,6 @@ func (app *ChainlinkApplication) stop() (err error) { err = multierr.Append(err, app.FeedsService.Close()) } - if app.Nurse != nil { - err = multierr.Append(err, app.Nurse.Close()) - } - if app.profiler != nil { err = multierr.Append(err, app.profiler.Stop()) } diff --git a/core/services/fluxmonitorv2/deviation_checker.go b/core/services/fluxmonitorv2/deviation_checker.go index 51e85de371e..9dc399b09f9 100644 --- a/core/services/fluxmonitorv2/deviation_checker.go +++ b/core/services/fluxmonitorv2/deviation_checker.go @@ -3,7 +3,7 @@ package fluxmonitorv2 import ( "github.com/shopspring/decimal" - "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) // DeviationThresholds carries parameters used by the threshold-trigger logic @@ -26,7 +26,7 @@ func NewDeviationChecker(rel, abs float64, lggr logger.Logger) *DeviationChecker Rel: rel, Abs: abs, }, - lggr: lggr.Named("DeviationChecker").With("threshold", rel, "absoluteThreshold", abs), + lggr: logger.Sugared(lggr).Named("DeviationChecker").With("threshold", rel, "absoluteThreshold", abs), } } diff --git a/core/services/fluxmonitorv2/flux_monitor.go b/core/services/fluxmonitorv2/flux_monitor.go index 9175feb1a68..b8154ab6797 100644 --- a/core/services/fluxmonitorv2/flux_monitor.go +++ b/core/services/fluxmonitorv2/flux_monitor.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/shopspring/decimal" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -22,7 +23,6 @@ import ( evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flags_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/recovery" "github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2/promfm" "github.com/smartcontractkit/chainlink/v2/core/services/job" @@ -56,7 +56,10 @@ const DefaultHibernationPollPeriod = 24 * time.Hour // FluxMonitor polls external price adapters via HTTP to check for price swings. type FluxMonitor struct { - services.StateMachine + services.Service + eng *services.Engine + logger logger.SugaredLogger + contractAddress common.Address oracleAddress common.Address jobSpec job.Job @@ -77,13 +80,8 @@ type FluxMonitor struct { logBroadcaster log.Broadcaster chainID *big.Int - logger logger.SugaredLogger - backlog *utils.BoundedPriorityQueue[log.Broadcast] chProcessLogs chan struct{} - - chStop services.StopChan - waitOnStop chan struct{} } // NewFluxMonitor returns a new instance of PollingDeviationChecker. @@ -105,7 +103,7 @@ func NewFluxMonitor( flags Flags, fluxAggregator flux_aggregator_wrapper.FluxAggregatorInterface, logBroadcaster log.Broadcaster, - fmLogger logger.Logger, + lggr logger.Logger, chainID *big.Int, ) (*FluxMonitor, error) { fm := &FluxMonitor{ @@ -126,7 +124,6 @@ func NewFluxMonitor( flags: flags, logBroadcaster: logBroadcaster, fluxAggregator: fluxAggregator, - logger: logger.Sugared(fmLogger), chainID: chainID, backlog: utils.NewBoundedPriorityQueue[log.Broadcast](map[uint]int{ // We want reconnecting nodes to be able to submit to a round @@ -136,9 +133,13 @@ func NewFluxMonitor( PriorityFlagChangedLog: 2, }), chProcessLogs: make(chan struct{}, 1), - chStop: make(services.StopChan), - waitOnStop: make(chan struct{}), } + fm.Service, fm.eng = services.Config{ + Name: "FluxMonitor", + Start: fm.start, + Close: fm.close, + }.NewServiceEngine(lggr) + fm.logger = logger.Sugared(fm.eng) return fm, nil } @@ -220,7 +221,7 @@ func NewFromJobSpec( return nil, err } - fmLogger := lggr.With( + fmLogger := logger.With(lggr, "jobID", jobSpec.ID, "contract", fmSpec.ContractAddress.Hex(), ) @@ -279,14 +280,9 @@ const ( // Start implements the job.Service interface. It begins the CSP consumer in a // single goroutine to poll the price adapters and listen to NewRound events. -func (fm *FluxMonitor) Start(context.Context) error { - return fm.StartOnce("FluxMonitor", func() error { - fm.logger.Debug("Starting Flux Monitor for job") - - go fm.consume() - - return nil - }) +func (fm *FluxMonitor) start(context.Context) error { + fm.eng.Go(fm.consume) + return nil } func (fm *FluxMonitor) IsHibernating() bool { @@ -304,16 +300,12 @@ func (fm *FluxMonitor) IsHibernating() bool { return !isFlagLowered } -// Close implements the job.Service interface. It stops this instance from +// close stops this instance from // polling, cleaning up resources. -func (fm *FluxMonitor) Close() error { - return fm.StopOnce("FluxMonitor", func() error { - fm.pollManager.Stop() - close(fm.chStop) - <-fm.waitOnStop +func (fm *FluxMonitor) close() error { + fm.pollManager.Stop() - return nil - }) + return nil } // JobID implements the listener.Listener interface. @@ -354,10 +346,8 @@ func (fm *FluxMonitor) HandleLog(ctx context.Context, broadcast log.Broadcast) { } } -func (fm *FluxMonitor) consume() { - defer close(fm.waitOnStop) - - if err := fm.SetOracleAddress(); err != nil { +func (fm *FluxMonitor) consume(ctx context.Context) { + if err := fm.SetOracleAddress(ctx); err != nil { fm.logger.Warnw( "unable to set oracle address, this flux monitor job may not work correctly", "err", err, @@ -398,46 +388,46 @@ func (fm *FluxMonitor) consume() { for { select { - case <-fm.chStop: + case <-ctx.Done(): return case <-fm.chProcessLogs: - recovery.WrapRecover(fm.logger, fm.processLogs) + recovery.WrapRecover(fm.logger, func() { fm.processLogs(ctx) }) case at := <-fm.pollManager.PollTickerTicks(): tickLogger.Debugf("Poll ticker fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypePoll, fm.deviationChecker, nil) + fm.pollIfEligible(ctx, PollRequestTypePoll, fm.deviationChecker, nil) }) case at := <-fm.pollManager.IdleTimerTicks(): tickLogger.Debugf("Idle timer fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypeIdle, NewZeroDeviationChecker(fm.logger), nil) + fm.pollIfEligible(ctx, PollRequestTypeIdle, NewZeroDeviationChecker(fm.logger), nil) }) case at := <-fm.pollManager.RoundTimerTicks(): tickLogger.Debugf("Round timer fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypeRound, fm.deviationChecker, nil) + fm.pollIfEligible(ctx, PollRequestTypeRound, fm.deviationChecker, nil) }) case at := <-fm.pollManager.HibernationTimerTicks(): tickLogger.Debugf("Hibernation timer fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypeHibernation, NewZeroDeviationChecker(fm.logger), nil) + fm.pollIfEligible(ctx, PollRequestTypeHibernation, NewZeroDeviationChecker(fm.logger), nil) }) case at := <-fm.pollManager.RetryTickerTicks(): tickLogger.Debugf("Retry ticker fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypeRetry, NewZeroDeviationChecker(fm.logger), nil) + fm.pollIfEligible(ctx, PollRequestTypeRetry, NewZeroDeviationChecker(fm.logger), nil) }) case at := <-fm.pollManager.DrumbeatTicks(): tickLogger.Debugf("Drumbeat ticker fired on %v", formatTime(at)) recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(PollRequestTypeDrumbeat, NewZeroDeviationChecker(fm.logger), nil) + fm.pollIfEligible(ctx, PollRequestTypeDrumbeat, NewZeroDeviationChecker(fm.logger), nil) }) case request := <-fm.pollManager.Poll(): @@ -446,7 +436,7 @@ func (fm *FluxMonitor) consume() { break default: recovery.WrapRecover(fm.logger, func() { - fm.pollIfEligible(request.Type, fm.deviationChecker, nil) + fm.pollIfEligible(ctx, request.Type, fm.deviationChecker, nil) }) } } @@ -460,11 +450,7 @@ func formatTime(at time.Time) string { // SetOracleAddress sets the oracle address which matches the node's keys. // If none match, it uses the first available key -func (fm *FluxMonitor) SetOracleAddress() error { - // fm on deprecation path, using dangling context - ctx, cancel := fm.chStop.NewCtx() - defer cancel() - +func (fm *FluxMonitor) SetOracleAddress(ctx context.Context) error { oracleAddrs, err := fm.fluxAggregator.GetOracles(nil) if err != nil { fm.logger.Error("failed to get list of oracles from FluxAggregator contract") @@ -502,10 +488,7 @@ func (fm *FluxMonitor) SetOracleAddress() error { return errors.New("No keys found") } -func (fm *FluxMonitor) processLogs() { - ctx, cancel := fm.chStop.NewCtx() - defer cancel() - +func (fm *FluxMonitor) processLogs(ctx context.Context) { for ctx.Err() == nil && !fm.backlog.Empty() { broadcast := fm.backlog.Take() fm.processBroadcast(ctx, broadcast) @@ -529,7 +512,7 @@ func (fm *FluxMonitor) processBroadcast(ctx context.Context, broadcast log.Broad decodedLog := broadcast.DecodedLog() switch log := decodedLog.(type) { case *flux_aggregator_wrapper.FluxAggregatorNewRound: - fm.respondToNewRoundLog(*log, broadcast) + fm.respondToNewRoundLog(ctx, *log, broadcast) case *flux_aggregator_wrapper.FluxAggregatorAnswerUpdated: fm.respondToAnswerUpdatedLog(*log) fm.markLogAsConsumed(ctx, broadcast, decodedLog, started) @@ -540,7 +523,7 @@ func (fm *FluxMonitor) processBroadcast(ctx context.Context, broadcast log.Broad // Only reactivate if it is hibernating if fm.pollManager.isHibernating.Load() { fm.pollManager.Awaken(fm.initialRoundState()) - fm.pollIfEligible(PollRequestTypeAwaken, NewZeroDeviationChecker(fm.logger), broadcast) + fm.pollIfEligible(ctx, PollRequestTypeAwaken, NewZeroDeviationChecker(fm.logger), broadcast) } default: fm.logger.Errorf("unknown log %v of type %T", log, log) @@ -589,10 +572,8 @@ func (fm *FluxMonitor) respondToAnswerUpdatedLog(log flux_aggregator_wrapper.Flu // The NewRound log tells us that an oracle has initiated a new round. This tells us that we // need to poll and submit an answer to the contract regardless of the deviation. -func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggregatorNewRound, lb log.Broadcast) { +func (fm *FluxMonitor) respondToNewRoundLog(ctx context.Context, log flux_aggregator_wrapper.FluxAggregatorNewRound, lb log.Broadcast) { started := time.Now() - ctx, cancel := fm.chStop.NewCtx() - defer cancel() newRoundLogger := fm.logger.With( "round", log.RoundId, @@ -819,10 +800,8 @@ func (fm *FluxMonitor) checkEligibilityAndAggregatorFunding(roundState flux_aggr return nil } -func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker *DeviationChecker, broadcast log.Broadcast) { +func (fm *FluxMonitor) pollIfEligible(ctx context.Context, pollReq PollRequestType, deviationChecker *DeviationChecker, broadcast log.Broadcast) { started := time.Now() - ctx, cancel := fm.chStop.NewCtx() - defer cancel() l := fm.logger.With( "threshold", deviationChecker.Thresholds.Rel, diff --git a/core/services/fluxmonitorv2/flux_monitor_test.go b/core/services/fluxmonitorv2/flux_monitor_test.go index b3a5bcee6b9..1d1ed676e48 100644 --- a/core/services/fluxmonitorv2/flux_monitor_test.go +++ b/core/services/fluxmonitorv2/flux_monitor_test.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/google/uuid" + "github.com/jmoiron/sqlx" "github.com/onsi/gomega" "github.com/pkg/errors" "github.com/shopspring/decimal" @@ -18,11 +19,10 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v4" - "github.com/jmoiron/sqlx" - "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" logmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log/mocks" @@ -491,7 +491,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) { oracles := []common.Address{nodeAddr, testutils.NewAddress()} tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) fm.ExportedPollIfEligible(thresholds.rel, thresholds.abs) }) } @@ -526,7 +526,7 @@ func TestFluxMonitor_PollIfEligible_Creates_JobErr(t *testing.T) { Once() tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) fm.ExportedPollIfEligible(1, 1) } @@ -1171,7 +1171,7 @@ func TestFluxMonitor_RoundTimeoutCausesPoll_timesOutAtZero(t *testing.T) { tm.fluxAggregator.On("Address").Return(common.Address{}) tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) fm.ExportedRoundState(t) servicetest.Run(t, fm) @@ -1506,7 +1506,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Return(nil) tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) tm.fluxAggregator.On("LatestRoundData", nilOpts).Return(flux_aggregator_wrapper.LatestRoundData{ Answer: big.NewInt(10), @@ -1635,7 +1635,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Once() tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) fm.ExportedPollIfEligible(0, 0) // Now fire off the NewRound log and ensure it does not respond this time @@ -1732,7 +1732,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Once() tm.fluxAggregator.On("GetOracles", nilOpts).Return(oracles, nil) - require.NoError(t, fm.SetOracleAddress()) + require.NoError(t, fm.SetOracleAddress(tests.Context(t))) fm.ExportedPollIfEligible(0, 0) // Now fire off the NewRound log and ensure it does not respond this time diff --git a/core/services/fluxmonitorv2/helpers_test.go b/core/services/fluxmonitorv2/helpers_test.go index d321ddc35c3..80db82351c7 100644 --- a/core/services/fluxmonitorv2/helpers_test.go +++ b/core/services/fluxmonitorv2/helpers_test.go @@ -19,11 +19,15 @@ func (fm *FluxMonitor) Format(f fmt.State, verb rune) { } func (fm *FluxMonitor) ExportedPollIfEligible(threshold, absoluteThreshold float64) { - fm.pollIfEligible(PollRequestTypePoll, NewDeviationChecker(threshold, absoluteThreshold, fm.logger), nil) + ctx, cancel := fm.eng.NewCtx() + defer cancel() + fm.pollIfEligible(ctx, PollRequestTypePoll, NewDeviationChecker(threshold, absoluteThreshold, fm.logger), nil) } func (fm *FluxMonitor) ExportedProcessLogs() { - fm.processLogs() + ctx, cancel := fm.eng.NewCtx() + defer cancel() + fm.processLogs(ctx) } func (fm *FluxMonitor) ExportedBacklog() *utils.BoundedPriorityQueue[log.Broadcast] { @@ -36,7 +40,9 @@ func (fm *FluxMonitor) ExportedRoundState(t *testing.T) { } func (fm *FluxMonitor) ExportedRespondToNewRoundLog(log *flux_aggregator_wrapper.FluxAggregatorNewRound, broadcast log.Broadcast) { - fm.respondToNewRoundLog(*log, broadcast) + ctx, cancel := fm.eng.NewCtx() + defer cancel() + fm.respondToNewRoundLog(ctx, *log, broadcast) } func (fm *FluxMonitor) ExportedRespondToFlagsRaisedLog() { diff --git a/core/services/fluxmonitorv2/poll_manager.go b/core/services/fluxmonitorv2/poll_manager.go index 78b99aec4d5..aca6c75a311 100644 --- a/core/services/fluxmonitorv2/poll_manager.go +++ b/core/services/fluxmonitorv2/poll_manager.go @@ -5,8 +5,8 @@ import ( "sync/atomic" "time" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -64,7 +64,7 @@ type PollManager struct { } // NewPollManager initializes a new PollManager -func NewPollManager(cfg PollManagerConfig, logger logger.Logger) (*PollManager, error) { +func NewPollManager(cfg PollManagerConfig, lggr logger.Logger) (*PollManager, error) { minBackoffDuration := cfg.MinRetryBackoffDuration if cfg.IdleTimerPeriod < minBackoffDuration { minBackoffDuration = cfg.IdleTimerPeriod @@ -82,7 +82,7 @@ func NewPollManager(cfg PollManagerConfig, logger logger.Logger) (*PollManager, p := &PollManager{ cfg: cfg, - logger: logger.Named("PollManager"), + logger: logger.Named(lggr, "PollManager"), hibernationTimer: utils.NewResettableTimer(), pollTicker: utils.NewPausableTicker(cfg.PollTickerInterval), @@ -277,7 +277,7 @@ func (pm *PollManager) startIdleTimer(roundStartedAtUTC uint64) { deadline := startedAt.Add(pm.cfg.IdleTimerPeriod) deadlineDuration := time.Until(deadline) - log := pm.logger.With( + log := logger.With(pm.logger, "pollFrequency", pm.cfg.PollTickerInterval, "idleDuration", pm.cfg.IdleTimerPeriod, "startedAt", roundStartedAtUTC, @@ -300,7 +300,7 @@ func (pm *PollManager) startIdleTimer(roundStartedAtUTC uint64) { // startRoundTimer starts the round timer func (pm *PollManager) startRoundTimer(roundTimesOutAt uint64) { - log := pm.logger.With( + log := logger.With(pm.logger, "pollFrequency", pm.cfg.PollTickerInterval, "idleDuration", pm.cfg.IdleTimerPeriod, "timesOutAt", roundTimesOutAt, diff --git a/core/services/nurse.go b/core/services/nurse.go index a9069b5181d..7f3cad13e71 100644 --- a/core/services/nurse.go +++ b/core/services/nurse.go @@ -3,6 +3,7 @@ package services import ( "bytes" "compress/gzip" + "context" "fmt" "io/fs" "os" @@ -19,22 +20,21 @@ import ( commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) type Nurse struct { - services.StateMachine + services.Service + eng *services.Engine cfg Config - log logger.Logger checks map[string]CheckFunc checksMu sync.RWMutex chGather chan gatherRequest - chStop chan struct{} - wgDone sync.WaitGroup } type Config interface { @@ -66,85 +66,63 @@ const ( ) func NewNurse(cfg Config, log logger.Logger) *Nurse { - return &Nurse{ + n := &Nurse{ cfg: cfg, - log: log.Named("Nurse"), checks: make(map[string]CheckFunc), chGather: make(chan gatherRequest, 1), - chStop: make(chan struct{}), } + n.Service, n.eng = services.Config{ + Name: "Nurse", + Start: n.start, + }.NewServiceEngine(log) + + return n } -func (n *Nurse) Start() error { - return n.StartOnce("Nurse", func() error { - // This must be set *once*, and it must occur as early as possible - if n.cfg.MemProfileRate() != runtime.MemProfileRate { - runtime.MemProfileRate = n.cfg.BlockProfileRate() - } +func (n *Nurse) start(_ context.Context) error { + // This must be set *once*, and it must occur as early as possible + if n.cfg.MemProfileRate() != runtime.MemProfileRate { + runtime.MemProfileRate = n.cfg.BlockProfileRate() + } - n.log.Debugf("Starting nurse with config %+v", n.cfg) - runtime.SetCPUProfileRate(n.cfg.CPUProfileRate()) - runtime.SetBlockProfileRate(n.cfg.BlockProfileRate()) - runtime.SetMutexProfileFraction(n.cfg.MutexProfileFraction()) + n.eng.Debugf("Starting nurse with config %+v", n.cfg) + runtime.SetCPUProfileRate(n.cfg.CPUProfileRate()) + runtime.SetBlockProfileRate(n.cfg.BlockProfileRate()) + runtime.SetMutexProfileFraction(n.cfg.MutexProfileFraction()) - err := utils.EnsureDirAndMaxPerms(n.cfg.ProfileRoot(), 0744) - if err != nil { - return err - } + err := utils.EnsureDirAndMaxPerms(n.cfg.ProfileRoot(), 0744) + if err != nil { + return err + } - n.AddCheck("mem", n.checkMem) - n.AddCheck("goroutines", n.checkGoroutines) - - n.wgDone.Add(1) - // Checker - go func() { - defer n.wgDone.Done() - for { - select { - case <-n.chStop: - return - case <-time.After(n.cfg.PollInterval().Duration()): - } - - func() { - n.checksMu.RLock() - defer n.checksMu.RUnlock() - for reason, checkFunc := range n.checks { - if unwell, meta := checkFunc(); unwell { - n.GatherVitals(reason, meta) - break - } - } - }() - } - }() - - n.wgDone.Add(1) - // Responder - go func() { - defer n.wgDone.Done() - for { - select { - case <-n.chStop: - return - case req := <-n.chGather: - n.gatherVitals(req.reason, req.meta) - } - } - }() + n.AddCheck("mem", n.checkMem) + n.AddCheck("goroutines", n.checkGoroutines) - return nil + // Checker + n.eng.GoTick(timeutil.NewTicker(n.cfg.PollInterval().Duration), func(ctx context.Context) { + n.checksMu.RLock() + defer n.checksMu.RUnlock() + for reason, checkFunc := range n.checks { + if unwell, meta := checkFunc(); unwell { + n.GatherVitals(ctx, reason, meta) + break + } + } }) -} -func (n *Nurse) Close() error { - return n.StopOnce("Nurse", func() error { - n.log.Debug("Nurse closing...") - defer n.log.Debug("Nurse closed") - close(n.chStop) - n.wgDone.Wait() - return nil + // Responder + n.eng.Go(func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case req := <-n.chGather: + n.gatherVitals(req.reason, req.meta) + } + } }) + + return nil } func (n *Nurse) AddCheck(reason string, checkFunc CheckFunc) { @@ -153,9 +131,9 @@ func (n *Nurse) AddCheck(reason string, checkFunc CheckFunc) { n.checks[reason] = checkFunc } -func (n *Nurse) GatherVitals(reason string, meta Meta) { +func (n *Nurse) GatherVitals(ctx context.Context, reason string, meta Meta) { select { - case <-n.chStop: + case <-ctx.Done(): case n.chGather <- gatherRequest{reason, meta}: default: } @@ -189,14 +167,14 @@ func (n *Nurse) checkGoroutines() (bool, Meta) { func (n *Nurse) gatherVitals(reason string, meta Meta) { loggerFields := (logger.Fields{"reason": reason}).Merge(logger.Fields(meta)) - n.log.Debugw("Nurse is gathering vitals", loggerFields.Slice()...) + n.eng.Debugw("Nurse is gathering vitals", loggerFields.Slice()...) size, err := n.totalProfileBytes() if err != nil { - n.log.Errorw("could not fetch total profile bytes", loggerFields.With("err", err).Slice()...) + n.eng.Errorw("could not fetch total profile bytes", loggerFields.With("err", err).Slice()...) return } else if size >= uint64(n.cfg.MaxProfileSize()) { - n.log.Warnw("cannot write pprof profile, total profile size exceeds configured PPROF_MAX_PROFILE_SIZE", + n.eng.Warnw("cannot write pprof profile, total profile size exceeds configured PPROF_MAX_PROFILE_SIZE", loggerFields.With("total", size, "max", n.cfg.MaxProfileSize()).Slice()..., ) return @@ -206,7 +184,7 @@ func (n *Nurse) gatherVitals(reason string, meta Meta) { err = n.appendLog(now, reason, meta) if err != nil { - n.log.Warnw("cannot write pprof profile", loggerFields.With("err", err).Slice()...) + n.eng.Warnw("cannot write pprof profile", loggerFields.With("err", err).Slice()...) return } var wg sync.WaitGroup @@ -227,7 +205,7 @@ func (n *Nurse) gatherVitals(reason string, meta Meta) { wg.Add(1) go n.gather("heap", now, &wg) } else { - n.log.Info("skipping heap collection because runtime.MemProfileRate = 0") + n.eng.Info("skipping heap collection because runtime.MemProfileRate = 0") } wg.Add(1) @@ -236,15 +214,13 @@ func (n *Nurse) gatherVitals(reason string, meta Meta) { go n.gather("threadcreate", now, &wg) ch := make(chan struct{}) - n.wgDone.Add(1) - go func() { - defer n.wgDone.Done() + n.eng.Go(func(ctx context.Context) { defer close(ch) wg.Wait() - }() + }) select { - case <-n.chStop: + case <-n.eng.StopChan: case <-ch: } } @@ -252,7 +228,7 @@ func (n *Nurse) gatherVitals(reason string, meta Meta) { func (n *Nurse) appendLog(now time.Time, reason string, meta Meta) error { filename := filepath.Join(n.cfg.ProfileRoot(), "nurse.log") - n.log.Debugf("creating nurse log %s", filename) + n.eng.Debugf("creating nurse log %s", filename) file, err := os.Create(filename) if err != nil { @@ -288,34 +264,34 @@ func (n *Nurse) appendLog(now time.Time, reason string, meta Meta) error { func (n *Nurse) gatherCPU(now time.Time, wg *sync.WaitGroup) { defer wg.Done() - n.log.Debugf("gather cpu %d ...", now.UnixMicro()) - defer n.log.Debugf("gather cpu %d done", now.UnixMicro()) + n.eng.Debugf("gather cpu %d ...", now.UnixMicro()) + defer n.eng.Debugf("gather cpu %d done", now.UnixMicro()) wc, err := n.createFile(now, cpuProfName, false) if err != nil { - n.log.Errorw("could not write cpu profile", "err", err) + n.eng.Errorw("could not write cpu profile", "err", err) return } defer wc.Close() err = pprof.StartCPUProfile(wc) if err != nil { - n.log.Errorw("could not start cpu profile", "err", err) + n.eng.Errorw("could not start cpu profile", "err", err) return } select { - case <-n.chStop: - n.log.Debug("gather cpu received stop") + case <-n.eng.StopChan: + n.eng.Debug("gather cpu received stop") case <-time.After(n.cfg.GatherDuration().Duration()): - n.log.Debugf("gather cpu duration elapsed %s. stoping profiling.", n.cfg.GatherDuration().Duration().String()) + n.eng.Debugf("gather cpu duration elapsed %s. stoping profiling.", n.cfg.GatherDuration().Duration().String()) } pprof.StopCPUProfile() err = wc.Close() if err != nil { - n.log.Errorw("could not close cpu profile", "err", err) + n.eng.Errorw("could not close cpu profile", "err", err) return } } @@ -323,23 +299,23 @@ func (n *Nurse) gatherCPU(now time.Time, wg *sync.WaitGroup) { func (n *Nurse) gatherTrace(now time.Time, wg *sync.WaitGroup) { defer wg.Done() - n.log.Debugf("gather trace %d ...", now.UnixMicro()) - defer n.log.Debugf("gather trace %d done", now.UnixMicro()) + n.eng.Debugf("gather trace %d ...", now.UnixMicro()) + defer n.eng.Debugf("gather trace %d done", now.UnixMicro()) wc, err := n.createFile(now, traceProfName, true) if err != nil { - n.log.Errorw("could not write trace profile", "err", err) + n.eng.Errorw("could not write trace profile", "err", err) return } defer wc.Close() err = trace.Start(wc) if err != nil { - n.log.Errorw("could not start trace profile", "err", err) + n.eng.Errorw("could not start trace profile", "err", err) return } select { - case <-n.chStop: + case <-n.eng.StopChan: case <-time.After(n.cfg.GatherTraceDuration().Duration()): } @@ -347,7 +323,7 @@ func (n *Nurse) gatherTrace(now time.Time, wg *sync.WaitGroup) { err = wc.Close() if err != nil { - n.log.Errorw("could not close trace profile", "err", err) + n.eng.Errorw("could not close trace profile", "err", err) return } } @@ -355,18 +331,18 @@ func (n *Nurse) gatherTrace(now time.Time, wg *sync.WaitGroup) { func (n *Nurse) gather(typ string, now time.Time, wg *sync.WaitGroup) { defer wg.Done() - n.log.Debugf("gather %s %d ...", typ, now.UnixMicro()) - n.log.Debugf("gather %s %d done", typ, now.UnixMicro()) + n.eng.Debugf("gather %s %d ...", typ, now.UnixMicro()) + n.eng.Debugf("gather %s %d done", typ, now.UnixMicro()) p := pprof.Lookup(typ) if p == nil { - n.log.Errorf("Invariant violation: pprof type '%v' does not exist", typ) + n.eng.Errorf("Invariant violation: pprof type '%v' does not exist", typ) return } p0, err := collectProfile(p) if err != nil { - n.log.Errorw(fmt.Sprintf("could not collect %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not collect %v profile", typ), "err", err) return } @@ -374,14 +350,14 @@ func (n *Nurse) gather(typ string, now time.Time, wg *sync.WaitGroup) { defer t.Stop() select { - case <-n.chStop: + case <-n.eng.StopChan: return case <-t.C: } p1, err := collectProfile(p) if err != nil { - n.log.Errorw(fmt.Sprintf("could not collect %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not collect %v profile", typ), "err", err) return } ts := p1.TimeNanos @@ -391,7 +367,7 @@ func (n *Nurse) gather(typ string, now time.Time, wg *sync.WaitGroup) { p1, err = profile.Merge([]*profile.Profile{p0, p1}) if err != nil { - n.log.Errorw(fmt.Sprintf("could not compute delta for %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not compute delta for %v profile", typ), "err", err) return } @@ -400,19 +376,19 @@ func (n *Nurse) gather(typ string, now time.Time, wg *sync.WaitGroup) { wc, err := n.createFile(now, typ, false) if err != nil { - n.log.Errorw(fmt.Sprintf("could not write %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not write %v profile", typ), "err", err) return } defer wc.Close() err = p1.Write(wc) if err != nil { - n.log.Errorw(fmt.Sprintf("could not write %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not write %v profile", typ), "err", err) return } err = wc.Close() if err != nil { - n.log.Errorw(fmt.Sprintf("could not close file for %v profile", typ), "err", err) + n.eng.Errorw(fmt.Sprintf("could not close file for %v profile", typ), "err", err) return } } @@ -437,7 +413,7 @@ func (n *Nurse) createFile(now time.Time, typ string, shouldGzip bool) (*utils.D filename += ".gz" } fullpath := filepath.Join(n.cfg.ProfileRoot(), filename) - n.log.Debugf("creating file %s", fullpath) + n.eng.Debugf("creating file %s", fullpath) file, err := os.Create(fullpath) if err != nil { diff --git a/core/services/nurse_test.go b/core/services/nurse_test.go index 4597eeb456b..ed6f6872dc9 100644 --- a/core/services/nurse_test.go +++ b/core/services/nurse_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -102,7 +103,7 @@ func TestNurse(t *testing.T) { nrse := NewNurse(newMockConfig(t), l) nrse.AddCheck("test", func() (bool, Meta) { return true, Meta{} }) - require.NoError(t, nrse.Start()) + require.NoError(t, nrse.Start(tests.Context(t))) defer func() { require.NoError(t, nrse.Close()) }() require.NoError(t, nrse.appendLog(time.Now(), "test", Meta{})) diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 559b1ec33f5..b0d04b11871 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -22,7 +22,8 @@ import ( ) type logPollerWrapper struct { - services.StateMachine + services.Service + eng *services.Engine routerContract *functions_router.FunctionsRouter pluginConfig config.PluginConfig @@ -38,9 +39,6 @@ type logPollerWrapper struct { detectedRequests detectedEvents detectedResponses detectedEvents mu sync.Mutex - closeWait sync.WaitGroup - stopCh services.StopChan - lggr logger.Logger } type detectedEvent struct { @@ -94,7 +92,7 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf return nil, errors.Errorf("invalid config: number of required confirmation blocks >= pastBlocksToPoll") } - return &logPollerWrapper{ + w := &logPollerWrapper{ routerContract: routerContract, pluginConfig: pluginConfig, requestBlockOffset: requestBlockOffset, @@ -106,40 +104,25 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf logPoller: logPoller, client: client, subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber), - stopCh: make(services.StopChan), - lggr: lggr.Named("LogPollerWrapper"), - }, nil -} - -func (l *logPollerWrapper) Start(context.Context) error { - return l.StartOnce("LogPollerWrapper", func() error { - l.lggr.Infow("starting LogPollerWrapper", "routerContract", l.routerContract.Address().Hex(), "contractVersion", l.pluginConfig.ContractVersion) - l.mu.Lock() - defer l.mu.Unlock() - if l.pluginConfig.ContractVersion != 1 { - return errors.New("only contract version 1 is supported") - } - l.closeWait.Add(1) - go l.checkForRouteUpdates() - return nil - }) -} - -func (l *logPollerWrapper) Close() error { - return l.StopOnce("LogPollerWrapper", func() (err error) { - l.lggr.Info("closing LogPollerWrapper") - close(l.stopCh) - l.closeWait.Wait() - return nil - }) + } + w.Service, w.eng = services.Config{ + Name: "LoggPollerWrapper", + Start: w.start, + }.NewServiceEngine(lggr) + return w, nil } -func (l *logPollerWrapper) HealthReport() map[string]error { - return map[string]error{l.Name(): l.Ready()} +func (l *logPollerWrapper) start(context.Context) error { + l.eng.Infow("starting LogPollerWrapper", "routerContract", l.routerContract.Address().Hex(), "contractVersion", l.pluginConfig.ContractVersion) + l.mu.Lock() + defer l.mu.Unlock() + if l.pluginConfig.ContractVersion != 1 { + return errors.New("only contract version 1 is supported") + } + l.eng.Go(l.checkForRouteUpdates) + return nil } -func (l *logPollerWrapper) Name() string { return l.lggr.Name() } - // methods of LogPollerWrapper func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.OracleRequest, []evmRelayTypes.OracleResponse, error) { l.mu.Lock() @@ -166,7 +149,7 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or resultsReq := []evmRelayTypes.OracleRequest{} resultsResp := []evmRelayTypes.OracleResponse{} if len(coordinators) == 0 { - l.lggr.Debug("LatestEvents: no non-zero coordinators to check") + l.eng.Debug("LatestEvents: no non-zero coordinators to check") return resultsReq, resultsResp, errors.New("no non-zero coordinators to check") } @@ -174,32 +157,32 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or requestEndBlock := latestBlockNum - l.requestBlockOffset requestLogs, err := l.logPoller.Logs(ctx, startBlockNum, requestEndBlock, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) if err != nil { - l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock) + l.eng.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock) return nil, nil, err } - l.lggr.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock) + l.eng.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock) requestLogs = l.filterPreviouslyDetectedEvents(requestLogs, &l.detectedRequests, "requests") responseEndBlock := latestBlockNum - l.responseBlockOffset responseLogs, err := l.logPoller.Logs(ctx, startBlockNum, responseEndBlock, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) if err != nil { - l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock) + l.eng.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock) return nil, nil, err } - l.lggr.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock) + l.eng.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock) responseLogs = l.filterPreviouslyDetectedEvents(responseLogs, &l.detectedResponses, "responses") parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator, l.client) if err != nil { - l.lggr.Error("LatestEvents: creating a contract instance for parsing failed") + l.eng.Error("LatestEvents: creating a contract instance for parsing failed") return nil, nil, err } - l.lggr.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.Hex()) + l.eng.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.Hex()) for _, log := range requestLogs { gethLog := log.ToGethLog() oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) if err != nil { - l.lggr.Errorw("LatestEvents: failed to parse a request log, skipping", "err", err) + l.eng.Errorw("LatestEvents: failed to parse a request log, skipping", "err", err) continue } @@ -212,7 +195,7 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { - l.lggr.Errorw("LatestEvents: failed to initialize types", "errType1", errType1, + l.eng.Errorw("LatestEvents: failed to initialize types", "errType1", errType1, "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, ) continue @@ -244,7 +227,7 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or oracleRequest.Commitment.TimeoutTimestamp, ) if err != nil { - l.lggr.Errorw("LatestEvents: failed to pack commitment bytes, skipping", "err", err) + l.eng.Errorw("LatestEvents: failed to pack commitment bytes, skipping", "err", err) } resultsReq = append(resultsReq, evmRelayTypes.OracleRequest{ @@ -266,7 +249,7 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or gethLog := log.ToGethLog() oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) if err != nil { - l.lggr.Errorw("LatestEvents: failed to parse a response log, skipping") + l.eng.Errorw("LatestEvents: failed to parse a response log, skipping") continue } resultsResp = append(resultsResp, evmRelayTypes.OracleResponse{ @@ -275,13 +258,13 @@ func (l *logPollerWrapper) LatestEvents(ctx context.Context) ([]evmRelayTypes.Or } } - l.lggr.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "startBlock", startBlockNum, "endBlock", latestBlockNum) + l.eng.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "startBlock", startBlockNum, "endBlock", latestBlockNum) return resultsReq, resultsResp, nil } func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, detectedEvents *detectedEvents, filterType string) []logpoller.Log { if len(logs) > maxLogsToProcess { - l.lggr.Errorw("filterPreviouslyDetectedEvents: too many logs to process, only processing latest maxLogsToProcess logs", "filterType", filterType, "nLogs", len(logs), "maxLogsToProcess", maxLogsToProcess) + l.eng.Errorw("filterPreviouslyDetectedEvents: too many logs to process, only processing latest maxLogsToProcess logs", "filterType", filterType, "nLogs", len(logs), "maxLogsToProcess", maxLogsToProcess) logs = logs[len(logs)-maxLogsToProcess:] } l.mu.Lock() @@ -290,7 +273,7 @@ func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, for _, log := range logs { var requestId [32]byte if len(log.Topics) < 2 || len(log.Topics[1]) != 32 { - l.lggr.Errorw("filterPreviouslyDetectedEvents: invalid log, skipping", "filterType", filterType, "log", log) + l.eng.Errorw("filterPreviouslyDetectedEvents: invalid log, skipping", "filterType", filterType, "log", log) continue } copy(requestId[:], log.Topics[1]) // requestId is the second topic (1st topic is the event signature) @@ -310,7 +293,7 @@ func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, expiredRequests++ } detectedEvents.detectedEventsOrdered = detectedEvents.detectedEventsOrdered[expiredRequests:] - l.lggr.Debugw("filterPreviouslyDetectedEvents: done", "filterType", filterType, "nLogs", len(logs), "nFilteredLogs", len(filteredLogs), "nExpiredRequests", expiredRequests, "previouslyDetectedCacheSize", len(detectedEvents.detectedEventsOrdered)) + l.eng.Debugw("filterPreviouslyDetectedEvents: done", "filterType", filterType, "nLogs", len(logs), "nFilteredLogs", len(filteredLogs), "nExpiredRequests", expiredRequests, "previouslyDetectedCacheSize", len(detectedEvents.detectedEventsOrdered)) return filteredLogs } @@ -319,7 +302,7 @@ func (l *logPollerWrapper) SubscribeToUpdates(ctx context.Context, subscriberNam if l.pluginConfig.ContractVersion == 0 { // in V0, immediately set contract address to Oracle contract and never update again if err := subscriber.UpdateRoutes(ctx, l.routerContract.Address(), l.routerContract.Address()); err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "err", err) + l.eng.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "err", err) } } else if l.pluginConfig.ContractVersion == 1 { l.mu.Lock() @@ -328,37 +311,36 @@ func (l *logPollerWrapper) SubscribeToUpdates(ctx context.Context, subscriberNam } } -func (l *logPollerWrapper) checkForRouteUpdates() { - defer l.closeWait.Done() +func (l *logPollerWrapper) checkForRouteUpdates(ctx context.Context) { freqSec := l.pluginConfig.ContractUpdateCheckFrequencySec if freqSec == 0 { - l.lggr.Errorw("LogPollerWrapper: ContractUpdateCheckFrequencySec is zero - route update checks disabled") + l.eng.Errorw("LogPollerWrapper: ContractUpdateCheckFrequencySec is zero - route update checks disabled") return } - updateOnce := func() { + updateOnce := func(ctx context.Context) { // NOTE: timeout == frequency here, could be changed to a separate config value timeout := time.Duration(l.pluginConfig.ContractUpdateCheckFrequencySec) * time.Second - ctx, cancel := l.stopCh.CtxCancel(context.WithTimeout(context.Background(), timeout)) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() active, proposed, err := l.getCurrentCoordinators(ctx) if err != nil { - l.lggr.Errorw("LogPollerWrapper: error calling getCurrentCoordinators", "err", err) + l.eng.Errorw("LogPollerWrapper: error calling getCurrentCoordinators", "err", err) return } l.handleRouteUpdate(ctx, active, proposed) } - updateOnce() // update once right away + updateOnce(ctx) // update once right away ticker := time.NewTicker(time.Duration(freqSec) * time.Second) defer ticker.Stop() for { select { - case <-l.stopCh: + case <-ctx.Done(): return case <-ticker.C: - updateOnce() + updateOnce(ctx) } } } @@ -394,22 +376,22 @@ func (l *logPollerWrapper) handleRouteUpdate(ctx context.Context, activeCoordina defer l.mu.Unlock() if activeCoordinator == (common.Address{}) { - l.lggr.Error("LogPollerWrapper: cannot update activeCoordinator to zero address") + l.eng.Error("LogPollerWrapper: cannot update activeCoordinator to zero address") return } if activeCoordinator == l.activeCoordinator && proposedCoordinator == l.proposedCoordinator { - l.lggr.Debug("LogPollerWrapper: no changes to routes") + l.eng.Debug("LogPollerWrapper: no changes to routes") return } errActive := l.registerFilters(ctx, activeCoordinator) errProposed := l.registerFilters(ctx, proposedCoordinator) if errActive != nil || errProposed != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to register filters", "errorActive", errActive, "errorProposed", errProposed) + l.eng.Errorw("LogPollerWrapper: Failed to register filters", "errorActive", errActive, "errorProposed", errProposed) return } - l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Hex(), "proposedCoordinator", proposedCoordinator.Hex()) + l.eng.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Hex(), "proposedCoordinator", proposedCoordinator.Hex()) l.activeCoordinator = activeCoordinator l.proposedCoordinator = proposedCoordinator @@ -417,7 +399,7 @@ func (l *logPollerWrapper) handleRouteUpdate(ctx context.Context, activeCoordina for _, subscriber := range l.subscribers { err := subscriber.UpdateRoutes(ctx, activeCoordinator, proposedCoordinator) if err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) + l.eng.Errorw("LogPollerWrapper: Failed to update routes", "err", err) } } @@ -430,9 +412,9 @@ func (l *logPollerWrapper) handleRouteUpdate(ctx context.Context, activeCoordina continue } if err := l.logPoller.UnregisterFilter(ctx, filter.Name); err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to unregister filter", "filterName", filter.Name, "err", err) + l.eng.Errorw("LogPollerWrapper: Failed to unregister filter", "filterName", filter.Name, "err", err) } - l.lggr.Debugw("LogPollerWrapper: Successfully unregistered filter", "filterName", filter.Name) + l.eng.Debugw("LogPollerWrapper: Successfully unregistered filter", "filterName", filter.Name) } } diff --git a/core/services/synchronization/helpers_test.go b/core/services/synchronization/helpers_test.go index 7bb2dde7633..aea9bf77f49 100644 --- a/core/services/synchronization/helpers_test.go +++ b/core/services/synchronization/helpers_test.go @@ -12,15 +12,15 @@ import ( // NewTestTelemetryIngressClient calls NewTelemetryIngressClient and injects telemClient. func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient) TelemetryService { - tc := NewTelemetryIngressClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, "test", "test") + tc := NewTelemetryIngressClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100) tc.(*telemetryIngressClient).telemClient = telemClient return tc } // NewTestTelemetryIngressBatchClient calls NewTelemetryIngressBatchClient and injects telemClient. func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration, uniconn bool) TelemetryService { - tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second, uniconn, "test", "test") - tc.(*telemetryIngressBatchClient).close = func() error { return nil } + tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second, uniconn) + tc.(*telemetryIngressBatchClient).closeFn = func() error { return nil } tc.(*telemetryIngressBatchClient).telemClient = telemClient return tc } diff --git a/core/services/synchronization/telemetry_ingress_batch_client.go b/core/services/synchronization/telemetry_ingress_batch_client.go index cade98cf606..26ce1e3066a 100644 --- a/core/services/synchronization/telemetry_ingress_batch_client.go +++ b/core/services/synchronization/telemetry_ingress_batch_client.go @@ -12,8 +12,9 @@ import ( "github.com/smartcontractkit/wsrpc" "github.com/smartcontractkit/wsrpc/examples/simple/keys" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" ) @@ -37,21 +38,18 @@ func (NoopTelemetryIngressBatchClient) Name() string { return func (NoopTelemetryIngressBatchClient) Ready() error { return nil } type telemetryIngressBatchClient struct { - services.StateMachine + services.Service + eng *services.Engine + url *url.URL ks keystore.CSA serverPubKeyHex string connected atomic.Bool telemClient telemPb.TelemClient - close func() error - - globalLogger logger.Logger - logging bool - lggr logger.Logger + closeFn func() error - wgDone sync.WaitGroup - chDone services.StopChan + logging bool telemBufferSize uint telemMaxBatchSize uint @@ -66,8 +64,8 @@ type telemetryIngressBatchClient struct { // NewTelemetryIngressBatchClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool, network string, chainID string) TelemetryService { - return &telemetryIngressBatchClient{ +func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryService { + c := &telemetryIngressBatchClient{ telemBufferSize: telemBufferSize, telemMaxBatchSize: telemMaxBatchSize, telemSendInterval: telemSendInterval, @@ -75,13 +73,17 @@ func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks key url: url, ks: ks, serverPubKeyHex: serverPubKeyHex, - globalLogger: lggr, logging: logging, - lggr: lggr.Named("TelemetryIngressBatchClient").Named(network).Named(chainID), - chDone: make(services.StopChan), workers: make(map[string]*telemetryIngressBatchWorker), useUniConn: useUniconn, } + c.Service, c.eng = services.Config{ + Name: "TelemetryIngressBatchClient", + Start: c.start, + Close: c.close, + }.NewServiceEngine(lggr) + + return c } // Start connects the wsrpc client to the telemetry ingress server @@ -90,71 +92,53 @@ func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks key // an error and wsrpc will continue to retry the connection. Eventually when the ingress // server does come back up, wsrpc will establish the connection without any interaction // on behalf of the node operator. -func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error { - return tc.StartOnce("TelemetryIngressBatchClient", func() error { - clientPrivKey, err := tc.getCSAPrivateKey() - if err != nil { - return err - } +func (tc *telemetryIngressBatchClient) start(ctx context.Context) error { + clientPrivKey, err := tc.getCSAPrivateKey() + if err != nil { + return err + } - serverPubKey := keys.FromHex(tc.serverPubKeyHex) - - // Initialize a new wsrpc client caller - // This is used to call RPC methods on the server - if tc.telemClient == nil { // only preset for tests - if tc.useUniConn { - tc.wgDone.Add(1) - go func() { - defer tc.wgDone.Done() - ctx2, cancel := tc.chDone.NewCtx() - defer cancel() - conn, err := wsrpc.DialUniWithContext(ctx2, tc.lggr, tc.url.String(), clientPrivKey, serverPubKey) - if err != nil { - if ctx2.Err() != nil { - tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err) - } else { - tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err, "server pubkey", tc.serverPubKeyHex) - tc.SvcErrBuffer.Append(err) - } - return - } - tc.telemClient = telemPb.NewTelemClient(conn) - tc.close = conn.Close - tc.connected.Store(true) - }() - } else { - // Spawns a goroutine that will eventually connect - conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr)) + serverPubKey := keys.FromHex(tc.serverPubKeyHex) + + // Initialize a new wsrpc client caller + // This is used to call RPC methods on the server + if tc.telemClient == nil { // only preset for tests + if tc.useUniConn { + tc.eng.Go(func(ctx context.Context) { + conn, err := wsrpc.DialUniWithContext(ctx, tc.eng, tc.url.String(), clientPrivKey, serverPubKey) if err != nil { - return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err) + if ctx.Err() != nil { + tc.eng.Warnw("gave up connecting to telemetry endpoint", "err", err) + } else { + tc.eng.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err, "server pubkey", tc.serverPubKeyHex) + tc.eng.EmitHealthErr(err) + } + return } tc.telemClient = telemPb.NewTelemClient(conn) - tc.close = func() error { conn.Close(); return nil } + tc.closeFn = conn.Close + tc.connected.Store(true) + }) + } else { + // Spawns a goroutine that will eventually connect + conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.eng)) + if err != nil { + return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err) } + tc.telemClient = telemPb.NewTelemClient(conn) + tc.closeFn = func() error { conn.Close(); return nil } } + } - return nil - }) + return nil } // Close disconnects the wsrpc client from the ingress server and waits for all workers to exit -func (tc *telemetryIngressBatchClient) Close() error { - return tc.StopOnce("TelemetryIngressBatchClient", func() error { - close(tc.chDone) - tc.wgDone.Wait() - if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn { - return tc.close() - } - return nil - }) -} - -func (tc *telemetryIngressBatchClient) Name() string { - return tc.lggr.Name() -} - -func (tc *telemetryIngressBatchClient) HealthReport() map[string]error { - return map[string]error{tc.Name(): tc.Healthy()} +func (tc *telemetryIngressBatchClient) close() error { + if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn { + return tc.closeFn() + } + return nil } // getCSAPrivateKey gets the client's CSA private key @@ -175,7 +159,7 @@ func (tc *telemetryIngressBatchClient) getCSAPrivateKey() (privkey []byte, err e // and a warning is logged. func (tc *telemetryIngressBatchClient) Send(ctx context.Context, telemData []byte, contractID string, telemType TelemetryType) { if tc.useUniConn && !tc.connected.Load() { - tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String()) + tc.eng.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String()) return } payload := TelemPayload{ @@ -206,18 +190,17 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload) if !found { worker = NewTelemetryIngressBatchWorker( tc.telemMaxBatchSize, - tc.telemSendInterval, tc.telemSendTimeout, tc.telemClient, - &tc.wgDone, - tc.chDone, make(chan TelemPayload, tc.telemBufferSize), payload.ContractID, payload.TelemType, - tc.globalLogger, + tc.eng, tc.logging, ) - worker.Start() + tc.eng.GoTick(timeutil.NewTicker(func() time.Duration { + return tc.telemSendInterval + }), worker.Send) tc.workers[workerKey] = worker } diff --git a/core/services/synchronization/telemetry_ingress_batch_worker.go b/core/services/synchronization/telemetry_ingress_batch_worker.go index e7ea6595811..7eca26f02c9 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker.go @@ -2,13 +2,12 @@ package synchronization import ( "context" - "sync" "sync/atomic" "time" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" ) @@ -18,11 +17,8 @@ type telemetryIngressBatchWorker struct { services.Service telemMaxBatchSize uint - telemSendInterval time.Duration telemSendTimeout time.Duration telemClient telemPb.TelemClient - wgDone *sync.WaitGroup - chDone services.StopChan chTelemetry chan TelemPayload contractID string telemType TelemetryType @@ -35,65 +31,45 @@ type telemetryIngressBatchWorker struct { // telemetry to the ingress server via WSRPC func NewTelemetryIngressBatchWorker( telemMaxBatchSize uint, - telemSendInterval time.Duration, telemSendTimeout time.Duration, telemClient telemPb.TelemClient, - wgDone *sync.WaitGroup, - chDone chan struct{}, chTelemetry chan TelemPayload, contractID string, telemType TelemetryType, - globalLogger logger.Logger, + lggr logger.Logger, logging bool, ) *telemetryIngressBatchWorker { return &telemetryIngressBatchWorker{ - telemSendInterval: telemSendInterval, telemSendTimeout: telemSendTimeout, telemMaxBatchSize: telemMaxBatchSize, telemClient: telemClient, - wgDone: wgDone, - chDone: chDone, chTelemetry: chTelemetry, contractID: contractID, telemType: telemType, logging: logging, - lggr: globalLogger.Named("TelemetryIngressBatchWorker"), + lggr: logger.Named(lggr, "TelemetryIngressBatchWorker"), } } -// Start sends batched telemetry to the ingress server on an interval -func (tw *telemetryIngressBatchWorker) Start() { - tw.wgDone.Add(1) - sendTicker := time.NewTicker(tw.telemSendInterval) - - go func() { - defer tw.wgDone.Done() - - for { - select { - case <-sendTicker.C: - if len(tw.chTelemetry) == 0 { - continue - } +// Send sends batched telemetry to the ingress server on an interval +func (tw *telemetryIngressBatchWorker) Send(ctx context.Context) { + if len(tw.chTelemetry) == 0 { + return + } - // Send batched telemetry to the ingress server, log any errors - telemBatchReq := tw.BuildTelemBatchReq() - ctx, cancel := tw.chDone.CtxCancel(context.WithTimeout(context.Background(), tw.telemSendTimeout)) - _, err := tw.telemClient.TelemBatch(ctx, telemBatchReq) - cancel() + // Send batched telemetry to the ingress server, log any errors + telemBatchReq := tw.BuildTelemBatchReq() + ctx, cancel := context.WithTimeout(ctx, tw.telemSendTimeout) + _, err := tw.telemClient.TelemBatch(ctx, telemBatchReq) + cancel() - if err != nil { - tw.lggr.Warnf("Could not send telemetry: %v", err) - continue - } - if tw.logging { - tw.lggr.Debugw("Successfully sent telemetry to ingress server", "contractID", telemBatchReq.ContractId, "telemType", telemBatchReq.TelemetryType, "telemetry", telemBatchReq.Telemetry) - } - case <-tw.chDone: - return - } - } - }() + if err != nil { + tw.lggr.Warnf("Could not send telemetry: %v", err) + return + } + if tw.logging { + tw.lggr.Debugw("Successfully sent telemetry to ingress server", "contractID", telemBatchReq.ContractId, "telemType", telemBatchReq.TelemetryType, "telemetry", telemBatchReq.Telemetry) + } } // logBufferFullWithExpBackoff logs messages at diff --git a/core/services/synchronization/telemetry_ingress_batch_worker_test.go b/core/services/synchronization/telemetry_ingress_batch_worker_test.go index 109022c7135..bf44ee9195a 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker_test.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker_test.go @@ -1,7 +1,6 @@ package synchronization_test import ( - "sync" "testing" "time" @@ -22,11 +21,8 @@ func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) { chTelemetry := make(chan synchronization.TelemPayload, 10) worker := synchronization.NewTelemetryIngressBatchWorker( uint(maxTelemBatchSize), - time.Millisecond*1, time.Second, mocks.NewTelemClient(t), - &sync.WaitGroup{}, - make(chan struct{}), chTelemetry, "0xa", synchronization.OCR, diff --git a/core/services/synchronization/telemetry_ingress_client.go b/core/services/synchronization/telemetry_ingress_client.go index dc4ced31d09..1ed55bb5468 100644 --- a/core/services/synchronization/telemetry_ingress_client.go +++ b/core/services/synchronization/telemetry_ingress_client.go @@ -4,15 +4,14 @@ import ( "context" "errors" "net/url" - "sync" "sync/atomic" "time" "github.com/smartcontractkit/wsrpc" "github.com/smartcontractkit/wsrpc/examples/simple/keys" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" ) @@ -35,82 +34,59 @@ func (NoopTelemetryIngressClient) Name() string { return "Noop func (NoopTelemetryIngressClient) Ready() error { return nil } type telemetryIngressClient struct { - services.StateMachine + services.Service + eng *services.Engine + url *url.URL ks keystore.CSA serverPubKeyHex string telemClient telemPb.TelemClient logging bool - lggr logger.Logger - wgDone sync.WaitGroup - chDone services.StopChan dropMessageCount atomic.Uint32 chTelemetry chan TelemPayload } // NewTelemetryIngressClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, network string, chainID string) TelemetryService { - return &telemetryIngressClient{ +func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryService { + c := &telemetryIngressClient{ url: url, ks: ks, serverPubKeyHex: serverPubKeyHex, logging: logging, - lggr: lggr.Named("TelemetryIngressClient").Named(network).Named(chainID), chTelemetry: make(chan TelemPayload, telemBufferSize), - chDone: make(services.StopChan), } + c.Service, c.eng = services.Config{ + Name: "TelemetryIngressClient", + Start: c.start, + }.NewServiceEngine(lggr) + return c } // Start connects the wsrpc client to the telemetry ingress server -func (tc *telemetryIngressClient) Start(context.Context) error { - return tc.StartOnce("TelemetryIngressClient", func() error { - privkey, err := tc.getCSAPrivateKey() - if err != nil { - return err - } - - tc.connect(privkey) - - return nil - }) -} - -// Close disconnects the wsrpc client from the ingress server -func (tc *telemetryIngressClient) Close() error { - return tc.StopOnce("TelemetryIngressClient", func() error { - close(tc.chDone) - tc.wgDone.Wait() - return nil - }) -} +func (tc *telemetryIngressClient) start(context.Context) error { + privkey, err := tc.getCSAPrivateKey() + if err != nil { + return err + } -func (tc *telemetryIngressClient) Name() string { - return tc.lggr.Name() -} + tc.connect(privkey) -func (tc *telemetryIngressClient) HealthReport() map[string]error { - return map[string]error{tc.Name(): tc.Healthy()} + return nil } func (tc *telemetryIngressClient) connect(clientPrivKey []byte) { - tc.wgDone.Add(1) - - go func() { - defer tc.wgDone.Done() - ctx, cancel := tc.chDone.NewCtx() - defer cancel() - + tc.eng.Go(func(ctx context.Context) { serverPubKey := keys.FromHex(tc.serverPubKeyHex) - conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr)) + conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.eng)) if err != nil { if ctx.Err() != nil { - tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err) + tc.eng.Warnw("gave up connecting to telemetry endpoint", "err", err) } else { - tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err) - tc.SvcErrBuffer.Append(err) + tc.eng.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err) + tc.eng.EmitHealthErr(err) } return } @@ -126,16 +102,12 @@ func (tc *telemetryIngressClient) connect(clientPrivKey []byte) { tc.handleTelemetry() // Wait for close - <-tc.chDone - }() + <-ctx.Done() + }) } func (tc *telemetryIngressClient) handleTelemetry() { - tc.wgDone.Add(1) - go func() { - defer tc.wgDone.Done() - ctx, cancel := tc.chDone.NewCtx() - defer cancel() + tc.eng.Go(func(ctx context.Context) { for { select { case p := <-tc.chTelemetry: @@ -148,17 +120,17 @@ func (tc *telemetryIngressClient) handleTelemetry() { } _, err := tc.telemClient.Telem(ctx, telemReq) if err != nil { - tc.lggr.Errorf("Could not send telemetry: %v", err) + tc.eng.Errorf("Could not send telemetry: %v", err) continue } if tc.logging { - tc.lggr.Debugw("successfully sent telemetry to ingress server", "contractID", p.ContractID, "telemetry", p.Telemetry) + tc.eng.Debugw("successfully sent telemetry to ingress server", "contractID", p.ContractID, "telemetry", p.Telemetry) } - case <-tc.chDone: + case <-ctx.Done(): return } } - }() + }) } // logBufferFullWithExpBackoff logs messages at @@ -176,7 +148,7 @@ func (tc *telemetryIngressClient) handleTelemetry() { func (tc *telemetryIngressClient) logBufferFullWithExpBackoff(payload TelemPayload) { count := tc.dropMessageCount.Add(1) if count > 0 && (count%100 == 0 || count&(count-1) == 0) { - tc.lggr.Warnw("telemetry ingress client buffer full, dropping message", "telemetry", payload.Telemetry, "droppedCount", count) + tc.eng.Warnw("telemetry ingress client buffer full, dropping message", "telemetry", payload.Telemetry, "droppedCount", count) } } diff --git a/core/services/telemetry/manager.go b/core/services/telemetry/manager.go index a65759a5c62..73a94b4b127 100644 --- a/core/services/telemetry/manager.go +++ b/core/services/telemetry/manager.go @@ -1,29 +1,29 @@ package telemetry import ( - "context" "net/url" "strings" "time" "github.com/pkg/errors" - "go.uber.org/multierr" - "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + common "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/config" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" ) type Manager struct { - services.StateMachine - bufferSize uint - endpoints []*telemetryEndpoint - ks keystore.CSA - lggr logger.Logger + services.Service + eng *services.Engine + + bufferSize uint + endpoints []*telemetryEndpoint + ks keystore.CSA + logging bool maxBatchSize uint sendInterval time.Duration @@ -45,9 +45,7 @@ type telemetryEndpoint struct { func NewManager(cfg config.TelemetryIngress, csaKeyStore keystore.CSA, lggr logger.Logger) *Manager { m := &Manager{ bufferSize: cfg.BufferSize(), - endpoints: nil, ks: csaKeyStore, - lggr: lggr.Named("TelemetryManager"), logging: cfg.Logging(), maxBatchSize: cfg.MaxBatchSize(), sendInterval: cfg.SendInterval(), @@ -55,44 +53,21 @@ func NewManager(cfg config.TelemetryIngress, csaKeyStore keystore.CSA, lggr logg uniConn: cfg.UniConn(), useBatchSend: cfg.UseBatchSend(), } - for _, e := range cfg.Endpoints() { - if err := m.addEndpoint(e); err != nil { - m.lggr.Error(err) - } - } - return m -} - -func (m *Manager) Start(ctx context.Context) error { - return m.StartOnce("TelemetryManager", func() error { - var err error - for _, e := range m.endpoints { - err = multierr.Append(err, e.client.Start(ctx)) - } - return err - }) -} -func (m *Manager) Close() error { - return m.StopOnce("TelemetryManager", func() error { - var err error - for _, e := range m.endpoints { - err = multierr.Append(err, e.client.Close()) - } - return err - }) -} - -func (m *Manager) Name() string { - return m.lggr.Name() -} + m.Service, m.eng = services.Config{ + Name: "TelemetryManager", + NewSubServices: func(lggr common.Logger) (subs []services.Service) { + for _, e := range cfg.Endpoints() { + if sub, err := m.newEndpoint(e, lggr, cfg); err != nil { + lggr.Error(err) + } else { + subs = append(subs, sub) + } + } + return + }, + }.NewServiceEngine(lggr) -func (m *Manager) HealthReport() map[string]error { - hr := map[string]error{m.Name(): m.Healthy()} - - for _, e := range m.endpoints { - services.CopyHealth(hr, e.client.HealthReport()) - } - return hr + return m } // GenMonitoringEndpoint creates a new monitoring endpoints based on the existing available endpoints defined in the core config TOML, if no endpoint for the network and chainID exists, a NOOP agent will be used and the telemetry will not be sent @@ -100,7 +75,7 @@ func (m *Manager) GenMonitoringEndpoint(network string, chainID string, contract e, found := m.getEndpoint(network, chainID) if !found { - m.lggr.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID) + m.eng.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID) return &NoopAgent{} } @@ -111,32 +86,33 @@ func (m *Manager) GenMonitoringEndpoint(network string, chainID string, contract return NewIngressAgent(e.client, network, chainID, contractID, telemType) } -func (m *Manager) addEndpoint(e config.TelemetryIngressEndpoint) error { +func (m *Manager) newEndpoint(e config.TelemetryIngressEndpoint, lggr logger.Logger, cfg config.TelemetryIngress) (services.Service, error) { if e.Network() == "" { - return errors.New("cannot add telemetry endpoint, network cannot be empty") + return nil, errors.New("cannot add telemetry endpoint, network cannot be empty") } if e.ChainID() == "" { - return errors.New("cannot add telemetry endpoint, chainID cannot be empty") + return nil, errors.New("cannot add telemetry endpoint, chainID cannot be empty") } if e.URL() == nil { - return errors.New("cannot add telemetry endpoint, URL cannot be empty") + return nil, errors.New("cannot add telemetry endpoint, URL cannot be empty") } if e.ServerPubKey() == "" { - return errors.New("cannot add telemetry endpoint, ServerPubKey cannot be empty") + return nil, errors.New("cannot add telemetry endpoint, ServerPubKey cannot be empty") } if _, found := m.getEndpoint(e.Network(), e.ChainID()); found { - return errors.Errorf("cannot add telemetry endpoint for network %q and chainID %q, endpoint already exists", e.Network(), e.ChainID()) + return nil, errors.Errorf("cannot add telemetry endpoint for network %q and chainID %q, endpoint already exists", e.Network(), e.ChainID()) } + lggr = logger.Sugared(lggr).Named(e.Network()).Named(e.ChainID()) var tClient synchronization.TelemetryService if m.useBatchSend { - tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, m.maxBatchSize, m.sendInterval, m.sendTimeout, m.uniConn, e.Network(), e.ChainID()) + tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, cfg.Logging(), lggr, cfg.BufferSize(), cfg.MaxBatchSize(), cfg.SendInterval(), cfg.SendTimeout(), cfg.UniConn()) } else { - tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, e.Network(), e.ChainID()) + tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, cfg.Logging(), lggr, cfg.BufferSize()) } te := telemetryEndpoint{ @@ -148,7 +124,7 @@ func (m *Manager) addEndpoint(e config.TelemetryIngressEndpoint) error { } m.endpoints = append(m.endpoints, &te) - return nil + return te.client, nil } func (m *Manager) getEndpoint(network string, chainID string) (*telemetryEndpoint, bool) { diff --git a/core/services/telemetry/manager_test.go b/core/services/telemetry/manager_test.go index 4e55cb75752..fef065b572c 100644 --- a/core/services/telemetry/manager_test.go +++ b/core/services/telemetry/manager_test.go @@ -156,7 +156,7 @@ func TestNewManager(t *testing.T) { require.Equal(t, uint(123), m.bufferSize) require.Equal(t, ks, m.ks) - require.Equal(t, "TelemetryManager", m.lggr.Name()) + require.Equal(t, "TelemetryManager", m.Name()) require.Equal(t, true, m.logging) require.Equal(t, uint(51), m.maxBatchSize) require.Equal(t, time.Millisecond*512, m.sendInterval)