From f4b5c71b100b96292eade09ac232785e9529725d Mon Sep 17 00:00:00 2001 From: Chia Yong Kang Date: Fri, 16 Jun 2023 15:08:48 -0700 Subject: [PATCH] [BCI-1402] Moved Generalised HeadTracker components into common folder (#9510) * Generalised HeadTracker * added mocks * Removed the use of Big.int * added head mocks * Generalised HeadTracker * Updated BlockNumber() return type to big.Int * removed BlockNumberInt64 * updated naming for headNumberInt64 * Updated mocks * fixed pointer comparison of big.Int * fix tests due to big.int changes * moved headbroadcaster to common * moved headlistener * moved HT * merge fixes * added mock files * removed EVM prefix * fixed linting for headlistener * cleanup * fixed merge changes * updated models * updated mock head files * renamed variable * removed eth terminology from generic headtracker * removed headbyhash * fixed merge conflict * fixed merge conflict --- common/headtracker/head_broadcaster.go | 163 ++++++++ common/headtracker/head_listener.go | 219 ++++++++++ common/headtracker/head_tracker.go | 363 +++++++++++++++++ common/headtracker/types/client.go | 1 - core/chains/evm/chain.go | 4 +- .../evm/headtracker/head_broadcaster.go | 167 +------- .../evm/headtracker/head_broadcaster_test.go | 13 +- core/chains/evm/headtracker/head_listener.go | 222 +--------- .../evm/headtracker/head_listener_test.go | 6 +- core/chains/evm/headtracker/head_tracker.go | 382 +----------------- .../evm/headtracker/head_tracker_test.go | 12 +- core/services/vrf/delegate_test.go | 2 +- 12 files changed, 789 insertions(+), 765 deletions(-) create mode 100644 common/headtracker/head_broadcaster.go create mode 100644 common/headtracker/head_listener.go create mode 100644 common/headtracker/head_tracker.go diff --git a/common/headtracker/head_broadcaster.go b/common/headtracker/head_broadcaster.go new file mode 100644 index 00000000000..d31ff4f2008 --- /dev/null +++ b/common/headtracker/head_broadcaster.go @@ -0,0 +1,163 @@ +package headtracker + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + commontypes "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +const TrackableCallbackTimeout = 2 * time.Second + +type callbackSet[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] map[int]commontypes.HeadTrackable[H, BLOCK_HASH] + +func (set callbackSet[H, BLOCK_HASH]) values() []commontypes.HeadTrackable[H, BLOCK_HASH] { + var values []commontypes.HeadTrackable[H, BLOCK_HASH] + for _, callback := range set { + values = append(values, callback) + } + return values +} + +type HeadBroadcaster[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] struct { + logger logger.Logger + callbacks callbackSet[H, BLOCK_HASH] + mailbox *utils.Mailbox[H] + mutex *sync.Mutex + chClose utils.StopChan + wgDone sync.WaitGroup + utils.StartStopOnce + latest H + lastCallbackID int +} + +// NewHeadBroadcaster creates a new HeadBroadcaster +func NewHeadBroadcaster[ + H commontypes.Head[BLOCK_HASH], + BLOCK_HASH commontypes.Hashable, +]( + lggr logger.Logger, +) *HeadBroadcaster[H, BLOCK_HASH] { + return &HeadBroadcaster[H, BLOCK_HASH]{ + logger: lggr.Named("HeadBroadcaster"), + callbacks: make(callbackSet[H, BLOCK_HASH]), + mailbox: utils.NewSingleMailbox[H](), + mutex: &sync.Mutex{}, + chClose: make(chan struct{}), + wgDone: sync.WaitGroup{}, + StartStopOnce: utils.StartStopOnce{}, + } +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Start(context.Context) error { + return hb.StartOnce("HeadBroadcaster", func() error { + hb.wgDone.Add(1) + go hb.run() + return nil + }) +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error { + return hb.StopOnce("HeadBroadcaster", func() error { + hb.mutex.Lock() + // clear all callbacks + hb.callbacks = make(callbackSet[H, BLOCK_HASH]) + hb.mutex.Unlock() + + close(hb.chClose) + hb.wgDone.Wait() + return nil + }) +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Name() string { + return hb.logger.Name() +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error { + return map[string]error{hb.Name(): hb.StartStopOnce.Healthy()} +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) { + hb.mailbox.Deliver(head) +} + +// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed, +// or unsubscribe callback is called explicitly +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback commontypes.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) { + hb.mutex.Lock() + defer hb.mutex.Unlock() + + currentLongestChain = hb.latest + + hb.lastCallbackID++ + callbackID := hb.lastCallbackID + hb.callbacks[callbackID] = callback + unsubscribe = func() { + hb.mutex.Lock() + defer hb.mutex.Unlock() + delete(hb.callbacks, callbackID) + } + + return +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() { + defer hb.wgDone.Done() + + for { + select { + case <-hb.chClose: + return + case <-hb.mailbox.Notify(): + hb.executeCallbacks() + } + } +} + +// DEV: the head relayer makes no promises about head delivery! Subscribing +// Jobs should expect to the relayer to skip heads if there is a large number of listeners +// and all callbacks cannot be completed in the allotted time. +func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() { + head, exists := hb.mailbox.Retrieve() + if !exists { + hb.logger.Info("No head to retrieve. It might have been skipped") + return + } + + hb.mutex.Lock() + callbacks := hb.callbacks.values() + hb.latest = head + hb.mutex.Unlock() + + hb.logger.Debugw("Initiating callbacks", + "headNum", head.BlockNumber(), + "numCallbacks", len(callbacks), + ) + + wg := sync.WaitGroup{} + wg.Add(len(callbacks)) + + ctx, cancel := hb.chClose.NewCtx() + defer cancel() + + for _, callback := range callbacks { + go func(trackable commontypes.HeadTrackable[H, BLOCK_HASH]) { + defer wg.Done() + start := time.Now() + cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout) + defer cancel() + trackable.OnNewLongestChain(cctx, head) + elapsed := time.Since(start) + hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed), + "callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed) + }(callback) + } + + wg.Wait() +} diff --git a/common/headtracker/head_listener.go b/common/headtracker/head_listener.go new file mode 100644 index 00000000000..a2d121911a9 --- /dev/null +++ b/common/headtracker/head_listener.go @@ -0,0 +1,219 @@ +package headtracker + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + promNumHeadsReceived = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_heads_received", + Help: "The total number of heads seen", + }, []string{"ChainID"}) + promEthConnectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_connection_errors", + Help: "The total number of node connection errors", + }, []string{"ChainID"}) +) + +type HeadListener[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID txmgrtypes.ID, + BLOCK_HASH types.Hashable, +] struct { + config htrktypes.Config + client htrktypes.Client[HTH, S, ID, BLOCK_HASH] + logger logger.Logger + chStop utils.StopChan + chHeaders chan HTH + headSubscription types.Subscription + connected atomic.Bool + receivingHeads atomic.Bool +} + +func NewHeadListener[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID txmgrtypes.ID, + BLOCK_HASH types.Hashable, + CLIENT htrktypes.Client[HTH, S, ID, BLOCK_HASH], +]( + lggr logger.Logger, + client CLIENT, + config htrktypes.Config, + chStop chan struct{}, +) *HeadListener[HTH, S, ID, BLOCK_HASH] { + return &HeadListener[HTH, S, ID, BLOCK_HASH]{ + config: config, + client: client, + logger: lggr.Named("HeadListener"), + chStop: chStop, + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Name() string { + return hl.logger.Name() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH], done func()) { + defer done() + defer hl.unsubscribe() + + ctx, cancel := hl.chStop.NewCtx() + defer cancel() + + for { + if !hl.subscribe(ctx) { + break + } + err := hl.receiveHeaders(ctx, handleNewHead) + if ctx.Err() != nil { + break + } else if err != nil { + hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err) + continue + } else { + break + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool { + return hl.receivingHeads.Load() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Connected() bool { + return hl.connected.Load() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { + var err error + if !hl.ReceivingHeads() { + err = errors.New("Listener is not receiving heads") + } + if !hl.Connected() { + err = errors.New("Listener is not connected") + } + return map[string]error{hl.Name(): err} +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH]) error { + var noHeadsAlarmC <-chan time.Time + var noHeadsAlarmT *time.Ticker + noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold() + if noHeadsAlarmDuration > 0 { + noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) + noHeadsAlarmC = noHeadsAlarmT.C + } + + for { + select { + case <-hl.chStop: + return nil + + case blockHeader, open := <-hl.chHeaders: + chainId := hl.client.ConfiguredChainID() + if noHeadsAlarmT != nil { + // We've received a head, reset the no heads alarm + noHeadsAlarmT.Stop() + noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) + noHeadsAlarmC = noHeadsAlarmT.C + } + hl.receivingHeads.Store(true) + if !open { + return errors.New("head listener: chHeaders prematurely closed") + } + if !blockHeader.IsValid() { + hl.logger.Error("got nil block header") + continue + } + + // Compare the chain ID of the block header to the chain ID of the client + if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() { + hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID()) + } + promNumHeadsReceived.WithLabelValues(chainId.String()).Inc() + + err := handleNewHead(ctx, blockHeader) + if ctx.Err() != nil { + return nil + } else if err != nil { + return err + } + + case err, open := <-hl.headSubscription.Err(): + // err can be nil, because of using chainIDSubForwarder + if !open || err == nil { + return errors.New("head listener: subscription Err channel prematurely closed") + } + return err + + case <-noHeadsAlarmC: + // We haven't received a head on the channel for a long time, log a warning + hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration) + hl.receivingHeads.Store(false) + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool { + subscribeRetryBackoff := utils.NewRedialBackoff() + + chainId := hl.client.ConfiguredChainID() + + for { + hl.unsubscribe() + + hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String()) + + select { + case <-hl.chStop: + return false + + case <-time.After(subscribeRetryBackoff.Duration()): + err := hl.subscribeToHead(ctx) + if err != nil { + promEthConnectionErrors.WithLabelValues(chainId.String()).Inc() + hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err) + } else { + hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String()) + return true + } + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error { + hl.chHeaders = make(chan HTH) + + var err error + hl.headSubscription, err = hl.client.SubscribeNewHead(ctx, hl.chHeaders) + if err != nil { + close(hl.chHeaders) + return errors.Wrap(err, "Client#SubscribeNewHead") + } + + hl.connected.Store(true) + + return nil +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() { + if hl.headSubscription != nil { + hl.connected.Store(false) + hl.headSubscription.Unsubscribe() + hl.headSubscription = nil + } +} diff --git a/common/headtracker/head_tracker.go b/common/headtracker/head_tracker.go new file mode 100644 index 00000000000..d0fec8adef9 --- /dev/null +++ b/common/headtracker/head_tracker.go @@ -0,0 +1,363 @@ +package headtracker + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/exp/maps" + + htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + commontypes "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + promCurrentHead = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "head_tracker_current_head", + Help: "The highest seen head number", + }, []string{"evmChainID"}) + + promOldHead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_very_old_head", + Help: "Counter is incremented every time we get a head that is much lower than the highest seen head ('much lower' is defined as a block that is EVM.FinalityDepth or greater below the highest seen head)", + }, []string{"evmChainID"}) +) + +// HeadsBufferSize - The buffer is used when heads sampling is disabled, to ensure the callback is run for every head +const HeadsBufferSize = 10 + +type HeadTracker[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S commontypes.Subscription, + ID txmgrtypes.ID, + BLOCK_HASH commontypes.Hashable, +] struct { + log logger.Logger + headBroadcaster commontypes.HeadBroadcaster[HTH, BLOCK_HASH] + headSaver commontypes.HeadSaver[HTH, BLOCK_HASH] + mailMon *utils.MailboxMonitor + client htrktypes.Client[HTH, S, ID, BLOCK_HASH] + chainID ID + config htrktypes.Config + htConfig htrktypes.HeadTrackerConfig + + backfillMB *utils.Mailbox[HTH] + broadcastMB *utils.Mailbox[HTH] + headListener commontypes.HeadListener[HTH, BLOCK_HASH] + chStop utils.StopChan + wgDone sync.WaitGroup + utils.StartStopOnce + getNilHead func() HTH +} + +// NewHeadTracker instantiates a new HeadTracker using HeadSaver to persist new block numbers. +func NewHeadTracker[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S commontypes.Subscription, + ID txmgrtypes.ID, + BLOCK_HASH commontypes.Hashable, +]( + lggr logger.Logger, + client htrktypes.Client[HTH, S, ID, BLOCK_HASH], + config htrktypes.Config, + htConfig htrktypes.HeadTrackerConfig, + headBroadcaster commontypes.HeadBroadcaster[HTH, BLOCK_HASH], + headSaver commontypes.HeadSaver[HTH, BLOCK_HASH], + mailMon *utils.MailboxMonitor, + getNilHead func() HTH, +) commontypes.HeadTracker[HTH, BLOCK_HASH] { + chStop := make(chan struct{}) + lggr = lggr.Named("HeadTracker") + return &HeadTracker[HTH, S, ID, BLOCK_HASH]{ + headBroadcaster: headBroadcaster, + client: client, + chainID: client.ConfiguredChainID(), + config: config, + htConfig: htConfig, + log: lggr, + backfillMB: utils.NewSingleMailbox[HTH](), + broadcastMB: utils.NewMailbox[HTH](HeadsBufferSize), + chStop: chStop, + headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop), + headSaver: headSaver, + mailMon: mailMon, + getNilHead: getNilHead, + } +} + +// Start starts HeadTracker service. +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error { + return ht.StartOnce("HeadTracker", func() error { + ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID) + latestChain, err := ht.headSaver.Load(ctx) + if err != nil { + return err + } + if latestChain.IsValid() { + ht.log.Debugw( + fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", config.FriendlyNumber(latestChain.BlockNumber()), latestChain.BlockHash()), + "blockNumber", latestChain.BlockNumber(), + "blockHash", latestChain.BlockHash(), + ) + } + + // NOTE: Always try to start the head tracker off with whatever the + // latest head is, without waiting for the subscription to send us one. + // + // In some cases the subscription will send us the most recent head + // anyway when we connect (but we should not rely on this because it is + // not specced). If it happens this is fine, and the head will be + // ignored as a duplicate. + initialHead, err := ht.getInitialHead(ctx) + if err != nil { + if errors.Is(err, ctx.Err()) { + return nil + } + ht.log.Errorw("Error getting initial head", "err", err) + } else if initialHead.IsValid() { + if err := ht.handleNewHead(ctx, initialHead); err != nil { + return errors.Wrap(err, "error handling initial head") + } + } else { + ht.log.Debug("Got nil initial head") + } + + ht.wgDone.Add(3) + go ht.headListener.ListenForNewHeads(ht.handleNewHead, ht.wgDone.Done) + go ht.backfillLoop() + go ht.broadcastLoop() + + ht.mailMon.Monitor(ht.broadcastMB, "HeadTracker", "Broadcast", ht.chainID.String()) + + return nil + }) +} + +// Close stops HeadTracker service. +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Close() error { + return ht.StopOnce("HeadTracker", func() error { + close(ht.chStop) + ht.wgDone.Wait() + return ht.broadcastMB.Close() + }) +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Name() string { + return ht.log.Name() +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { + report := map[string]error{ + ht.Name(): ht.StartStopOnce.Healthy(), + } + maps.Copy(report, ht.headListener.HealthReport()) + return report +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) { + if uint(headWithChain.ChainLength()) >= depth { + return nil + } + + baseHeight := headWithChain.BlockNumber() - int64(depth-1) + if baseHeight < 0 { + baseHeight = 0 + } + + return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight) +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH { + return ht.headSaver.LatestChain() +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) { + head, err := ht.client.HeadByNumber(ctx, nil) + if err != nil { + return ht.getNilHead(), errors.Wrap(err, "failed to fetch initial head") + } + loggerFields := []interface{}{"head", head} + if head.IsValid() { + loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash()) + } + ht.log.Debugw("Got initial head", loggerFields...) + return head, nil +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error { + prevHead := ht.headSaver.LatestChain() + + ht.log.Debugw(fmt.Sprintf("Received new head %v", config.FriendlyNumber(head.BlockNumber())), + "blockHeight", head.BlockNumber(), + "blockHash", head.BlockHash(), + "parentHeadHash", head.GetParentHash(), + ) + + err := ht.headSaver.Save(ctx, head) + if ctx.Err() != nil { + return nil + } else if err != nil { + return errors.Wrapf(err, "failed to save head: %#v", head) + } + + if !prevHead.IsValid() || head.BlockNumber() > prevHead.BlockNumber() { + promCurrentHead.WithLabelValues(ht.chainID.String()).Set(float64(head.BlockNumber())) + + headWithChain := ht.headSaver.Chain(head.BlockHash()) + if !headWithChain.IsValid() { + return errors.Errorf("HeadTracker#handleNewHighestHead headWithChain was unexpectedly nil") + } + ht.backfillMB.Deliver(headWithChain) + ht.broadcastMB.Deliver(headWithChain) + } else if head.BlockNumber() == prevHead.BlockNumber() { + if head.BlockHash() != prevHead.BlockHash() { + ht.log.Debugw("Got duplicate head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockHash()) + } else { + ht.log.Debugw("Head already in the database", "head", head.BlockHash()) + } + } else { + ht.log.Debugw("Got out of order head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockNumber()) + prevUnFinalizedHead := prevHead.BlockNumber() - int64(ht.config.FinalityDepth()) + if head.BlockNumber() < prevUnFinalizedHead { + promOldHead.WithLabelValues(ht.chainID.String()).Inc() + ht.log.Criticalf("Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber()) + ht.SvcErrBuffer.Append(errors.New("got very old block")) + } + } + return nil +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { + defer ht.wgDone.Done() + + samplingInterval := ht.htConfig.SamplingInterval() + if samplingInterval > 0 { + ht.log.Debugf("Head sampling is enabled - sampling interval is set to: %v", samplingInterval) + debounceHead := time.NewTicker(samplingInterval) + defer debounceHead.Stop() + for { + select { + case <-ht.chStop: + return + case <-debounceHead.C: + item := ht.broadcastMB.RetrieveLatestAndClear() + if !item.IsValid() { + continue + } + ht.headBroadcaster.BroadcastNewLongestChain(item) + } + } + } else { + ht.log.Info("Head sampling is disabled - callback will be called on every head") + for { + select { + case <-ht.chStop: + return + case <-ht.broadcastMB.Notify(): + for { + item, exists := ht.broadcastMB.Retrieve() + if !exists { + break + } + ht.headBroadcaster.BroadcastNewLongestChain(item) + } + } + } + } +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { + defer ht.wgDone.Done() + + ctx, cancel := ht.chStop.NewCtx() + defer cancel() + + for { + select { + case <-ht.chStop: + return + case <-ht.backfillMB.Notify(): + for { + head, exists := ht.backfillMB.Retrieve() + if !exists { + break + } + { + err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth())) + if err != nil { + ht.log.Warnw("Unexpected error while backfilling heads", "err", err) + } else if ctx.Err() != nil { + break + } + } + } + } + } +} + +// backfill fetches all missing heads up until the base height +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head commontypes.Head[BLOCK_HASH], baseHeight int64) (err error) { + headBlockNumber := head.BlockNumber() + if headBlockNumber <= baseHeight { + return nil + } + mark := time.Now() + fetched := 0 + l := ht.log.With("blockNumber", headBlockNumber, + "n", headBlockNumber-baseHeight, + "fromBlockHeight", baseHeight, + "toBlockHeight", headBlockNumber-1) + l.Debug("Starting backfill") + defer func() { + if ctx.Err() != nil { + l.Warnw("Backfill context error", "err", ctx.Err()) + return + } + l.Debugw("Finished backfill", + "fetched", fetched, + "time", time.Since(mark), + "err", err) + }() + + for i := head.BlockNumber() - 1; i >= baseHeight; i-- { + // NOTE: Sequential requests here mean it's a potential performance bottleneck, be aware! + existingHead := ht.headSaver.Chain(head.GetParentHash()) + if existingHead.IsValid() { + head = existingHead + continue + } + head, err = ht.fetchAndSaveHead(ctx, i) + fetched++ + if ctx.Err() != nil { + ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err()) + break + } else if err != nil { + return errors.Wrap(err, "fetchAndSaveHead failed") + } + } + return +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) fetchAndSaveHead(ctx context.Context, n int64) (HTH, error) { + ht.log.Debugw("Fetching head", "blockHeight", n) + head, err := ht.client.HeadByNumber(ctx, big.NewInt(n)) + if err != nil { + return ht.getNilHead(), err + } else if !head.IsValid() { + return ht.getNilHead(), errors.New("got nil head") + } + err = ht.headSaver.Save(ctx, head) + if err != nil { + return ht.getNilHead(), err + } + return head, nil +} diff --git a/common/headtracker/types/client.go b/common/headtracker/types/client.go index 48648144fb6..3c49dc8cacc 100644 --- a/common/headtracker/types/client.go +++ b/common/headtracker/types/client.go @@ -10,7 +10,6 @@ import ( type Client[H commontypes.Head[BLOCK_HASH], S commontypes.Subscription, ID txmgrtypes.ID, BLOCK_HASH commontypes.Hashable] interface { HeadByNumber(ctx context.Context, number *big.Int) (head H, err error) - HeadByHash(ctx context.Context, hash BLOCK_HASH) (head H, err error) // ConfiguredChainID returns the chain ID that the node is configured to connect to ConfiguredChainID() (id ID) // SubscribeNewHead is the method in which the client receives new Head. diff --git a/core/chains/evm/chain.go b/core/chains/evm/chain.go index 6921af7e61b..07d192d36c2 100644 --- a/core/chains/evm/chain.go +++ b/core/chains/evm/chain.go @@ -102,7 +102,7 @@ func newChain(ctx context.Context, cfg evmconfig.ChainScopedConfig, nodes []*v2. } db := opts.DB - headBroadcaster := headtracker.NewEVMHeadBroadcaster(l) + headBroadcaster := headtracker.NewHeadBroadcaster(l) headSaver := headtracker.NullSaver var headTracker httypes.HeadTracker if !cfg.EVMRPCEnabled() { @@ -110,7 +110,7 @@ func newChain(ctx context.Context, cfg evmconfig.ChainScopedConfig, nodes []*v2. } else if opts.GenHeadTracker == nil { orm := headtracker.NewORM(db, l, cfg.Database(), *chainID) headSaver = headtracker.NewHeadSaver(l, orm, cfg, cfg.EVM().HeadTracker()) - headTracker = headtracker.NewEVMHeadTracker(l, client, headtracker.NewWrappedConfig(cfg), cfg.EVM().HeadTracker(), headBroadcaster, headSaver, opts.MailMon) + headTracker = headtracker.NewHeadTracker(l, client, headtracker.NewWrappedConfig(cfg), cfg.EVM().HeadTracker(), headBroadcaster, headSaver, opts.MailMon) } else { headTracker = opts.GenHeadTracker(chainID, headBroadcaster) } diff --git a/core/chains/evm/headtracker/head_broadcaster.go b/core/chains/evm/headtracker/head_broadcaster.go index d4c397a2860..b47fbc2726f 100644 --- a/core/chains/evm/headtracker/head_broadcaster.go +++ b/core/chains/evm/headtracker/head_broadcaster.go @@ -1,175 +1,20 @@ package headtracker import ( - "context" - "fmt" - "reflect" - "sync" - "time" - "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink/v2/common/headtracker" commontypes "github.com/smartcontractkit/chainlink/v2/common/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) -const TrackableCallbackTimeout = 2 * time.Second - -type callbackSet[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] map[int]commontypes.HeadTrackable[H, BLOCK_HASH] +type headBroadcaster = headtracker.HeadBroadcaster[*evmtypes.Head, common.Hash] -func (set callbackSet[H, BLOCK_HASH]) values() []commontypes.HeadTrackable[H, BLOCK_HASH] { - var values []commontypes.HeadTrackable[H, BLOCK_HASH] - for _, callback := range set { - values = append(values, callback) - } - return values -} - -type headBroadcaster[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] struct { - logger logger.Logger - callbacks callbackSet[H, BLOCK_HASH] - mailbox *utils.Mailbox[H] - mutex *sync.Mutex - chClose utils.StopChan - wgDone sync.WaitGroup - utils.StartStopOnce - latest H - lastCallbackID int -} -type evmHeadBroadcaster = headBroadcaster[*evmtypes.Head, common.Hash] - -var _ commontypes.HeadBroadcaster[*evmtypes.Head, common.Hash] = &evmHeadBroadcaster{} - -// NewHeadBroadcaster creates a new HeadBroadcaster -func NewHeadBroadcaster[ - H commontypes.Head[BLOCK_HASH], - BLOCK_HASH commontypes.Hashable, -]( - lggr logger.Logger, -) *headBroadcaster[H, BLOCK_HASH] { - return &headBroadcaster[H, BLOCK_HASH]{ - logger: lggr.Named("HeadBroadcaster"), - callbacks: make(callbackSet[H, BLOCK_HASH]), - mailbox: utils.NewSingleMailbox[H](), - mutex: &sync.Mutex{}, - chClose: make(chan struct{}), - wgDone: sync.WaitGroup{}, - StartStopOnce: utils.StartStopOnce{}, - } -} +var _ commontypes.HeadBroadcaster[*evmtypes.Head, common.Hash] = &headBroadcaster{} -func NewEVMHeadBroadcaster( +func NewHeadBroadcaster( lggr logger.Logger, -) *evmHeadBroadcaster { - return NewHeadBroadcaster[*evmtypes.Head, common.Hash](lggr) -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) Start(context.Context) error { - return hb.StartOnce("HeadBroadcaster", func() error { - hb.wgDone.Add(1) - go hb.run() - return nil - }) -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) Close() error { - return hb.StopOnce("HeadBroadcaster", func() error { - hb.mutex.Lock() - // clear all callbacks - hb.callbacks = make(callbackSet[H, BLOCK_HASH]) - hb.mutex.Unlock() - - close(hb.chClose) - hb.wgDone.Wait() - return nil - }) -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) Name() string { - return hb.logger.Name() -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error { - return map[string]error{hb.Name(): hb.StartStopOnce.Healthy()} -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) { - hb.mailbox.Deliver(head) -} - -// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed, -// or unsubscribe callback is called explicitly -func (hb *headBroadcaster[H, BLOCK_HASH]) Subscribe(callback commontypes.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) { - hb.mutex.Lock() - defer hb.mutex.Unlock() - - currentLongestChain = hb.latest - - hb.lastCallbackID++ - callbackID := hb.lastCallbackID - hb.callbacks[callbackID] = callback - unsubscribe = func() { - hb.mutex.Lock() - defer hb.mutex.Unlock() - delete(hb.callbacks, callbackID) - } - - return -} - -func (hb *headBroadcaster[H, BLOCK_HASH]) run() { - defer hb.wgDone.Done() - - for { - select { - case <-hb.chClose: - return - case <-hb.mailbox.Notify(): - hb.executeCallbacks() - } - } -} - -// DEV: the head relayer makes no promises about head delivery! Subscribing -// Jobs should expect to the relayer to skip heads if there is a large number of listeners -// and all callbacks cannot be completed in the allotted time. -func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() { - head, exists := hb.mailbox.Retrieve() - if !exists { - hb.logger.Info("No head to retrieve. It might have been skipped") - return - } - - hb.mutex.Lock() - callbacks := hb.callbacks.values() - hb.latest = head - hb.mutex.Unlock() - - hb.logger.Debugw("Initiating callbacks", - "headNum", head.BlockNumber(), - "numCallbacks", len(callbacks), - ) - - wg := sync.WaitGroup{} - wg.Add(len(callbacks)) - - ctx, cancel := hb.chClose.NewCtx() - defer cancel() - - for _, callback := range callbacks { - go func(trackable commontypes.HeadTrackable[H, BLOCK_HASH]) { - defer wg.Done() - start := time.Now() - cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout) - defer cancel() - trackable.OnNewLongestChain(cctx, head) - elapsed := time.Since(start) - hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed), - "callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed) - }(callback) - } - - wg.Wait() +) *headBroadcaster { + return headtracker.NewHeadBroadcaster[*evmtypes.Head, common.Hash](lggr) } diff --git a/core/chains/evm/headtracker/head_broadcaster_test.go b/core/chains/evm/headtracker/head_broadcaster_test.go index 9bf8b330ba0..98bb2f5fc9b 100644 --- a/core/chains/evm/headtracker/head_broadcaster_test.go +++ b/core/chains/evm/headtracker/head_broadcaster_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + commonhtrk "github.com/smartcontractkit/chainlink/v2/common/headtracker" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" @@ -66,11 +67,11 @@ func TestHeadBroadcaster_Subscribe(t *testing.T) { checker1 := &cltest.MockHeadTrackable{} checker2 := &cltest.MockHeadTrackable{} - hb := headtracker.NewEVMHeadBroadcaster(logger) + hb := headtracker.NewHeadBroadcaster(logger) orm := headtracker.NewORM(db, logger, cfg.Database(), *ethClient.ConfiguredChainID()) hs := headtracker.NewHeadSaver(logger, orm, evmCfg, evmCfg.EVM().HeadTracker()) mailMon := utils.NewMailboxMonitor(t.Name()) - ht := headtracker.NewEVMHeadTracker(logger, ethClient, headtracker.NewWrappedConfig(evmCfg), evmCfg.EVM().HeadTracker(), hb, hs, mailMon) + ht := headtracker.NewHeadTracker(logger, ethClient, headtracker.NewWrappedConfig(evmCfg), evmCfg.EVM().HeadTracker(), hb, hs, mailMon) var ms services.MultiStart require.NoError(t, ms.Start(testutils.Context(t), mailMon, hb, ht)) t.Cleanup(func() { require.NoError(t, services.CloseAll(mailMon, hb, ht)) }) @@ -100,7 +101,7 @@ func TestHeadBroadcaster_BroadcastNewLongestChain(t *testing.T) { g := gomega.NewWithT(t) lggr := logger.TestLogger(t) - broadcaster := headtracker.NewEVMHeadBroadcaster(lggr) + broadcaster := headtracker.NewHeadBroadcaster(lggr) err := broadcaster.Start(testutils.Context(t)) require.NoError(t, err) @@ -142,7 +143,7 @@ func TestHeadBroadcaster_TrackableCallbackTimeout(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) - broadcaster := headtracker.NewEVMHeadBroadcaster(lggr) + broadcaster := headtracker.NewHeadBroadcaster(lggr) err := broadcaster.Start(testutils.Context(t)) require.NoError(t, err) @@ -151,8 +152,8 @@ func TestHeadBroadcaster_TrackableCallbackTimeout(t *testing.T) { slowAwaiter := cltest.NewAwaiter() fastAwaiter := cltest.NewAwaiter() - slow := &sleepySubscriber{awaiter: slowAwaiter, delay: headtracker.TrackableCallbackTimeout * 2} - fast := &sleepySubscriber{awaiter: fastAwaiter, delay: headtracker.TrackableCallbackTimeout / 2} + slow := &sleepySubscriber{awaiter: slowAwaiter, delay: commonhtrk.TrackableCallbackTimeout * 2} + fast := &sleepySubscriber{awaiter: fastAwaiter, delay: commonhtrk.TrackableCallbackTimeout / 2} _, unsubscribe1 := broadcaster.Subscribe(slow) _, unsubscribe2 := broadcaster.Subscribe(fast) diff --git a/core/chains/evm/headtracker/head_listener.go b/core/chains/evm/headtracker/head_listener.go index c15ecd7b5ed..a311515b824 100644 --- a/core/chains/evm/headtracker/head_listener.go +++ b/core/chains/evm/headtracker/head_listener.go @@ -1,240 +1,30 @@ package headtracker import ( - "context" "math/big" - "sync/atomic" - "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" - txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/headtracker" commontypes "github.com/smartcontractkit/chainlink/v2/common/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) -var ( - promNumHeadsReceived = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "head_tracker_heads_received", - Help: "The total number of heads seen", - }, []string{"evmChainID"}) - promEthConnectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "head_tracker_eth_connection_errors", - Help: "The total number of eth node connection errors", - }, []string{"evmChainID"}) -) - -type headListener[ - HTH htrktypes.Head[BLOCK_HASH, ID], - S commontypes.Subscription, - ID txmgrtypes.ID, - BLOCK_HASH commontypes.Hashable, -] struct { - config htrktypes.Config - client htrktypes.Client[HTH, S, ID, BLOCK_HASH] - logger logger.Logger - chStop utils.StopChan - chHeaders chan HTH - headSubscription commontypes.Subscription - connected atomic.Bool - receivingHeads atomic.Bool -} +type headListener = headtracker.HeadListener[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] -type evmHeadListener = headListener[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] +var _ commontypes.HeadListener[*evmtypes.Head, common.Hash] = (*headListener)(nil) -var _ commontypes.HeadListener[*evmtypes.Head, common.Hash] = &evmHeadListener{} - -func NewHeadListener[ - HTH htrktypes.Head[BLOCK_HASH, ID], - S commontypes.Subscription, - ID txmgrtypes.ID, - BLOCK_HASH commontypes.Hashable, - CLIENT htrktypes.Client[HTH, S, ID, BLOCK_HASH], -]( - lggr logger.Logger, - client CLIENT, - config htrktypes.Config, - chStop chan struct{}, -) *headListener[HTH, S, ID, BLOCK_HASH] { - return &headListener[HTH, S, ID, BLOCK_HASH]{ - config: config, - client: client, - logger: lggr.Named("HeadListener"), - chStop: chStop, - } -} - -func NewEVMHeadListener( +func NewHeadListener( lggr logger.Logger, ethClient evmclient.Client, config Config, chStop chan struct{}, -) *evmHeadListener { +) *headListener { wrappedConfig := NewWrappedConfig(config) - return NewHeadListener[ + return headtracker.NewHeadListener[ *evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash, ](lggr, ethClient, wrappedConfig, chStop) } - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Name() string { - return hl.logger.Name() -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead commontypes.NewHeadHandler[HTH, BLOCK_HASH], done func()) { - defer done() - defer hl.unsubscribe() - - ctx, cancel := hl.chStop.NewCtx() - defer cancel() - - for { - if !hl.subscribe(ctx) { - break - } - err := hl.receiveHeaders(ctx, handleNewHead) - if ctx.Err() != nil { - break - } else if err != nil { - hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err) - continue - } else { - break - } - } -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool { - return hl.receivingHeads.Load() -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Connected() bool { - return hl.connected.Load() -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { - var err error - if !hl.ReceivingHeads() { - err = errors.New("Listener is not receiving heads") - } - if !hl.Connected() { - err = errors.New("Listener is not connected") - } - return map[string]error{hl.Name(): err} -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead commontypes.NewHeadHandler[HTH, BLOCK_HASH]) error { - var noHeadsAlarmC <-chan time.Time - var noHeadsAlarmT *time.Ticker - noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold() - if noHeadsAlarmDuration > 0 { - noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) - noHeadsAlarmC = noHeadsAlarmT.C - } - - for { - select { - case <-hl.chStop: - return nil - - case blockHeader, open := <-hl.chHeaders: - chainId := hl.client.ConfiguredChainID() - if noHeadsAlarmT != nil { - // We've received a head, reset the no heads alarm - noHeadsAlarmT.Stop() - noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) - noHeadsAlarmC = noHeadsAlarmT.C - } - hl.receivingHeads.Store(true) - if !open { - return errors.New("head listener: chHeaders prematurely closed") - } - if !blockHeader.IsValid() { - hl.logger.Error("got nil block header") - continue - } - - // Compare the chain ID of the block header to the chain ID of the client - if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() { - hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID()) - } - promNumHeadsReceived.WithLabelValues(chainId.String()).Inc() - - err := handleNewHead(ctx, blockHeader) - if ctx.Err() != nil { - return nil - } else if err != nil { - return err - } - - case err, open := <-hl.headSubscription.Err(): - // err can be nil, because of using chainIDSubForwarder - if !open || err == nil { - return errors.New("head listener: subscription Err channel prematurely closed") - } - return err - - case <-noHeadsAlarmC: - // We haven't received a head on the channel for a long time, log a warning - hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration) - hl.receivingHeads.Store(false) - } - } -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool { - subscribeRetryBackoff := utils.NewRedialBackoff() - - chainId := hl.client.ConfiguredChainID() - - for { - hl.unsubscribe() - - hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String()) - - select { - case <-hl.chStop: - return false - - case <-time.After(subscribeRetryBackoff.Duration()): - err := hl.subscribeToHead(ctx) - if err != nil { - promEthConnectionErrors.WithLabelValues(chainId.String()).Inc() - hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err) - } else { - hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String()) - return true - } - } - } -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error { - hl.chHeaders = make(chan HTH) - - var err error - hl.headSubscription, err = hl.client.SubscribeNewHead(ctx, hl.chHeaders) - if err != nil { - close(hl.chHeaders) - return errors.Wrap(err, "EthClient#SubscribeNewHead") - } - - hl.connected.Store(true) - - return nil -} - -func (hl *headListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() { - if hl.headSubscription != nil { - hl.connected.Store(false) - hl.headSubscription.Unsubscribe() - hl.headSubscription = nil - } -} diff --git a/core/chains/evm/headtracker/head_listener_test.go b/core/chains/evm/headtracker/head_listener_test.go index f681ee65a45..6204a75d9fc 100644 --- a/core/chains/evm/headtracker/head_listener_test.go +++ b/core/chains/evm/headtracker/head_listener_test.go @@ -43,7 +43,7 @@ func Test_HeadListener_HappyPath(t *testing.T) { }) evmcfg := evmtest.NewChainScopedConfig(t, cfg) chStop := make(chan struct{}) - hl := headtracker.NewEVMHeadListener(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) + hl := headtracker.NewHeadListener(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) var headCount atomic.Int32 handler := func(context.Context, *evmtypes.Head) error { @@ -105,7 +105,7 @@ func Test_HeadListener_NotReceivingHeads(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, cfg) evmcfg.BlockEmissionIdleWarningThreshold() chStop := make(chan struct{}) - hl := headtracker.NewEVMHeadListener(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) + hl := headtracker.NewHeadListener(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) firstHeadAwaiter := cltest.NewAwaiter() handler := func(context.Context, *evmtypes.Head) error { @@ -168,7 +168,7 @@ func Test_HeadListener_SubscriptionErr(t *testing.T) { cfg := configtest.NewGeneralConfig(t, nil) evmcfg := evmtest.NewChainScopedConfig(t, cfg) chStop := make(chan struct{}) - hl := headtracker.NewEVMHeadListener(l, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) + hl := headtracker.NewHeadListener(l, ethClient, headtracker.NewWrappedConfig(evmcfg), chStop) hnhCalled := make(chan *evmtypes.Head) hnh := func(_ context.Context, header *evmtypes.Head) error { diff --git a/core/chains/evm/headtracker/head_tracker.go b/core/chains/evm/headtracker/head_tracker.go index 92a5c083352..482bebc28e0 100644 --- a/core/chains/evm/headtracker/head_tracker.go +++ b/core/chains/evm/headtracker/head_tracker.go @@ -2,109 +2,26 @@ package headtracker import ( "context" - "fmt" "math/big" - "sync" - "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/zap/zapcore" - "golang.org/x/exp/maps" - htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" - txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/headtracker" commontypes "github.com/smartcontractkit/chainlink/v2/common/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) -var ( - promCurrentHead = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "head_tracker_current_head", - Help: "The highest seen head number", - }, []string{"evmChainID"}) +type headTracker = headtracker.HeadTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] - promOldHead = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "head_tracker_very_old_head", - Help: "Counter is incremented every time we get a head that is much lower than the highest seen head ('much lower' is defined as a block that is EVM.FinalityDepth or greater below the highest seen head)", - }, []string{"evmChainID"}) -) - -// HeadsBufferSize - The buffer is used when heads sampling is disabled, to ensure the callback is run for every head -const HeadsBufferSize = 10 - -type headTracker[ - HTH htrktypes.Head[BLOCK_HASH, ID], - S commontypes.Subscription, - ID txmgrtypes.ID, - BLOCK_HASH commontypes.Hashable, -] struct { - log logger.Logger - headBroadcaster commontypes.HeadBroadcaster[HTH, BLOCK_HASH] - headSaver commontypes.HeadSaver[HTH, BLOCK_HASH] - mailMon *utils.MailboxMonitor - ethClient htrktypes.Client[HTH, S, ID, BLOCK_HASH] - chainID ID - config htrktypes.Config - htConfig htrktypes.HeadTrackerConfig - - backfillMB *utils.Mailbox[HTH] - broadcastMB *utils.Mailbox[HTH] - headListener commontypes.HeadListener[HTH, BLOCK_HASH] - chStop utils.StopChan - wgDone sync.WaitGroup - utils.StartStopOnce - getNilHead func() HTH -} - -type evmHeadTracker = headTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] - -var _ commontypes.HeadTracker[*evmtypes.Head, common.Hash] = (*evmHeadTracker)(nil) +var _ commontypes.HeadTracker[*evmtypes.Head, common.Hash] = (*headTracker)(nil) -// NewHeadTracker instantiates a new HeadTracker using HeadSaver to persist new block numbers. -func NewHeadTracker[ - HTH htrktypes.Head[BLOCK_HASH, ID], - S commontypes.Subscription, - ID txmgrtypes.ID, - BLOCK_HASH commontypes.Hashable, -]( - lggr logger.Logger, - client htrktypes.Client[HTH, S, ID, BLOCK_HASH], - config htrktypes.Config, - htConfig htrktypes.HeadTrackerConfig, - headBroadcaster commontypes.HeadBroadcaster[HTH, BLOCK_HASH], - headSaver commontypes.HeadSaver[HTH, BLOCK_HASH], - mailMon *utils.MailboxMonitor, - getNilHead func() HTH, -) commontypes.HeadTracker[HTH, BLOCK_HASH] { - chStop := make(chan struct{}) - lggr = lggr.Named("HeadTracker") - return &headTracker[HTH, S, ID, BLOCK_HASH]{ - headBroadcaster: headBroadcaster, - ethClient: client, - chainID: client.ConfiguredChainID(), - config: config, - htConfig: htConfig, - log: lggr, - backfillMB: utils.NewSingleMailbox[HTH](), - broadcastMB: utils.NewMailbox[HTH](HeadsBufferSize), - chStop: chStop, - headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop), - headSaver: headSaver, - mailMon: mailMon, - getNilHead: getNilHead, - } -} - -func NewEVMHeadTracker( +func NewHeadTracker( lggr logger.Logger, ethClient evmclient.Client, config Config, @@ -113,289 +30,16 @@ func NewEVMHeadTracker( headSaver httypes.HeadSaver, mailMon *utils.MailboxMonitor, ) httypes.HeadTracker { - chStop := make(chan struct{}) - lggr = lggr.Named("HeadTracker") - return &evmHeadTracker{ - headBroadcaster: headBroadcaster, - ethClient: ethClient, - chainID: ethClient.ConfiguredChainID(), - config: NewWrappedConfig(config), - htConfig: htConfig, - log: lggr, - backfillMB: utils.NewSingleMailbox[*evmtypes.Head](), - broadcastMB: utils.NewMailbox[*evmtypes.Head](HeadsBufferSize), - chStop: chStop, - headListener: NewEVMHeadListener(lggr, ethClient, config, chStop), - headSaver: headSaver, - mailMon: mailMon, - getNilHead: func() *evmtypes.Head { return nil }, - } -} - -// Start starts HeadTracker service. -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error { - return ht.StartOnce("HeadTracker", func() error { - ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID) - latestChain, err := ht.headSaver.Load(ctx) - if err != nil { - return err - } - if latestChain.IsValid() { - ht.log.Debugw( - fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", config.FriendlyNumber(latestChain.BlockNumber()), latestChain.BlockHash()), - "blockNumber", latestChain.BlockNumber(), - "blockHash", latestChain.BlockHash(), - ) - } - - // NOTE: Always try to start the head tracker off with whatever the - // latest head is, without waiting for the subscription to send us one. - // - // In some cases the subscription will send us the most recent head - // anyway when we connect (but we should not rely on this because it is - // not specced). If it happens this is fine, and the head will be - // ignored as a duplicate. - initialHead, err := ht.getInitialHead(ctx) - if err != nil { - if errors.Is(err, ctx.Err()) { - return nil - } - ht.log.Errorw("Error getting initial head", "err", err) - } else if initialHead.IsValid() { - if err := ht.handleNewHead(ctx, initialHead); err != nil { - return errors.Wrap(err, "error handling initial head") - } - } else { - ht.log.Debug("Got nil initial head") - } - - ht.wgDone.Add(3) - go ht.headListener.ListenForNewHeads(ht.handleNewHead, ht.wgDone.Done) - go ht.backfillLoop() - go ht.broadcastLoop() - - ht.mailMon.Monitor(ht.broadcastMB, "HeadTracker", "Broadcast", ht.chainID.String()) - - return nil - }) -} - -// Close stops HeadTracker service. -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Close() error { - return ht.StopOnce("HeadTracker", func() error { - close(ht.chStop) - ht.wgDone.Wait() - return ht.broadcastMB.Close() - }) -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Name() string { - return ht.log.Name() -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { - report := map[string]error{ - ht.Name(): ht.StartStopOnce.Healthy(), - } - maps.Copy(report, ht.headListener.HealthReport()) - return report -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) { - if uint(headWithChain.ChainLength()) >= depth { - return nil - } - - baseHeight := headWithChain.BlockNumber() - int64(depth-1) - if baseHeight < 0 { - baseHeight = 0 - } - - return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight) -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH { - return ht.headSaver.LatestChain() -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) { - head, err := ht.ethClient.HeadByNumber(ctx, nil) - if err != nil { - return ht.getNilHead(), errors.Wrap(err, "failed to fetch initial head") - } - loggerFields := []interface{}{"head", head} - if head.IsValid() { - loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash()) - } - ht.log.Debugw("Got initial head", loggerFields...) - return head, nil -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error { - prevHead := ht.headSaver.LatestChain() - - ht.log.Debugw(fmt.Sprintf("Received new head %v", config.FriendlyNumber(head.BlockNumber())), - "blockHeight", head, - "blockHash", head.BlockHash(), - "parentHeadHash", head.GetParentHash(), + return headtracker.NewHeadTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash]( + lggr, + ethClient, + NewWrappedConfig(config), + htConfig, + headBroadcaster, + headSaver, + mailMon, + func() *evmtypes.Head { return nil }, ) - - err := ht.headSaver.Save(ctx, head) - if ctx.Err() != nil { - return nil - } else if err != nil { - return errors.Wrapf(err, "failed to save head: %#v", head) - } - - if !prevHead.IsValid() || head.BlockNumber() > prevHead.BlockNumber() { - promCurrentHead.WithLabelValues(ht.chainID.String()).Set(float64(head.BlockNumber())) - - headWithChain := ht.headSaver.Chain(head.BlockHash()) - if !headWithChain.IsValid() { - return errors.Errorf("HeadTracker#handleNewHighestHead headWithChain was unexpectedly nil") - } - ht.backfillMB.Deliver(headWithChain) - ht.broadcastMB.Deliver(headWithChain) - } else if head.BlockNumber() == prevHead.BlockNumber() { - if head.BlockHash() != prevHead.BlockHash() { - ht.log.Debugw("Got duplicate head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockHash()) - } else { - ht.log.Debugw("Head already in the database", "head", head.BlockHash()) - } - } else { - ht.log.Debugw("Got out of order head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockNumber()) - if head.BlockNumber() < prevHead.BlockNumber()-int64(ht.config.FinalityDepth()) { - promOldHead.WithLabelValues(ht.chainID.String()).Inc() - ht.log.Criticalf("Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber()) - ht.SvcErrBuffer.Append(errors.New("got very old block")) - } - } - return nil -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { - defer ht.wgDone.Done() - - samplingInterval := ht.htConfig.SamplingInterval() - if samplingInterval > 0 { - ht.log.Debugf("Head sampling is enabled - sampling interval is set to: %v", samplingInterval) - debounceHead := time.NewTicker(samplingInterval) - defer debounceHead.Stop() - for { - select { - case <-ht.chStop: - return - case <-debounceHead.C: - item := ht.broadcastMB.RetrieveLatestAndClear() - if !item.IsValid() { - continue - } - ht.headBroadcaster.BroadcastNewLongestChain(item) - } - } - } else { - ht.log.Info("Head sampling is disabled - callback will be called on every head") - for { - select { - case <-ht.chStop: - return - case <-ht.broadcastMB.Notify(): - for { - item, exists := ht.broadcastMB.Retrieve() - if !exists { - break - } - ht.headBroadcaster.BroadcastNewLongestChain(item) - } - } - } - } -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { - defer ht.wgDone.Done() - - ctx, cancel := ht.chStop.NewCtx() - defer cancel() - - for { - select { - case <-ht.chStop: - return - case <-ht.backfillMB.Notify(): - for { - head, exists := ht.backfillMB.Retrieve() - if !exists { - break - } - { - err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth())) - if err != nil { - ht.log.Warnw("Unexpected error while backfilling heads", "err", err) - } else if ctx.Err() != nil { - break - } - } - } - } - } -} - -// backfill fetches all missing heads up until the base height -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head commontypes.Head[BLOCK_HASH], baseHeight int64) (err error) { - if head.BlockNumber() <= baseHeight { - return nil - } - mark := time.Now() - fetched := 0 - l := ht.log.With("blockNumber", head.BlockNumber(), - "n", head.BlockNumber()-baseHeight, - "fromBlockHeight", baseHeight, - "toBlockHeight", head.BlockNumber()-1) - l.Debug("Starting backfill") - defer func() { - if ctx.Err() != nil { - l.Warnw("Backfill context error", "err", ctx.Err()) - return - } - l.Debugw("Finished backfill", - "fetched", fetched, - "time", time.Since(mark), - "err", err) - }() - - for i := head.BlockNumber() - 1; i >= baseHeight; i-- { - // NOTE: Sequential requests here mean it's a potential performance bottleneck, be aware! - existingHead := ht.headSaver.Chain(head.GetParentHash()) - if existingHead.IsValid() { - head = existingHead - continue - } - head, err = ht.fetchAndSaveHead(ctx, i) - fetched++ - if ctx.Err() != nil { - ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err()) - break - } else if err != nil { - return errors.Wrap(err, "fetchAndSaveHead failed") - } - } - return -} - -func (ht *headTracker[HTH, S, ID, BLOCK_HASH]) fetchAndSaveHead(ctx context.Context, n int64) (HTH, error) { - ht.log.Debugw("Fetching head", "blockHeight", n) - head, err := ht.ethClient.HeadByNumber(ctx, big.NewInt(n)) - if err != nil { - return ht.getNilHead(), err - } else if !head.IsValid() { - return ht.getNilHead(), errors.New("got nil head") - } - err = ht.headSaver.Save(ctx, head) - if err != nil { - return ht.getNilHead(), err - } - return head, nil } var NullTracker httypes.HeadTracker = &nullTracker{} diff --git a/core/chains/evm/headtracker/head_tracker_test.go b/core/chains/evm/headtracker/head_tracker_test.go index d49d97c9771..ca8141d98fe 100644 --- a/core/chains/evm/headtracker/head_tracker_test.go +++ b/core/chains/evm/headtracker/head_tracker_test.go @@ -970,12 +970,12 @@ func TestHeadTracker_Backfill(t *testing.T) { func createHeadTracker(t *testing.T, ethClient evmclient.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse { lggr := logger.TestLogger(t) - hb := headtracker.NewEVMHeadBroadcaster(lggr) + hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) mailMon := utils.NewMailboxMonitor(t.Name()) return &headTrackerUniverse{ mu: new(sync.Mutex), - headTracker: headtracker.NewEVMHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(config), htConfig, hb, hs, mailMon), + headTracker: headtracker.NewHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(config), htConfig, hb, hs, mailMon), headBroadcaster: hb, headSaver: hs, mailMon: mailMon, @@ -985,10 +985,10 @@ func createHeadTracker(t *testing.T, ethClient evmclient.Client, config headtrac func createHeadTrackerWithNeverSleeper(t *testing.T, ethClient evmclient.Client, cfg chainlink.GeneralConfig, orm headtracker.ORM) *headTrackerUniverse { evmcfg := evmtest.NewChainScopedConfig(t, cfg) lggr := logger.TestLogger(t) - hb := headtracker.NewEVMHeadBroadcaster(lggr) + hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, evmcfg, evmcfg.EVM().HeadTracker()) mailMon := utils.NewMailboxMonitor(t.Name()) - ht := headtracker.NewEVMHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), evmcfg.EVM().HeadTracker(), hb, hs, mailMon) + ht := headtracker.NewHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(evmcfg), evmcfg.EVM().HeadTracker(), hb, hs, mailMon) _, err := hs.Load(testutils.Context(t)) require.NoError(t, err) return &headTrackerUniverse{ @@ -1002,11 +1002,11 @@ func createHeadTrackerWithNeverSleeper(t *testing.T, ethClient evmclient.Client, func createHeadTrackerWithChecker(t *testing.T, ethClient evmclient.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM, checker httypes.HeadTrackable) *headTrackerUniverse { lggr := logger.TestLogger(t) - hb := headtracker.NewEVMHeadBroadcaster(lggr) + hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) hb.Subscribe(checker) mailMon := utils.NewMailboxMonitor(t.Name()) - ht := headtracker.NewEVMHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(config), htConfig, hb, hs, mailMon) + ht := headtracker.NewHeadTracker(lggr, ethClient, headtracker.NewWrappedConfig(config), htConfig, hb, hs, mailMon) return &headTrackerUniverse{ mu: new(sync.Mutex), headTracker: ht, diff --git a/core/services/vrf/delegate_test.go b/core/services/vrf/delegate_test.go index 4e423eb3177..dac06fa6be3 100644 --- a/core/services/vrf/delegate_test.go +++ b/core/services/vrf/delegate_test.go @@ -67,7 +67,7 @@ func buildVrfUni(t *testing.T, db *sqlx.DB, cfg chainlink.GeneralConfig) vrfUniv ec := evmclimocks.NewClient(t) ec.On("ConfiguredChainID").Return(testutils.FixtureChainID) lggr := logger.TestLogger(t) - hb := headtracker.NewEVMHeadBroadcaster(lggr) + hb := headtracker.NewHeadBroadcaster(lggr) // Don't mock db interactions prm := pipeline.NewORM(db, lggr, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())