Skip to content

Commit

Permalink
Update BackupLogPoller test
Browse files Browse the repository at this point in the history
Add RegisterHeadNumberCallback() to SimulatedBackendClient, so we can
trigger an rpc failover event just after reading a particular block, but
before the logs get read for that block. This is the race condition that
can happen on optimism chain which BackupPoller was designed to address
  • Loading branch information
reductionista committed May 24, 2024
1 parent 459aa1d commit aec7ffe
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 59 deletions.
7 changes: 7 additions & 0 deletions 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT FROM (
SELECT id, block_number, block_hash, log_index
FROM evm.log_poller_filters f JOIN evm.logs l ON
f.evm_chain_id = l.evm_chain_id AND f.address = l.address AND f.event = l.event_sig
WHERE f.evm_chain_id=5
GROUP BY f.id, l.block_number, l.block_hash, l.log_index
) x order by block_number, log_index;
27 changes: 20 additions & 7 deletions core/chains/evm/client/simulated_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ var (
// SimulatedBackendClient is an Client implementation using a simulated
// blockchain backend. Note that not all RPC methods are implemented here.
type SimulatedBackendClient struct {
b *simulated.Backend
client simulated.Client
t testing.TB
chainId *big.Int
optimismMode bool
b evmtypes.Backend // *simulated.Backend, or something satisfying same interface
client simulated.Client
t testing.TB
chainId *big.Int
optimismMode bool
headByNumberCallback func(ctx context.Context, c *SimulatedBackendClient, n *big.Int) error
}

// NewSimulatedBackendClient creates an eth client backed by a simulated backend.
Expand All @@ -88,7 +89,7 @@ func NewSimulatedBackendClient(t testing.TB, b *simulated.Backend, chainId *big.
// where success rather than an error code is returned when a call to FilterLogs() fails to find the block hash
// requested. This combined with a failover event can lead to the "eventual consistency" behavior that Backup LogPoller
// and other solutions were designed to recover from.
func (c *SimulatedBackendClient) SetBackend(backend *simulated.Backend, optimismMode bool) {
func (c *SimulatedBackendClient) SetBackend(backend evmtypes.Backend, optimismMode bool) {
c.optimismMode = optimismMode
c.b = backend
c.client = backend.Client()
Expand Down Expand Up @@ -229,6 +230,10 @@ func (c *SimulatedBackendClient) blockNumber(ctx context.Context, number interfa
}
}

func (c *SimulatedBackendClient) RegisterHeadByNumberCallback(cb func(ctx context.Context, c *SimulatedBackendClient, n *big.Int) error) {
c.headByNumberCallback = cb
}

// HeadByNumber returns our own header type.
func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) {
if n == nil {
Expand All @@ -240,6 +245,14 @@ func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) (
} else if header == nil {
return nil, ethereum.NotFound
}

if c.headByNumberCallback != nil {
err = c.headByNumberCallback(ctx, c, n)
if err != nil {
return nil, err
}
}

return &evmtypes.Head{
EVMChainID: ubig.NewI(c.chainId.Int64()),
Hash: header.Hash(),
Expand Down Expand Up @@ -530,7 +543,7 @@ func (c *SimulatedBackendClient) SuggestGasTipCap(ctx context.Context) (tipCap *
return c.client.SuggestGasTipCap(ctx)
}

func (c *SimulatedBackendClient) Backend() *simulated.Backend {
func (c *SimulatedBackendClient) Backend() evmtypes.Backend {
return c.b
}

Expand Down
5 changes: 3 additions & 2 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
Expand All @@ -38,7 +39,7 @@ type TestHarness struct {
ORM, ORM2 logpoller.ORM
LogPoller logpoller.LogPollerTest
Client *client.SimulatedBackendClient
Backend *simulated.Backend
Backend evmtypes.Backend
Owner *bind.TransactOpts
Emitter1, Emitter2 *log_emitter.LogEmitter
EmitterAddress1, EmitterAddress2 common.Address
Expand Down Expand Up @@ -117,7 +118,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {

// Simulates an RPC failover event to an alternate rpc server. This can also be used to
// simulate switching back to the primary rpc after it recovers.
func (th *TestHarness) SetActiveClient(backend *simulated.Backend, optimismMode bool) {
func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, optimismMode bool) {
th.Backend = backend
th.Client.SetBackend(backend, optimismMode)
}
1 change: 1 addition & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
Expand Down
133 changes: 83 additions & 50 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient/simulated"
"github.com/leanovate/gopter"
Expand Down Expand Up @@ -224,6 +223,49 @@ func TestLogPoller_Integration(t *testing.T) {
assert.ErrorIs(t, th.LogPoller.Replay(ctx, 4), logpoller.ErrReplayRequestAborted)
}

type backendWrapper struct {
b *simulated.Backend
}

func (b backendWrapper) Client() simulated.Client {
return b.b.Client()
}

func (b backendWrapper) Close() error {
return b.b.Close()
}

func (b backendWrapper) Commit() common.Hash {
return b.b.Commit()
}

func (b backendWrapper) Rollback() {
b.b.Rollback()
}

func (b backendWrapper) Fork(parentHash common.Hash) error {
return b.b.Fork(parentHash)
}

func (b backendWrapper) AdjustTime(adjustment time.Duration) error {
return b.b.AdjustTime(adjustment)
}

// Behaves as primary simulated.Client until failoverBlockNumber is requested (via HeaderByNumber), then
// fails over to secondary simulated.Client.
type failoverClient struct {
primary simulated.Client
secondary simulated.Client
failoverBlockNumber *big.Int
}

func (f failoverClient) HeaderByNumber(ctx context.Context, n *big.Int) (*types.Header, error) {
if n.Cmp(f.failoverBlockNumber) < 0 {
return f.primary.HeaderByNumber(ctx, n)
}
return f.secondary.HeaderByNumber(ctx, n)
}

// Simulate an rpc failover event on optimism, where logs are requested from a block hash which doesn't
// exist on the new rpc server, but a successful error code is returned. This is bad/buggy behavior on the
// part of the rpc server, but we should be able to handle this without missing any logs, as
Expand Down Expand Up @@ -258,15 +300,6 @@ func Test_BackupLogPoller(t *testing.T) {
BackupPollerBlockDelay: 100,
},
)
// later, we will need at least 32 blocks filled with logs for cache invalidation
for i := int64(0); i < 32; i++ {
// to invalidate geth's internal read-cache, a matching log must be found in the bloom Filter
// for each of the 32 blocks
tx, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(i + 7)})
require.NoError(t, err)
require.NotNil(t, tx)
th.Backend.Commit()
}

ctx := testutils.Context(t)

Expand Down Expand Up @@ -300,6 +333,11 @@ func Test_BackupLogPoller(t *testing.T) {
assert.NoError(t, th.LogPoller.UnregisterFilter(ctx, "filter2"))
}()

for n := 1; n < 31; n++ {
h := th.Backend.Commit()
require.Len(t, h, 32)
}

// generate some tx's with logs
tx1, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)})
require.NoError(t, err)
Expand All @@ -313,14 +351,14 @@ func Test_BackupLogPoller(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, tx3)

th.Backend.Commit() // commit block 34 with 3 tx's included
th.Backend.Commit() // commit block 32 with 3 tx's included

block34, err := th.Client.BlockByNumber(ctx, nil)
block32, err := th.Client.BlockByNumber(ctx, nil)
require.NoError(t, err)
require.Equal(t, uint64(34), block34.Number().Uint64())
require.Equal(t, uint64(32), block32.Number().Uint64())

// Ensure that the logs have been included in this rpc server's view of the blockchain
txs := block34.Body().Transactions
txs := block32.Body().Transactions
require.Len(t, txs, 3)
receipt, err := th.Client.TransactionReceipt(ctx, txs[0].Hash())
require.NoError(t, err)
Expand All @@ -336,64 +374,59 @@ func Test_BackupLogPoller(t *testing.T) {

primaryRpc := th.Backend // save primaryRpc for later

// Failover to backup rpc
th.SetActiveClient(backupRpc, true)

// Ensure that the client LogPoller is connected to no longer remembers block 34
_, err = th.Client.BlockByHash(ctx, block34.Hash())
require.ErrorIs(t, err, ethereum.NotFound)
// Failover to simulated optimism rpc on block 30
th.Client.RegisterHeadByNumberCallback(func(ctx context.Context, c *client.SimulatedBackendClient, n *big.Int) error {
if n.Int64() != 32 {
return nil
}
th.SetActiveClient(backupRpc, true)
return nil
})

currentBlockNumber := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(1), currentBlockNumber)

// flush out cached block 34 by reading logs from first 32 blocks
//query := ethereum.FilterQuery{
// FromBlock: big.NewInt(int64(2)),
// ToBlock: big.NewInt(int64(33)),
// Addresses: []common.Address{th.EmitterAddress1},
// Topics: [][]common.Hash{{EmitterABI.Events["Log1"].ID}},
//}
//fLogs, err := th.Client.FilterLogs(ctx, query)
//require.NoError(t, err)
//require.Equal(t, 32, len(fLogs))
require.Equal(t, int64(33), currentBlockNumber)

// logs shouldn't show up yet
logs, err := th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
logs, err := th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 0, len(logs))
require.Equal(t, 0, len(logs))

