Skip to content

Commit

Permalink
Merge branch 'release/2.5.1-automation' into 2.5.1-automation-timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
infiloop2 authored Sep 20, 2023
2 parents 4a31b6a + e20dbac commit 5dfe1a5
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 77 deletions.
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults/Arbitrum_Goerli.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ BlockHistorySize = 0

[NodePool]
SyncThreshold = 10

[OCR2.Automation]
GasLimit = 14500000
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults/Arbitrum_Mainnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ BlockHistorySize = 0

[NodePool]
SyncThreshold = 10

[OCR2.Automation]
GasLimit = 14500000
4 changes: 2 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
github.com/pelletier/go-toml/v2 v2.0.9
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5
github.com/smartcontractkit/ocr2keepers v0.7.23
github.com/smartcontractkit/libocr v0.0.0-20230918212407-dbd4e505b3e6
github.com/smartcontractkit/ocr2keepers v0.7.24
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb
github.com/spf13/cobra v1.6.1
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1387,10 +1387,10 @@ github.com/smartcontractkit/go-plugin v0.0.0-20230605132010-0f4d515d1472 h1:x3kN
github.com/smartcontractkit/go-plugin v0.0.0-20230605132010-0f4d515d1472/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU=
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoScs9VHGnyCKF7AoQEuUfwJnzcKmGIfaczeanA=
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.23 h1:hvMCHm9zTOKGELc40n+JLGmbiW1tkFnHW17qAtoVews=
github.com/smartcontractkit/ocr2keepers v0.7.23/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0=
github.com/smartcontractkit/libocr v0.0.0-20230918212407-dbd4e505b3e6 h1:w+8TI2Vcm3vk8XQz40ddcwy9BNZgoakXIby35Y54iDU=
github.com/smartcontractkit/libocr v0.0.0-20230918212407-dbd4e505b3e6/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.24 h1:d1HcCpsBcBSC9MC9qdjzsm/NSAnnavcZAvAqPAAa75Q=
github.com/smartcontractkit/ocr2keepers v0.7.24/go.mod h1:4e1ZDRz7fpLgcRUjJpq+5mkoD0ga11BxrSp2JTWKADQ=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
Expand Down
17 changes: 11 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type BlockSubscriber struct {
latestBlock atomic.Pointer[ocr2keepers.BlockKey]
blockHistorySize int64
blockSize int64
finalityDepth uint32
lggr logger.Logger
}

var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{}

