Skip to content

Commit

Permalink
Add Topic1, Topic2, Topic3, LogsPerBlock fields to logpoller.Filter a…
Browse files Browse the repository at this point in the history
…nd log_poller_filters schema (#11949)

* Add Topic1, Topic2, Topic3, LogsPerBlock fields to logpoller.Filter

And update evm.log_poller_filters schema to match

* Improve SQL QUERY logging

[]BYTEA was panicing for nil or empty array, which then gets trapped and
ignored... making it very difficult to figure out which query the error
is coming from.

While fixing that bug, updated formating of []BYTEA and TEXT to be
closer an actual SQL query you can run by cutting and pasting from the log

* Add MaxLogsKept

* Address PR comments

- Add new index before removing old index to be safer
- Change MaxLogsKept & LogsPerBlock from big.Int to uint64

* Update migration # after rebase
  • Loading branch information
reductionista authored Feb 22, 2024
1 parent 85cc590 commit 63c286d
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 54 deletions.
13 changes: 9 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
}

type Filter struct {
Name string // see FilterName(id, args) below
EventSigs evmtypes.HashArray
Addresses evmtypes.AddressArray
Retention time.Duration
Name string // see FilterName(id, args) below
Addresses evmtypes.AddressArray
EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration // maximum amount of time to retain logs
MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited )
LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
require.Equal(t, 1, len(f.Addresses))
assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0])

err := lp.RegisterFilter(Filter{"Emitter Log 1", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{a1}, 0})
err := lp.RegisterFilter(Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe EventSigs
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe Addresses
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2 dupe", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Address required.
err = lp.RegisterFilter(Filter{"no address", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{}, 0})
err = lp.RegisterFilter(Filter{Name: "no address", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}})
require.Error(t, err)
// Event required
err = lp.RegisterFilter(Filter{"No event", []common.Hash{}, []common.Address{a1}, 0})
err = lp.RegisterFilter(Filter{Name: "No event", Addresses: []common.Address{a1}})
require.Error(t, err)
validateFiltersTable(t, lp, orm)

Expand Down
74 changes: 48 additions & 26 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestLogPoller_Integration(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0}))
require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}}))
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1)
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1)

Expand Down Expand Up @@ -188,8 +188,9 @@ func TestLogPoller_Integration(t *testing.T) {

// Now let's update the Filter and replay to get Log2 logs.
err = th.LogPoller.RegisterFilter(logpoller.Filter{
"Emitter - log2", []common.Hash{EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1}, 0,
Name: "Emitter - log2",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)
// Replay an invalid block should error
Expand Down Expand Up @@ -254,11 +255,13 @@ func Test_BackupLogPoller(t *testing.T) {

ctx := testutils.Context(t)

filter1 := logpoller.Filter{"filter1", []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1},
0}
filter1 := logpoller.Filter{
Name: "filter1",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
}
err := th.LogPoller.RegisterFilter(filter1)
require.NoError(t, err)

Expand All @@ -268,9 +271,11 @@ func Test_BackupLogPoller(t *testing.T) {
require.Equal(t, filter1, filters["filter1"])

err = th.LogPoller.RegisterFilter(
logpoller.Filter{"filter2",
[]common.Hash{EmitterABI.Events["Log1"].ID},
[]common.Address{th.EmitterAddress2}, 0})
logpoller.Filter{
Name: "filter2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress2},
})
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -569,9 +574,9 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}
events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}

err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses, 0})
err := th.LogPoller.RegisterFilter(logpoller.Filter{Name: "convertLogs", EventSigs: events, Addresses: addresses})
require.NoError(t, err)

blk, err := th.Client.BlockByNumber(ctx, nil)
Expand Down Expand Up @@ -619,7 +624,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(2),
ToBlock: big.NewInt(5),
Topics: [][]common.Hash{topics},
Topics: [][]common.Hash{events},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}}

gethLogs, err := th.Client.FilterLogs(ctx, query)
Expand Down Expand Up @@ -762,8 +767,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
"Test Emitter 1 & 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0,
Name: "Test Emitter 1 & 2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

