Skip to content

Commit

Permalink
Remove unused mocks (#12769)
Browse files Browse the repository at this point in the history
* Remove unused mocks

* Remove unused HeadBroadcasterRegistry

* Cleanup head tracker structures

* Fix lint & cleanup

* Move HeadSaver under headtracker package
  • Loading branch information
dimriou authored Apr 12, 2024
1 parent 3538db0 commit 69ebfbb
Show file tree
Hide file tree
Showing 29 changed files with 179 additions and 501 deletions.
47 changes: 32 additions & 15 deletions common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,34 @@ import (

const TrackableCallbackTimeout = 2 * time.Second

type callbackSet[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] map[int]types.HeadTrackable[H, BLOCK_HASH]
type callbackSet[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] map[int]HeadTrackable[H, BLOCK_HASH]

func (set callbackSet[H, BLOCK_HASH]) values() []types.HeadTrackable[H, BLOCK_HASH] {
var values []types.HeadTrackable[H, BLOCK_HASH]
func (set callbackSet[H, BLOCK_HASH]) values() []HeadTrackable[H, BLOCK_HASH] {
var values []HeadTrackable[H, BLOCK_HASH]
for _, callback := range set {
values = append(values, callback)
}
return values
}

type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct {
// HeadTrackable is implemented by the core txm to be able to receive head events from any chain.
// Chain implementations should notify head events to the core txm via this interface.
//
//go:generate mockery --quiet --name HeadTrackable --output ./mocks/ --case=underscore
type HeadTrackable[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
// OnNewLongestChain sends a new head when it becomes available. Subscribers can recursively trace the parent
// of the head to the finalized block back.
OnNewLongestChain(ctx context.Context, head H)
}

// HeadBroadcaster relays new Heads to all subscribers.
type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
services.Service
BroadcastNewLongestChain(H)
Subscribe(callback HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func())
}

type headBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct {
services.StateMachine
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
Expand All @@ -44,24 +61,24 @@ func NewHeadBroadcaster[
BLOCK_HASH types.Hashable,
](
lggr logger.Logger,
) *HeadBroadcaster[H, BLOCK_HASH] {
return &HeadBroadcaster[H, BLOCK_HASH]{
) HeadBroadcaster[H, BLOCK_HASH] {
return &headBroadcaster[H, BLOCK_HASH]{
logger: logger.Named(lggr, "HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: mailbox.NewSingle[H](),
chClose: make(chan struct{}),
}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Start(context.Context) error {
func (hb *headBroadcaster[H, BLOCK_HASH]) Start(context.Context) error {
return hb.StartOnce("HeadBroadcaster", func() error {
hb.wgDone.Add(1)
go hb.run()
return nil
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error {
func (hb *headBroadcaster[H, BLOCK_HASH]) Close() error {
return hb.StopOnce("HeadBroadcaster", func() error {
hb.mutex.Lock()
// clear all callbacks
Expand All @@ -74,21 +91,21 @@ func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error {
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Name() string {
func (hb *headBroadcaster[H, BLOCK_HASH]) Name() string {
return hb.logger.Name()
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error {
func (hb *headBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error {
return map[string]error{hb.Name(): hb.Healthy()}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
func (hb *headBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
hb.mailbox.Deliver(head)
}

// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed,
// or unsubscribe callback is called explicitly
func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback types.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
func (hb *headBroadcaster[H, BLOCK_HASH]) Subscribe(callback HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
hb.mutex.Lock()
defer hb.mutex.Unlock()

Expand All @@ -106,7 +123,7 @@ func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback types.HeadTrackable
return
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() {
func (hb *headBroadcaster[H, BLOCK_HASH]) run() {
defer hb.wgDone.Done()

for {
Expand All @@ -122,7 +139,7 @@ func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() {
// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
head, exists := hb.mailbox.Retrieve()
if !exists {
hb.logger.Info("No head to retrieve. It might have been skipped")
Expand All @@ -146,7 +163,7 @@ func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
defer cancel()

for _, callback := range callbacks {
go func(trackable types.HeadTrackable[H, BLOCK_HASH]) {
go func(trackable HeadTrackable[H, BLOCK_HASH]) {
defer wg.Done()
start := time.Now()
cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout)
Expand Down
43 changes: 31 additions & 12 deletions common/headtracker/head_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,26 @@ var (
}, []string{"ChainID"})
)

type HeadListener[
// headHandler is a callback that handles incoming heads
type headHandler[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] func(ctx context.Context, header H) error

// HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node
type HeadListener[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
// ListenForNewHeads kicks off the listen loop (not thread safe)
// done() must be executed upon leaving ListenForNewHeads()
ListenForNewHeads(handleNewHead headHandler[H, BLOCK_HASH], done func())

// ReceivingHeads returns true if the listener is receiving heads (thread safe)
ReceivingHeads() bool

// Connected returns true if the listener is connected (thread safe)
Connected() bool

// HealthReport returns report of errors within HeadListener
HealthReport() map[string]error
}

type headListener[
HTH htrktypes.Head[BLOCK_HASH, ID],
S types.Subscription,
ID types.ID,
Expand All @@ -56,20 +75,20 @@ func NewHeadListener[
client CLIENT,
config htrktypes.Config,
chStop chan struct{},
) *HeadListener[HTH, S, ID, BLOCK_HASH] {
return &HeadListener[HTH, S, ID, BLOCK_HASH]{
) HeadListener[HTH, BLOCK_HASH] {
return &headListener[HTH, S, ID, BLOCK_HASH]{
config: config,
client: client,
logger: logger.Named(lggr, "HeadListener"),
chStop: chStop,
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Name() string {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Name() string {
return hl.logger.Name()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH], done func()) {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead headHandler[HTH, BLOCK_HASH], done func()) {
defer done()
defer hl.unsubscribe()

Expand All @@ -91,15 +110,15 @@ func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool {
return hl.receivingHeads.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Connected() bool {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Connected() bool {
return hl.connected.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
var err error
if !hl.ReceivingHeads() {
err = errors.New("Listener is not receiving heads")
Expand All @@ -110,7 +129,7 @@ func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error
return map[string]error{hl.Name(): err}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH]) error {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead headHandler[HTH, BLOCK_HASH]) error {
var noHeadsAlarmC <-chan time.Time
var noHeadsAlarmT *time.Ticker
noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold()
Expand Down Expand Up @@ -169,7 +188,7 @@ func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool {
subscribeRetryBackoff := utils.NewRedialBackoff()

chainId := hl.client.ConfiguredChainID()
Expand All @@ -196,7 +215,7 @@ func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) b
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error {
hl.chHeaders = make(chan HTH)

var err error
Expand All @@ -211,7 +230,7 @@ func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Cont
return nil
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() {
if hl.headSubscription != nil {
hl.connected.Store(false)
hl.headSubscription.Unsubscribe()
Expand Down
23 changes: 23 additions & 0 deletions common/headtracker/head_saver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package headtracker

import (
"context"

"github.com/smartcontractkit/chainlink/v2/common/types"
)

// HeadSaver is an chain agnostic interface for saving and loading heads
// Different chains will instantiate generic HeadSaver type with their native Head and BlockHash types.
type HeadSaver[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
// Save updates the latest block number, if indeed the latest, and persists
// this number in case of reboot.
Save(ctx context.Context, head H) error
// Load loads latest heads up to latestFinalized - historyDepth, returns the latest chain.
Load(ctx context.Context, latestFinalized int64) (H, error)
// LatestChain returns the block header with the highest number that has been seen, or nil.
LatestChain() H
// Chain returns a head for the specified hash, or nil.
Chain(hash BLOCK_HASH) H
// MarkFinalized - marks matching block and all it's direct ancestors as finalized
MarkFinalized(ctx context.Context, latestFinalized H) error
}
Loading

0 comments on commit 69ebfbb

Please sign in to comment.