Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (log_poller backfill) exit on non-rpc err #14114

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gorgeous-carpets-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix backfill error detection
10 changes: 4 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,10 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {

gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
var rpcErr client.JsonError
if pkgerrors.As(err, &rpcErr) {
if rpcErr.Code != jsonRpcLimitExceeded {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
}
var rpcErr *client.JsonError
if !pkgerrors.As(err, &rpcErr) || rpcErr.Code != jsonRpcLimitExceeded {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
Copy link
Contributor

@reductionista reductionista Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried to make this change in Dec 2023, but during review it became clear that this is a somewhat risky change and there was more to investigate about what the right way is to fix it. I re-opened it a few weeks ago, and got a more complex fix working that actually detects the error correctly (at least on some rpc servers.) This by itself will not, because it's comparing against a concrete type that will only match during testing, not in production.

I thought that was ready to merge last week, but unfortunately during review it again became clear that things are more complicated. There are so many different error codes and string formats returned from different rpc servers for the same condition we'll have to rely mostly on string parsing to differentiate them.

Leaving it as-is at least errs on the side of caution because, even though the logic is flawed, it's just taking preventative measures when it doesn't actually need to in most cases... and reporting a critical error in some cases when it shouldn't. Not taking those preventative measures, or not reporting a critical condition is more risky in production.

Is there a way to fix the flaky test without making this change?

Here is the current PR as it stands:
#11654

}
if batchSize == 1 {
lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize)
Expand Down
158 changes: 91 additions & 67 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"context"
"errors"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -1541,15 +1542,12 @@ type getLogErrData struct {
Limit int
}

func TestTooManyLogResults(t *testing.T) {
func TestLogPoller_PollAndSaveLogsErrors(t *testing.T) {
ctx := testutils.Context(t)
ec := evmtest.NewEthClientMockWithDefaultChain(t)
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)

o := logpoller.NewORM(chainID, db, lggr)

lpOpts := logpoller.Opts{
PollPeriod: time.Hour,
FinalityDepth: 2,
Expand All @@ -1558,7 +1556,6 @@ func TestTooManyLogResults(t *testing.T) {
KeepFinalizedBlocksDepth: 1000,
}
headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)
expected := []int64{10, 5, 2, 1}

clientErr := client.JsonError{
Expand All @@ -1567,83 +1564,110 @@ func TestTooManyLogResults(t *testing.T) {
Message: "query returned more than 10000 results. Try with this block range [0x100E698, 0x100E6D4].",
}

// Simulate currentBlock = 300
head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
call1 := ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
if blockNumber == nil {
require.FailNow(t, "unexpected call to get current head")
}
return &evmtypes.Head{Number: blockNumber.Int64()}, nil
})

call2 := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
from := fq.FromBlock.Uint64()
to := fq.ToBlock.Uint64()
if to-from >= 4 {
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
}
return logs, err
})
t.Run("Too many logs until batchSize=1", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

addr := testutils.NewAddress()
err := lp.RegisterFilter(ctx, logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)

logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All()
// Should have tried again 3 times--first reducing batch size to 10, then 5, then 2
require.Len(t, logs, 3)
for i, s := range expected[:3] {
assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"])
}
// Simulate currentBlock = 300
head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()

obs.TakeAll()
call1.Unset()
call2.Unset()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
from := fq.FromBlock.Uint64()
to := fq.ToBlock.Uint64()
if to-from >= 4 {
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
}
return logs, err
})
defer filterCall.Unset()

// Now jump to block 500, but return error no matter how small the block range gets.
// Should exit the loop with a critical error instead of hanging.
head = &evmtypes.Head{Number: 500}
finalized = &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
call1.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) {
if blockNumber == nil {
require.FailNow(t, "unexpected call to get current head")
addr := testutils.NewAddress()
err := lp.RegisterFilter(ctx, logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)

logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All()
// Should have tried again 3 times--first reducing batch size to 10, then 5, then 2
require.Len(t, logs, 3)
for i, s := range expected[:3] {
assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"])
}
return &evmtypes.Head{Number: blockNumber.Int64()}, nil
})
call2.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested

t.Run("Too many logs always", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

// Now jump to block 500, but return error no matter how small the block range gets.
// Should exit the loop with a critical error instead of hanging.
head := &evmtypes.Head{Number: 500}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.BlockHash != nil {
return []types.Log{}, nil // succeed when single block requested
}
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks
})
defer filterCall.Unset()

lp.PollAndSaveLogs(ctx, 298)
block, err := o.SelectLatestBlock(ctx)
require.NoError(t, err)
assert.Equal(t, int64(298), block.BlockNumber)
warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All()
crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All()
require.Len(t, warns, 4)
for i, s := range expected {
assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"])
}
return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks

require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
})

lp.PollAndSaveLogs(ctx, 298)
block, err2 = o.SelectLatestBlock(ctx)
require.NoError(t, err2)
assert.Equal(t, int64(298), block.BlockNumber)
warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All()
crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All()
require.Len(t, warns, 4)
for i, s := range expected {
assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"])
}
t.Run("Context canceled return err", func(t *testing.T) {
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
o := logpoller.NewORM(chainID, db, lggr)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)

head := &evmtypes.Head{Number: 300}
finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth}
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) {
if fq.ToBlock.Int64() >= 297 {
return nil, errors.New("context canceled")
}
return []types.Log{}, err
})
defer filterCall.Unset()

require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
lp.PollAndSaveLogs(ctx, 5)
errs := obs.FilterMessageSnippet("Unable to query for logs").FilterLevelExact(zapcore.ErrorLevel).All()
require.Len(t, errs, 1)
assert.Equal(t, "context canceled", errs[0].ContextMap()["err"])
})
}

func Test_PollAndQueryFinalizedBlocks(t *testing.T) {
Expand Down
Loading