Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seed order to manage logs overflow #10485

Merged
merged 24 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5
github.com/smartcontractkit/ocr2keepers v0.7.19
github.com/smartcontractkit/ocr2keepers v0.7.20
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb
github.com/spf13/cobra v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoScs9VHGnyCKF7AoQEuUfwJnzcKmGIfaczeanA=
github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY=
github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0=
github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU=
github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
Expand Down
122 changes: 98 additions & 24 deletions core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package logprovider

import (
"encoding/hex"
"math/big"
"sort"
"sync"
"sync/atomic"

"github.com/smartcontractkit/ocr2keepers/pkg/v3/random"
ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"
infiloop2 marked this conversation as resolved.
Show resolved Hide resolved

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
// allowedLogsPerBlock is the maximum number of logs allowed per upkeep in a block.
allowedLogsPerBlock = 128
// bufferMaxBlockSize is the maximum number of blocks in the buffer.
bufferMaxBlockSize = 1024
// maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block.
maxLogsPerUpkeepInBlock = 32
// maxLogsPerBlock is the maximum number of blocks in the buffer.
maxLogsPerBlock = 1024
)

// fetchedLog holds the log and the ID of the upkeep
Expand All @@ -33,9 +37,46 @@ type fetchedBlock struct {
visited []fetchedLog
}

func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) {
has, upkeepLogs := b.has(fl.upkeepID, fl.log)
if has {
// Skipping known logs
return fetchedLog{}, false
}
// lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id)
b.logs = append(b.logs, fl)

// drop logs if we reached limits.
if upkeepLogs+1 > maxUpkeepLogs {
// in case we have logs overflow for a particular upkeep, we drop a log of that upkeep,
// based on shared, random (per block) order of the logs in the block.
b.Sort()
var dropped fetchedLog
currentLogs := make([]fetchedLog, 0, len(b.logs)-1)
for _, l := range b.logs {
if dropped.upkeepID == nil && l.upkeepID.Cmp(fl.upkeepID) == 0 {
dropped = l
continue
}
currentLogs = append(currentLogs, l)
}
b.logs = currentLogs
return dropped, true
} else if len(b.logs)+len(b.visited) > maxBlockLogs {
// in case we have logs overflow in the buffer level, we drop a log based on
// shared, random (per block) order of the logs in the block.
b.Sort()
dropped := b.logs[0]
b.logs = b.logs[1:]
return dropped, true
}

return fetchedLog{}, true
}

