Skip to content

Commit

Permalink
Seed order to manage logs overflow (#10485)
Browse files Browse the repository at this point in the history
* update ocr2keepers

* go mod tidy

* buffer: drop logs by seed-order

* comment out noisy log

* ensure order for provider.GetLatestPayloads()

* ensure order for recoverer.GetRecoveryProposals()

* use a normalized value of latestBlock

* set overall limit for recovery proposals (MaxProposals)

value TBD, currently set to 50

* set max proposals to 20 (was 50)

* apply total limits when dequeing for payloads

MaxPayloads was set to 100

* fix test

* fix max block logs

* protect log spamming

* renaming

* lint

* set offset to 100

* added tests

* use maps when sorting

* temporary added blockhash to log id

* lint

* remove todo from log id func
  • Loading branch information
amirylm authored Sep 6, 2023
1 parent a15bcd8 commit ad22c6e
Show file tree
Hide file tree
Showing 7 changed files with 677 additions and 42 deletions.
132 changes: 108 additions & 24 deletions core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package logprovider

import (
"encoding/hex"
"math/big"
"sort"
"sync"
"sync/atomic"

"github.com/smartcontractkit/ocr2keepers/pkg/v3/random"
ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
// allowedLogsPerBlock is the maximum number of logs allowed per upkeep in a block.
allowedLogsPerBlock = 128
// bufferMaxBlockSize is the maximum number of blocks in the buffer.
bufferMaxBlockSize = 1024
// maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block.
maxLogsPerUpkeepInBlock = 32
// maxLogsPerBlock is the maximum number of blocks in the buffer.
maxLogsPerBlock = 1024
)

// fetchedLog holds the log and the ID of the upkeep
type fetchedLog struct {
upkeepID *big.Int
log logpoller.Log
// cachedLogID is the cached log identifier, used for sorting.
// It is calculated lazily, and cached for performance.
cachedLogID string
}

func (l *fetchedLog) getLogID() string {
if len(l.cachedLogID) == 0 {
ext := ocr2keepers.LogTriggerExtension{
Index: uint32(l.log.LogIndex),
}
copy(ext.TxHash[:], l.log.TxHash[:])
copy(ext.BlockHash[:], l.log.BlockHash[:])
l.cachedLogID = hex.EncodeToString(ext.LogIdentifier())
}
return l.cachedLogID
}

// fetchedBlock holds the logs fetched for a block
Expand All @@ -33,9 +52,46 @@ type fetchedBlock struct {
visited []fetchedLog
}

func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) {
has, upkeepLogs := b.has(fl.upkeepID, fl.log)
if has {
// Skipping known logs
return fetchedLog{}, false
}
// lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id)
b.logs = append(b.logs, fl)

// drop logs if we reached limits.
if upkeepLogs+1 > maxUpkeepLogs {
// in case we have logs overflow for a particular upkeep, we drop a log of that upkeep,
// based on shared, random (per block) order of the logs in the block.
b.Sort()
var dropped fetchedLog
currentLogs := make([]fetchedLog, 0, len(b.logs)-1)
for _, l := range b.logs {
if dropped.upkeepID == nil && l.upkeepID.Cmp(fl.upkeepID) == 0 {
dropped = l
continue
}
currentLogs = append(currentLogs, l)
}
b.logs = currentLogs
return dropped, true
} else if len(b.logs)+len(b.visited) > maxBlockLogs {
// in case we have logs overflow in the buffer level, we drop a log based on
// shared, random (per block) order of the logs in the block.
b.Sort()
dropped := b.logs[0]
b.logs = b.logs[1:]
return dropped, true
}

return fetchedLog{}, true
}

