From 389875c0374becfb01023723414965c673391b71 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Wed, 14 Aug 2024 12:51:21 +0100 Subject: [PATCH 1/4] fix: (log_poller backfill) exit on non-rpc err --- .changeset/gorgeous-carpets-grab.md | 5 + core/chains/evm/logpoller/log_poller.go | 10 +- core/chains/evm/logpoller/log_poller_test.go | 158 +++++++++++-------- 3 files changed, 100 insertions(+), 73 deletions(-) create mode 100644 .changeset/gorgeous-carpets-grab.md diff --git a/.changeset/gorgeous-carpets-grab.md b/.changeset/gorgeous-carpets-grab.md new file mode 100644 index 00000000000..324b9f98149 --- /dev/null +++ b/.changeset/gorgeous-carpets-grab.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix backfill error detection diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dee5d1d1a5d..59538f3812a 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -807,12 +807,10 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil)) if err != nil { - var rpcErr client.JsonError - if pkgerrors.As(err, &rpcErr) { - if rpcErr.Code != jsonRpcLimitExceeded { - lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) - return err - } + var rpcErr *client.JsonError + if !pkgerrors.As(err, &rpcErr) || rpcErr.Code != jsonRpcLimitExceeded { + lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) + return err } if batchSize == 1 { lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 860b588c771..db1bfd5d31e 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -2,6 +2,7 @@ package logpoller_test import ( "context" + "errors" "fmt" "math" "math/big" @@ -1541,15 +1542,12 @@ type getLogErrData struct { Limit int } -func TestTooManyLogResults(t *testing.T) { +func TestLogPoller_PollAndSaveLogsErrors(t *testing.T) { ctx := testutils.Context(t) ec := evmtest.NewEthClientMockWithDefaultChain(t) - lggr, obs := logger.TestObserved(t, zapcore.DebugLevel) chainID := testutils.NewRandomEVMChainID() db := pgtest.NewSqlxDB(t) - o := logpoller.NewORM(chainID, db, lggr) - lpOpts := logpoller.Opts{ PollPeriod: time.Hour, FinalityDepth: 2, @@ -1558,7 +1556,6 @@ func TestTooManyLogResults(t *testing.T) { KeepFinalizedBlocksDepth: 1000, } headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) - lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts) expected := []int64{10, 5, 2, 1} clientErr := client.JsonError{ @@ -1567,83 +1564,110 @@ func TestTooManyLogResults(t *testing.T) { Message: "query returned more than 10000 results. Try with this block range [0x100E698, 0x100E6D4].", } - // Simulate currentBlock = 300 - head := &evmtypes.Head{Number: 300} - finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth} - headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() - call1 := ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) { + ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) { if blockNumber == nil { require.FailNow(t, "unexpected call to get current head") } return &evmtypes.Head{Number: blockNumber.Int64()}, nil }) - call2 := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { - if fq.BlockHash != nil { - return []types.Log{}, nil // succeed when single block requested - } - from := fq.FromBlock.Uint64() - to := fq.ToBlock.Uint64() - if to-from >= 4 { - return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks - } - return logs, err - }) + t.Run("Too many logs until batchSize=1", func(t *testing.T) { + lggr, obs := logger.TestObserved(t, zapcore.DebugLevel) + o := logpoller.NewORM(chainID, db, lggr) + lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts) - addr := testutils.NewAddress() - err := lp.RegisterFilter(ctx, logpoller.Filter{ - Name: "Integration test", - EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, - Addresses: []common.Address{addr}, - }) - require.NoError(t, err) - lp.PollAndSaveLogs(ctx, 5) - block, err2 := o.SelectLatestBlock(ctx) - require.NoError(t, err2) - assert.Equal(t, int64(298), block.BlockNumber) - - logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All() - // Should have tried again 3 times--first reducing batch size to 10, then 5, then 2 - require.Len(t, logs, 3) - for i, s := range expected[:3] { - assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"]) - } + // Simulate currentBlock = 300 + head := &evmtypes.Head{Number: 300} + finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth} + headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() - obs.TakeAll() - call1.Unset() - call2.Unset() + filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { + if fq.BlockHash != nil { + return []types.Log{}, nil // succeed when single block requested + } + from := fq.FromBlock.Uint64() + to := fq.ToBlock.Uint64() + if to-from >= 4 { + return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks + } + return logs, err + }) + defer filterCall.Unset() - // Now jump to block 500, but return error no matter how small the block range gets. - // Should exit the loop with a critical error instead of hanging. - head = &evmtypes.Head{Number: 500} - finalized = &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth} - headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() - call1.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) { - if blockNumber == nil { - require.FailNow(t, "unexpected call to get current head") + addr := testutils.NewAddress() + err := lp.RegisterFilter(ctx, logpoller.Filter{ + Name: "Integration test", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{addr}, + }) + require.NoError(t, err) + lp.PollAndSaveLogs(ctx, 5) + block, err2 := o.SelectLatestBlock(ctx) + require.NoError(t, err2) + assert.Equal(t, int64(298), block.BlockNumber) + + logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All() + // Should have tried again 3 times--first reducing batch size to 10, then 5, then 2 + require.Len(t, logs, 3) + for i, s := range expected[:3] { + assert.Equal(t, s, logs[i].ContextMap()["newBatchSize"]) } - return &evmtypes.Head{Number: blockNumber.Int64()}, nil }) - call2.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { - if fq.BlockHash != nil { - return []types.Log{}, nil // succeed when single block requested + + t.Run("Too many logs always", func(t *testing.T) { + lggr, obs := logger.TestObserved(t, zapcore.DebugLevel) + o := logpoller.NewORM(chainID, db, lggr) + lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts) + + // Now jump to block 500, but return error no matter how small the block range gets. + // Should exit the loop with a critical error instead of hanging. + head := &evmtypes.Head{Number: 500} + finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth} + headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() + filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { + if fq.BlockHash != nil { + return []types.Log{}, nil // succeed when single block requested + } + return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks + }) + defer filterCall.Unset() + + lp.PollAndSaveLogs(ctx, 298) + block, err := o.SelectLatestBlock(ctx) + require.NoError(t, err) + assert.Equal(t, int64(298), block.BlockNumber) + warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All() + crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All() + require.Len(t, warns, 4) + for i, s := range expected { + assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"]) } - return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks + + require.Len(t, crit, 1) + assert.Contains(t, crit[0].Message, "Too many log results in a single block") }) - lp.PollAndSaveLogs(ctx, 298) - block, err2 = o.SelectLatestBlock(ctx) - require.NoError(t, err2) - assert.Equal(t, int64(298), block.BlockNumber) - warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All() - crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All() - require.Len(t, warns, 4) - for i, s := range expected { - assert.Equal(t, s, warns[i].ContextMap()["newBatchSize"]) - } + t.Run("Context canceled return err", func(t *testing.T) { + lggr, obs := logger.TestObserved(t, zapcore.DebugLevel) + o := logpoller.NewORM(chainID, db, lggr) + lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts) + + head := &evmtypes.Head{Number: 300} + finalized := &evmtypes.Head{Number: head.Number - lpOpts.FinalityDepth} + headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() + filterCall := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { + if fq.ToBlock.Int64() >= 297 { + return nil, errors.New("context canceled") + } + return []types.Log{}, err + }) + defer filterCall.Unset() - require.Len(t, crit, 1) - assert.Contains(t, crit[0].Message, "Too many log results in a single block") + lp.PollAndSaveLogs(ctx, 5) + errs := obs.FilterMessageSnippet("Unable to query for logs").FilterLevelExact(zapcore.ErrorLevel).All() + require.Len(t, errs, 1) + assert.Equal(t, "context canceled", errs[0].ContextMap()["err"]) + }) } func Test_PollAndQueryFinalizedBlocks(t *testing.T) { From 3333386ce682920972c26a8a42dae9d01c568d1b Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata <96521086+bukata-sa@users.noreply.github.com> Date: Wed, 14 Aug 2024 19:19:14 +0100 Subject: [PATCH 2/4] Update gorgeous-carpets-grab.md --- .changeset/gorgeous-carpets-grab.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/gorgeous-carpets-grab.md b/.changeset/gorgeous-carpets-grab.md index 324b9f98149..0f72458e204 100644 --- a/.changeset/gorgeous-carpets-grab.md +++ b/.changeset/gorgeous-carpets-grab.md @@ -2,4 +2,4 @@ "chainlink": patch --- -Fix backfill error detection +#bugfix backfill error detection From 468b2b7a7979f33ae9341fd4432d8565efeadaf5 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Thu, 15 Aug 2024 10:49:24 +0100 Subject: [PATCH 3/4] fix --- core/chains/evm/logpoller/log_poller.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 59538f3812a..9906743e318 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -808,18 +808,19 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil)) if err != nil { var rpcErr *client.JsonError - if !pkgerrors.As(err, &rpcErr) || rpcErr.Code != jsonRpcLimitExceeded { + if pkgerrors.As(err, &rpcErr) { + if batchSize == 1 { + lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize) + return err + } + batchSize /= 2 + lp.lggr.Warnw("Too many log results, halving block range batch size. Consider increasing LogBackfillBatchSize if this happens frequently", "err", err, "from", from, "to", to, "newBatchSize", batchSize, "LogBackfillBatchSize", lp.backfillBatchSize) + from -= batchSize // counteract +=batchSize on next loop iteration, so starting block does not change + continue + } else { lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) return err } - if batchSize == 1 { - lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize) - return err - } - batchSize /= 2 - lp.lggr.Warnw("Too many log results, halving block range batch size. Consider increasing LogBackfillBatchSize if this happens frequently", "err", err, "from", from, "to", to, "newBatchSize", batchSize, "LogBackfillBatchSize", lp.backfillBatchSize) - from -= batchSize // counteract +=batchSize on next loop iteration, so starting block does not change - continue } if len(gethLogs) == 0 { continue From c06aaa8c7808c4fdfbd20be2a7cee3dc3947c1c9 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Thu, 15 Aug 2024 12:16:41 +0100 Subject: [PATCH 4/4] lint --- core/chains/evm/logpoller/log_poller.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 9906743e318..9d59ac08a3b 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -794,8 +794,6 @@ func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBl return lp.GetBlocksRange(ctx, numbers) } -const jsonRpcLimitExceeded = -32005 // See https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1474.md - // backfill will query FilterLogs in batches for logs in the // block range [start, end] and save them to the db. // Retries until ctx cancelled. Will return an error if cancelled @@ -817,10 +815,9 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { lp.lggr.Warnw("Too many log results, halving block range batch size. Consider increasing LogBackfillBatchSize if this happens frequently", "err", err, "from", from, "to", to, "newBatchSize", batchSize, "LogBackfillBatchSize", lp.backfillBatchSize) from -= batchSize // counteract +=batchSize on next loop iteration, so starting block does not change continue - } else { - lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) - return err } + lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) + return err } if len(gethLogs) == 0 { continue