From aec7ffe804fa54bacb3e44917d150885ec3e39e8 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 22 May 2024 10:01:06 -0700 Subject: [PATCH] Update BackupLogPoller test Add RegisterHeadNumberCallback() to SimulatedBackendClient, so we can trigger an rpc failover event just after reading a particular block, but before the logs get read for that block. This is the race condition that can happen on optimism chain which BackupPoller was designed to address --- 2 | 7 + .../evm/client/simulated_backend_client.go | 27 +++- core/chains/evm/logpoller/helper_test.go | 5 +- core/chains/evm/logpoller/log_poller.go | 1 + core/chains/evm/logpoller/log_poller_test.go | 133 +++++++++++------- core/chains/evm/types/types.go | 14 ++ 6 files changed, 128 insertions(+), 59 deletions(-) create mode 100644 2 diff --git a/2 b/2 new file mode 100644 index 00000000000..be07b572723 --- /dev/null +++ b/2 @@ -0,0 +1,7 @@ +SELECT FROM ( + SELECT id, block_number, block_hash, log_index + FROM evm.log_poller_filters f JOIN evm.logs l ON + f.evm_chain_id = l.evm_chain_id AND f.address = l.address AND f.event = l.event_sig + WHERE f.evm_chain_id=5 + GROUP BY f.id, l.block_number, l.block_hash, l.log_index +) x order by block_number, log_index; diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index 7ff4f0ec6ac..7ba2c045ce8 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -66,11 +66,12 @@ var ( // SimulatedBackendClient is an Client implementation using a simulated // blockchain backend. Note that not all RPC methods are implemented here. type SimulatedBackendClient struct { - b *simulated.Backend - client simulated.Client - t testing.TB - chainId *big.Int - optimismMode bool + b evmtypes.Backend // *simulated.Backend, or something satisfying same interface + client simulated.Client + t testing.TB + chainId *big.Int + optimismMode bool + headByNumberCallback func(ctx context.Context, c *SimulatedBackendClient, n *big.Int) error } // NewSimulatedBackendClient creates an eth client backed by a simulated backend. @@ -88,7 +89,7 @@ func NewSimulatedBackendClient(t testing.TB, b *simulated.Backend, chainId *big. // where success rather than an error code is returned when a call to FilterLogs() fails to find the block hash // requested. This combined with a failover event can lead to the "eventual consistency" behavior that Backup LogPoller // and other solutions were designed to recover from. -func (c *SimulatedBackendClient) SetBackend(backend *simulated.Backend, optimismMode bool) { +func (c *SimulatedBackendClient) SetBackend(backend evmtypes.Backend, optimismMode bool) { c.optimismMode = optimismMode c.b = backend c.client = backend.Client() @@ -229,6 +230,10 @@ func (c *SimulatedBackendClient) blockNumber(ctx context.Context, number interfa } } +func (c *SimulatedBackendClient) RegisterHeadByNumberCallback(cb func(ctx context.Context, c *SimulatedBackendClient, n *big.Int) error) { + c.headByNumberCallback = cb +} + // HeadByNumber returns our own header type. func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) { if n == nil { @@ -240,6 +245,14 @@ func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) ( } else if header == nil { return nil, ethereum.NotFound } + + if c.headByNumberCallback != nil { + err = c.headByNumberCallback(ctx, c, n) + if err != nil { + return nil, err + } + } + return &evmtypes.Head{ EVMChainID: ubig.NewI(c.chainId.Int64()), Hash: header.Hash(), @@ -530,7 +543,7 @@ func (c *SimulatedBackendClient) SuggestGasTipCap(ctx context.Context) (tipCap * return c.client.SuggestGasTipCap(ctx) } -func (c *SimulatedBackendClient) Backend() *simulated.Backend { +func (c *SimulatedBackendClient) Backend() evmtypes.Backend { return c.b } diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 6da7d20dcdb..652ddf65dd1 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" @@ -38,7 +39,7 @@ type TestHarness struct { ORM, ORM2 logpoller.ORM LogPoller logpoller.LogPollerTest Client *client.SimulatedBackendClient - Backend *simulated.Backend + Backend evmtypes.Backend Owner *bind.TransactOpts Emitter1, Emitter2 *log_emitter.LogEmitter EmitterAddress1, EmitterAddress2 common.Address @@ -117,7 +118,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { // Simulates an RPC failover event to an alternate rpc server. This can also be used to // simulate switching back to the primary rpc after it recovers. -func (th *TestHarness) SetActiveClient(backend *simulated.Backend, optimismMode bool) { +func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, optimismMode bool) { th.Backend = backend th.Client.SetBackend(backend, optimismMode) } diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b1d7d1da623..19115cd75f0 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -27,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 395ecfc646b..fd808a50424 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient/simulated" "github.com/leanovate/gopter" @@ -224,6 +223,49 @@ func TestLogPoller_Integration(t *testing.T) { assert.ErrorIs(t, th.LogPoller.Replay(ctx, 4), logpoller.ErrReplayRequestAborted) } +type backendWrapper struct { + b *simulated.Backend +} + +func (b backendWrapper) Client() simulated.Client { + return b.b.Client() +} + +func (b backendWrapper) Close() error { + return b.b.Close() +} + +func (b backendWrapper) Commit() common.Hash { + return b.b.Commit() +} + +func (b backendWrapper) Rollback() { + b.b.Rollback() +} + +func (b backendWrapper) Fork(parentHash common.Hash) error { + return b.b.Fork(parentHash) +} + +func (b backendWrapper) AdjustTime(adjustment time.Duration) error { + return b.b.AdjustTime(adjustment) +} + +// Behaves as primary simulated.Client until failoverBlockNumber is requested (via HeaderByNumber), then +// fails over to secondary simulated.Client. +type failoverClient struct { + primary simulated.Client + secondary simulated.Client + failoverBlockNumber *big.Int +} + +func (f failoverClient) HeaderByNumber(ctx context.Context, n *big.Int) (*types.Header, error) { + if n.Cmp(f.failoverBlockNumber) < 0 { + return f.primary.HeaderByNumber(ctx, n) + } + return f.secondary.HeaderByNumber(ctx, n) +} + // Simulate an rpc failover event on optimism, where logs are requested from a block hash which doesn't // exist on the new rpc server, but a successful error code is returned. This is bad/buggy behavior on the // part of the rpc server, but we should be able to handle this without missing any logs, as @@ -258,15 +300,6 @@ func Test_BackupLogPoller(t *testing.T) { BackupPollerBlockDelay: 100, }, ) - // later, we will need at least 32 blocks filled with logs for cache invalidation - for i := int64(0); i < 32; i++ { - // to invalidate geth's internal read-cache, a matching log must be found in the bloom Filter - // for each of the 32 blocks - tx, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(i + 7)}) - require.NoError(t, err) - require.NotNil(t, tx) - th.Backend.Commit() - } ctx := testutils.Context(t) @@ -300,6 +333,11 @@ func Test_BackupLogPoller(t *testing.T) { assert.NoError(t, th.LogPoller.UnregisterFilter(ctx, "filter2")) }() + for n := 1; n < 31; n++ { + h := th.Backend.Commit() + require.Len(t, h, 32) + } + // generate some tx's with logs tx1, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) require.NoError(t, err) @@ -313,14 +351,14 @@ func Test_BackupLogPoller(t *testing.T) { require.NoError(t, err) require.NotNil(t, tx3) - th.Backend.Commit() // commit block 34 with 3 tx's included + th.Backend.Commit() // commit block 32 with 3 tx's included - block34, err := th.Client.BlockByNumber(ctx, nil) + block32, err := th.Client.BlockByNumber(ctx, nil) require.NoError(t, err) - require.Equal(t, uint64(34), block34.Number().Uint64()) + require.Equal(t, uint64(32), block32.Number().Uint64()) // Ensure that the logs have been included in this rpc server's view of the blockchain - txs := block34.Body().Transactions + txs := block32.Body().Transactions require.Len(t, txs, 3) receipt, err := th.Client.TransactionReceipt(ctx, txs[0].Hash()) require.NoError(t, err) @@ -336,64 +374,59 @@ func Test_BackupLogPoller(t *testing.T) { primaryRpc := th.Backend // save primaryRpc for later - // Failover to backup rpc - th.SetActiveClient(backupRpc, true) - - // Ensure that the client LogPoller is connected to no longer remembers block 34 - _, err = th.Client.BlockByHash(ctx, block34.Hash()) - require.ErrorIs(t, err, ethereum.NotFound) + // Failover to simulated optimism rpc on block 30 + th.Client.RegisterHeadByNumberCallback(func(ctx context.Context, c *client.SimulatedBackendClient, n *big.Int) error { + if n.Int64() != 32 { + return nil + } + th.SetActiveClient(backupRpc, true) + return nil + }) currentBlockNumber := th.PollAndSaveLogs(ctx, 1) - assert.Equal(t, int64(1), currentBlockNumber) - - // flush out cached block 34 by reading logs from first 32 blocks - //query := ethereum.FilterQuery{ - // FromBlock: big.NewInt(int64(2)), - // ToBlock: big.NewInt(int64(33)), - // Addresses: []common.Address{th.EmitterAddress1}, - // Topics: [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, - //} - //fLogs, err := th.Client.FilterLogs(ctx, query) - //require.NoError(t, err) - //require.Equal(t, 32, len(fLogs)) + require.Equal(t, int64(33), currentBlockNumber) // logs shouldn't show up yet - logs, err := th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) + logs, err := th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) require.NoError(t, err) - assert.Equal(t, 0, len(logs)) + require.Equal(t, 0, len(logs)) - finalizeThroughBlock(t, th, 34) + finalizeThroughBlock(t, th, 32) - th.SetActiveClient(primaryRpc, true) // restore primary rpc + b, ok := primaryRpc.(*simulated.Backend) + require.True(t, ok) + th.SetActiveClient(backendWrapper{b}, true) // restore primary rpc // Run ordinary poller + backup poller at least once - currentBlock, _ := th.LogPoller.LatestBlock(ctx) - th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1) + require.NoError(t, err) + currentBlockNumber = th.PollAndSaveLogs(ctx, currentBlockNumber) + require.Equal(t, int64(33), currentBlockNumber) th.LogPoller.BackupPollAndSaveLogs(ctx) - currentBlock, _ = th.LogPoller.LatestBlock(ctx) - - //require.Equal(t, int64(37), currentBlock.BlockNumber+1) + latestBlock, err := th.LogPoller.LatestBlock(ctx) + require.NoError(t, err) + require.Equal(t, currentBlockNumber-1, latestBlock.BlockNumber) // shouldn't change // logs still shouldn't show up, because we don't want to backfill the last finalized log // to help with reorg detection - logs, err = th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) + logs, err = th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) require.NoError(t, err) - assert.Equal(t, 0, len(logs)) + require.Equal(t, 0, len(logs)) th.Backend.Commit() - finalizeThroughBlock(t, th, 35) + finalizeThroughBlock(t, th, 64) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) th.LogPoller.BackupPollAndSaveLogs(ctx) - currentBlock, _ = th.LogPoller.LatestBlock(ctx) + currentBlock, err := th.LogPoller.LatestBlock(ctx) + require.NoError(t, err) - require.Equal(t, int64(38), currentBlock.BlockNumber+1) + require.Equal(t, int64(64), currentBlock.BlockNumber) // all 3 logs in block 34 should show up now, thanks to backup logger logs, err = th.LogPoller.Logs(ctx, 30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) require.NoError(t, err) - assert.Equal(t, 5, len(logs)) - logs, err = th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log2"].ID, th.EmitterAddress1) + assert.Equal(t, 1, len(logs)) + logs, err = th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log2"].ID, th.EmitterAddress1) require.NoError(t, err) assert.Equal(t, 1, len(logs)) logs, err = th.LogPoller.Logs(ctx, 32, 36, EmitterABI.Events["Log1"].ID, th.EmitterAddress2) @@ -708,7 +741,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { // Set up a test chain with a log emitting contract deployed. orm := logpoller.NewORM(chainID, db, lggr) // Note this property test is run concurrently and the sim is not threadsafe. - backend := simulated.NewBackend(map[common.Address]core.GenesisAccount{ + backend := simulated.NewBackend(types.GenesisAlloc{ owner.From: { Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)), }, diff --git a/core/chains/evm/types/types.go b/core/chains/evm/types/types.go index 57a53bce67a..94bb22e879e 100644 --- a/core/chains/evm/types/types.go +++ b/core/chains/evm/types/types.go @@ -6,10 +6,12 @@ import ( "log/slog" "math/big" "os" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient/simulated" "github.com/jackc/pgtype" pkgerrors "github.com/pkg/errors" "gopkg.in/guregu/null.v4" @@ -400,3 +402,15 @@ func (h *HashArray) Scan(src interface{}) error { } return err } + +// Interface which is satisfied by simulated.Backend. Defined here so that default geth behavior can be +// overridden in tests, and injected into our SimulatedBackend wrapper. This can be used to simulate rpc +// servers with quirky behavior that differs from geth +type Backend interface { + Close() error + Commit() common.Hash + Rollback() + Fork(parentHash common.Hash) error + AdjustTime(adjustment time.Duration) error + Client() simulated.Client +}