Expand Down Expand Up @@ -1068,12 +1074,22 @@ func TestLogPoller_LoadFilters(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

filter1 := logpoller.Filter{"first Filter", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter2 := logpoller.Filter{"second Filter", []common.Hash{
EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, []common.Address{th.EmitterAddress2}, 0}
filter3 := logpoller.Filter{"third Filter", []common.Hash{
EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter1 := logpoller.Filter{
Name: "first Filter",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}
filter2 := logpoller.Filter{
Name: "second Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID},
Addresses: []common.Address{th.EmitterAddress2},
}
filter3 := logpoller.Filter{
Name: "third Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}

assert.True(t, filter1.Contains(nil))
assert.False(t, filter1.Contains(&filter2))
Expand Down Expand Up @@ -1119,9 +1135,11 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0},
)
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "GetBlocks Test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

// LP retrieves 0 blocks
Expand Down Expand Up @@ -1365,7 +1383,11 @@ func TestTooManyLogResults(t *testing.T) {
})

addr := testutils.NewAddress()
err := lp.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{addr}, 0})
err := lp.RegisterFilter(logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock()
Expand Down
37 changes: 30 additions & 7 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
Expand Down Expand Up @@ -95,26 +97,42 @@ func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimes
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// If a second job tries to overwrite the same pair, this should fail.
func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withCustomArg("retention", filter.Retention).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
withAddressArray(filter.Addresses).
withEventSigArray(filter.EventSigs).
withTopicArrays(filter.Topic2, filter.Topic3, filter.Topic4).
toArgs()
if err != nil {
return err
}
// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
return o.q.WithOpts(qopts...).ExecQNamed(`
var topicsColumns, topicsSql strings.Builder
for n, topicValues := range topicArrays {
if len(topicValues) != 0 {
topicCol := fmt.Sprintf("topic%d", n+2)
fmt.Fprintf(&topicsColumns, ", %s", topicCol)
fmt.Fprintf(&topicsSql, ",\n(SELECT unnest(:%s ::::BYTEA[]) %s) t%d", topicCol, topicCol, n+2)
}
}
query := fmt.Sprintf(`
INSERT INTO evm.log_poller_filters
(name, evm_chain_id, retention, created_at, address, event)
(name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s)
SELECT * FROM
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, NOW()) x,
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x,
(SELECT unnest(:address_array ::::BYTEA[]) addr) a,
(SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e
ON CONFLICT (name, evm_chain_id, address, event)
DO UPDATE SET retention=:retention ::::BIGINT`, args)
%s
ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0))
DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`,
topicsColumns.String(),
topicsSql.String())
return o.q.WithOpts(qopts...).ExecQNamed(query, args)
}

// DeleteFilter removes all events,address pairs associated with the Filter
Expand All @@ -130,7 +148,12 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
err := q.Select(&rows, `SELECT name,
ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses,
ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs,
MAX(retention) AS retention
ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2,
ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3,
ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4,
MAX(logs_per_block) AS logs_per_block,
MAX(retention) AS retention,
MAX(max_logs_kept) AS max_logs_kept
FROM evm.log_poller_filters WHERE evm_chain_id = $1
GROUP BY name`, ubig.New(o.chainID))
filters := make(map[string]Filter)
Expand Down
96 changes: 96 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"bytes"
"context"
"database/sql"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -448,6 +450,100 @@ func TestORM(t *testing.T) {
require.Zero(t, len(logs))
}

type PgxLogger struct {
lggr logger.Logger
}

func NewPgxLogger(lggr logger.Logger) PgxLogger {
return PgxLogger{lggr}
}

func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) {

}

func TestLogPollerFilters(t *testing.T) {
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()

dbx := pgtest.NewSqlxDB(t)
orm := logpoller.NewORM(chainID, dbx, lggr, pgtest.NewQConfig(true))

event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address := common.HexToAddress("0x1234")
topicA := common.HexToHash("0x1111")
topicB := common.HexToHash("0x2222")
topicC := common.HexToHash("0x3333")
topicD := common.HexToHash("0x4444")

filters := []logpoller.Filter{{
Name: "filter by topic2",
EventSigs: types.HashArray{event1, event2},
Addresses: types.AddressArray{address},
Topic2: types.HashArray{topicA, topicB},
}, {
Name: "filter by topic3",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicB, topicC, topicD},
}, {
Name: "filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic4: types.HashArray{topicC},
}, {
Name: "filter by topics 2 and 4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event2},
Topic2: types.HashArray{topicA},
Topic4: types.HashArray{topicC, topicD},
}, {
Name: "10 lpb rate limit, 1M max logs",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
MaxLogsKept: 1000000,
LogsPerBlock: 10,
}, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical)
Name: "duplicate of filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicC},
}}

for _, filter := range filters {
t.Run("Save filter: "+filter.Name, func(t *testing.T) {
var count int
err := orm.InsertFilter(filter)
require.NoError(t, err)
err = dbx.Get(&count, `SELECT COUNT(*) FROM evm.log_poller_filters WHERE evm_chain_id = $1 AND name = $2`, ubig.New(chainID), filter.Name)
require.NoError(t, err)
expectedCount := len(filter.Addresses) * len(filter.EventSigs)
if len(filter.Topic2) > 0 {
expectedCount *= len(filter.Topic2)
}
if len(filter.Topic3) > 0 {
expectedCount *= len(filter.Topic3)
}
if len(filter.Topic4) > 0 {
expectedCount *= len(filter.Topic4)
}
assert.Equal(t, count, expectedCount)
})
}

// Make sure they all come back the same when we reload them
t.Run("Load filters", func(t *testing.T) {
loadedFilters, err := orm.LoadFilters()
require.NoError(t, err)
for _, filter := range filters {
loadedFilter, ok := loadedFilters[filter.Name]
require.True(t, ok, `Failed to reload filter "%s"`, filter.Name)
assert.Equal(t, filter, loadedFilter)
}
})
}

func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) {
var lgs []logpoller.Log
for i := start; i <= stop; i++ {
Expand Down
Loading

0 comments on commit 63c286d

Please sign in to comment.