Skip to content

Commit

Permalink
Merge branch 'develop' into tt_1140_log_scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
Tofel authored May 13, 2024
2 parents 1c434e5 + eed5668 commit 0af0514
Show file tree
Hide file tree
Showing 20 changed files with 63 additions and 187 deletions.
5 changes: 0 additions & 5 deletions .changeset/heavy-mails-rule.md

This file was deleted.

5 changes: 5 additions & 0 deletions .changeset/many-kings-notice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

bump chainlink-solana dependency #internal
5 changes: 5 additions & 0 deletions .changeset/moody-candles-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Revert block number tracking #changed
5 changes: 5 additions & 0 deletions .changeset/ninety-months-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#nops fix metric description on mercury_transmit_queue_load
5 changes: 5 additions & 0 deletions .changeset/tender-pianos-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Decouple utils tests from core #internal
13 changes: 4 additions & 9 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1031,11 +1031,6 @@ jobs:
solana-test-image-exists,
get_solana_sha,
]
container:
image: projectserum/build:${{ needs.get_projectserum_version.outputs.projectserum_version }}
env:
RUSTUP_HOME: "/root/.rustup"
FORCE_COLOR: 1
steps:
- name: Collect Metrics
if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch'
Expand All @@ -1049,17 +1044,17 @@ jobs:
this-job-name: Solana Build Artifacts
continue-on-error: true
- name: Checkout the solana repo
# Use v3.6.0 because the custom runner (container configured above)
# doesn't have node20 installed which is required for versions >=4
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4
with:
repository: smartcontractkit/chainlink-solana
ref: ${{ needs.get_solana_sha.outputs.sha }}
- name: Build contracts
if: (needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch') && needs.solana-test-image-exists.outputs.exists == 'false'
uses: smartcontractkit/chainlink-solana/.github/actions/build_contract_artifacts@21675b3a7dcdff8e790391708d4763020cace21e # stable action on December 18 2023
uses: smartcontractkit/chainlink-solana/.github/actions/build_contract_artifacts@46b1311a5a83f33d08ffa8e1e0ab04f9ad51665d # node20 update on may 10, 2024
with:
ref: ${{ needs.get_solana_sha.outputs.sha }}
image: projectserum/build
image-version: ${{ needs.get_projectserum_version.outputs.projectserum_version }}

solana-build-test-image:
environment: integration
Expand Down
17 changes: 9 additions & 8 deletions core/chains/evm/types/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils/hex"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

Expand Down Expand Up @@ -55,11 +56,11 @@ func TestHead_GreaterThan(t *testing.T) {
greater bool
}{
{"nil nil", nil, nil, false},
{"present nil", cltest.Head(1), nil, true},
{"nil present", nil, cltest.Head(1), false},
{"less", cltest.Head(1), cltest.Head(2), false},
{"equal", cltest.Head(2), cltest.Head(2), false},
{"greater", cltest.Head(2), cltest.Head(1), true},
{"present nil", testutils.Head(1), nil, true},
{"nil present", nil, testutils.Head(1), false},
{"less", testutils.Head(1), testutils.Head(2), false},
{"equal", testutils.Head(2), testutils.Head(2), false},
{"greater", testutils.Head(2), testutils.Head(1), true},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -76,7 +77,7 @@ func TestHead_NextInt(t *testing.T) {
want *big.Int
}{
{"nil", nil, nil},
{"one", cltest.Head(1), big.NewInt(2)},
{"one", testutils.Head(1), big.NewInt(2)},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -98,7 +99,7 @@ func TestEthTxAttempt_GetSignedTx(t *testing.T) {

chainID := big.NewInt(3)

signedTx, err := ethKeyStore.SignTx(testutils.Context(t), fromAddress, tx, chainID)
signedTx, err := ethKeyStore.SignTx(tests.Context(t), fromAddress, tx, chainID)
require.NoError(t, err)
rlp := new(bytes.Buffer)
require.NoError(t, signedTx.EncodeRLP(rlp))
Expand Down
8 changes: 5 additions & 3 deletions core/chains/evm/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
)

func TestKeccak256(t *testing.T) {
Expand Down Expand Up @@ -208,7 +210,7 @@ func TestRetryWithBackoff(t *testing.T) {
t.Parallel()

var counter atomic.Int32
ctx, cancel := context.WithCancel(testutils.Context(t))
ctx, cancel := context.WithCancel(tests.Context(t))

utils.RetryWithBackoff(ctx, func() bool {
return false
Expand All @@ -222,7 +224,7 @@ func TestRetryWithBackoff(t *testing.T) {

assert.Eventually(t, func() bool {
return counter.Load() == 3
}, testutils.WaitTimeout(t), testutils.TestInterval)
}, tests.WaitTimeout(t), tests.TestInterval)

cancel()

Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ require (
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240422130241-13c17a91b2ab // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240508123714-3a7c0d9d22e3 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240510181707-46b1311a5a83 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240508155030-1024f2b55c69 // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1193,8 +1193,8 @@ github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540/go.mod h1:sjAmX8K2kbQhvDarZE1ZZgDgmHJ50s0BBc/66vKY2ek=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240422130241-13c17a91b2ab h1:Ct1oUlyn03HDUVdFHJqtRGRUujMqdoMzvf/Cjhe30Ag=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240422130241-13c17a91b2ab/go.mod h1:RPUY7r8GxgzXxS1ijtU1P/fpJomOXztXgUbEziNmbCA=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240508123714-3a7c0d9d22e3 h1:nU9yibgmCMr6Yz4Ge2IJqzQGMYYpOlISor0HYwUewwU=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240508123714-3a7c0d9d22e3/go.mod h1:RdAtOeBUWq2zByw2kEbwPlXaPIb7YlaDOmnn+nVUBJI=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240510181707-46b1311a5a83 h1:f3W82k9V/XA6ZP/VQVJcGMVR6CrL3pQrPJSwyQWVFys=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240510181707-46b1311a5a83/go.mod h1:RdAtOeBUWq2zByw2kEbwPlXaPIb7YlaDOmnn+nVUBJI=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240508155030-1024f2b55c69 h1:ssh/w3oXWu+C6bE88GuFRC1+0Bx/4ihsbc80XMLrl2k=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240508155030-1024f2b55c69/go.mod h1:VsfjhvWgjxqWja4q+FlXEtX5lu8BSxn10xRo6gi948g=
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 h1:LQmRsrzzaYYN3wEU1l5tWiccznhvbyGnu2N+wHSXZAo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,80 +76,38 @@ type logBuffer struct {
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
lock sync.RWMutex
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
}
}

// Enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. It will create a new buffer if it does not exist.
// Logs are expected to be enqueued in increasing order of block number.
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call.
// Returns the number of logs that were added and number of logs that were dropped.
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepQueue(uid)
if !ok || buf == nil {
buf = newUpkeepLogQueue(b.lggr, uid, b.opts)
b.setUpkeepQueue(uid, buf)
}

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen)
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)
}

b.trackBlockNumbersForUpkeep(uid, uniqueBlocks)

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}

// clean up enqueued block counts
for block := range b.enqueuedBlocks {
if block < blockThreshold {
delete(b.enqueuedBlocks, block)
}
}

return buf.enqueue(blockThreshold, logs...)
}

// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep,
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once,
// we log a message.
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) {
for blockNumber := range uniqueBlocks {
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok {
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok {
blockNumbers[uid.String()] = upkeepBlockInstances + 1
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String())
} else {
blockNumbers[uid.String()] = 1
}
b.enqueuedBlocks[blockNumber] = blockNumbers
} else {
b.enqueuedBlocks[blockNumber] = map[string]int{
uid.String(): 1,
}
}
}
}

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -51,96 +50,6 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) {
require.Equal(t, 1, buf.NumOfUpkeeps())
}

