Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HeadTracker finalization support #12082

Merged
merged 42 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
af480ab
Add LatestFinalizedBlock to HeadTracker
dhaidashenko Feb 19, 2024
9da949b
Added LatestFinalizedHead to Head
dhaidashenko Feb 19, 2024
9e580cc
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 19, 2024
112679e
remove unused func
dhaidashenko Feb 20, 2024
334a4d1
fix flakey nil pointer
dhaidashenko Feb 20, 2024
83cf834
improve logs & address lint issue
dhaidashenko Feb 20, 2024
26e8d50
nitpicks
dhaidashenko Feb 20, 2024
962464c
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 20, 2024
b86c872
fixed copy on heads on MarkFinalized
dhaidashenko Feb 21, 2024
00769f0
Merge branch 'feature/BCI-2649-latest-finalized-block' of github.com:…
dhaidashenko Feb 21, 2024
81774b4
error instead of panic
dhaidashenko Feb 21, 2024
c942663
return error instead of panic
dhaidashenko Feb 21, 2024
72a2380
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 21, 2024
a01fb86
nitpicks
dhaidashenko Feb 21, 2024
d7a9d4e
Merge branch 'feature/BCI-2649-latest-finalized-block' of github.com:…
dhaidashenko Feb 21, 2024
faf61d9
Finalized block based history depth
dhaidashenko Feb 23, 2024
89a75b3
simplify trimming
dhaidashenko Feb 23, 2024
f7ab489
nit fixes
dhaidashenko Feb 23, 2024
d9d422c
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 23, 2024
e77f529
fix build issues caused by merge
dhaidashenko Feb 23, 2024
908acf7
regen
dhaidashenko Feb 23, 2024
93b835d
FIx rpc client mock generation
dhaidashenko Feb 23, 2024
2f55403
nit fixes
dhaidashenko Feb 26, 2024
9f26066
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 26, 2024
71a0803
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 26, 2024
35c3302
nit fixes
dhaidashenko Feb 26, 2024
bd1ea1e
update comments
dhaidashenko Feb 27, 2024
6cc4fec
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 27, 2024
83ea5d1
ensure that we trim redundant blocks both in slice and in chain in Heads
dhaidashenko Feb 28, 2024
2d5ae65
nit fix
dhaidashenko Feb 28, 2024
c99cea6
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Feb 28, 2024
f77a8ab
Update common/headtracker/head_tracker.go
dhaidashenko Feb 28, 2024
f7c786f
HeadTracker backfill test with 0 finality depth
dhaidashenko Feb 29, 2024
dee11fc
Merge branch 'feature/BCI-2649-latest-finalized-block' of github.com:…
dhaidashenko Feb 29, 2024
03b07d2
docs
dhaidashenko Mar 5, 2024
9f6e5e4
Update docs/CHANGELOG.md
dhaidashenko Mar 5, 2024
924df1b
ensure latest finalized block is valid on startup
dhaidashenko Mar 6, 2024
e4fe585
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Mar 8, 2024
03489c0
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Mar 11, 2024
4a117c4
changeset
dhaidashenko Mar 11, 2024
00cbb2e
Merge branch 'develop' into feature/BCI-2649-latest-finalized-block
dhaidashenko Mar 12, 2024
7abea66
switch from warn to debug level when we failed to makr block as final…
dhaidashenko Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,3 +819,12 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}
return n.RPC().TransactionReceipt(ctx, txHash)
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) LatestFinalizedBlock(ctx context.Context) (head HEAD, err error) {
n, err := c.selectNode()
if err != nil {
return head, err
}

return n.RPC().LatestFinalizedBlock(ctx)
}
1 change: 1 addition & 0 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type clientAPI[
BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error)
BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error)
LatestBlockHeight(context.Context) (*big.Int, error)
LatestFinalizedBlock(ctx context.Context) (HEAD, error)