// Has returns true if the block has the log,
// and the number of logs for that upkeep in the block.
func (b fetchedBlock) Has(id *big.Int, log logpoller.Log) (bool, int) {
func (b fetchedBlock) has(id *big.Int, log logpoller.Log) (bool, int) {
allLogs := append(b.logs, b.visited...)
upkeepLogs := 0
for _, l := range allLogs {
Expand All @@ -62,6 +103,27 @@ func (b fetchedBlock) Clone() fetchedBlock {
}
}

// Sort by log identifiers, shuffled using a pseduorandom souce that is shared across all nodes
// for a given block.
func (b *fetchedBlock) Sort() {
randSeed := random.GetRandomKeySource(nil, uint64(b.blockNumber))

shuffledLogIDs := make([]string, len(b.logs))
for i, log := range b.logs {
ext := ocr2keepers.LogTriggerExtension{
TxHash: log.log.TxHash,
Index: uint32(log.log.LogIndex),
BlockHash: log.log.BlockHash,
}
logID := hex.EncodeToString(ext.LogIdentifier())
shuffledLogIDs[i] = random.ShuffleString(logID, randSeed)
}

sort.Slice(b.logs, func(i, j int) bool {
return shuffledLogIDs[i] < shuffledLogIDs[j]
})
Copy link
Contributor

@infiloop2 infiloop2 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to verify this will work properly if the indices passed in comparator doesn't refer to the slice being sorted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the comparator accepts indices and returns a bool, making it completely decoupled from the underlying slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually digging more it's giving me some unexpected behaviour in playground. Will followup offline

}

// logEventBuffer is a circular/ring buffer of fetched logs.
// Each entry in the buffer represents a block,
// and holds the logs fetched for that block.
Expand Down Expand Up @@ -97,6 +159,7 @@ func (b *logEventBuffer) bufferSize() int {
}

// enqueue adds logs (if not exist) to the buffer, returning the number of logs added
// minus the number of logs dropped.
func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -107,7 +170,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
maxUpkeepLogs := int(b.maxUpkeepLogsPerBlock)

latestBlock := b.latestBlockSeen()
added := 0
added, dropped := 0, 0

for _, log := range logs {
if log.BlockNumber == 0 {
// invalid log
Expand All @@ -125,23 +189,20 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber)
continue
}
if len(currentBlock.logs)+1 > maxBlockLogs {
lggr.Debugw("Reached max logs number per block, dropping log", "blockNumber", log.BlockNumber,
"blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex)
droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs)
if !ok {
// Skipping known logs
continue
}
if has, upkeepLogs := currentBlock.Has(id, log); has {
// Skipping existing log
continue
} else if upkeepLogs+1 > maxUpkeepLogs {
lggr.Debugw("Reached max logs number per upkeep, dropping log", "blockNumber", log.BlockNumber,
"blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex)
continue
if droppedLog.upkeepID != nil {
dropped++
lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", droppedLog.log.BlockNumber,
"blockHash", droppedLog.log.BlockHash, "txHash", droppedLog.log.TxHash, "logIndex", droppedLog.log.LogIndex,
"upkeepID", droppedLog.upkeepID.String())
}
// lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id)
currentBlock.logs = append(currentBlock.logs, fetchedLog{upkeepID: id, log: log})
b.blocks[i] = currentBlock
added++
b.blocks[i] = currentBlock

if log.BlockNumber > latestBlock {
latestBlock = log.BlockNumber
}
Expand All @@ -151,10 +212,10 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
atomic.StoreInt64(&b.latestBlock, latestBlock)
}
if added > 0 {
lggr.Debugw("Added logs to buffer", "addedLogs", added, "latestBlock", latestBlock)
lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock)
}

return added
return added - dropped
}

// peek returns the logs in range [latestBlock-blocks, latestBlock]
Expand Down Expand Up @@ -196,7 +257,7 @@ func (b *logEventBuffer) peekRange(start, end int64) []fetchedLog {
}

// dequeueRange returns the logs between start and end inclusive.
func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetchedLog {
func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit int) []fetchedLog {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -214,20 +275,33 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetch
})

logsCount := map[string]int{}
totalCount := 0
var results []fetchedLog
for _, block := range fetchedBlocks {
// double checking that we don't have any gaps in the range
if block.blockNumber < start || block.blockNumber > end {
// double checking that we don't have any gaps in the range
continue
}
if totalCount >= totalLimit {
// reached total limit, no need to process more blocks
break
}
// Sort the logs in random order that is shared across all nodes.
amirylm marked this conversation as resolved.
Show resolved Hide resolved
// This ensures that nodes across the network will process the same logs.
block.Sort()
var remainingLogs, blockResults []fetchedLog
for _, log := range block.logs {
if totalCount >= totalLimit {
remainingLogs = append(remainingLogs, log)
continue
}
if logsCount[log.upkeepID.String()] >= upkeepLimit {
remainingLogs = append(remainingLogs, log)
continue
}
logsCount[log.upkeepID.String()]++
blockResults = append(blockResults, log)
logsCount[log.upkeepID.String()]++
totalCount++
}
if len(blockResults) == 0 {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
results := buf.peekRange(int64(1), int64(2))
require.Equal(t, 2, len(results))
verifyBlockNumbers(t, results, 1, 2)
removed := buf.dequeueRange(int64(1), int64(2), 2)
removed := buf.dequeueRange(int64(1), int64(2), 2, 10)
require.Equal(t, 2, len(removed))
results = buf.peekRange(int64(1), int64(2))
require.Equal(t, 0, len(results))
Expand All @@ -256,7 +256,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
results := buf.peek(8)
require.Equal(t, 4, len(results))
verifyBlockNumbers(t, results, 1, 2, 3, 3)
removed := buf.dequeueRange(1, 3, 5)
removed := buf.dequeueRange(1, 3, 5, 5)
require.Equal(t, 4, len(removed))
buf.lock.Lock()
require.Equal(t, 0, len(buf.blocks[0].logs))
Expand Down Expand Up @@ -313,10 +313,18 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 0},
), 5)

