Skip to content

Commit

Permalink
Support for finality tags in LogPoller (#10762)
Browse files Browse the repository at this point in the history
* Support for finality in the LogPoller

* Post review fixes

* Post rebase fixes
  • Loading branch information
mateusz-sekara authored Oct 16, 2023
1 parent fd1369c commit 4519370
Show file tree
Hide file tree
Showing 24 changed files with 1,440 additions and 714 deletions.
11 changes: 10 additions & 1 deletion core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,16 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
if opts.GenLogPoller != nil {
logPoller = opts.GenLogPoller(chainID)
} else {
logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, cfg.EVM().LogPollInterval(), int64(cfg.EVM().FinalityDepth()), int64(cfg.EVM().LogBackfillBatchSize()), int64(cfg.EVM().RPCDefaultBatchSize()), int64(cfg.EVM().LogKeepBlocksDepth()))
logPoller = logpoller.NewLogPoller(
logpoller.NewObservedORM(chainID, db, l, cfg.Database()),
client,
l,
cfg.EVM().LogPollInterval(),
cfg.EVM().FinalityTagEnabled(),
int64(cfg.EVM().FinalityDepth()),
int64(cfg.EVM().LogBackfillBatchSize()),
int64(cfg.EVM().RPCDefaultBatchSize()),
int64(cfg.EVM().LogKeepBlocksDepth()))
}
}

Expand Down
48 changes: 29 additions & 19 deletions core/chains/evm/client/simulated_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,39 +545,32 @@ func (c *SimulatedBackendClient) BatchCallContext(ctx context.Context, b []rpc.B
if len(elem.Args) != 2 {
return fmt.Errorf("SimulatedBackendClient expected 2 args, got %d for eth_getBlockByNumber", len(elem.Args))
}
blockNum, is := elem.Args[0].(string)
blockNumOrTag, is := elem.Args[0].(string)
if !is {
return fmt.Errorf("SimulatedBackendClient expected first arg to be a string for eth_getBlockByNumber, got: %T", elem.Args[0])
}
_, is = elem.Args[1].(bool)
if !is {
return fmt.Errorf("SimulatedBackendClient expected second arg to be a boolean for eth_getBlockByNumber, got: %T", elem.Args[1])
}
n, ok := new(big.Int).SetString(blockNum, 0)
if !ok {
return fmt.Errorf("error while converting block number string: %s to big.Int ", blockNum)
}
header, err := c.b.HeaderByNumber(ctx, n)
header, err := c.fetchHeader(ctx, blockNumOrTag)
if err != nil {
return err
}
switch v := elem.Result.(type) {
switch res := elem.Result.(type) {
case *evmtypes.Head:
b[i].Result = &evmtypes.Head{
Number: header.Number.Int64(),
Hash: header.Hash(),
Timestamp: time.Unix(int64(header.Time), 0).UTC(),
}
res.Number = header.Number.Int64()
res.Hash = header.Hash()
res.ParentHash = header.ParentHash
res.Timestamp = time.Unix(int64(header.Time), 0).UTC()
case *evmtypes.Block:
b[i].Result = &evmtypes.Block{
Number: header.Number.Int64(),
Hash: header.Hash(),
Timestamp: time.Unix(int64(header.Time), 0),
}
res.Number = header.Number.Int64()
res.Hash = header.Hash()
res.ParentHash = header.ParentHash
res.Timestamp = time.Unix(int64(header.Time), 0).UTC()
default:
return fmt.Errorf("SimulatedBackendClient Unexpected Type %T", v)
return fmt.Errorf("SimulatedBackendClient Unexpected Type %T", elem.Result)
}

b[i].Error = err
case "eth_call":
if len(elem.Args) != 2 {
Expand Down Expand Up @@ -718,3 +711,20 @@ func toCallMsg(params map[string]interface{}) ethereum.CallMsg {
func (c *SimulatedBackendClient) IsL2() bool {
return false
}

func (c *SimulatedBackendClient) fetchHeader(ctx context.Context, blockNumOrTag string) (*types.Header, error) {
switch blockNumOrTag {
case rpc.SafeBlockNumber.String():
return c.b.Blockchain().CurrentSafeBlock(), nil
case rpc.LatestBlockNumber.String():
return c.b.Blockchain().CurrentHeader(), nil
case rpc.FinalizedBlockNumber.String():
return c.b.Blockchain().CurrentFinalBlock(), nil
default:
blockNum, ok := new(big.Int).SetString(blockNumOrTag, 0)
if !ok {
return nil, fmt.Errorf("error while converting block number string: %s to big.Int ", blockNumOrTag)
}
return c.b.HeaderByNumber(ctx, blockNum)
}
}
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.TestLogger(t), cfg.Database())

Expand Down Expand Up @@ -111,7 +111,7 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
ec.Commit()

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.TestLogger(t), cfg.Database())

Expand Down
4 changes: 0 additions & 4 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,3 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
7 changes: 5 additions & 2 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type TestHarness struct {
EthDB ethdb.Database
}

func SetupTH(t testing.TB, finalityDepth, backfillBatchSize, rpcBatchSize int64) TestHarness {
func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize int64) TestHarness {
lggr := logger.TestLogger(t)
chainID := testutils.NewRandomEVMChainID()
chainID2 := testutils.NewRandomEVMChainID()
Expand All @@ -63,7 +63,10 @@ func SetupTH(t testing.TB, finalityDepth, backfillBatchSize, rpcBatchSize int64)
// Poll period doesn't matter, we intend to call poll and save logs directly in the test.
// Set it to some insanely high value to not interfere with any tests.
esc := client.NewSimulatedBackendClient(t, ec, chainID)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, finalityDepth, backfillBatchSize, rpcBatchSize, 1000)
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, 1000)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
Loading

0 comments on commit 4519370

Please sign in to comment.