From 27d941328655e0cde608c1eff47de736c11e2e58 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:28:55 +0200 Subject: [PATCH] LogPoller CLI command to resolve reorg greater than finality depth (#12867) * find lca and remove block after CLI * fix sort.Find typo * make RemoveBlocks local cmd * tests * added changeset * added tags to the changeset * fixed tests * make cmds, vars cases consistent --- .changeset/brave-dots-breathe.md | 7 ++ core/chains/evm/logpoller/disabled.go | 8 ++ core/chains/evm/logpoller/log_poller.go | 99 +++++++++++++++ core/chains/evm/logpoller/log_poller_test.go | 116 ++++++++++++++++++ core/chains/evm/logpoller/mocks/log_poller.go | 48 ++++++++ core/chains/evm/logpoller/observability.go | 6 + core/chains/evm/logpoller/orm.go | 9 ++ core/chains/evm/logpoller/orm_test.go | 30 +++++ core/cmd/blocks_commands.go | 58 +++++++++ core/cmd/blocks_commands_test.go | 25 ++++ core/cmd/shell_local.go | 79 ++++++++++++ core/cmd/shell_local_test.go | 56 +++++++++ core/internal/mocks/application.go | 50 ++++++++ core/services/chainlink/application.go | 43 +++++++ core/web/api.go | 2 +- core/web/lca_controller.go | 74 +++++++++++ core/web/lca_controller_test.go | 29 +++++ core/web/router.go | 2 + testdata/scripts/blocks/help.txtar | 3 +- testdata/scripts/help-all/help-all.txtar | 2 + testdata/scripts/node/help.txtar | 1 + 21 files changed, 745 insertions(+), 2 deletions(-) create mode 100644 .changeset/brave-dots-breathe.md create mode 100644 core/web/lca_controller.go create mode 100644 core/web/lca_controller_test.go diff --git a/.changeset/brave-dots-breathe.md b/.changeset/brave-dots-breathe.md new file mode 100644 index 00000000000..f1ae4f4d21e --- /dev/null +++ b/.changeset/brave-dots-breathe.md @@ -0,0 +1,7 @@ +--- +"chainlink": minor +--- + +Added a new CLI command, `blocks find-lca,` which finds the latest block that is available in both the database and on the chain for the specified chain. +Added a new CLI command, `node remove-blocks,` which removes all blocks and logs greater than or equal to the specified block number. +#nops #added diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index f3e64378384..6f95b9c55da 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -114,3 +114,11 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, from func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) { return nil, ErrDisabled } + +func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) { + return nil, ErrDisabled +} + +func (d disabled) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + return ErrDisabled +} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 7592ec104c4..cd26889627f 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -44,6 +44,8 @@ type LogPoller interface { GetFilters() map[string]Filter LatestBlock(ctx context.Context) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error) + FindLCA(ctx context.Context) (*LogPollerBlock, error) + DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // General querying Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) @@ -1422,6 +1424,103 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(ctx context.Context, address c return lp.orm.SelectIndexedLogsWithSigsExcluding(ctx, eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs) } +// DeleteLogsAndBlocksAfter - removes blocks and logs starting from the specified block +func (lp *logPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + return lp.orm.DeleteLogsAndBlocksAfter(ctx, start) +} + +func (lp *logPoller) FindLCA(ctx context.Context) (*LogPollerBlock, error) { + latest, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + return nil, fmt.Errorf("failed to select the latest block: %w", err) + } + + oldest, err := lp.orm.SelectOldestBlock(ctx, 0) + if err != nil { + return nil, fmt.Errorf("failed to select the oldest block: %w", err) + } + + if latest == nil || oldest == nil { + return nil, fmt.Errorf("expected at least one block to be present in DB") + } + + lp.lggr.Debugf("Received request to find LCA. Searching in range [%d, %d]", oldest.BlockNumber, latest.BlockNumber) + + // Find the largest block number for which block hash stored in the DB matches one that we get from the RPC. + // `sort.Find` expects slice of following format s = [1, 0, -1] and returns smallest index i for which s[i] = 0. + // To utilise `sort.Find` we represent range of blocks as slice [latestBlock, latestBlock-1, ..., olderBlock+1, oldestBlock] + // and return 1 if DB block was reorged or 0 if it's still present on chain. + lcaI, found := sort.Find(int(latest.BlockNumber-oldest.BlockNumber)+1, func(i int) int { + const notFound = 1 + const found = 0 + // if there is an error - stop the search + if err != nil { + return notFound + } + + // canceled search + if ctx.Err() != nil { + err = fmt.Errorf("aborted, FindLCA request cancelled: %w", ctx.Err()) + return notFound + } + iBlockNumber := latest.BlockNumber - int64(i) + var dbBlock *LogPollerBlock + // Block with specified block number might not exist in the database, to address that we check closest child + // of the iBlockNumber. If the child is present on chain, it's safe to assume that iBlockNumber is present too + dbBlock, err = lp.orm.SelectOldestBlock(ctx, iBlockNumber) + if err != nil { + err = fmt.Errorf("failed to select block %d by number: %w", iBlockNumber, err) + return notFound + } + + if dbBlock == nil { + err = fmt.Errorf("expected block to exist with blockNumber >= %d as observed block with number %d", iBlockNumber, latest.BlockNumber) + return notFound + } + + lp.lggr.Debugf("Looking for matching block on chain blockNumber: %d blockHash: %s", + dbBlock.BlockNumber, dbBlock.BlockHash) + var chainBlock *evmtypes.Head + chainBlock, err = lp.ec.HeadByHash(ctx, dbBlock.BlockHash) + // our block in DB does not exist on chain + if (chainBlock == nil && err == nil) || errors.Is(err, ethereum.NotFound) { + err = nil + return notFound + } + if err != nil { + err = fmt.Errorf("failed to get block %s from RPC: %w", dbBlock.BlockHash, err) + return notFound + } + + if chainBlock.BlockNumber() != dbBlock.BlockNumber { + err = fmt.Errorf("expected block numbers to match (db: %d, chain: %d), if block hashes match "+ + "(db: %s, chain: %s)", dbBlock.BlockNumber, chainBlock.BlockNumber(), dbBlock.BlockHash, chainBlock.Hash) + return notFound + } + + return found + }) + if err != nil { + return nil, fmt.Errorf("failed to find: %w", err) + } + + if !found { + return nil, fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") + } + + lcaBlockNumber := latest.BlockNumber - int64(lcaI) + lca, err := lp.orm.SelectBlockByNumber(ctx, lcaBlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to select lca from db: %w", err) + } + + if lca == nil { + return nil, fmt.Errorf("expected lca (blockNum: %d) to exist in DB", lcaBlockNumber) + } + + return lca, nil +} + func EvmWord(i uint64) common.Hash { var b = make([]byte, 8) binary.BigEndian.PutUint64(b, i) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 74ec41fa85a..cb211043a4c 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1921,3 +1921,119 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H require.NoError(t, err) th.Client.Blockchain().SetFinalized(b.Header()) } + +func TestFindLCA(t *testing.T) { + ctx := testutils.Context(t) + ec := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.Test(t) + chainID := testutils.NewRandomEVMChainID() + db := pgtest.NewSqlxDB(t) + + orm := logpoller.NewORM(chainID, db, lggr) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + KeepFinalizedBlocksDepth: 1000, + } + + lp := logpoller.NewLogPoller(orm, ec, lggr, lpOpts) + t.Run("Fails, if failed to select oldest block", func(t *testing.T) { + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "failed to select the latest block") + }) + // oldest + require.NoError(t, orm.InsertBlock(ctx, common.HexToHash("0x123"), 10, time.Now(), 0)) + // latest + latestBlockHash := common.HexToHash("0x124") + require.NoError(t, orm.InsertBlock(ctx, latestBlockHash, 16, time.Now(), 0)) + t.Run("Fails, if caller's context canceled", func(t *testing.T) { + lCtx, cancel := context.WithCancel(ctx) + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, nil).Run(func(_ mock.Arguments) { + cancel() + }).Once() + _, err := lp.FindLCA(lCtx) + require.ErrorContains(t, err, "aborted, FindLCA request cancelled") + + }) + t.Run("Fails, if RPC returns an error", func(t *testing.T) { + expectedError := fmt.Errorf("failed to call RPC") + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, expectedError).Once() + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, expectedError.Error()) + }) + t.Run("Fails, if block numbers do not match", func(t *testing.T) { + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(&evmtypes.Head{ + Number: 123, + }, nil).Once() + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "expected block numbers to match") + }) + t.Run("Fails, if none of the blocks in db matches on chain", func(t *testing.T) { + ec.On("HeadByHash", mock.Anything, mock.Anything).Return(nil, nil).Times(3) + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") + }) + + type block struct { + BN int + Exists bool + } + testCases := []struct { + Name string + Blocks []block + ExpectedBlockNumber int + ExpectedError error + }{ + { + Name: "All of the blocks are present on chain - returns the latest", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: true}}, + ExpectedBlockNumber: 4, + }, + { + Name: "None of the blocks exists on chain - returns an erro", + Blocks: []block{{BN: 1, Exists: false}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 0, + ExpectedError: fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured"), + }, + { + Name: "Only latest block does not exist", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 3, + }, + { + Name: "Only oldest block exists on chain", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 1, + }, + } + + blockHashI := int64(0) + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + // reset the database + require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 0)) + for _, b := range tc.Blocks { + blockHashI++ + hash := common.BigToHash(big.NewInt(blockHashI)) + require.NoError(t, orm.InsertBlock(ctx, hash, int64(b.BN), time.Now(), 0)) + // Hashes are unique for all test cases + var onChainBlock *evmtypes.Head + if b.Exists { + onChainBlock = &evmtypes.Head{Number: int64(b.BN)} + } + ec.On("HeadByHash", mock.Anything, hash).Return(onChainBlock, nil).Maybe() + } + + result, err := lp.FindLCA(ctx) + if tc.ExpectedError != nil { + require.ErrorContains(t, err, tc.ExpectedError.Error()) + } else { + require.NotNil(t, result) + require.Equal(t, result.BlockNumber, int64(tc.ExpectedBlockNumber), "expected block numbers to match") + } + }) + } +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 548e9ca3b90..ef3f4dbd428 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -37,6 +37,54 @@ func (_m *LogPoller) Close() error { return r0 } +// DeleteLogsAndBlocksAfter provides a mock function with given fields: ctx, start +func (_m *LogPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + ret := _m.Called(ctx, start) + + if len(ret) == 0 { + panic("no return value specified for DeleteLogsAndBlocksAfter") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, start) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// FindLCA provides a mock function with given fields: ctx +func (_m *LogPoller) FindLCA(ctx context.Context) (*logpoller.LogPollerBlock, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FindLCA") + } + + var r0 *logpoller.LogPollerBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*logpoller.LogPollerBlock, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *logpoller.LogPollerBlock); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpoller.LogPollerBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetBlocksRange provides a mock function with given fields: ctx, numbers func (_m *LogPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]logpoller.LogPollerBlock, error) { ret := _m.Called(ctx, numbers) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 14dec5274ad..8f3cdfe185e 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -151,6 +151,12 @@ func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, e }) } +func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { + return withObservedQuery(o, "SelectOldestBlock", func() (*LogPollerBlock, error) { + return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber) + }) +} + func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 838a38c8ebb..5e0a74a9183 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -38,6 +38,7 @@ type ORM interface { SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) + SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) @@ -202,6 +203,14 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) return &b, nil } +func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { + var b LogPollerBlock + if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { + return nil, err + } + return &b, nil +} + func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withConfs(confs). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 8a45ff2f1c5..2a1be62dd5b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1759,3 +1759,33 @@ func Benchmark_DeleteExpiredLogs(b *testing.B) { assert.NoError(b, err1) } } + +func TestSelectOldestBlock(t *testing.T) { + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) + t.Run("Selects oldest within given chain", func(t *testing.T) { + // insert blocks + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 0)) + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1232"), 12, time.Now(), 0)) + // insert newer block from different chain + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 14, time.Now(), 0)) + block, err := o1.SelectOldestBlock(ctx, 0) + require.NoError(t, err) + require.NotNil(t, block) + require.Equal(t, block.BlockNumber, int64(13)) + require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) + }) + t.Run("Does not select blocks older than specified limit", func(t *testing.T) { + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1234"), 15, time.Now(), 0)) + block, err := o1.SelectOldestBlock(ctx, 12) + require.NoError(t, err) + require.NotNil(t, block) + require.Equal(t, block.BlockNumber, int64(13)) + require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) + }) +} diff --git a/core/cmd/blocks_commands.go b/core/cmd/blocks_commands.go index 72b0523e18d..158caf253ab 100644 --- a/core/cmd/blocks_commands.go +++ b/core/cmd/blocks_commands.go @@ -9,6 +9,8 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli" "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/v2/core/web" ) func initBlocksSubCmds(s *Shell) []cli.Command { @@ -34,6 +36,18 @@ func initBlocksSubCmds(s *Shell) []cli.Command { }, }, }, + { + Name: "find-lca", + Usage: "Find latest common block stored in DB and on chain", + Action: s.FindLCA, + Flags: []cli.Flag{ + cli.Int64Flag{ + Name: "evm-chain-id", + Usage: "Chain ID of the EVM-based blockchain", + Required: true, + }, + }, + }, } } @@ -75,3 +89,47 @@ func (s *Shell) ReplayFromBlock(c *cli.Context) (err error) { fmt.Println("Replay started") return nil } + +// LCAPresenter implements TableRenderer for an LCAResponse. +type LCAPresenter struct { + web.LCAResponse +} + +// ToRow presents the EVMChainResource as a slice of strings. +func (p *LCAPresenter) ToRow() []string { + return []string{p.EVMChainID.String(), p.Hash, strconv.FormatInt(p.BlockNumber, 10)} +} + +// RenderTable implements TableRenderer +// Just renders a single row +func (p LCAPresenter) RenderTable(rt RendererTable) error { + renderList([]string{"ChainID", "Block Hash", "Block Number"}, [][]string{p.ToRow()}, rt.Writer) + + return nil +} + +// FindLCA finds last common block stored in DB and on chain. +func (s *Shell) FindLCA(c *cli.Context) (err error) { + v := url.Values{} + + if c.IsSet("evm-chain-id") { + v.Add("evmChainID", fmt.Sprintf("%d", c.Int64("evm-chain-id"))) + } + + resp, err := s.HTTP.Get(s.ctx(), + fmt.Sprintf( + "/v2/find_lca?%s", + v.Encode(), + )) + if err != nil { + return s.errorOut(err) + } + + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + err = multierr.Append(err, cerr) + } + }() + + return s.renderAPIResponse(resp, &LCAPresenter{}, "Last Common Ancestor") +} diff --git a/core/cmd/blocks_commands_test.go b/core/cmd/blocks_commands_test.go index 30540748cb1..f7656b94ae1 100644 --- a/core/cmd/blocks_commands_test.go +++ b/core/cmd/blocks_commands_test.go @@ -41,3 +41,28 @@ func Test_ReplayFromBlock(t *testing.T) { c = cli.NewContext(nil, set, nil) require.NoError(t, client.ReplayFromBlock(c)) } + +func Test_FindLCA(t *testing.T) { + t.Parallel() + + //ethClient.On("BalanceAt", mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(42), nil) + app := startNewApplicationV2(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].ChainID = (*ubig.Big)(big.NewInt(5)) + c.EVM[0].Enabled = ptr(true) + }) + + client, _ := app.NewShellAndRenderer() + + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(client.FindLCA, set, "") + + //Incorrect chain ID + require.NoError(t, set.Set("evm-chain-id", "1")) + c := cli.NewContext(nil, set, nil) + require.ErrorContains(t, client.FindLCA(c), "does not match any local chains") + + //Correct chain ID + require.NoError(t, set.Set("evm-chain-id", "5")) + c = cli.NewContext(nil, set, nil) + require.ErrorContains(t, client.FindLCA(c), "FindLCA is only available if LogPoller is enabled") +} diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 24cb43e2090..7c9c025d4be 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -34,6 +34,7 @@ import ( "github.com/jmoiron/sqlx" cutils "github.com/smartcontractkit/chainlink-common/pkg/utils" + "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -253,6 +254,23 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { }, }, }, + { + Name: "remove-blocks", + Usage: "Deletes block range and all associated data", + Action: s.RemoveBlocks, + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "start", + Usage: "Beginning of block range to be deleted", + Required: true, + }, + cli.Int64Flag{ + Name: "evm-chain-id", + Usage: "Chain ID of the EVM-based blockchain", + Required: true, + }, + }, + }, } } @@ -1180,3 +1198,64 @@ func insertFixtures(dbURL url.URL, pathToFixtures string) (err error) { _, err = db.Exec(string(fixturesSQL)) return err } + +// RemoveBlocks - removes blocks after the specified blocks number +func (s *Shell) RemoveBlocks(c *cli.Context) error { + start := c.Int64("start") + if start <= 0 { + return s.errorOut(errors.New("Must pass a positive value in '--start' parameter")) + } + + chainID := big.NewInt(0) + if c.IsSet("evm-chain-id") { + err := chainID.UnmarshalText([]byte(c.String("evm-chain-id"))) + if err != nil { + return s.errorOut(err) + } + } + + cfg := s.Config + err := cfg.Validate() + if err != nil { + return s.errorOut(fmt.Errorf("error validating configuration: %+v", err)) + } + + lggr := logger.Sugared(s.Logger.Named("RemoveBlocks")) + ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) + ctx, cancel := context.WithCancel(context.Background()) + go shutdown.HandleShutdown(func(sig string) { + cancel() + lggr.Info("received signal to stop - closing the database and releasing lock") + + if cErr := ldb.Close(); cErr != nil { + lggr.Criticalf("Failed to close LockedDB: %v", cErr) + } + + if cErr := s.CloseLogger(); cErr != nil { + log.Printf("Failed to close Logger: %v", cErr) + } + }) + + if err = ldb.Open(ctx); err != nil { + // If not successful, we know neither locks nor connection remains opened + return s.errorOut(errors.Wrap(err, "opening db")) + } + defer lggr.ErrorIfFn(ldb.Close, "Error closing db") + + // From now on, DB locks and DB connection will be released on every return. + // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. + + app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, ldb.DB()) + if err != nil { + return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) + } + + err = app.DeleteLogPollerDataAfter(ctx, chainID, start) + if err != nil { + return s.errorOut(err) + } + + lggr.Infof("RemoveBlocks: successfully removed blocks") + + return nil +} diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 7427e6caedb..e7322e513ae 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -2,6 +2,7 @@ package cmd_test import ( "flag" + "fmt" "math/big" "os" "strconv" @@ -514,3 +515,58 @@ func TestShell_CleanupChainTables(t *testing.T) { c := cli.NewContext(nil, set, nil) require.NoError(t, client.CleanupChainTables(c)) } + +func TestShell_RemoveBlocks(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + s.Password.Keystore = models.NewSecret("dummy") + c.EVM[0].Nodes[0].Name = ptr("fake") + c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") + c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") + // seems to be needed for config validate + c.Insecure.OCRDevelopmentMode = nil + }) + + lggr := logger.TestLogger(t) + + app := mocks.NewApplication(t) + app.On("GetSqlxDB").Maybe().Return(db) + shell := cmd.Shell{ + Config: cfg, + AppFactory: cltest.InstanceAppFactory{App: app}, + FallbackAPIInitializer: cltest.NewMockAPIInitializer(t), + Runner: cltest.EmptyRunner{}, + Logger: lggr, + } + + t.Run("Returns error, if --start is not positive", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "0")) + require.NoError(t, set.Set("evm-chain-id", "12")) + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.ErrorContains(t, err, "Must pass a positive value in '--start' parameter") + }) + t.Run("Returns error, if removal fails", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "10000")) + require.NoError(t, set.Set("evm-chain-id", "12")) + expectedError := fmt.Errorf("failed to delete log poller's data") + app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(expectedError).Once() + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.ErrorContains(t, err, expectedError.Error()) + }) + t.Run("Happy path", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "10000")) + require.NoError(t, set.Set("evm-chain-id", "12")) + app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(nil).Once() + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.NoError(t, err) + }) +} diff --git a/core/internal/mocks/application.go b/core/internal/mocks/application.go index c83b37a0e5d..f845d46ca8d 100644 --- a/core/internal/mocks/application.go +++ b/core/internal/mocks/application.go @@ -23,6 +23,8 @@ import ( logger "github.com/smartcontractkit/chainlink/v2/core/logger" + logpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + mock "github.com/stretchr/testify/mock" pipeline "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -147,6 +149,24 @@ func (_m *Application) DeleteJob(ctx context.Context, jobID int32) error { return r0 } +// DeleteLogPollerDataAfter provides a mock function with given fields: ctx, chainID, start +func (_m *Application) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { + ret := _m.Called(ctx, chainID, start) + + if len(ret) == 0 { + panic("no return value specified for DeleteLogPollerDataAfter") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int, int64) error); ok { + r0 = rf(ctx, chainID, start) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // EVMORM provides a mock function with given fields: func (_m *Application) EVMORM() types.Configs { ret := _m.Called() @@ -167,6 +187,36 @@ func (_m *Application) EVMORM() types.Configs { return r0 } +// FindLCA provides a mock function with given fields: ctx, chainID +func (_m *Application) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for FindLCA") + } + + var r0 *logpoller.LogPollerBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*logpoller.LogPollerBlock, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *logpoller.LogPollerBlock); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpoller.LogPollerBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetAuditLogger provides a mock function with given fields: func (_m *Application) GetAuditLogger() audit.AuditLogger { ret := _m.Called() diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 2aebef3f8f7..ae3db2e7a73 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -22,7 +22,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -115,6 +117,11 @@ type Application interface { ID() uuid.UUID SecretGenerator() SecretGenerator + + // FindLCA - finds last common ancestor for LogPoller's chain available in the database and RPC chain + FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) + // DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block + DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error } // ChainlinkApplication contains fields for the JobSubscriber, Scheduler, @@ -886,3 +893,39 @@ func (app *ChainlinkApplication) GetWebAuthnConfiguration() sessions.WebAuthnCon func (app *ChainlinkApplication) ID() uuid.UUID { return app.Config.AppID() } + +// FindLCA - finds last common ancestor +func (app *ChainlinkApplication) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { + chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) + if err != nil { + return nil, err + } + if !app.Config.Feature().LogPoller() { + return nil, fmt.Errorf("FindLCA is only available if LogPoller is enabled") + } + + lca, err := chain.LogPoller().FindLCA(ctx) + if err != nil { + return nil, fmt.Errorf("failed to find lca: %w", err) + } + + return lca, nil +} + +// DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block +func (app *ChainlinkApplication) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { + chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) + if err != nil { + return err + } + if !app.Config.Feature().LogPoller() { + return fmt.Errorf("DeleteLogPollerDataAfter is only available if LogPoller is enabled") + } + + err = chain.LogPoller().DeleteLogsAndBlocksAfter(ctx, start) + if err != nil { + return fmt.Errorf("failed to recover LogPoller: %w", err) + } + + return nil +} diff --git a/core/web/api.go b/core/web/api.go index 1f97d59c77d..51f7b855cd5 100644 --- a/core/web/api.go +++ b/core/web/api.go @@ -120,7 +120,7 @@ func ParsePaginatedResponse(input []byte, resource interface{}, links *jsonapi.L func parsePaginatedResponseToDocument(input []byte, resource interface{}, document *jsonapi.Document) error { err := ParseJSONAPIResponse(input, resource) if err != nil { - return errors.Wrap(err, "ParseJSONAPIResponse error") + return errors.Wrapf(err, "ParseJSONAPIResponse error body: %s", string(input)) } // Unmarshal using the stdlib Unmarshal to extract the links part of the document diff --git a/core/web/lca_controller.go b/core/web/lca_controller.go new file mode 100644 index 00000000000..bb4866c3d08 --- /dev/null +++ b/core/web/lca_controller.go @@ -0,0 +1,74 @@ +package web + +import ( + "errors" + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" +) + +type LCAController struct { + App chainlink.Application +} + +// FindLCA compares chain of blocks available in the DB with chain provided by an RPC and returns last common ancestor +// Example: +// +// "/v2/find_lca" +func (bdc *LCAController) FindLCA(c *gin.Context) { + chain, err := getChain(bdc.App.GetRelayers().LegacyEVMChains(), c.Query("evmChainID")) + if err != nil { + if errors.Is(err, ErrInvalidChainID) || errors.Is(err, ErrMultipleChains) || errors.Is(err, ErrMissingChainID) { + jsonAPIError(c, http.StatusUnprocessableEntity, err) + return + } + jsonAPIError(c, http.StatusInternalServerError, err) + return + } + chainID := chain.ID() + + lca, err := bdc.App.FindLCA(c.Request.Context(), chainID) + if err != nil { + jsonAPIError(c, http.StatusInternalServerError, err) + return + } + + if lca == nil { + jsonAPIError(c, http.StatusNotFound, fmt.Errorf("failed to find last common ancestor")) + return + } + + response := LCAResponse{ + BlockNumber: lca.BlockNumber, + Hash: lca.BlockHash.String(), + EVMChainID: big.New(chainID), + } + jsonAPIResponse(c, &response, "response") + +} + +type LCAResponse struct { + BlockNumber int64 `json:"blockNumber"` + Hash string `json:"hash"` + EVMChainID *big.Big `json:"evmChainID"` +} + +// GetID returns the jsonapi ID. +func (s LCAResponse) GetID() string { + return "LCAResponseID" +} + +// GetName returns the collection name for jsonapi. +func (LCAResponse) GetName() string { + return "lca_response" +} + +// SetID is used to conform to the UnmarshallIdentifier interface for +// deserializing from jsonapi documents. +func (*LCAResponse) SetID(string) error { + return nil +} diff --git a/core/web/lca_controller_test.go b/core/web/lca_controller_test.go new file mode 100644 index 00000000000..7ec476e8eca --- /dev/null +++ b/core/web/lca_controller_test.go @@ -0,0 +1,29 @@ +package web_test + +import ( + _ "embed" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" +) + +func TestLCAController_FindLCA(t *testing.T) { + cfg := configtest.NewTestGeneralConfig(t) + ec := setupEthClientForControllerTests(t) + app := cltest.NewApplicationWithConfigAndKey(t, cfg, cltest.DefaultP2PKey, ec) + require.NoError(t, app.Start(testutils.Context(t))) + client := app.NewHTTPClient(nil) + resp, cleanup := client.Get("/v2/find_lca?evmChainID=1") + t.Cleanup(cleanup) + assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Contains(t, string(b), "chain id does not match any local chains") +} diff --git a/core/web/router.go b/core/web/router.go index c327583a005..158ea4b411f 100644 --- a/core/web/router.go +++ b/core/web/router.go @@ -292,6 +292,8 @@ func v2Routes(app chainlink.Application, r *gin.RouterGroup) { rc := ReplayController{app} authv2.POST("/replay_from_block/:number", auth.RequiresRunRole(rc.ReplayFromBlock)) + lcaC := LCAController{app} + authv2.GET("/find_lca", auth.RequiresRunRole(lcaC.FindLCA)) csakc := CSAKeysController{app} authv2.GET("/keys/csa", csakc.Index) diff --git a/testdata/scripts/blocks/help.txtar b/testdata/scripts/blocks/help.txtar index 55aaf71858d..5d362a082fd 100644 --- a/testdata/scripts/blocks/help.txtar +++ b/testdata/scripts/blocks/help.txtar @@ -9,7 +9,8 @@ USAGE: chainlink blocks command [command options] [arguments...] COMMANDS: - replay Replays block data from the given number + replay Replays block data from the given number + find-lca Find latest common block stored in DB and on chain OPTIONS: --help, -h show help diff --git a/testdata/scripts/help-all/help-all.txtar b/testdata/scripts/help-all/help-all.txtar index eeaf0da98d1..e111295abb4 100644 --- a/testdata/scripts/help-all/help-all.txtar +++ b/testdata/scripts/help-all/help-all.txtar @@ -16,6 +16,7 @@ admin users list # Lists all API users and their roles attempts # Commands for managing Ethereum Transaction Attempts attempts list # List the Transaction Attempts in descending order blocks # Commands for managing blocks +blocks find-lca # Find latest common block stored in DB and on chain blocks replay # Replays block data from the given number bridges # Commands for Bridges communicating with External Adapters bridges create # Create a new Bridge to an External Adapter @@ -132,6 +133,7 @@ node db status # Display the current database migration status. node db version # Display the current database version. node profile # Collects profile metrics from the node. node rebroadcast-transactions # Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue +node remove-blocks # Deletes block range and all associated data node start # Run the Chainlink node node status # Displays the health of various services running inside the node. node validate # Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included diff --git a/testdata/scripts/node/help.txtar b/testdata/scripts/node/help.txtar index 33e1fdc90bc..875500b13df 100644 --- a/testdata/scripts/node/help.txtar +++ b/testdata/scripts/node/help.txtar @@ -13,6 +13,7 @@ COMMANDS: rebroadcast-transactions Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue validate Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included db Commands for managing the database. + remove-blocks Deletes block range and all associated data OPTIONS: --config value, -c value TOML configuration file(s) via flag, or raw TOML via env var. If used, legacy env vars must not be set. Multiple files can be used (-c configA.toml -c configB.toml), and they are applied in order with duplicated fields overriding any earlier values. If the 'CL_CONFIG' env var is specified, it is always processed last with the effect of being the final override. [$CL_CONFIG]