finalizeThroughBlock(t, th, 34)
finalizeThroughBlock(t, th, 32)

th.SetActiveClient(primaryRpc, true) // restore primary rpc
b, ok := primaryRpc.(*simulated.Backend)
require.True(t, ok)
th.SetActiveClient(backendWrapper{b}, true) // restore primary rpc

// Run ordinary poller + backup poller at least once
currentBlock, _ := th.LogPoller.LatestBlock(ctx)
th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1)
require.NoError(t, err)
currentBlockNumber = th.PollAndSaveLogs(ctx, currentBlockNumber)
require.Equal(t, int64(33), currentBlockNumber)
th.LogPoller.BackupPollAndSaveLogs(ctx)
currentBlock, _ = th.LogPoller.LatestBlock(ctx)

//require.Equal(t, int64(37), currentBlock.BlockNumber+1)
latestBlock, err := th.LogPoller.LatestBlock(ctx)
require.NoError(t, err)
require.Equal(t, currentBlockNumber-1, latestBlock.BlockNumber) // shouldn't change

// logs still shouldn't show up, because we don't want to backfill the last finalized log
// to help with reorg detection
logs, err = th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
logs, err = th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 0, len(logs))
require.Equal(t, 0, len(logs))
th.Backend.Commit()
finalizeThroughBlock(t, th, 35)
finalizeThroughBlock(t, th, 64)

