Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Head Tracker Finality Violation Detection #28

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
46 changes: 39 additions & 7 deletions chains/heads/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink-framework/chains"
Expand All @@ -36,7 +37,7 @@ const HeadsBufferSize = 10
type Tracker[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
services.Service
// Backfill given a head will fill in any missing heads up to latestFinalized
Backfill(ctx context.Context, headWithChain H) (err error)
Backfill(ctx context.Context, headWithChain H, prevHeadWithChain H) (err error)
LatestChain() H
// LatestAndFinalizedBlock - returns latest and latest finalized blocks.
// NOTE: Returns latest finalized block as is, ignoring the FinalityTagBypass feature flag.
Expand All @@ -59,6 +60,11 @@ type TrackerConfig interface {
PersistenceEnabled() bool
}

type headPair[HTH any] struct {
head HTH
prevHead HTH
}

type tracker[
HTH Head[BLOCK_HASH, ID],
S chains.Subscription,
Expand All @@ -77,7 +83,7 @@ type tracker[
config ChainConfig
htConfig TrackerConfig

backfillMB *mailbox.Mailbox[HTH]
backfillMB *mailbox.Mailbox[headPair[HTH]]
broadcastMB *mailbox.Mailbox[HTH]
headListener Listener[HTH, BLOCK_HASH]
getNilHead func() HTH
Expand Down Expand Up @@ -105,7 +111,7 @@ func NewTracker[
chainID: client.ConfiguredChainID(),
config: config,
htConfig: htConfig,
backfillMB: mailbox.NewSingle[HTH](),
backfillMB: mailbox.NewSingle[headPair[HTH]](),
broadcastMB: mailbox.New[HTH](HeadsBufferSize),
headSaver: headSaver,
mailMon: mailMon,
Expand Down Expand Up @@ -194,7 +200,28 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) close() error {
return t.broadcastMB.Close()
}

func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH) (err error) {
// verifyFinalizedBlockHashes returns finality violated error if a mismatch is found in finalized block hashes
func (t *tracker[HTH, S, ID, BLOCK_HASH]) verifyFinalizedBlockHashes(latestFinalizedHeadWithChain HTH, prevHeadWithChain HTH) error {
if !t.config.FinalityTagEnabled() {
return nil // Bypass if using finality depth
}

latestFinalizedBlockNum := latestFinalizedHeadWithChain.BlockNumber()
prevFinalizedBlockNum := prevHeadWithChain.LatestFinalizedHead().BlockNumber()
if latestFinalizedBlockNum < prevFinalizedBlockNum {
return fmt.Errorf("latest finalized block (%d) is behind previously seen finalized block (%d): %w",
latestFinalizedBlockNum, prevFinalizedBlockNum, types.ErrFinalityViolated)
}

for blockNum := prevFinalizedBlockNum; blockNum >= 0; blockNum-- {
if latestFinalizedHeadWithChain.HashAtHeight(blockNum) != prevHeadWithChain.HashAtHeight(blockNum) {
return fmt.Errorf("block hash mismatch at height %d: %w", blockNum, types.ErrFinalityViolated)
}
}
return nil
}

func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, prevHeadWithChain HTH) (err error) {
latestFinalized, err := t.calculateLatestFinalized(ctx, headWithChain, t.htConfig.FinalityTagBypass())
if err != nil {
return fmt.Errorf("failed to calculate finalized block: %w", err)
Expand All @@ -217,6 +244,10 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWith
latestFinalized.BlockNumber(), headWithChain.BlockNumber(), t.htConfig.MaxAllowedFinalityDepth())
}

if err = t.verifyFinalizedBlockHashes(latestFinalized, prevHeadWithChain); err != nil {
return err
}

return t.backfill(ctx, headWithChain, latestFinalized)
}

Expand Down Expand Up @@ -248,7 +279,7 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, hea
if !headWithChain.IsValid() {
return fmt.Errorf("heads.tracker#handleNewHighestHead headWithChain was unexpectedly nil")
}
t.backfillMB.Deliver(headWithChain)
t.backfillMB.Deliver(headPair[HTH]{headWithChain, prevHead})
t.broadcastMB.Deliver(headWithChain)
} else if head.BlockNumber() == prevHead.BlockNumber() {
if head.BlockHash() != prevHead.BlockHash() {
Expand All @@ -265,6 +296,7 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, hea
err := fmt.Errorf("got very old block with number %d (highest seen was %d)", head.BlockNumber(), prevHead.BlockNumber())
t.log.Critical("Got very old block. Either a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", "err", err)
t.eng.EmitHealthErr(err)
return types.ErrFinalityViolated
}
}
return nil
Expand Down Expand Up @@ -314,12 +346,12 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) backfillLoop(ctx context.Context) {
return
case <-t.backfillMB.Notify():
for {
head, exists := t.backfillMB.Retrieve()
backfillHeadPair, exists := t.backfillMB.Retrieve()
if !exists {
break
}
{
err := t.Backfill(ctx, head)
err := t.Backfill(ctx, backfillHeadPair.head, backfillHeadPair.prevHead)
if err != nil {
t.log.Warnw("Unexpected error while backfilling heads", "err", err)
} else if ctx.Err() != nil {
Expand Down
Loading