Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Topic1, Topic2, Topic3, LogsPerBlock fields to logpoller.Filter and log_poller_filters schema #11949

Merged
merged 6 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
}

type Filter struct {
Name string // see FilterName(id, args) below
EventSigs evmtypes.HashArray
Addresses evmtypes.AddressArray
Retention time.Duration
Name string // see FilterName(id, args) below
Addresses evmtypes.AddressArray
EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration // maximum amount of time to retain logs
MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited )
LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
require.Equal(t, 1, len(f.Addresses))
assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0])

err := lp.RegisterFilter(Filter{"Emitter Log 1", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{a1}, 0})
err := lp.RegisterFilter(Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe EventSigs
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe Addresses
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2 dupe", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Address required.
err = lp.RegisterFilter(Filter{"no address", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{}, 0})
err = lp.RegisterFilter(Filter{Name: "no address", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}})
require.Error(t, err)
// Event required
err = lp.RegisterFilter(Filter{"No event", []common.Hash{}, []common.Address{a1}, 0})
err = lp.RegisterFilter(Filter{Name: "No event", Addresses: []common.Address{a1}})
require.Error(t, err)
validateFiltersTable(t, lp, orm)

Expand Down
74 changes: 48 additions & 26 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestLogPoller_Integration(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0}))
require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}}))
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1)
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1)

Expand Down Expand Up @@ -188,8 +188,9 @@ func TestLogPoller_Integration(t *testing.T) {

// Now let's update the Filter and replay to get Log2 logs.
err = th.LogPoller.RegisterFilter(logpoller.Filter{
"Emitter - log2", []common.Hash{EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1}, 0,
Name: "Emitter - log2",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)
// Replay an invalid block should error
Expand Down Expand Up @@ -254,11 +255,13 @@ func Test_BackupLogPoller(t *testing.T) {

ctx := testutils.Context(t)

filter1 := logpoller.Filter{"filter1", []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1},
0}
filter1 := logpoller.Filter{
Name: "filter1",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
}
err := th.LogPoller.RegisterFilter(filter1)
require.NoError(t, err)

Expand All @@ -268,9 +271,11 @@ func Test_BackupLogPoller(t *testing.T) {
require.Equal(t, filter1, filters["filter1"])

err = th.LogPoller.RegisterFilter(
logpoller.Filter{"filter2",
[]common.Hash{EmitterABI.Events["Log1"].ID},
[]common.Address{th.EmitterAddress2}, 0})
logpoller.Filter{
Name: "filter2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress2},
})
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -569,9 +574,9 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}
events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}

err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses, 0})
err := th.LogPoller.RegisterFilter(logpoller.Filter{Name: "convertLogs", EventSigs: events, Addresses: addresses})
require.NoError(t, err)

blk, err := th.Client.BlockByNumber(ctx, nil)
Expand Down Expand Up @@ -619,7 +624,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(2),
ToBlock: big.NewInt(5),
Topics: [][]common.Hash{topics},
Topics: [][]common.Hash{events},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}}

gethLogs, err := th.Client.FilterLogs(ctx, query)
Expand Down Expand Up @@ -762,8 +767,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
"Test Emitter 1 & 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0,
Name: "Test Emitter 1 & 2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

