Skip to content

Commit

Permalink
Merge branch 'develop' into AUTO-9081
Browse files Browse the repository at this point in the history
  • Loading branch information
shileiwill authored Feb 22, 2024
2 parents b719ec5 + 16f1b78 commit 9530e34
Show file tree
Hide file tree
Showing 75 changed files with 4,234 additions and 236 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ tools/flakeytests/coverage.txt
**/testdata/fuzz/*

# Runtime test configuration that might contain secrets
overrides.toml
overrides.toml

# Pythin venv
.venv/
1 change: 1 addition & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ gomodtidy: ## Run go mod tidy on all modules.
go mod tidy
cd ./core/scripts && go mod tidy
cd ./integration-tests && go mod tidy
cd ./integration-tests/load && go mod tidy

.PHONY: godoc
godoc: ## Install and run godoc
Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil {
etx, err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, fromAddress, eb.chainID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Finish. No more transactions left to process. Hoorah!
return nil, nil
Expand Down
28 changes: 20 additions & 8 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type TransactionStore[
FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the fromAddress and sequence
FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error
FindNextUnstartedTransactionFromAddress(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error)
Expand Down
13 changes: 9 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
}

type Filter struct {
Name string // see FilterName(id, args) below
EventSigs evmtypes.HashArray
Addresses evmtypes.AddressArray
Retention time.Duration
Name string // see FilterName(id, args) below
Addresses evmtypes.AddressArray
EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration // maximum amount of time to retain logs
MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited )
LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
require.Equal(t, 1, len(f.Addresses))
assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0])

err := lp.RegisterFilter(Filter{"Emitter Log 1", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{a1}, 0})
err := lp.RegisterFilter(Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe EventSigs
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe Addresses
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2 dupe", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Address required.
err = lp.RegisterFilter(Filter{"no address", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{}, 0})
err = lp.RegisterFilter(Filter{Name: "no address", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}})
require.Error(t, err)
// Event required
err = lp.RegisterFilter(Filter{"No event", []common.Hash{}, []common.Address{a1}, 0})
err = lp.RegisterFilter(Filter{Name: "No event", Addresses: []common.Address{a1}})
require.Error(t, err)
validateFiltersTable(t, lp, orm)

Expand Down
74 changes: 48 additions & 26 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestLogPoller_Integration(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0}))
require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}}))
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1)
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1)

Expand Down Expand Up @@ -188,8 +188,9 @@ func TestLogPoller_Integration(t *testing.T) {

// Now let's update the Filter and replay to get Log2 logs.
err = th.LogPoller.RegisterFilter(logpoller.Filter{
"Emitter - log2", []common.Hash{EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1}, 0,
Name: "Emitter - log2",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)
// Replay an invalid block should error
Expand Down Expand Up @@ -254,11 +255,13 @@ func Test_BackupLogPoller(t *testing.T) {

ctx := testutils.Context(t)

filter1 := logpoller.Filter{"filter1", []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1},
0}
filter1 := logpoller.Filter{
Name: "filter1",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
}
err := th.LogPoller.RegisterFilter(filter1)
require.NoError(t, err)

Expand All @@ -268,9 +271,11 @@ func Test_BackupLogPoller(t *testing.T) {
require.Equal(t, filter1, filters["filter1"])

err = th.LogPoller.RegisterFilter(
logpoller.Filter{"filter2",
[]common.Hash{EmitterABI.Events["Log1"].ID},
[]common.Address{th.EmitterAddress2}, 0})
logpoller.Filter{
Name: "filter2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress2},
})
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -569,9 +574,9 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}
events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}

err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses, 0})
err := th.LogPoller.RegisterFilter(logpoller.Filter{Name: "convertLogs", EventSigs: events, Addresses: addresses})
require.NoError(t, err)

blk, err := th.Client.BlockByNumber(ctx, nil)
Expand Down Expand Up @@ -619,7 +624,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(2),
ToBlock: big.NewInt(5),
Topics: [][]common.Hash{topics},
Topics: [][]common.Hash{events},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}}

gethLogs, err := th.Client.FilterLogs(ctx, query)
Expand Down Expand Up @@ -762,8 +767,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
"Test Emitter 1 & 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0,
Name: "Test Emitter 1 & 2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

Expand Down Expand Up @@ -1068,12 +1074,22 @@ func TestLogPoller_LoadFilters(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

filter1 := logpoller.Filter{"first Filter", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter2 := logpoller.Filter{"second Filter", []common.Hash{
EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, []common.Address{th.EmitterAddress2}, 0}
filter3 := logpoller.Filter{"third Filter", []common.Hash{
EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter1 := logpoller.Filter{
Name: "first Filter",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}
filter2 := logpoller.Filter{
Name: "second Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID},
Addresses: []common.Address{th.EmitterAddress2},
}
filter3 := logpoller.Filter{
Name: "third Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}

assert.True(t, filter1.Contains(nil))
assert.False(t, filter1.Contains(&filter2))
Expand Down Expand Up @@ -1119,9 +1135,11 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0},
)
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "GetBlocks Test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

// LP retrieves 0 blocks
Expand Down Expand Up @@ -1365,7 +1383,11 @@ func TestTooManyLogResults(t *testing.T) {
})

addr := testutils.NewAddress()
err := lp.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{addr}, 0})
err := lp.RegisterFilter(logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock()
Expand Down
37 changes: 30 additions & 7 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
Expand Down Expand Up @@ -95,26 +97,42 @@ func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimes
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// If a second job tries to overwrite the same pair, this should fail.
func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withCustomArg("retention", filter.Retention).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
withAddressArray(filter.Addresses).
withEventSigArray(filter.EventSigs).
withTopicArrays(filter.Topic2, filter.Topic3, filter.Topic4).
toArgs()
if err != nil {
return err
}
// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
return o.q.WithOpts(qopts...).ExecQNamed(`
var topicsColumns, topicsSql strings.Builder
for n, topicValues := range topicArrays {
if len(topicValues) != 0 {
topicCol := fmt.Sprintf("topic%d", n+2)
fmt.Fprintf(&topicsColumns, ", %s", topicCol)
fmt.Fprintf(&topicsSql, ",\n(SELECT unnest(:%s ::::BYTEA[]) %s) t%d", topicCol, topicCol, n+2)
}
}
query := fmt.Sprintf(`
INSERT INTO evm.log_poller_filters
(name, evm_chain_id, retention, created_at, address, event)
(name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s)
SELECT * FROM
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, NOW()) x,
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x,
(SELECT unnest(:address_array ::::BYTEA[]) addr) a,
(SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e
ON CONFLICT (name, evm_chain_id, address, event)
DO UPDATE SET retention=:retention ::::BIGINT`, args)
%s
ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0))
DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`,
topicsColumns.String(),
topicsSql.String())
return o.q.WithOpts(qopts...).ExecQNamed(query, args)
}

// DeleteFilter removes all events,address pairs associated with the Filter
Expand All @@ -130,7 +148,12 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
err := q.Select(&rows, `SELECT name,
ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses,
ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs,
MAX(retention) AS retention
ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2,
ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3,
ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4,
MAX(logs_per_block) AS logs_per_block,
MAX(retention) AS retention,
MAX(max_logs_kept) AS max_logs_kept
FROM evm.log_poller_filters WHERE evm_chain_id = $1
GROUP BY name`, ubig.New(o.chainID))
filters := make(map[string]Filter)
Expand Down
Loading

0 comments on commit 9530e34

Please sign in to comment.