From aff06135ae095821ab7eb2ddc4ac6a9b14742daa Mon Sep 17 00:00:00 2001 From: Chunkai Yang Date: Tue, 2 Apr 2024 16:04:18 -0400 Subject: [PATCH 1/2] Do not read res if http errors (#676) ## Motivation ## Solution --- .changeset/light-suns-build.md | 5 +++++ .../services/ocr2/plugins/ccip/tokendata/http/http_client.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 .changeset/light-suns-build.md diff --git a/.changeset/light-suns-build.md b/.changeset/light-suns-build.md new file mode 100644 index 0000000000..4adbc370fc --- /dev/null +++ b/.changeset/light-suns-build.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +Do not read res if http errors diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go index 2ab00e4384..79ec21b1b8 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go +++ b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go @@ -34,7 +34,7 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) return nil, http.StatusRequestTimeout, nil, tokendata.ErrTimeout } // On error, res is nil in most cases, do not read res.StatusCode, return BadRequest - return nil, http.StatusBadRequest, res.Header, err + return nil, http.StatusBadRequest, nil, err } defer res.Body.Close() From 9ed19c6d93404837c233d9210b456814ca8a23d0 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 21 Mar 2024 18:17:34 +0100 Subject: [PATCH 2/2] BCF-3059 Hardening LogPoller replay (#12484) * Hardening LogPoller replay * Fixing tests * Linter fixes * Post review fixes * Returning ErrReplayRequestAborted when context is cancelled --- .changeset/small-beers-perform.md | 5 ++ core/chains/evm/logpoller/log_poller.go | 43 +++++++++++++++- .../evm/logpoller/log_poller_internal_test.go | 49 ++++++++++++++----- core/chains/evm/logpoller/log_poller_test.go | 4 +- 4 files changed, 85 insertions(+), 16 deletions(-) create mode 100644 .changeset/small-beers-perform.md diff --git a/.changeset/small-beers-perform.md b/.changeset/small-beers-perform.md new file mode 100644 index 0000000000..a420116a44 --- /dev/null +++ b/.changeset/small-beers-perform.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Making LogPoller's replay more robust by backfilling up to finalized block and processing rest in the main loop diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 922762467e..0cdf08a7ae 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -357,7 +357,13 @@ func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQ // If ctx is cancelled before the replay request has been initiated, ErrReplayRequestAborted is returned. If the replay // is already in progress, the replay will continue and ErrReplayInProgress will be returned. If the client needs a // guarantee that the replay is complete before proceeding, it should either avoid cancelling or retry until nil is returned -func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { +func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { + defer func() { + if errors.Is(err, context.Canceled) { + err = ErrReplayRequestAborted + } + }() + lp.lggr.Debugf("Replaying from block %d", fromBlock) latest, err := lp.ec.HeadByNumber(ctx, nil) if err != nil { @@ -366,6 +372,27 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { if fromBlock < 1 || fromBlock > latest.Number { return errors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number) } + + // Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop. + // This is safe, because chain cannot be rewinded deeper than that, so there must not be any race conditions. + savedFinalizedBlockNumber, err := lp.savedFinalizedBlockNumber(ctx) + if err != nil { + return err + } + if fromBlock <= savedFinalizedBlockNumber { + err = lp.backfill(ctx, fromBlock, savedFinalizedBlockNumber) + if err != nil { + return err + } + } + + // Poll everything after latest finalized block in main loop to avoid concurrent writes during reorg + // We assume that number of logs between saved finalized block and current head is small enough to be processed in main loop + fromBlock = mathutil.Max(fromBlock, savedFinalizedBlockNumber+1) + // Don't continue if latest block number is the same as saved finalized block number + if fromBlock > latest.Number { + return nil + } // Block until replay notification accepted or cancelled. select { case lp.replayStart <- fromBlock: @@ -384,6 +411,20 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { } } +// savedFinalizedBlockNumber returns the FinalizedBlockNumber saved with the last processed block in the db +// (latestFinalizedBlock at the time the last processed block was saved) +// If this is the first poll and no blocks are in the db, it returns 0 +func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, error) { + latestProcessed, err := lp.LatestBlock(pg.WithParentCtx(ctx)) + if err == nil { + return latestProcessed.FinalizedBlockNumber, nil + } + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err +} + func (lp *logPoller) recvReplayComplete() { err := <-lp.replayComplete if err != nil { diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 863ab0fdde..cb1d4f7fbe 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -27,6 +27,7 @@ import ( evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" @@ -241,6 +242,7 @@ func TestLogPoller_Replay(t *testing.T) { chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + ctx := testutils.Context(t) head := evmtypes.Head{Number: 4} events := []common.Hash{EmitterABI.Events["Log1"].ID} @@ -256,7 +258,7 @@ func TestLogPoller_Replay(t *testing.T) { ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice() ec.On("ConfiguredChainID").Return(chainID, nil) lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20) @@ -265,12 +267,13 @@ func TestLogPoller_Replay(t *testing.T) { latest, err := lp.LatestBlock() require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) + require.Equal(t, int64(1), latest.FinalizedBlockNumber) t.Run("abort before replayStart received", func(t *testing.T) { // Replay() should abort immediately if caller's context is cancelled before request signal is read - ctx, cancel := context.WithCancel(testutils.Context(t)) + cancelCtx, cancel := context.WithCancel(testutils.Context(t)) cancel() - err = lp.Replay(ctx, 3) + err = lp.Replay(cancelCtx, 3) assert.ErrorIs(t, err, ErrReplayRequestAborted) }) @@ -285,12 +288,11 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return error code received from replayComplete t.Run("returns error code on replay complete", func(t *testing.T) { - ctx := testutils.Context(t) anyErr := errors.New("any error") done := make(chan struct{}) go func() { defer close(done) - recvStartReplay(ctx, 1) + recvStartReplay(ctx, 2) lp.replayComplete <- anyErr }() assert.ErrorIs(t, lp.Replay(ctx, 1), anyErr) @@ -299,14 +301,14 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return ErrReplayInProgress if caller's context is cancelled after replay has begun t.Run("late abort returns ErrReplayInProgress", func(t *testing.T) { - ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s + cancelCtx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s done := make(chan struct{}) go func() { defer close(done) - recvStartReplay(ctx, 4) + recvStartReplay(cancelCtx, 4) cancel() }() - assert.ErrorIs(t, lp.Replay(ctx, 4), ErrReplayInProgress) + assert.ErrorIs(t, lp.Replay(cancelCtx, 4), ErrReplayInProgress) <-done lp.replayComplete <- nil lp.wg.Wait() @@ -316,8 +318,6 @@ func TestLogPoller_Replay(t *testing.T) { t.Run("client abort doesnt hang run loop", func(t *testing.T) { lp.backupPollerNextBlock = 0 - ctx := testutils.Context(t) - pass := make(chan struct{}) cancelled := make(chan struct{}) @@ -372,7 +372,6 @@ func TestLogPoller_Replay(t *testing.T) { done := make(chan struct{}) defer func() { <-done }() - ctx := testutils.Context(t) ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { go func() { defer close(done) @@ -405,7 +404,7 @@ func TestLogPoller_Replay(t *testing.T) { lp.ReplayAsync(1) - recvStartReplay(testutils.Context(t), 1) + recvStartReplay(testutils.Context(t), 2) }) t.Run("ReplayAsync error", func(t *testing.T) { @@ -427,6 +426,32 @@ func TestLogPoller_Replay(t *testing.T) { require.Equal(t, 1, observedLogs.Len()) assert.Equal(t, observedLogs.All()[0].Message, anyErr.Error()) }) + + t.Run("run regular replay when there are not blocks in db", func(t *testing.T) { + err := lp.orm.DeleteLogsAndBlocksAfter(0) + require.NoError(t, err) + + lp.ReplayAsync(1) + recvStartReplay(testutils.Context(t), 1) + }) + + t.Run("run only backfill when everything is finalized", func(t *testing.T) { + err := lp.orm.DeleteLogsAndBlocksAfter(0) + require.NoError(t, err) + + err = lp.orm.InsertLogsWithBlock([]Log{}, LogPollerBlock{ + EvmChainId: ubig.New(chainID), + BlockHash: head.Hash, + BlockNumber: head.Number, + BlockTimestamp: head.Timestamp, + FinalizedBlockNumber: head.Number, + CreatedAt: time.Time{}, + }) + require.NoError(t, err) + + err = lp.Replay(ctx, 1) + require.NoError(t, err) + }) } func (lp *logPoller) reset() { diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 4fec4b4fa0..7987c01619 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1385,7 +1385,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) require.Eventually(t, func() bool { - return observedLogs.Len() >= 5 + return observedLogs.Len() >= 1 }, 2*time.Second, 20*time.Millisecond) lp.Close() @@ -1401,8 +1401,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { assert.Contains(t, logMsgs, "SQL ERROR") assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later") - assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock") - assert.Contains(t, logMsgs, "Backup log poller ran before filters loaded, skipping") } type getLogErrData struct {