type readableLogger struct {
logger.Logger
DebugwFn func(msg string, keysAndValues ...interface{})
NamedFn func(name string) logger.Logger
WithFn func(args ...interface{}) logger.Logger
}

func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) {
l.DebugwFn(msg, keysAndValues...)
}

func (l *readableLogger) Named(name string) logger.Logger {
return l
}

func (l *readableLogger) With(args ...interface{}) logger.Logger {
return l
}

func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs from a block older than latest seen block" {
logReceived = true
assert.Equal(t, "logBlock", keysAndValues[0])
assert.Equal(t, int64(1), keysAndValues[1])
assert.Equal(t, "lastBlockSeen", keysAndValues[2])
assert.Equal(t, int64(2), keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs again for a previously seen block" {
logReceived = true
assert.Equal(t, "blockNumber", keysAndValues[0])
assert.Equal(t, int64(3), keysAndValues[1])
assert.Equal(t, "numberOfEnqueues", keysAndValues[2])
assert.Equal(t, 2, keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}

func TestLogEventBufferV1_Dequeue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash.
// LogSorter sorts the logs based on block number, tx hash and log index.
// returns true if b should come before a.
func LogSorter(a, b logpoller.Log) bool {
return LogComparator(a, b) > 0
Expand Down Expand Up @@ -57,17 +57,13 @@ func logID(l logpoller.Log) string {
return hex.EncodeToString(ext.LogIdentifier())
}

// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
// latestBlockNumber returns the latest block number from the given logs
func latestBlockNumber(logs ...logpoller.Log) int64 {
var latest int64
uniqueBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
}

return latest, uniqueBlocks
return latest
}
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ services.Service = (*TransmitQueue)(nil)

var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "mercury_transmit_queue_load",
Help: "Percent of transmit queue capacity used",
Help: "Current count of items in the transmit queue",
},
[]string{"feedID", "serverURL", "capacity"},
)
Expand Down
Loading

0 comments on commit 0af0514

Please sign in to comment.