Skip to content

Commit

Permalink
Node/EVM: Scroll finality
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Nov 10, 2023
1 parent 22bf5f6 commit 52210f8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
33 changes: 22 additions & 11 deletions node/pkg/watchers/evm/connectors/batch_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
// BatchPollConnector uses batch requests to poll for latest, safe and finalized blocks.
type BatchPollConnector struct {
Connector
logger *zap.Logger
Delay time.Duration
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
batchData []BatchEntry
logger *zap.Logger
Delay time.Duration
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
batchData []BatchEntry
generateSafe bool
}

type (
Expand All @@ -43,18 +44,22 @@ type (

const MAX_GAP_BATCH_SIZE uint64 = 5

func NewBatchPollConnector(ctx context.Context, logger *zap.Logger, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) {
func NewBatchPollConnector(ctx context.Context, logger *zap.Logger, baseConnector Connector, safeSupported bool, delay time.Duration) (*BatchPollConnector, error) {
// Create the batch data in the order we want to report them to the watcher, so finalized is most important, latest is least.
batchData := []BatchEntry{
{tag: "finalized", finality: Finalized},
{tag: "safe", finality: Safe},
}

if safeSupported {
batchData = append(batchData, BatchEntry{tag: "safe", finality: Safe})
}

connector := &BatchPollConnector{
Connector: baseConnector,
logger: logger,
Delay: delay,
batchData: batchData,
Connector: baseConnector,
logger: logger,
Delay: delay,
batchData: batchData,
generateSafe: !safeSupported,
}
err := supervisor.Run(ctx, "batchPoller", common.WrapWithScissors(connector.runFromSupervisor, "batchPoller"))
if err != nil {
Expand Down Expand Up @@ -195,6 +200,9 @@ func (b *BatchPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
}

b.blockFeed.Send(block)
if b.generateSafe && b.batchData[idx].finality == Finalized {
b.blockFeed.Send(block.Copy(Safe))
}
lastPublishedBlock = block
}
}
Expand All @@ -205,6 +213,9 @@ func (b *BatchPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
if !errorFound {
// The original value of newBlocks is still good.
b.blockFeed.Send(newBlock)
if b.generateSafe && b.batchData[idx].finality == Finalized {
b.blockFeed.Send(newBlock.Copy(Safe))
}
} else {
newBlocks[idx] = lastPublishedBlock
}
Expand Down
47 changes: 28 additions & 19 deletions node/pkg/watchers/evm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
ctx, watcherContextCancelFunc := context.WithCancel(parentCtx)
defer watcherContextCancelFunc()

var useFinalizedBlocks bool
useFinalizedBlocks, err = w.getFinality(ctx)
if err != nil {
return fmt.Errorf("failed to determine finality: %w", err)
}

// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
ContractAddress: w.contract.Hex(),
Expand All @@ -233,15 +227,24 @@ func (w *Watcher) Run(parentCtx context.Context) error {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

if useFinalizedBlocks {
logger.Info("using finalized blocks")
finalizedPollingSupported, safePollingSupported, err := w.getFinality(ctx)
if err != nil {
return fmt.Errorf("failed to determine finality: %w", err)
}

if finalizedPollingSupported {
if safePollingSupported {
logger.Info("polling for finalized and safe blocks")
} else {
logger.Info("polling for finalized blocks, will generate safe blocks")
}
baseConnector, err := connectors.NewEthereumBaseConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
w.ethConn, err = connectors.NewBatchPollConnector(ctx, logger, baseConnector, 1000*time.Millisecond)
w.ethConn, err = connectors.NewBatchPollConnector(ctx, logger, baseConnector, safePollingSupported, 1000*time.Millisecond)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
Expand Down Expand Up @@ -848,9 +851,10 @@ func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector)
}

// getFinality determines if the chain supports "finalized" and "safe". This is hard coded so it requires thought to change something. However, it also reads the RPC
// to make sure the node actually supports the expected values, and returns an error if it doesn't. Note that we do not support using finalized mode but not safe mode.
func (w *Watcher) getFinality(ctx context.Context) (bool, error) {
// to make sure the node actually supports the expected values, and returns an error if it doesn't. Note that we do not support using safe mode but not finalized mode.
func (w *Watcher) getFinality(ctx context.Context) (bool, bool, error) {
finalized := false
safe := false
if w.unsafeDevMode {
// Devnet supports finalized and safe (although they returns the same value as latest).
finalized = true
Expand All @@ -862,9 +866,12 @@ func (w *Watcher) getFinality(ctx context.Context) (bool, error) {
w.chainID == vaa.ChainIDKarura ||
w.chainID == vaa.ChainIDMoonbeam ||
w.chainID == vaa.ChainIDOptimism ||
w.chainID == vaa.ChainIDScroll ||
w.chainID == vaa.ChainIDSepolia {
finalized = true
safe = true
} else if w.chainID == vaa.ChainIDScroll {
// As of 11/10/2023 Scroll supports polling for finalized but not safe.
finalized = true
}

// If finalized / safe should be supported, read the RPC to make sure they actually are.
Expand All @@ -874,7 +881,7 @@ func (w *Watcher) getFinality(ctx context.Context) (bool, error) {

c, err := rpc.DialContext(timeout, w.url)
if err != nil {
return false, fmt.Errorf("failed to connect to endpoint: %w", err)
return false, false, fmt.Errorf("failed to connect to endpoint: %w", err)
}

type Marshaller struct {
Expand All @@ -883,17 +890,19 @@ func (w *Watcher) getFinality(ctx context.Context) (bool, error) {
var m Marshaller

err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false)
if err != nil {
return false, fmt.Errorf("finalized not supported by the node when it should be: %w", err)
if err != nil || m.Number == nil {
return false, false, fmt.Errorf("finalized not supported by the node when it should be")
}

err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "safe", false)
if err != nil {
return false, fmt.Errorf("safe not supported by the node when it should be: %w", err)
if safe {
err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "safe", false)
if err != nil || m.Number == nil {
return false, false, fmt.Errorf("safe not supported by the node when it should be")
}
}
}

return finalized, nil
return finalized, safe, nil
}

// SetL1Finalizer is used to set the layer one finalizer.
Expand Down

0 comments on commit 52210f8

Please sign in to comment.