func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr logger.Logger) *BlockSubscriber {
func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, finalityDepth uint32, lggr logger.Logger) *BlockSubscriber {
return &BlockSubscriber{
threadCtrl: utils.NewThreadControl(),
hb: hb,
Expand All @@ -65,6 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blocks: map[int64]string{},
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
finalityDepth: finalityDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
}
Expand Down Expand Up @@ -226,10 +228,13 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
// head parent is a linked list with EVM finality depth
// when re-org happens, new heads will have pointers to the new blocks
i := int64(0)
for cp := h; ; cp = cp.Parent {
if cp == nil || bs.blocks[cp.Number] == cp.Hash.Hex() {
break
}
for cp := h; cp != nil; cp = cp.Parent {
// we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be
// cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks
// left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list
// of 100, so block 100 and 101 are updated. when next head 102 arrives, it has full parent history of finality depth.
// if we stop when we see a block number/hash match, we won't look back and correct block 98 and 99.
// hence, we make a compromise here and check previous max(finality depth, blockSize) blocks and update the map.
existingHash, ok := bs.blocks[cp.Number]
if !ok {
bs.lggr.Debugf("filling block %d with new hash %s", cp.Number, cp.Hash.Hex())
Expand All @@ -238,7 +243,7 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
}
bs.blocks[cp.Number] = cp.Hash.Hex()
i++
if i > bs.blockSize {
if i > int64(bs.finalityDepth) || i > bs.blockSize {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (

const historySize = 4
const blockSize = int64(4)
const finality = uint32(4)

func TestBlockSubscriber_Subscribe(t *testing.T) {
lggr := logger.TestLogger(t)
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -47,7 +48,7 @@ func TestBlockSubscriber_Unsubscribe(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -65,7 +66,7 @@ func TestBlockSubscriber_Unsubscribe_Failure(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Unsubscribe(2)
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
blocks, err := bs.getBlockRange(testutils.Context(t))
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestBlockSubscriber_InitializeBlocks(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("GetBlocksRange", mock.Anything, tc.Blocks, mock.Anything).Return(tc.PollerBlocks, tc.Error)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.initializeBlocks(testutils.Context(t), tc.Blocks)
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestBlockSubscriber_BuildHistory(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestBlockSubscriber_Cleanup(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestBlockSubscriber_Start(t *testing.T) {

lp.On("GetBlocksRange", mock.Anything, blocks, mock.Anything).Return(pollerBlocks, nil)

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Start(context.Background())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logprovider

import (
"bytes"
"context"
"crypto/rand"
"errors"
Expand Down Expand Up @@ -43,6 +44,9 @@ var (
recoveryLogsBurst = int64(500)
// blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated
blockTimeUpdateCadence = 10 * time.Minute
// maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep
// at any given time
maxPendingPayloadsPerUpkeep = 500
)

type LogRecoverer interface {
Expand Down Expand Up @@ -405,10 +409,14 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}
filteredLogs := r.filterFinalizedStates(f, logs, states)

added, alreadyPending := r.populatePending(f, filteredLogs)
added, alreadyPending, ok := r.populatePending(f, filteredLogs)
if added > 0 {
r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID)
}
if !ok {
r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID)
return nil
}
r.filterStore.UpdateFilters(func(uf1, uf2 upkeepFilter) upkeepFilter {
uf1.lastRePollBlock = end
r.lggr.Debugw("Updated lastRePollBlock", "lastRePollBlock", end, "upkeepID", uf1.upkeepID)
Expand All @@ -419,13 +427,15 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}

// populatePending adds the logs to the pending list if they are not already pending.
// returns the number of logs added and the number of logs that were already pending.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int) {
// returns the number of logs added, the number of logs that were already pending,
// and a flag that indicates whether some errors happened while we are trying to add to pending q.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) {
r.lock.Lock()
defer r.lock.Unlock()

pendingSizeBefore := len(r.pending)
alreadyPending := 0
errs := make([]error, 0)
for _, log := range filteredLogs {
trigger := logToTrigger(log)
// Set the checkBlock and Hash to zero so that the checkPipeline uses the latest block
Expand Down Expand Up @@ -453,13 +463,16 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.
continue
}
// r.lggr.Debugw("adding a payload to pending", "payload", payload)
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
if err := r.addPending(payload); err != nil {
errs = append(errs, err)
} else {
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
}
}
r.addPending(payload)
}
return len(r.pending) - pendingSizeBefore, alreadyPending
return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0
}

// filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible).
Expand Down Expand Up @@ -619,9 +632,10 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
removed++
continue
}
r.addPending(rec.payload)
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
if err := r.addPending(rec.payload); err == nil {
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
}
default:
delete(r.visited, ids[i])
removed++
Expand All @@ -637,17 +651,25 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {

// addPending adds a payload to the pending list if it's not already there.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) {
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error {
var exist bool
pending := r.pending
upkeepPayloads := 0
for _, p := range pending {
if bytes.Equal(p.UpkeepID[:], payload.UpkeepID[:]) {
upkeepPayloads++
}
if p.WorkID == payload.WorkID {
exist = true
}
}
if upkeepPayloads >= maxPendingPayloadsPerUpkeep {
return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID)
}
if !exist {
r.pending = append(pending, payload)
}
return nil
}

// removePending removes a payload from the pending list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,40 +1093,75 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {

func TestLogRecoverer_pending(t *testing.T) {
tests := []struct {
name string
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
want []ocr2keepers.UpkeepPayload
name string
maxPerUpkeep int
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
errored []bool
want []ocr2keepers.UpkeepPayload
}{
{
"empty",
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
name: "empty",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{},
new: []ocr2keepers.UpkeepPayload{},
errored: []bool{},
want: []ocr2keepers.UpkeepPayload{},
},
{
"add new and existing",
[]ocr2keepers.UpkeepPayload{
name: "add new and existing",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
[]ocr2keepers.UpkeepPayload{
new: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
[]ocr2keepers.UpkeepPayload{
errored: []bool{false, false},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
},
{
name: "exceed limits for upkeep",
maxPerUpkeep: 3,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
new: []ocr2keepers.UpkeepPayload{
{WorkID: "4", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
errored: []bool{true},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep
maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep
defer func() {
maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep
}()

r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200))
r.lock.Lock()
r.pending = tc.exist
for _, p := range tc.new {
r.addPending(p)
for i, p := range tc.new {
err := r.addPending(p)
if tc.errored[i] {
require.Error(t, err)
continue
}
require.NoError(t, err)
}
pending := r.pending
require.GreaterOrEqual(t, len(pending), len(tc.new))
Expand Down
Loading

0 comments on commit 5dfe1a5

Please sign in to comment.