Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track when we enqueue logs for the same upkeep on the same block more than once #13176

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/heavy-mails-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add logs for when the assumptions of how the log buffer will be used are violated #internal
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,90 @@ type logBuffer struct {
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
lock sync.RWMutex

// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
}
}

// Enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. It will create a new buffer if it does not exist.
// Logs are expected to be enqueued in increasing order of block number.
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call.
// Returns the number of logs that were added and number of logs that were dropped.
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepQueue(uid)
if !ok || buf == nil {
buf = newUpkeepLogQueue(b.lggr, uid, b.opts)
b.setUpkeepQueue(uid, buf)
}
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
b.lggr.Debugw("enqueuing logs with a latest block older older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen)
}

b.trackBlockNumbersForUpkeep(uid, uniqueBlocks)

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}

b.cleanupEnqueuedBlocks(blockThreshold)

return buf.enqueue(blockThreshold, logs...)
}

func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) {
b.enqueuedBlockLock.Lock()
defer b.enqueuedBlockLock.Unlock()
// clean up enqueued block counts
for block := range b.enqueuedBlocks {
if block < blockThreshold {
delete(b.enqueuedBlocks, block)
}
}
}

// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep,
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once,
// we log a message.
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) {
b.enqueuedBlockLock.Lock()
defer b.enqueuedBlockLock.Unlock()

for blockNumber := range uniqueBlocks {
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok {
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok {
blockNumbers[uid.String()] = upkeepBlockInstances + 1
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String())
} else {
blockNumbers[uid.String()] = 1
}
b.enqueuedBlocks[blockNumber] = blockNumbers
} else {
b.enqueuedBlocks[blockNumber] = map[string]int{
uid.String(): 1,
}
}
}
}

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) {
require.Equal(t, 1, buf.NumOfUpkeeps())
}

type readableLogger struct {
logger.Logger
DebugwFn func(msg string, keysAndValues ...interface{})
NamedFn func(name string) logger.Logger
WithFn func(args ...interface{}) logger.Logger
}

func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) {
l.DebugwFn(msg, keysAndValues...)
}

func (l *readableLogger) Named(name string) logger.Logger {
return l
}

func (l *readableLogger) With(args ...interface{}) logger.Logger {
return l
}

func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs from a block older than latest seen block" {
logReceived = true
assert.Equal(t, "logBlock", keysAndValues[0])
assert.Equal(t, int64(1), keysAndValues[1])
assert.Equal(t, "lastBlockSeen", keysAndValues[2])
assert.Equal(t, int64(2), keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs again for a previously seen block" {
logReceived = true
assert.Equal(t, "blockNumber", keysAndValues[0])
assert.Equal(t, int64(3), keysAndValues[1])
assert.Equal(t, "numberOfEnqueues", keysAndValues[2])
assert.Equal(t, 2, keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}

func TestLogEventBufferV1_Dequeue(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -470,3 +561,107 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo
}
return logs
}

func Test_trackBlockNumbersForUpkeep(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1)

logBuffer := buf.(*logBuffer)

for _, tc := range []struct {
uid *big.Int
uniqueBlocks map[int64]bool
wantEnqueuedBlocks map[int64]map[string]int
}{
{
uid: big.NewInt(1),
uniqueBlocks: map[int64]bool{
1: true,
2: true,
3: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
},
2: {
"1": 1,
},
3: {
"1": 1,
},
},
},
{
uid: big.NewInt(2),
uniqueBlocks: map[int64]bool{
1: true,
2: true,
3: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 1,
},
},
},
{
uid: big.NewInt(2),
uniqueBlocks: map[int64]bool{
3: true,
4: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 2,
},
4: {
"2": 1,
},
},
},
{
uniqueBlocks: map[int64]bool{
3: true,
4: true,
},
wantEnqueuedBlocks: map[int64]map[string]int{
1: {
"1": 1,
"2": 1,
},
2: {
"1": 1,
"2": 1,
},
3: {
"1": 1,
"2": 2,
},
4: {
"2": 1,
},
},
},
} {
logBuffer.trackBlockNumbersForUpkeep(tc.uid, tc.uniqueBlocks)
assert.Equal(t, tc.wantEnqueuedBlocks, logBuffer.enqueuedBlocks)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// LogSorter sorts the logs based on block number, tx hash and log index.
// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash.
// returns true if b should come before a.
func LogSorter(a, b logpoller.Log) bool {
return LogComparator(a, b) > 0
Expand Down Expand Up @@ -57,13 +57,17 @@ func logID(l logpoller.Log) string {
return hex.EncodeToString(ext.LogIdentifier())
}

// latestBlockNumber returns the latest block number from the given logs
func latestBlockNumber(logs ...logpoller.Log) int64 {
// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
var latest int64
uniqueBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
}
return latest

return latest, uniqueBlocks
}
Loading