Skip to content

Commit

Permalink
Add Topic1, Topic2, Topic3, LogsPerBlock fields to logpoller.Filter
Browse files Browse the repository at this point in the history
And update evm.log_poller_filters schema to match
  • Loading branch information
reductionista committed Feb 13, 2024
1 parent 2da47a9 commit f3b6b22
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 44 deletions.
12 changes: 8 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
}

type Filter struct {
Name string // see FilterName(id, args) below
EventSigs evmtypes.HashArray
Addresses evmtypes.AddressArray
Retention time.Duration
Name string // see FilterName(id, args) below
Addresses evmtypes.AddressArray
EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration
LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
require.Equal(t, 1, len(f.Addresses))
assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0])

err := lp.RegisterFilter(Filter{"Emitter Log 1", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{a1}, 0})
err := lp.RegisterFilter(Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe EventSigs
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Should de-dupe Addresses
err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2 dupe", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0})
err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}})
require.NoError(t, err)
assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses)
assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics)
validateFiltersTable(t, lp, orm)

// Address required.
err = lp.RegisterFilter(Filter{"no address", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{}, 0})
err = lp.RegisterFilter(Filter{Name: "no address", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}})
require.Error(t, err)
// Event required
err = lp.RegisterFilter(Filter{"No event", []common.Hash{}, []common.Address{a1}, 0})
err = lp.RegisterFilter(Filter{Name: "No event", Addresses: []common.Address{a1}})
require.Error(t, err)
validateFiltersTable(t, lp, orm)

Expand Down
74 changes: 48 additions & 26 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestLogPoller_Integration(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0}))
require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}}))
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1)
require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1)

Expand Down Expand Up @@ -188,8 +188,9 @@ func TestLogPoller_Integration(t *testing.T) {

// Now let's update the Filter and replay to get Log2 logs.
err = th.LogPoller.RegisterFilter(logpoller.Filter{
"Emitter - log2", []common.Hash{EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1}, 0,
Name: "Emitter - log2",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)
// Replay an invalid block should error
Expand Down Expand Up @@ -254,11 +255,13 @@ func Test_BackupLogPoller(t *testing.T) {

ctx := testutils.Context(t)

filter1 := logpoller.Filter{"filter1", []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1},
0}
filter1 := logpoller.Filter{
Name: "filter1",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID,
EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1},
}
err := th.LogPoller.RegisterFilter(filter1)
require.NoError(t, err)

Expand All @@ -268,9 +271,11 @@ func Test_BackupLogPoller(t *testing.T) {
require.Equal(t, filter1, filters["filter1"])

err = th.LogPoller.RegisterFilter(
logpoller.Filter{"filter2",
[]common.Hash{EmitterABI.Events["Log1"].ID},
[]common.Address{th.EmitterAddress2}, 0})
logpoller.Filter{
Name: "filter2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress2},
})
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -569,9 +574,9 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2, 1000)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}
events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}

err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses, 0})
err := th.LogPoller.RegisterFilter(logpoller.Filter{Name: "convertLogs", EventSigs: events, Addresses: addresses})
require.NoError(t, err)

blk, err := th.Client.BlockByNumber(ctx, nil)
Expand Down Expand Up @@ -619,7 +624,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(2),
ToBlock: big.NewInt(5),
Topics: [][]common.Hash{topics},
Topics: [][]common.Hash{events},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}}

gethLogs, err := th.Client.FilterLogs(ctx, query)
Expand Down Expand Up @@ -762,8 +767,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
"Test Emitter 1 & 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
[]common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0,
Name: "Test Emitter 1 & 2",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

Expand Down Expand Up @@ -1068,12 +1074,22 @@ func TestLogPoller_LoadFilters(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

filter1 := logpoller.Filter{"first Filter", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter2 := logpoller.Filter{"second Filter", []common.Hash{
EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, []common.Address{th.EmitterAddress2}, 0}
filter3 := logpoller.Filter{"third Filter", []common.Hash{
EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
filter1 := logpoller.Filter{
Name: "first Filter",
EventSigs: []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}
filter2 := logpoller.Filter{
Name: "second Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID},
Addresses: []common.Address{th.EmitterAddress2},
}
filter3 := logpoller.Filter{
Name: "third Filter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
}

assert.True(t, filter1.Contains(nil))
assert.False(t, filter1.Contains(&filter2))
Expand Down Expand Up @@ -1119,9 +1135,11 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2, 1000)

err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0},
)
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "GetBlocks Test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2},
})
require.NoError(t, err)

// LP retrieves 0 blocks
Expand Down Expand Up @@ -1365,7 +1383,11 @@ func TestTooManyLogResults(t *testing.T) {
})