// Events
FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error)
Expand Down
148 changes: 99 additions & 49 deletions common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -96,37 +97,19 @@ func NewHeadTracker[
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error {
return ht.StartOnce("HeadTracker", func() error {
ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID)
latestChain, err := ht.headSaver.Load(ctx)
if err != nil {
return err
}
if latestChain.IsValid() {
ht.log.Debugw(
fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", latestChain.BlockNumber(), latestChain.BlockHash()),
"blockNumber", latestChain.BlockNumber(),
"blockHash", latestChain.BlockHash(),
)
}

// NOTE: Always try to start the head tracker off with whatever the
// latest head is, without waiting for the subscription to send us one.
//
// In some cases the subscription will send us the most recent head
// anyway when we connect (but we should not rely on this because it is
// not specced). If it happens this is fine, and the head will be
// ignored as a duplicate.
initialHead, err := ht.getInitialHead(ctx)
err := ht.handleInitialHead(ctx)
if err != nil {
if errors.Is(err, ctx.Err()) {
return nil
}
ht.log.Errorw("Error getting initial head", "err", err)
} else if initialHead.IsValid() {
if err := ht.handleNewHead(ctx, initialHead); err != nil {
return fmt.Errorf("error handling initial head: %w", err)
if ctx.Err() != nil {
return ctx.Err()
}
} else {
ht.log.Debug("Got nil initial head")
ht.log.Errorw("Error handling initial head", "err", err)
}

ht.wgDone.Add(3)
Expand All @@ -140,6 +123,45 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error
})
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleInitialHead(ctx context.Context) error {
initialHead, err := ht.client.HeadByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("failed to fetch initial head: %w", err)
}

if !initialHead.IsValid() {
ht.log.Warnw("Got nil initial head", "head", initialHead)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
ht.log.Debugw("Got initial head", "head", initialHead, "blockNumber", initialHead.BlockNumber(), "blockHash", initialHead.BlockHash())

latestFinalized, err := ht.calculateLatestFinalized(ctx, initialHead)
if err != nil {
return fmt.Errorf("failed to calculate latest finalized head: %w", err)
}

latestChain, err := ht.headSaver.Load(ctx, latestFinalized.BlockNumber())
if err != nil {
return fmt.Errorf("failed to initialized headSaver: %w", err)
}

if latestChain.IsValid() {
earliest := latestChain.EarliestHeadInChain()
ht.log.Debugw(
"Loaded chain from DB",
"latest_blockNumber", latestChain.BlockNumber(),
"latest_blockHash", latestChain.BlockHash(),
"earliest_blockNumber", earliest.BlockNumber(),
"earliest_blockHash", earliest.BlockHash(),
)
}
if err := ht.handleNewHead(ctx, initialHead); err != nil {
return fmt.Errorf("error handling initial head: %w", err)
}

return nil
}

// Close stops HeadTracker service.
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Close() error {
return ht.StopOnce("HeadTracker", func() error {
Expand All @@ -159,36 +181,26 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
return report
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) {
if uint(headWithChain.ChainLength()) >= depth {
return nil
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain, latestFinalized HTH) (err error) {
if !latestFinalized.IsValid() {
return errors.New("can not perform backfill without a valid latestFinalized head")
}

baseHeight := headWithChain.BlockNumber() - int64(depth-1)
if baseHeight < 0 {
baseHeight = 0
if headWithChain.BlockNumber() < latestFinalized.BlockNumber() {
const errMsg = "invariant violation: expected head of canonical chain to be ahead of the latestFinalized"
ht.log.With("head_block_num", headWithChain.BlockNumber(),
"latest_finalized_block_number", latestFinalized.BlockNumber()).
Criticalf(errMsg)
return errors.New(errMsg)
}

return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight)
return ht.backfill(ctx, headWithChain, latestFinalized)
dimriou marked this conversation as resolved.
Show resolved Hide resolved
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH {
return ht.headSaver.LatestChain()
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) {
head, err := ht.client.HeadByNumber(ctx, nil)
if err != nil {
return ht.getNilHead(), fmt.Errorf("failed to fetch initial head: %w", err)
}
loggerFields := []interface{}{"head", head}
if head.IsValid() {
loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash())
}
ht.log.Debugw("Got initial head", loggerFields...)
return head, nil
}

func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error {
prevHead := ht.headSaver.LatestChain()

Expand Down Expand Up @@ -290,7 +302,13 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() {
break
}
{
err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth()))
latestFinalized, err := ht.calculateLatestFinalized(ctx, head)
if err != nil {
ht.log.Warnw("Failed to calculate finalized block", "err", err)
continue
}

err = ht.Backfill(ctx, head, latestFinalized)
if err != nil {
ht.log.Warnw("Unexpected error while backfilling heads", "err", err)
} else if ctx.Err() != nil {
Expand All @@ -302,14 +320,30 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() {
}
}

// backfill fetches all missing heads up until the base height
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head types.Head[BLOCK_HASH], baseHeight int64) (err error) {
headBlockNumber := head.BlockNumber()
if headBlockNumber <= baseHeight {
return nil
// calculateLatestFinalized - returns latest finalized block. It's expected that currentHeadNumber - is the head of
// canonical chain. There is no guaranties that returned block belongs to the canonical chain. Additional verification
// must be performed before usage.
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) calculateLatestFinalized(ctx context.Context, currentHead HTH) (h HTH, err error) {
if ht.config.FinalityTagEnabled() {
return ht.client.LatestFinalizedBlock(ctx)
}
// no need to make an additional RPC call on chains with instant finality
if ht.config.FinalityDepth() == 0 {
return currentHead, nil
}
finalizedBlockNumber := currentHead.BlockNumber() - int64(ht.config.FinalityDepth())
if finalizedBlockNumber <= 0 {
finalizedBlockNumber = 0
}
return ht.client.HeadByNumber(ctx, big.NewInt(finalizedBlockNumber))
}

// backfill fetches all missing heads up until the latestFinalizedHead
func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head, latestFinalizedHead HTH) (err error) {
headBlockNumber := head.BlockNumber()
mark := time.Now()
fetched := 0
baseHeight := latestFinalizedHead.BlockNumber()
l := ht.log.With("blockNumber", headBlockNumber,
"n", headBlockNumber-baseHeight,
"fromBlockHeight", baseHeight,
Expand Down Expand Up @@ -337,11 +371,27 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, hea
fetched++
if ctx.Err() != nil {
ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err())
break
return fmt.Errorf("fetchAndSaveHead failed: %w", ctx.Err())
} else if err != nil {
return fmt.Errorf("fetchAndSaveHead failed: %w", err)
}
}

if head.BlockHash() != latestFinalizedHead.BlockHash() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reaches up to finalized block. Shouldn't it be up to finalized block + history depth? Otherwise HistoryDepth() has no use.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've kept this logic as is. We actively try to backfill up to finality and keep blocks in cache up to history depth

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certain product teams have expressed their interest in the past to have more than FinalityDepth blocks stored in Head Tracker. I figured HistoryDepth could play that role, and we can actually actively fetch more blocks instead of avoiding trimming, otherwise it won't be useful to users. That's why initially I proposed changing the HistoryDepth name to something more representative.
But if it requires additional changes, perhaps it might get out of scope. Is there anything else we would have to change besides this part to make it work with FinalityDepth + HistoryDepth

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the behaviour to backfill down to HistoryDepth is trivial. But It would be nice to do some testing to check how long would a fresh start take for chains like Polygon and maybe do optimisation if needed.

To keep this PR's size manageable I'd rather introduce this feature separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR, but we could do a large backfill by batched RPC calls right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, batching should improve performance for large block ranges, but it would be nice to measure actual improvement before introducing second way of backfilling

const errMsg = "expected finalized block to be present in canonical chain"
ht.log.With("finalized_block_number", latestFinalizedHead.BlockNumber(), "finalized_hash", latestFinalizedHead.BlockHash(),
"canonical_chain_block_number", head.BlockNumber(), "canonical_chain_hash", head.BlockHash()).Criticalf(errMsg)
return fmt.Errorf(errMsg)
}

err = ht.headSaver.MarkFinalized(ctx, latestFinalizedHead)
if err != nil {
return fmt.Errorf("failed to mark head as finalized: %w", err)
}

l.Debugw("marked block as finalized", "block_hash", latestFinalizedHead.BlockHash(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might end up spamming unnecessary logs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log has the same frequency and log level as existing Starting backfill and Finished backfill.
It seems helpful for debugging purposes to know which block HeadTracker considers finalized

Copy link
Collaborator

@dimriou dimriou Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting backfill and finished backfill are also noisy and don't provide that much information. The general problem with our current logs is that we log everything and then there is a huge amount of logs to go through. In principle, we should try to log when things go wrong and only add debug messages when they change the state meaningfully. Let's keep this as it is for now, and tackle it in a different PR.

"block_number", latestFinalizedHead.BlockNumber())

return
}

Expand Down
2 changes: 2 additions & 0 deletions common/headtracker/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ type Client[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_H
// SubscribeNewHead is the method in which the client receives new Head.
// It can be implemented differently for each chain i.e websocket, polling, etc
SubscribeNewHead(ctx context.Context, ch chan<- H) (S, error)
// LatestFinalizedBlock - returns the latest block that was marked as finalized
LatestFinalizedBlock(ctx context.Context) (head H, err error)
}
1 change: 1 addition & 0 deletions common/headtracker/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "time"
type Config interface {
BlockEmissionIdleWarningThreshold() time.Duration
FinalityDepth() uint32
FinalityTagEnabled() bool
}

type HeadTrackerConfig interface {
Expand Down
10 changes: 5 additions & 5 deletions common/mocks/head_tracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions common/types/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
//go:generate mockery --quiet --name HeadTracker --output ../mocks/ --case=underscore
type HeadTracker[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface {
services.Service
// Backfill given a head will fill in any missing heads up to the given depth
// (used for testing)
Backfill(ctx context.Context, headWithChain H, depth uint) (err error)
// Backfill given a head will fill in any missing heads up to latestFinalized
Backfill(ctx context.Context, headWithChain, latestFinalized H) (err error)
LatestChain() H
}

Expand All @@ -37,12 +36,14 @@ type HeadSaver[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface {
// Save updates the latest block number, if indeed the latest, and persists
// this number in case of reboot.
Save(ctx context.Context, head H) error
// Load loads latest EvmHeadTrackerHistoryDepth heads, returns the latest chain.
Load(ctx context.Context) (H, error)
// Load loads latest heads up to latestFinalized - historyDepth, returns the latest chain.
Load(ctx context.Context, latestFinalized int64) (H, error)
// LatestChain returns the block header with the highest number that has been seen, or nil.
LatestChain() H
// Chain returns a head for the specified hash, or nil.
Chain(hash BLOCK_HASH) H
// MarkFinalized - marks matching block and all it's direct ancestors as finalized
MarkFinalized(ctx context.Context, latestFinalized H) error
}

// HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,7 @@ func (c *chainClient) TransactionReceipt(ctx context.Context, txHash common.Hash
//return rpc.TransactionReceipt(ctx, txHash)
return rpc.TransactionReceiptGeth(ctx, txHash)
}

func (c *chainClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) {
return c.multiNode.LatestFinalizedBlock(ctx)
}
5 changes: 5 additions & 0 deletions core/chains/evm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Client interface {
HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error)
HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error)
SubscribeNewHead(ctx context.Context, ch chan<- *evmtypes.Head) (ethereum.Subscription, error)
LatestFinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error)

SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error)

Expand Down Expand Up @@ -366,3 +367,7 @@ func (client *client) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, er
func (client *client) IsL2() bool {
return client.pool.ChainType().IsL2()
}

func (client *client) LatestFinalizedBlock(_ context.Context) (*evmtypes.Head, error) {
return nil, pkgerrors.New("not implemented. client was deprecated. New methods are added only to satisfy type constraints while we are migrating to new alternatives")
}
Loading
Loading