logs := buf.dequeueRange(1, 5, 2)
logs := buf.dequeueRange(1, 5, 2, 10)
require.Equal(t, 2, len(logs))
require.Equal(t, int64(5), logs[0].log.BlockNumber)
require.Equal(t, int64(4), logs[1].log.BlockNumber)

require.Equal(t, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1},
logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 1},
), 2)

logs = buf.dequeueRange(1, 5, 3, 2)
require.Equal(t, 2, len(logs))
})

t.Run("dequeue doesn't return same logs again", func(t *testing.T) {
Expand All @@ -327,11 +335,11 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3"), LogIndex: 0},
), 3)

logs := buf.dequeueRange(3, 3, 2)
logs := buf.dequeueRange(3, 3, 2, 10)
fmt.Println(logs)
require.Equal(t, 1, len(logs))

logs = buf.dequeueRange(3, 3, 2)
logs = buf.dequeueRange(3, 3, 2, 10)
fmt.Println(logs)
require.Equal(t, 0, len(logs))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ func TestIntegration_LogRecoverer_Backfill(t *testing.T) {
}
lp, ethClient, utilsABI := setupDependencies(t, db, backend)
filterStore := logprovider.NewUpkeepFilterStore()
origDefaultRecoveryInterval := logprovider.DefaultRecoveryInterval
logprovider.DefaultRecoveryInterval = time.Millisecond * 200
origDefaultRecoveryInterval := logprovider.RecoveryInterval
logprovider.RecoveryInterval = time.Millisecond * 200
defer func() {
logprovider.DefaultRecoveryInterval = origDefaultRecoveryInterval
logprovider.RecoveryInterval = origDefaultRecoveryInterval
}()
provider, recoverer := setup(logger.TestLogger(t), lp, nil, utilsABI, &mockUpkeepStateStore{}, filterStore, opts)
logProvider := provider.(logprovider.LogEventProviderTest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (

// AllowedLogsPerUpkeep is the maximum number of logs allowed per upkeep every single call.
AllowedLogsPerUpkeep = 5
// MaxPayloads is the maximum number of payloads to return per call.
MaxPayloads = 100

readJobQueueSize = 64
readLogsTimeout = 10 * time.Second
Expand Down Expand Up @@ -99,7 +101,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
return &logEventProvider{
packer: packer,
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), bufferMaxBlockSize, allowedLogsPerBlock),
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down Expand Up @@ -177,7 +179,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers
if start <= 0 {
start = 1
}
logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep)
logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep, MaxPayloads)

// p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs))

Expand Down Expand Up @@ -318,7 +320,10 @@ func (p *logEventProvider) updateFiltersLastPoll(entries []upkeepFilter) {
p.filterStore.UpdateFilters(func(orig, f upkeepFilter) upkeepFilter {
if f.lastPollBlock > orig.lastPollBlock {
orig.lastPollBlock = f.lastPollBlock
p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID)
if f.lastPollBlock%10 == 0 {
// print log occasionally to avoid spamming logs
p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID)
}
}
return orig
}, entries...)
Expand Down
Loading