// Has returns true if the block has the log,
// and the number of logs for that upkeep in the block.
func (b fetchedBlock) Has(id *big.Int, log logpoller.Log) (bool, int) {
func (b fetchedBlock) has(id *big.Int, log logpoller.Log) (bool, int) {
allLogs := append(b.logs, b.visited...)
upkeepLogs := 0
for _, l := range allLogs {
Expand All @@ -62,6 +118,22 @@ func (b fetchedBlock) Clone() fetchedBlock {
}
}

// Sort by log identifiers, shuffled using a pseduorandom souce that is shared across all nodes
// for a given block.
func (b *fetchedBlock) Sort() {
randSeed := random.GetRandomKeySource(nil, uint64(b.blockNumber))

shuffledLogIDs := make(map[string]string, len(b.logs))
for _, log := range b.logs {
logID := log.getLogID()
shuffledLogIDs[logID] = random.ShuffleString(logID, randSeed)
}

sort.SliceStable(b.logs, func(i, j int) bool {
return shuffledLogIDs[b.logs[i].getLogID()] < shuffledLogIDs[b.logs[j].getLogID()]
})
}

// logEventBuffer is a circular/ring buffer of fetched logs.
// Each entry in the buffer represents a block,
// and holds the logs fetched for that block.
Expand Down Expand Up @@ -97,6 +169,7 @@ func (b *logEventBuffer) bufferSize() int {
}

// enqueue adds logs (if not exist) to the buffer, returning the number of logs added
// minus the number of logs dropped.
func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -107,7 +180,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
maxUpkeepLogs := int(b.maxUpkeepLogsPerBlock)

latestBlock := b.latestBlockSeen()
added := 0
added, dropped := 0, 0

for _, log := range logs {
if log.BlockNumber == 0 {
// invalid log
Expand All @@ -125,23 +199,20 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber)
continue
}
if len(currentBlock.logs)+1 > maxBlockLogs {
lggr.Debugw("Reached max logs number per block, dropping log", "blockNumber", log.BlockNumber,
"blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex)
droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs)
if !ok {
// Skipping known logs
continue
}
if has, upkeepLogs := currentBlock.Has(id, log); has {
// Skipping existing log
continue
} else if upkeepLogs+1 > maxUpkeepLogs {
lggr.Debugw("Reached max logs number per upkeep, dropping log", "blockNumber", log.BlockNumber,
"blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex)
continue
if droppedLog.upkeepID != nil {
dropped++
lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", droppedLog.log.BlockNumber,
"blockHash", droppedLog.log.BlockHash, "txHash", droppedLog.log.TxHash, "logIndex", droppedLog.log.LogIndex,
"upkeepID", droppedLog.upkeepID.String())
}
// lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id)
currentBlock.logs = append(currentBlock.logs, fetchedLog{upkeepID: id, log: log})
b.blocks[i] = currentBlock
added++
b.blocks[i] = currentBlock

if log.BlockNumber > latestBlock {
latestBlock = log.BlockNumber
}
Expand All @@ -151,10 +222,10 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
atomic.StoreInt64(&b.latestBlock, latestBlock)
}
if added > 0 {
lggr.Debugw("Added logs to buffer", "addedLogs", added, "latestBlock", latestBlock)
lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock)
}

return added
return added - dropped
}

// peek returns the logs in range [latestBlock-blocks, latestBlock]
Expand Down Expand Up @@ -196,7 +267,7 @@ func (b *logEventBuffer) peekRange(start, end int64) []fetchedLog {
}

// dequeueRange returns the logs between start and end inclusive.
func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetchedLog {
func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit int) []fetchedLog {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -214,20 +285,33 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetch
})

logsCount := map[string]int{}
totalCount := 0
var results []fetchedLog
for _, block := range fetchedBlocks {
// double checking that we don't have any gaps in the range
if block.blockNumber < start || block.blockNumber > end {
// double checking that we don't have any gaps in the range
continue
}
if totalCount >= totalLimit {
// reached total limit, no need to process more blocks
break
}
// Sort the logs in random order that is shared across all nodes.
// This ensures that nodes across the network will process the same logs.
block.Sort()
var remainingLogs, blockResults []fetchedLog
for _, log := range block.logs {
if totalCount >= totalLimit {
remainingLogs = append(remainingLogs, log)
continue
}
if logsCount[log.upkeepID.String()] >= upkeepLimit {
remainingLogs = append(remainingLogs, log)
continue
}
logsCount[log.upkeepID.String()]++
blockResults = append(blockResults, log)
logsCount[log.upkeepID.String()]++
totalCount++
}
if len(blockResults) == 0 {
continue
Expand Down
Loading

0 comments on commit ad22c6e

Please sign in to comment.