Skip to content

Commit

Permalink
Upkeep filters life cycle: avoid replying logs for existing filters (#…
Browse files Browse the repository at this point in the history
…10470)

* filters life cycle: avoid replying logs for existing filters

TODO: tests

* fix test

* use config update block if not too old

* small fix + align tests and logs

* added mocks for HasFilter
  • Loading branch information
amirylm authored Sep 6, 2023
1 parent 37f3880 commit 1176a8c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption
p.lggr.Debugf("filter for upkeep with id %s already registered with the same config", upkeepID.String())
return nil
}
// removing filter so we can recreate it with updated values
err := p.poller.UnregisterFilter(p.filterName(currentFilter.upkeepID))
if err != nil {
return fmt.Errorf("failed to unregister upkeep filter %s for update: %w", upkeepID.String(), err)
}
filter = *currentFilter
} else { // new filter
filter = upkeepFilter{
Expand Down Expand Up @@ -115,17 +110,35 @@ func (p *logEventProvider) register(ctx context.Context, lpFilter logpoller.Filt
if err != nil {
return fmt.Errorf("failed to get latest block while registering filter: %w", err)
}
lggr := p.lggr.With("upkeepID", ufilter.upkeepID.String())
logPollerHasFilter := p.poller.HasFilter(lpFilter.Name)
if logPollerHasFilter {
lggr.Debugw("Upserting upkeep filter")
// removing filter so we can recreate it with updated values
err := p.poller.UnregisterFilter(lpFilter.Name)
if err != nil {
return fmt.Errorf("failed to upsert (unregister) upkeep filter %s: %w", ufilter.upkeepID.String(), err)
}
}
if err := p.poller.RegisterFilter(lpFilter); err != nil {
return err
}
p.filterStore.AddActiveUpkeeps(ufilter)
if logPollerHasFilter {
// already registered, no need to backfill
return nil
}
backfillBlock := latest - int64(LogBackfillBuffer)
if backfillBlock < 1 {
// New chain, backfill from start
backfillBlock = 1
}
// TODO: Optimise to do backfill from ufilter.configUpdateBlock only for new filters
// if it is not too old
if int64(ufilter.configUpdateBlock) > backfillBlock {
// backfill from config update block in case it is not too old
backfillBlock = int64(ufilter.configUpdateBlock)
}
// NOTE: replys are planned to be done as part of RegisterFilter within logpoller
lggr.Debugw("Backfilling logs for new upkeep filter", "backfillBlock", backfillBlock)
p.poller.ReplayAsync(backfillBlock)

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
errored bool
upkeepID *big.Int
upkeepCfg LogTriggerConfig
hasFilter bool
replyed bool
cfgUpdateBlock uint64
mockPoller bool
unregister bool
Expand All @@ -35,6 +37,8 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)),
Topic0: common.BytesToHash(common.LeftPadBytes([]byte{1, 2, 3, 4}, 32)),
},
false,
true,
uint64(1),
true,
false,
Expand All @@ -44,6 +48,8 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
true,
big.NewInt(111),
LogTriggerConfig{},
false,
false,
uint64(0),
false,
false,
Expand All @@ -56,18 +62,22 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{}, 20)),
Topic0: common.BytesToHash(common.LeftPadBytes([]byte{}, 32)),
},
false,
false,
uint64(2),
false,
false,
},
{
"existing config",
"existing config with old block",
true,
big.NewInt(111),
LogTriggerConfig{
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)),
Topic0: common.BytesToHash(common.LeftPadBytes([]byte{1, 2, 3, 4}, 32)),
},
true,
false,
uint64(0),
true,
false,
Expand All @@ -80,23 +90,37 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)),
Topic0: common.BytesToHash(common.LeftPadBytes([]byte{1, 2, 3, 4}, 32)),
},
true,
false,
uint64(2),
true,
true,
},
}

mp := new(mocks.LogPoller)
mp.On("RegisterFilter", mock.Anything).Return(nil)
mp.On("UnregisterFilter", mock.Anything).Return(nil)
mp.On("LatestBlock", mock.Anything).Return(int64(0), nil)
mp.On("ReplayAsync", mock.Anything).Return(nil)
p := NewLogProvider(logger.TestLogger(t), mp, &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200))
p := NewLogProvider(logger.TestLogger(t), nil, &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200))

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if tc.mockPoller {
lp := new(mocks.LogPoller)
lp.On("RegisterFilter", mock.Anything).Return(nil)
lp.On("UnregisterFilter", mock.Anything).Return(nil)
lp.On("LatestBlock", mock.Anything).Return(int64(0), nil)
lp.On("HasFilter", p.filterName(tc.upkeepID)).Return(tc.hasFilter).Times(1)
if tc.replyed {
lp.On("ReplayAsync", mock.Anything).Return(nil).Times(1)
} else {
lp.On("ReplayAsync", mock.Anything).Return(nil).Times(0)
}
p.lock.Lock()
p.poller = lp
p.lock.Unlock()
}

err := p.RegisterFilter(ctx, FilterOptions{
UpkeepID: tc.upkeepID,
TriggerConfig: tc.upkeepCfg,
Expand All @@ -120,6 +144,7 @@ func TestEventLogProvider_RefreshActiveUpkeeps(t *testing.T) {
mp := new(mocks.LogPoller)
mp.On("RegisterFilter", mock.Anything).Return(nil)
mp.On("UnregisterFilter", mock.Anything).Return(nil)
mp.On("HasFilter", mock.Anything).Return(false)
mp.On("LatestBlock", mock.Anything).Return(int64(0), nil)
mp.On("ReplayAsync", mock.Anything).Return(nil)

Expand All @@ -146,6 +171,7 @@ func TestEventLogProvider_RefreshActiveUpkeeps(t *testing.T) {
newIds, err := p.RefreshActiveUpkeeps()
require.NoError(t, err)
require.Len(t, newIds, 0)
mp.On("HasFilter", p.filterName(core.GenUpkeepID(ocr2keepers.LogTrigger, "2222").BigInt())).Return(true)
newIds, err = p.RefreshActiveUpkeeps(
core.GenUpkeepID(ocr2keepers.LogTrigger, "2222").BigInt(),
core.GenUpkeepID(ocr2keepers.LogTrigger, "1234").BigInt(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func TestLogEventProvider_ReadLogs(t *testing.T) {

mp.On("RegisterFilter", mock.Anything).Return(nil)
mp.On("ReplayAsync", mock.Anything).Return()
mp.On("HasFilter", mock.Anything).Return(false)
mp.On("UnregisterFilter", mock.Anything, mock.Anything).Return(nil)
mp.On("LatestBlock", mock.Anything).Return(int64(1), nil)
mp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{
Expand Down

0 comments on commit 1176a8c

Please sign in to comment.