Skip to content

Commit

Permalink
Raising a flag when finality is violated (#12564)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Mar 26, 2024
1 parent 34dd367 commit 246762c
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/silent-pets-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Exposing information about LogPoller finality violation via Healthy method. It's raised whenever LogPoller sees reorg deeper than the finality
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (disabled) Start(ctx context.Context) error { return ErrDisabled }

func (disabled) Close() error { return ErrDisabled }

func (disabled) Healthy() error {
return ErrDisabled
}

func (disabled) Ready() error { return ErrDisabled }

func (disabled) HealthReport() map[string]error {
Expand Down
26 changes: 25 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -34,6 +35,7 @@ import (
//go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go
type LogPoller interface {
services.Service
Healthy() error
Replay(ctx context.Context, fromBlock int64) error
ReplayAsync(fromBlock int64)
RegisterFilter(ctx context.Context, filter Filter) error
Expand Down Expand Up @@ -92,6 +94,7 @@ var (
ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled")
ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress")
ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown")
ErrFinalityViolated = pkgerrors.New("finality violated")
)

type logPoller struct {
Expand Down Expand Up @@ -120,6 +123,12 @@ type logPoller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// This flag is raised whenever the log poller detects that the chain's finality has been violated.
// It can happen when reorg is deeper than the latest finalized block that LogPoller saw in a previous PollAndSave tick.
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
}

type Opts struct {
Expand Down Expand Up @@ -163,6 +172,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -466,6 +476,13 @@ func (lp *logPoller) Close() error {
})
}

func (lp *logPoller) Healthy() error {
if lp.finalityViolated.Load() {
return ErrFinalityViolated
}
return nil
}

func (lp *logPoller) Name() string {
return lp.lggr.Name()
}
Expand Down Expand Up @@ -786,7 +803,13 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
// 1. Find the LCA by following parent hashes.
// 2. Delete all logs and blocks after the LCA
// 3. Return the LCA+1, i.e. our new current (unprocessed) block.
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (*evmtypes.Head, error) {
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) {
defer func() {
if err == nil {
lp.finalityViolated.Store(false)
}
}()

var err1 error
if currentBlock == nil {
// If we don't have the current block already, lets get it.
Expand Down Expand Up @@ -1012,6 +1035,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
rerr := pkgerrors.New("Reorg greater than finality depth")
lp.SvcErrBuffer.Append(rerr)
lp.finalityViolated.Store(true)
return nil, rerr
}

Expand Down
89 changes: 89 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,93 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {
}
}

func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {
tests := []struct {
name string
finalityDepth int64
finalityTag bool
}{
{
name: "fixed finality depth without finality tag",
finalityDepth: 1,
finalityTag: false,
},
{
name: "chain finality in use",
finalityDepth: 0,
finalityTag: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, logpoller.Opts{
UseFinalityTag: tt.finalityTag,
FinalityDepth: tt.finalityDepth,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
BackupPollerBlockDelay: 100,
})
// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{
Name: "Test Emitter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)

// Test scenario
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)})
require.NoError(t, err)
th.Client.Commit()
th.Client.Commit()
th.Client.Commit()
markBlockAsFinalized(t, th, 3)

// Polling should get us the L1 log.
firstPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(5), firstPoll)
assert.NoError(t, th.LogPoller.Healthy())

// Fork deeper than finality depth
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
lca, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1))
require.NoError(t, err)
require.NoError(t, th.Client.Fork(testutils.Context(t), lca.Hash()))

// Create 2'
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)})
require.NoError(t, err)
th.Client.Commit()

// Create 3-10
for i := 3; i < 10; i++ {
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
require.NoError(t, err)
th.Client.Commit()
}
markBlockAsFinalized(t, th, 6)

secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll)
assert.Equal(t, firstPoll, secondPoll)
assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Healthy())

// Manually remove latest block from the log poller to bring it back to life
// LogPoller should be healthy again after first poll
// Chain gen <- 1
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
require.NoError(t, th.ORM.DeleteLogsAndBlocksAfter(testutils.Context(t), 2))
// Poll from latest
recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(10), recoveryPoll)
assert.NoError(t, th.LogPoller.Healthy())
})
}
}

func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1089,6 +1176,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

// Polling should get us the L1 log.
newStart := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.NoError(t, th.LogPoller.Healthy())
assert.Equal(t, int64(3), newStart)
// Check that L1_1 has a proper data payload
lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2)
Expand All @@ -1115,6 +1203,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

newStart = th.PollAndSaveLogs(testutils.Context(t), newStart)
assert.Equal(t, int64(10), newStart)
assert.NoError(t, th.LogPoller.Healthy())

// Expect L1_2 to be properly updated
lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2)
Expand Down
18 changes: 18 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 246762c

Please sign in to comment.