Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add logs to identify when assumptions of log queuing behaviour are violated" #13173

Merged
merged 2 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .changeset/heavy-mails-rule.md

This file was deleted.

5 changes: 5 additions & 0 deletions .changeset/moody-candles-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Revert block number tracking #changed
Original file line number Diff line number Diff line change
Expand Up @@ -76,80 +76,38 @@ type logBuffer struct {
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
lock sync.RWMutex
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
}
}

// Enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. It will create a new buffer if it does not exist.
// Logs are expected to be enqueued in increasing order of block number.
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call.
// Returns the number of logs that were added and number of logs that were dropped.
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepQueue(uid)
if !ok || buf == nil {
buf = newUpkeepLogQueue(b.lggr, uid, b.opts)
b.setUpkeepQueue(uid, buf)
}

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen)
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)
}

b.trackBlockNumbersForUpkeep(uid, uniqueBlocks)

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}

// clean up enqueued block counts
for block := range b.enqueuedBlocks {
if block < blockThreshold {
delete(b.enqueuedBlocks, block)
}
}

return buf.enqueue(blockThreshold, logs...)
}

// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep,
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once,
// we log a message.
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) {
for blockNumber := range uniqueBlocks {
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok {
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok {
blockNumbers[uid.String()] = upkeepBlockInstances + 1
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String())
} else {
blockNumbers[uid.String()] = 1
}
b.enqueuedBlocks[blockNumber] = blockNumbers
} else {
b.enqueuedBlocks[blockNumber] = map[string]int{
uid.String(): 1,
}
}
}
}

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -51,96 +50,6 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) {
require.Equal(t, 1, buf.NumOfUpkeeps())
}

type readableLogger struct {
logger.Logger
DebugwFn func(msg string, keysAndValues ...interface{})
NamedFn func(name string) logger.Logger
WithFn func(args ...interface{}) logger.Logger
}

func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) {
l.DebugwFn(msg, keysAndValues...)
}

func (l *readableLogger) Named(name string) logger.Logger {
return l
}

func (l *readableLogger) With(args ...interface{}) logger.Logger {
return l
}

func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs from a block older than latest seen block" {
logReceived = true
assert.Equal(t, "logBlock", keysAndValues[0])
assert.Equal(t, int64(1), keysAndValues[1])
assert.Equal(t, "lastBlockSeen", keysAndValues[2])
assert.Equal(t, int64(2), keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs again for a previously seen block" {
logReceived = true
assert.Equal(t, "blockNumber", keysAndValues[0])
assert.Equal(t, int64(3), keysAndValues[1])
assert.Equal(t, "numberOfEnqueues", keysAndValues[2])
assert.Equal(t, 2, keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}

func TestLogEventBufferV1_Dequeue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash.
// LogSorter sorts the logs based on block number, tx hash and log index.
// returns true if b should come before a.
func LogSorter(a, b logpoller.Log) bool {
return LogComparator(a, b) > 0
Expand Down Expand Up @@ -57,17 +57,13 @@ func logID(l logpoller.Log) string {
return hex.EncodeToString(ext.LogIdentifier())
}

// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
// latestBlockNumber returns the latest block number from the given logs
func latestBlockNumber(logs ...logpoller.Log) int64 {
var latest int64
uniqueBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
}

return latest, uniqueBlocks
return latest
}
Loading