Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customizing BackupPollerDelayBlock or disabling Backup LogPoller entirely for tests #11850

Merged
merged 11 commits into from
Feb 28, 2024
Merged
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (e *evmConfig) LogKeepBlocksDepth() uint32 {
return *e.c.LogKeepBlocksDepth
}

func (e *evmConfig) BackupLogPollerBlockDelay() uint64 {
return *e.c.BackupLogPollerBlockDelay
}

func (e *evmConfig) NonceAutoSync() bool {
return *e.c.NonceAutoSync
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type EVM interface {
LinkContractAddress() string
LogBackfillBatchSize() uint32
LogKeepBlocksDepth() uint32
BackupLogPollerBlockDelay() uint64
LogPollInterval() time.Duration
LogPrunePageSize() uint32
MinContractPayment() *commonassets.Link
Expand Down
39 changes: 20 additions & 19 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,25 +341,26 @@ func (c *EVMConfig) TOMLString() (string, error) {
}

type Chain struct {
AutoCreateKey *bool
BlockBackfillDepth *uint32
BlockBackfillSkip *bool
ChainType *string
FinalityDepth *uint32
FinalityTagEnabled *bool
FlagsContractAddress *ethkey.EIP55Address
LinkContractAddress *ethkey.EIP55Address
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
NoNewHeadsThreshold *commonconfig.Duration
OperatorFactoryAddress *ethkey.EIP55Address
RPCDefaultBatchSize *uint32
RPCBlockQueryDelay *uint16
AutoCreateKey *bool
BlockBackfillDepth *uint32
BlockBackfillSkip *bool
ChainType *string
FinalityDepth *uint32
FinalityTagEnabled *bool
FlagsContractAddress *ethkey.EIP55Address
LinkContractAddress *ethkey.EIP55Address
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
BackupLogPollerBlockDelay *uint64
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
NoNewHeadsThreshold *commonconfig.Duration
OperatorFactoryAddress *ethkey.EIP55Address
RPCDefaultBatchSize *uint32
RPCBlockQueryDelay *uint16

Transactions Transactions `toml:",omitempty"`
BalanceMonitor BalanceMonitor `toml:",omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (c *Chain) SetFrom(f *Chain) {
if v := f.LogPrunePageSize; v != nil {
c.LogPrunePageSize = v
}
if v := f.BackupLogPollerBlockDelay; v != nil {
c.BackupLogPollerBlockDelay = v
}
if v := f.MinIncomingConfirmations; v != nil {
c.MinIncomingConfirmations = v
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ LogBackfillBatchSize = 1000
LogPollInterval = '15s'
LogKeepBlocksDepth = 100000
LogPrunePageSize = 0
BackupLogPollerBlockDelay = 100
MinContractPayment = '.00001 link'
MinIncomingConfirmations = 3
NonceAutoSync = true
Expand Down
19 changes: 17 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)

lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down Expand Up @@ -113,7 +121,14 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
ec.Commit()

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down
8 changes: 6 additions & 2 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TestHarness struct {
EthDB ethdb.Database
}

func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth int64) TestHarness {
func SetupTH(t testing.TB, opts logpoller.Opts) TestHarness {
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()
chainID2 := testutils.NewRandomEVMChainID()
Expand All @@ -67,7 +67,11 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0)

if opts.PollPeriod == 0 {
opts.PollPeriod = 1 * time.Hour
}
lp := logpoller.NewLogPoller(o, esc, lggr, opts)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
144 changes: 81 additions & 63 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
type LogPollerTest interface {
LogPoller
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64)
BackupPollAndSaveLogs(ctx context.Context)
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
PruneOldBlocks(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -105,8 +105,9 @@ type logPoller struct {
keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
backupPollerNextBlock int64
logPrunePageSize int64
backupPollerNextBlock int64 // next block to be processed by Backup LogPoller
backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled

filterMu sync.RWMutex
filters map[string]Filter
Expand All @@ -121,6 +122,17 @@ type logPoller struct {
wg sync.WaitGroup
}

type Opts struct {
PollPeriod time.Duration
UseFinalityTag bool
FinalityDepth int64
BackfillBatchSize int64
RpcBatchSize int64
KeepFinalizedBlocksDepth int64
BackupPollerBlockDelay int64
LogPrunePageSize int64
Tofel marked this conversation as resolved.
Show resolved Hide resolved
}

// NewLogPoller creates a log poller. Note there is an assumption
// that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind.
// Block processing involves the following calls in steady state (without reorgs):
Expand All @@ -131,7 +143,7 @@ type logPoller struct {
//
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
return &logPoller{
ctx: ctx,
Expand All @@ -141,13 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: pollPeriod,
finalityDepth: finalityDepth,
useFinalityTag: useFinalityTag,
backfillBatchSize: backfillBatchSize,
rpcBatchSize: rpcBatchSize,
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
logPrunePageSize: logsPrunePageSize,
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
}
Expand Down Expand Up @@ -436,67 +449,41 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

func (lp *logPoller) loadFilters() error {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx))

if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}

lp.filters = filters
lp.filterDirty = true
return nil
}

func (lp *logPoller) run() {
defer lp.wg.Done()
logPollTick := time.After(0)
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
filtersLoaded := false

loadFilters := func() error {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx))

if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}

lp.filters = filters
lp.filterDirty = true
filtersLoaded = true
return nil
}

for {
select {
case <-lp.ctx.Done():
return
case fromBlockReq := <-lp.replayStart:
fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq)
if err == nil {
if !filtersLoaded {
lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq)
if err = loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock)
}
}
if err == nil {
// Serially process replay requests.
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
lp.PollAndSaveLogs(lp.ctx, fromBlock)
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
}
} else {
lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err)
}
select {
case <-lp.ctx.Done():
// We're shutting down, notify client and exit
select {
case lp.replayComplete <- ErrReplayRequestAborted:
default:
}
return
case lp.replayComplete <- err:
}
lp.handleReplayRequest(fromBlockReq, filtersLoaded)
case <-logPollTick:
logPollTick = time.After(utils.WithJitter(lp.pollPeriod))
if !filtersLoaded {
if err := loadFilters(); err != nil {
if err := lp.loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err)
continue
}
filtersLoaded = true
}

// Always start from the latest block in the db.
Expand Down Expand Up @@ -529,22 +516,23 @@ func (lp *logPoller) run() {
}
lp.PollAndSaveLogs(lp.ctx, start)
case <-backupLogPollTick:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
}
// Backup log poller: this serves as an emergency backup to protect against eventual-consistency behavior
// of an rpc node (seen occasionally on optimism, but possibly could happen on other chains?). If the first
// time we request a block, no logs or incomplete logs come back, this ensures that every log is eventually
// re-requested after it is finalized. This doesn't add much overhead, because we can request all of them
// in one shot, since we don't need to worry about re-orgs after finality depth, and it runs 100x less
// frequently than the primary log poller.

// If pollPeriod is set to 1 block time, backup log poller will run once every 100 blocks
const backupPollerBlockDelay = 100
// re-requested after it is finalized. This doesn't add much overhead, because we can request all of them
// in one shot, since we don't need to worry about re-orgs after finality depth, and it runs far less
// frequently than the primary log poller (instead of roughly once per block it runs once roughly once every
// lp.backupPollerDelay blocks--with default settings about 100x less frequently).

backupLogPollTick = time.After(utils.WithJitter(backupPollerBlockDelay * lp.pollPeriod))
backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod))
Tofel marked this conversation as resolved.
Show resolved Hide resolved
if !filtersLoaded {
lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
continue
}
lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay)
lp.BackupPollAndSaveLogs(lp.ctx)
}
}
}
Expand Down Expand Up @@ -582,7 +570,37 @@ func (lp *logPoller) backgroundWorkerRun() {
}
}

func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) {
func (lp *logPoller) handleReplayRequest(fromBlockReq int64, filtersLoaded bool) {
fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq)
if err == nil {
if !filtersLoaded {
lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq)
if err = lp.loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock)
}
}
if err == nil {
// Serially process replay requests.
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
lp.PollAndSaveLogs(lp.ctx, fromBlock)
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
}
} else {
lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err)
}
select {
case <-lp.ctx.Done():
// We're shutting down, notify client and exit
select {
case lp.replayComplete <- ErrReplayRequestAborted:
default:
}
return
case lp.replayComplete <- err:
}
}

func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
if lp.backupPollerNextBlock == 0 {
lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
Expand All @@ -594,7 +612,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBloc
return
}
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
// (or at block 0 if whole blockchain is too short)
lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0)
}
Expand Down
Loading
Loading