// Run ordinary poller + backup poller at least once more
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
th.LogPoller.BackupPollAndSaveLogs(ctx)
currentBlock, _ = th.LogPoller.LatestBlock(ctx)
currentBlock, err := th.LogPoller.LatestBlock(ctx)
require.NoError(t, err)

require.Equal(t, int64(38), currentBlock.BlockNumber+1)
require.Equal(t, int64(64), currentBlock.BlockNumber)

// all 3 logs in block 34 should show up now, thanks to backup logger
logs, err = th.LogPoller.Logs(ctx, 30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 5, len(logs))
logs, err = th.LogPoller.Logs(ctx, 34, 34, EmitterABI.Events["Log2"].ID, th.EmitterAddress1)
assert.Equal(t, 1, len(logs))
logs, err = th.LogPoller.Logs(ctx, 32, 32, EmitterABI.Events["Log2"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 1, len(logs))
logs, err = th.LogPoller.Logs(ctx, 32, 36, EmitterABI.Events["Log1"].ID, th.EmitterAddress2)
Expand Down Expand Up @@ -708,7 +741,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
// Set up a test chain with a log emitting contract deployed.
orm := logpoller.NewORM(chainID, db, lggr)
// Note this property test is run concurrently and the sim is not threadsafe.
backend := simulated.NewBackend(map[common.Address]core.GenesisAccount{
backend := simulated.NewBackend(types.GenesisAlloc{
owner.From: {
Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)),
},
Expand Down
14 changes: 14 additions & 0 deletions core/chains/evm/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"log/slog"
"math/big"
"os"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient/simulated"
"github.com/jackc/pgtype"
pkgerrors "github.com/pkg/errors"
"gopkg.in/guregu/null.v4"
Expand Down Expand Up @@ -400,3 +402,15 @@ func (h *HashArray) Scan(src interface{}) error {
}
return err
}

// Interface which is satisfied by simulated.Backend. Defined here so that default geth behavior can be
// overridden in tests, and injected into our SimulatedBackend wrapper. This can be used to simulate rpc
// servers with quirky behavior that differs from geth
type Backend interface {
Close() error
Commit() common.Hash
Rollback()
Fork(parentHash common.Hash) error
AdjustTime(adjustment time.Duration) error
Client() simulated.Client
}

0 comments on commit aec7ffe

Please sign in to comment.