Skip to content

Commit

Permalink
Allow burst in log recoverer when it lags (#10479)
Browse files Browse the repository at this point in the history
* Allow burst in log recoverer when it lags

* add test

* more tests

* update

* improve test

* update test messages
  • Loading branch information
infiloop2 authored Sep 6, 2023
1 parent 3bf936a commit 94d05b7
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var (
GCInterval = RecoveryCacheTTL

recoveryBatchSize = 10
recoveryLogsBuffer = int64(50)
recoveryLogsBuffer = int64(200)
recoveryLogsBurst = int64(500)
)

type LogRecoverer interface {
Expand Down Expand Up @@ -318,6 +319,12 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
start = startBlock
}
end := start + recoveryLogsBuffer
if offsetBlock-end > 100*recoveryLogsBuffer {
// If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow
// a range of recoveryLogsBurst
// Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts
end = start + recoveryLogsBurst
}
if end > offsetBlock {
end = offsetBlock
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
logsErr error
recoverErr error
proposalsWorkIDs []string
lastRePollBlocks []int64
}{
{
"no filters",
Expand All @@ -239,6 +240,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
nil,
nil,
[]string{},
[]int64{},
},
{
"latest block error",
Expand All @@ -252,6 +254,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
nil,
fmt.Errorf("test error"),
[]string{},
[]int64{},
},
{
"states error",
Expand Down Expand Up @@ -280,6 +283,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
nil,
nil,
[]string{},
[]int64{0},
},
{
"get logs error",
Expand All @@ -301,11 +305,12 @@ func TestLogRecoverer_Recover(t *testing.T) {
fmt.Errorf("test error"),
nil,
[]string{},
[]int64{0},
},
{
"happy flow",
100,
200,
500,
nil,
[]upkeepFilter{
{
Expand All @@ -321,7 +326,15 @@ func TestLogRecoverer_Recover(t *testing.T) {
topics: []common.Hash{
common.HexToHash("0x2"),
},
configUpdateBlock: 150, // should be filtered out
configUpdateBlock: 450, // should be filtered out
},
{
upkeepID: big.NewInt(3),
addr: common.HexToAddress("0x2").Bytes(),
topics: []common.Hash{
common.HexToHash("0x2"),
},
lastRePollBlock: 450, // should be filtered out, as its higher than latest-lookback
},
},
[]ocr2keepers.UpkeepState{ocr2keepers.UnknownState},
Expand All @@ -337,6 +350,68 @@ func TestLogRecoverer_Recover(t *testing.T) {
nil,
nil,
[]string{"84c83c79c2be2c3eabd8d35986a2a798d9187564d7f4f8f96c5a0f40f50bed3f"},
[]int64{200, 0, 450},
},
{
"lastRePollBlock updated with burst when lagging behind",
100,
50000,
nil,
[]upkeepFilter{
{
upkeepID: big.NewInt(1),
addr: common.HexToAddress("0x1").Bytes(),
topics: []common.Hash{
common.HexToHash("0x1"),
},
lastRePollBlock: 100, // Should be updated with burst
},
},
[]ocr2keepers.UpkeepState{ocr2keepers.UnknownState},
nil,
[]logpoller.Log{
{
BlockNumber: 2,
TxHash: common.HexToHash("0x111"),
LogIndex: 1,
BlockHash: common.HexToHash("0x2"),
},
},
nil,
nil,
[]string{"84c83c79c2be2c3eabd8d35986a2a798d9187564d7f4f8f96c5a0f40f50bed3f"},
[]int64{600},
},
{
"recovery starts at configUpdateBlock if higher than lastRePollBlock",
100,
5000,
nil,
[]upkeepFilter{
{
upkeepID: big.NewInt(1),
addr: common.HexToAddress("0x1").Bytes(),
topics: []common.Hash{
common.HexToHash("0x1"),
},
lastRePollBlock: 100,
configUpdateBlock: 500,
},
},
[]ocr2keepers.UpkeepState{ocr2keepers.UnknownState},
nil,
[]logpoller.Log{
{
BlockNumber: 2,
TxHash: common.HexToHash("0x111"),
LogIndex: 1,
BlockHash: common.HexToHash("0x2"),
},
},
nil,
nil,
[]string{"84c83c79c2be2c3eabd8d35986a2a798d9187564d7f4f8f96c5a0f40f50bed3f"},
[]int64{700}, // should be configUpdateBlock + recoveryLogsBuffer
},
}

Expand All @@ -356,6 +431,13 @@ func TestLogRecoverer_Recover(t *testing.T) {
return
}
require.NoError(t, err)
for i, active := range tc.active {
filters := filterStore.GetFilters(func(f upkeepFilter) bool {
return f.upkeepID.String() == active.upkeepID.String()
})
require.Equal(t, 1, len(filters))
require.Equal(t, tc.lastRePollBlocks[i], filters[0].lastRePollBlock)
}

proposals, err := recoverer.GetRecoveryProposals(ctx)
require.NoError(t, err)
Expand Down

0 comments on commit 94d05b7

Please sign in to comment.