Skip to content

Commit

Permalink
[BCI-1402] Moved Generalised HeadTracker components into common folder (
Browse files Browse the repository at this point in the history
#9510)

* Generalised HeadTracker

* added mocks

* Removed the use of Big.int

* added head mocks

* Generalised HeadTracker

* Updated BlockNumber() return type to big.Int

* removed BlockNumberInt64

* updated naming for headNumberInt64

* Updated mocks

* fixed pointer comparison of big.Int

* fix tests due to big.int changes

* moved headbroadcaster to common

* moved headlistener

* moved HT

* merge fixes

* added mock files

* removed EVM prefix

* fixed linting for headlistener

* cleanup

* fixed merge changes

* updated models

* updated mock head files

* renamed variable

* removed eth terminology from generic headtracker

* removed headbyhash

* fixed merge conflict

* fixed merge conflict
  • Loading branch information
yongkangc authored Jun 16, 2023
1 parent 68350f2 commit f4b5c71
Show file tree
Hide file tree
Showing 12 changed files with 789 additions and 765 deletions.
163 changes: 163 additions & 0 deletions common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package headtracker

import (
"context"
"fmt"
"reflect"
"sync"
"time"

commontypes "github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const TrackableCallbackTimeout = 2 * time.Second

type callbackSet[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] map[int]commontypes.HeadTrackable[H, BLOCK_HASH]

func (set callbackSet[H, BLOCK_HASH]) values() []commontypes.HeadTrackable[H, BLOCK_HASH] {
var values []commontypes.HeadTrackable[H, BLOCK_HASH]
for _, callback := range set {
values = append(values, callback)
}
return values
}

type HeadBroadcaster[H commontypes.Head[BLOCK_HASH], BLOCK_HASH commontypes.Hashable] struct {
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mutex *sync.Mutex
chClose utils.StopChan
wgDone sync.WaitGroup
utils.StartStopOnce
latest H
lastCallbackID int
}

// NewHeadBroadcaster creates a new HeadBroadcaster
func NewHeadBroadcaster[
H commontypes.Head[BLOCK_HASH],
BLOCK_HASH commontypes.Hashable,
](
lggr logger.Logger,
) *HeadBroadcaster[H, BLOCK_HASH] {
return &HeadBroadcaster[H, BLOCK_HASH]{
logger: lggr.Named("HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: utils.NewSingleMailbox[H](),
mutex: &sync.Mutex{},
chClose: make(chan struct{}),
wgDone: sync.WaitGroup{},
StartStopOnce: utils.StartStopOnce{},
}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Start(context.Context) error {
return hb.StartOnce("HeadBroadcaster", func() error {
hb.wgDone.Add(1)
go hb.run()
return nil
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error {
return hb.StopOnce("HeadBroadcaster", func() error {
hb.mutex.Lock()
// clear all callbacks
hb.callbacks = make(callbackSet[H, BLOCK_HASH])
hb.mutex.Unlock()

close(hb.chClose)
hb.wgDone.Wait()
return nil
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Name() string {
return hb.logger.Name()
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error {
return map[string]error{hb.Name(): hb.StartStopOnce.Healthy()}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
hb.mailbox.Deliver(head)
}

// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed,
// or unsubscribe callback is called explicitly
func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback commontypes.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
hb.mutex.Lock()
defer hb.mutex.Unlock()

currentLongestChain = hb.latest

hb.lastCallbackID++
callbackID := hb.lastCallbackID
hb.callbacks[callbackID] = callback
unsubscribe = func() {
hb.mutex.Lock()
defer hb.mutex.Unlock()
delete(hb.callbacks, callbackID)
}

return
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() {
defer hb.wgDone.Done()

for {
select {
case <-hb.chClose:
return
case <-hb.mailbox.Notify():
hb.executeCallbacks()
}
}
}

// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
head, exists := hb.mailbox.Retrieve()
if !exists {
hb.logger.Info("No head to retrieve. It might have been skipped")
return
}

hb.mutex.Lock()
callbacks := hb.callbacks.values()
hb.latest = head
hb.mutex.Unlock()

hb.logger.Debugw("Initiating callbacks",
"headNum", head.BlockNumber(),
"numCallbacks", len(callbacks),
)

wg := sync.WaitGroup{}
wg.Add(len(callbacks))

ctx, cancel := hb.chClose.NewCtx()
defer cancel()

for _, callback := range callbacks {
go func(trackable commontypes.HeadTrackable[H, BLOCK_HASH]) {
defer wg.Done()
start := time.Now()
cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout)
defer cancel()
trackable.OnNewLongestChain(cctx, head)
elapsed := time.Since(start)
hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
"callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed)
}(callback)
}

wg.Wait()
}
219 changes: 219 additions & 0 deletions common/headtracker/head_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package headtracker

import (
"context"
"sync/atomic"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
promNumHeadsReceived = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "head_tracker_heads_received",
Help: "The total number of heads seen",
}, []string{"ChainID"})
promEthConnectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "head_tracker_connection_errors",
Help: "The total number of node connection errors",
}, []string{"ChainID"})
)

type HeadListener[
HTH htrktypes.Head[BLOCK_HASH, ID],
S types.Subscription,
ID txmgrtypes.ID,
BLOCK_HASH types.Hashable,
] struct {
config htrktypes.Config
client htrktypes.Client[HTH, S, ID, BLOCK_HASH]
logger logger.Logger
chStop utils.StopChan
chHeaders chan HTH
headSubscription types.Subscription
connected atomic.Bool
receivingHeads atomic.Bool
}

func NewHeadListener[
HTH htrktypes.Head[BLOCK_HASH, ID],
S types.Subscription,
ID txmgrtypes.ID,
BLOCK_HASH types.Hashable,
CLIENT htrktypes.Client[HTH, S, ID, BLOCK_HASH],
](
lggr logger.Logger,
client CLIENT,
config htrktypes.Config,
chStop chan struct{},
) *HeadListener[HTH, S, ID, BLOCK_HASH] {
return &HeadListener[HTH, S, ID, BLOCK_HASH]{
config: config,
client: client,
logger: lggr.Named("HeadListener"),
chStop: chStop,
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Name() string {
return hl.logger.Name()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH], done func()) {
defer done()
defer hl.unsubscribe()

ctx, cancel := hl.chStop.NewCtx()
defer cancel()

for {
if !hl.subscribe(ctx) {
break
}
err := hl.receiveHeaders(ctx, handleNewHead)
if ctx.Err() != nil {
break
} else if err != nil {
hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err)
continue
} else {
break
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool {
return hl.receivingHeads.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Connected() bool {
return hl.connected.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
var err error
if !hl.ReceivingHeads() {
err = errors.New("Listener is not receiving heads")
}
if !hl.Connected() {
err = errors.New("Listener is not connected")
}
return map[string]error{hl.Name(): err}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH]) error {
var noHeadsAlarmC <-chan time.Time
var noHeadsAlarmT *time.Ticker
noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold()
if noHeadsAlarmDuration > 0 {
noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration)
noHeadsAlarmC = noHeadsAlarmT.C
}

for {
select {
case <-hl.chStop:
return nil

case blockHeader, open := <-hl.chHeaders:
chainId := hl.client.ConfiguredChainID()
if noHeadsAlarmT != nil {
// We've received a head, reset the no heads alarm
noHeadsAlarmT.Stop()
noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration)
noHeadsAlarmC = noHeadsAlarmT.C
}
hl.receivingHeads.Store(true)
if !open {
return errors.New("head listener: chHeaders prematurely closed")
}
if !blockHeader.IsValid() {
hl.logger.Error("got nil block header")
continue
}

// Compare the chain ID of the block header to the chain ID of the client
if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() {
hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID())
}
promNumHeadsReceived.WithLabelValues(chainId.String()).Inc()

err := handleNewHead(ctx, blockHeader)
if ctx.Err() != nil {
return nil
} else if err != nil {
return err
}

case err, open := <-hl.headSubscription.Err():
// err can be nil, because of using chainIDSubForwarder
if !open || err == nil {
return errors.New("head listener: subscription Err channel prematurely closed")
}
return err

case <-noHeadsAlarmC:
// We haven't received a head on the channel for a long time, log a warning
hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration)
hl.receivingHeads.Store(false)
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool {
subscribeRetryBackoff := utils.NewRedialBackoff()

chainId := hl.client.ConfiguredChainID()

for {
hl.unsubscribe()

hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String())

select {
case <-hl.chStop:
return false

case <-time.After(subscribeRetryBackoff.Duration()):
err := hl.subscribeToHead(ctx)
if err != nil {
promEthConnectionErrors.WithLabelValues(chainId.String()).Inc()
hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err)
} else {
hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String())
return true
}
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error {
hl.chHeaders = make(chan HTH)

var err error
hl.headSubscription, err = hl.client.SubscribeNewHead(ctx, hl.chHeaders)
if err != nil {
close(hl.chHeaders)
return errors.Wrap(err, "Client#SubscribeNewHead")
}

hl.connected.Store(true)

return nil
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() {
if hl.headSubscription != nil {
hl.connected.Store(false)
hl.headSubscription.Unsubscribe()
hl.headSubscription = nil
}
}
Loading

0 comments on commit f4b5c71

Please sign in to comment.