Expand Down Expand Up @@ -1068,12 +1074,22 @@ func TestLogPoller_LoadFilters(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

filter1 := logpoller.Filter{"first Filter", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter2 := logpoller.Filter{"second Filter", []common.Hash{
EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, []common.Address{th.EmitterAddress2}, 0}
filter3 := logpoller.Filter{"third Filter", []common.Hash{
EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter1 := logpoller.Filter{
Name: "first Filter",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}
filter2 := logpoller.Filter{
Name: "second Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID},
Addresses: []common.Address{th.EmitterAddress2},
}
filter3 := logpoller.Filter{
Name: "third Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}

assert.True(t, filter1.Contains(nil))
assert.False(t, filter1.Contains(&filter2))
Expand Down Expand Up @@ -1119,9 +1135,11 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0},
)
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "GetBlocks Test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

// LP retrieves 0 blocks
Expand Down Expand Up @@ -1365,7 +1383,11 @@ func TestTooManyLogResults(t *testing.T) {
})

addr := testutils.NewAddress()
err := lp.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{addr}, 0})
err := lp.RegisterFilter(logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock()
Expand Down
37 changes: 30 additions & 7 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
Expand Down Expand Up @@ -95,26 +97,42 @@ func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimes
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// If a second job tries to overwrite the same pair, this should fail.
func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withCustomArg("retention", filter.Retention).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
withAddressArray(filter.Addresses).
withEventSigArray(filter.EventSigs).
withTopicArrays(filter.Topic2, filter.Topic3, filter.Topic4).
toArgs()
if err != nil {
return err
}
// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
return o.q.WithOpts(qopts...).ExecQNamed(`
var topicsColumns, topicsSql strings.Builder
for n, topicValues := range topicArrays {
if len(topicValues) != 0 {
topicCol := fmt.Sprintf("topic%d", n+2)
fmt.Fprintf(&topicsColumns, ", %s", topicCol)
fmt.Fprintf(&topicsSql, ",\n(SELECT unnest(:%s ::::BYTEA[]) %s) t%d", topicCol, topicCol, n+2)
}
}
query := fmt.Sprintf(`
INSERT INTO evm.log_poller_filters
(name, evm_chain_id, retention, created_at, address, event)
(name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s)
SELECT * FROM
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, NOW()) x,
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally understand the max_logs_kept argument. It might be useful for some filters for CCIP in which we are interested in the latest logs, so it's better to rely on the number of logs instead of duration retention. However, I wonder what's the use case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the changes in this PR are aimed at addressing security concerns with Automation's new LogTriggers feature. They allow external users to track any logs on any smart contract, so without this it would open the door to a DoS attack where someone tells us to save every log from an ulta high volume smart contract, filling up the disk and halting operation.

The LogsPerBlock limit is less important, but it will be there as a safeguard against overloading the network connection with a high burst of logs all in one block. (They verified by writing a test contract that there is no limit on the number of logs you can emit at once, they showed me an example where they emitted 16,000 logs in one tx.)

(SELECT unnest(:address_array ::::BYTEA[]) addr) a,
(SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e
ON CONFLICT (name, evm_chain_id, address, event)
DO UPDATE SET retention=:retention ::::BIGINT`, args)
%s
ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0))
DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`,
topicsColumns.String(),
topicsSql.String())
return o.q.WithOpts(qopts...).ExecQNamed(query, args)
}

// DeleteFilter removes all events,address pairs associated with the Filter
Expand All @@ -130,7 +148,12 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
err := q.Select(&rows, `SELECT name,
ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses,
ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs,
MAX(retention) AS retention
ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2,
ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3,
ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4,
MAX(logs_per_block) AS logs_per_block,
MAX(retention) AS retention,
MAX(max_logs_kept) AS max_logs_kept
FROM evm.log_poller_filters WHERE evm_chain_id = $1
GROUP BY name`, ubig.New(o.chainID))
filters := make(map[string]Filter)
Expand Down
96 changes: 96 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"bytes"
"context"
"database/sql"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -448,6 +450,100 @@ func TestORM(t *testing.T) {
require.Zero(t, len(logs))
}

type PgxLogger struct {
lggr logger.Logger
}

func NewPgxLogger(lggr logger.Logger) PgxLogger {
return PgxLogger{lggr}
}

func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) {

}

func TestLogPollerFilters(t *testing.T) {
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()

dbx := pgtest.NewSqlxDB(t)
orm := logpoller.NewORM(chainID, dbx, lggr, pgtest.NewQConfig(true))

event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address := common.HexToAddress("0x1234")
topicA := common.HexToHash("0x1111")
topicB := common.HexToHash("0x2222")
topicC := common.HexToHash("0x3333")
topicD := common.HexToHash("0x4444")

filters := []logpoller.Filter{{
Name: "filter by topic2",
EventSigs: types.HashArray{event1, event2},
Addresses: types.AddressArray{address},
Topic2: types.HashArray{topicA, topicB},
}, {
Name: "filter by topic3",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicB, topicC, topicD},
}, {
Name: "filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic4: types.HashArray{topicC},
}, {
Name: "filter by topics 2 and 4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event2},
Topic2: types.HashArray{topicA},
Topic4: types.HashArray{topicC, topicD},
}, {
Name: "10 lpb rate limit, 1M max logs",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
MaxLogsKept: 1000000,
LogsPerBlock: 10,
}, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical)
Name: "duplicate of filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicC},
}}

for _, filter := range filters {
t.Run("Save filter: "+filter.Name, func(t *testing.T) {
var count int
err := orm.InsertFilter(filter)
require.NoError(t, err)
err = dbx.Get(&count, `SELECT COUNT(*) FROM evm.log_poller_filters WHERE evm_chain_id = $1 AND name = $2`, ubig.New(chainID), filter.Name)
require.NoError(t, err)
expectedCount := len(filter.Addresses) * len(filter.EventSigs)
if len(filter.Topic2) > 0 {
expectedCount *= len(filter.Topic2)
}
if len(filter.Topic3) > 0 {
expectedCount *= len(filter.Topic3)
}
if len(filter.Topic4) > 0 {
expectedCount *= len(filter.Topic4)
}
assert.Equal(t, count, expectedCount)
})
}

// Make sure they all come back the same when we reload them
t.Run("Load filters", func(t *testing.T) {
loadedFilters, err := orm.LoadFilters()
require.NoError(t, err)
for _, filter := range filters {
loadedFilter, ok := loadedFilters[filter.Name]
require.True(t, ok, `Failed to reload filter "%s"`, filter.Name)
assert.Equal(t, filter, loadedFilter)
}
})
}

func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) {
var lgs []logpoller.Log
for i := start; i <= stop; i++ {
Expand Down
Loading
Loading