addr := testutils.NewAddress()
err := lp.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{addr}, 0})
err := lp.RegisterFilter(logpoller.Filter{
Name: "Integration test",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{addr},
})
require.NoError(t, err)
lp.PollAndSaveLogs(ctx, 5)
block, err2 := o.SelectLatestBlock()
Expand Down
34 changes: 28 additions & 6 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"database/sql"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
Expand Down Expand Up @@ -94,26 +97,41 @@ func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimes
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// If a second job tries to overwrite the same pair, this should fail.
func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withCustomArg("retention", filter.Retention).
withRetention(filter.Retention).
withLogsPerBlock(filter.LogsPerBlock).
withAddressArray(filter.Addresses).
withEventSigArray(filter.EventSigs).
withTopicArrays(filter.Topic2, filter.Topic3, filter.Topic4).
toArgs()
if err != nil {
return err
}
// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
return o.q.WithOpts(qopts...).ExecQNamed(`
var topicsColumns, topicsSql strings.Builder
for n, topicValues := range topicArrays {
if len(topicValues) != 0 {
topicCol := fmt.Sprintf("topic%d", n+2)
fmt.Fprintf(&topicsColumns, ", %s", topicCol)
fmt.Fprintf(&topicsSql, ",\n(SELECT unnest(:%s ::::BYTEA[]) %s) t%d", topicCol, topicCol, n+2)
}
}
query := fmt.Sprintf(`
INSERT INTO evm.log_poller_filters
(name, evm_chain_id, retention, created_at, address, event)
(name, evm_chain_id, retention, logs_per_block, created_at, address, event %s)
SELECT * FROM
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, NOW()) x,
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :logs_per_block ::::NUMERIC, NOW()) x,
(SELECT unnest(:address_array ::::BYTEA[]) addr) a,
(SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e
ON CONFLICT (name, evm_chain_id, address, event)
DO UPDATE SET retention=:retention ::::BIGINT`, args)
%s
ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0))
DO UPDATE SET retention=:retention ::::BIGINT, logs_per_block=:logs_per_block ::::NUMERIC`,
topicsColumns.String(),
topicsSql.String())
return o.q.WithOpts(qopts...).ExecQNamed(query, args)
}

// DeleteFilter removes all events,address pairs associated with the Filter
Expand All @@ -129,6 +147,10 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
err := q.Select(&rows, `SELECT name,
ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses,
ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs,
ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2,
ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3,
ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4,
MAX(logs_per_block) AS logs_per_block,
MAX(retention) AS retention
FROM evm.log_poller_filters WHERE evm_chain_id = $1
GROUP BY name`, ubig.New(o.chainID))
Expand Down
95 changes: 95 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"bytes"
"context"
"database/sql"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -448,6 +450,99 @@ func TestORM(t *testing.T) {
require.Zero(t, len(logs))
}

type PgxLogger struct {
lggr logger.Logger
}

func NewPgxLogger(lggr logger.Logger) PgxLogger {
return PgxLogger{lggr}
}

func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) {

}

func TestLogPollerFilters(t *testing.T) {
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()

dbx := pgtest.NewSqlxDB(t)
orm := logpoller.NewORM(chainID, dbx, lggr, pgtest.NewQConfig(true))

event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address := common.HexToAddress("0x1234")
topicA := common.HexToHash("0x1111")
topicB := common.HexToHash("0x2222")
topicC := common.HexToHash("0x3333")
topicD := common.HexToHash("0x4444")

filters := []logpoller.Filter{{
Name: "filter by topic2",
EventSigs: types.HashArray{event1, event2},
Addresses: types.AddressArray{address},
Topic2: types.HashArray{topicA, topicB},
}, {
Name: "filter by topic3",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicB, topicC, topicD},
}, {
Name: "filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic4: types.HashArray{topicC},
}, {
Name: "filter by topics 2 and 4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event2},
Topic2: types.HashArray{topicA},
Topic4: types.HashArray{topicC, topicD},
}, {
Name: "10 logs per block rate limit",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
LogsPerBlock: ubig.NewI(10),
}, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical)
Name: "duplicate of filter by topic4",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
Topic3: types.HashArray{topicC},
}}

for _, filter := range filters {
t.Run("Save filter: "+filter.Name, func(t *testing.T) {
var count int
err := orm.InsertFilter(filter)
require.NoError(t, err)
err = dbx.Get(&count, `SELECT COUNT(*) FROM evm.log_poller_filters WHERE evm_chain_id = $1 AND name = $2`, ubig.New(chainID), filter.Name)
require.NoError(t, err)
expectedCount := len(filter.Addresses) * len(filter.EventSigs)
if len(filter.Topic2) > 0 {
expectedCount *= len(filter.Topic2)
}
if len(filter.Topic3) > 0 {
expectedCount *= len(filter.Topic3)
}
if len(filter.Topic4) > 0 {
expectedCount *= len(filter.Topic4)
}
assert.Equal(t, count, expectedCount)
})
}

// Make sure they all come back the same when we reload them
t.Run("Load filters", func(t *testing.T) {
loadedFilters, err := orm.LoadFilters()
require.NoError(t, err)
for _, filter := range filters {
loadedFilter, ok := loadedFilters[filter.Name]
require.True(t, ok, `Failed to reload filter "%s"`, filter.Name)
assert.Equal(t, filter, loadedFilter)
}
})
}

func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) {
var lgs []logpoller.Log
for i := start; i <= stop; i++ {
Expand Down
Loading

0 comments on commit f3b6b22

Please sign in to comment.