From f0871f0d2b2fa85f1359ecb535495895be0158b0 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 24 Jan 2024 13:58:58 -0800 Subject: [PATCH 1/5] Add Topic1, Topic2, Topic3, LogsPerBlock fields to logpoller.Filter And update evm.log_poller_filters schema to match --- core/chains/evm/logpoller/log_poller.go | 12 ++- .../evm/logpoller/log_poller_internal_test.go | 10 +- core/chains/evm/logpoller/log_poller_test.go | 74 ++++++++++----- core/chains/evm/logpoller/orm.go | 33 +++++-- core/chains/evm/logpoller/orm_test.go | 95 +++++++++++++++++++ core/chains/evm/logpoller/query.go | 19 ++++ core/chains/evm/types/types.go | 14 ++- ...ller_filters_add_topics_logs_per_block.sql | 22 +++++ 8 files changed, 235 insertions(+), 44 deletions(-) create mode 100644 core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 7006c1762ef..efcaf53e7dd 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -153,10 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati } type Filter struct { - Name string // see FilterName(id, args) below - EventSigs evmtypes.HashArray - Addresses evmtypes.AddressArray - Retention time.Duration + Name string // see FilterName(id, args) below + Addresses evmtypes.AddressArray + EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1) + Topic2 evmtypes.HashArray // list of possible values for topic2 + Topic3 evmtypes.HashArray // list of possible values for topic3 + Topic4 evmtypes.HashArray // list of possible values for topic4 + Retention time.Duration + LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) } // FilterName is a suggested convenience function for clients to construct unique filter names diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 863ab0fddea..899efebe42c 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -71,31 +71,31 @@ func TestLogPoller_RegisterFilter(t *testing.T) { require.Equal(t, 1, len(f.Addresses)) assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0]) - err := lp.RegisterFilter(Filter{"Emitter Log 1", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{a1}, 0}) + err := lp.RegisterFilter(Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}}) require.NoError(t, err) assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses) assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics) validateFiltersTable(t, lp, orm) // Should de-dupe EventSigs - err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0}) + err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}}) require.NoError(t, err) assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses) assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics) validateFiltersTable(t, lp, orm) // Should de-dupe Addresses - err = lp.RegisterFilter(Filter{"Emitter Log 1 + 2 dupe", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{a2}, 0}) + err = lp.RegisterFilter(Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}}) require.NoError(t, err) assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses) assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics) validateFiltersTable(t, lp, orm) // Address required. - err = lp.RegisterFilter(Filter{"no address", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{}, 0}) + err = lp.RegisterFilter(Filter{Name: "no address", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}}) require.Error(t, err) // Event required - err = lp.RegisterFilter(Filter{"No event", []common.Hash{}, []common.Address{a1}, 0}) + err = lp.RegisterFilter(Filter{Name: "No event", Addresses: []common.Address{a1}}) require.Error(t, err) validateFiltersTable(t, lp, orm) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 2508e676e6c..ab02e783a28 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -150,7 +150,7 @@ func TestLogPoller_Integration(t *testing.T) { th := SetupTH(t, false, 2, 3, 2, 1000) th.Client.Commit() // Block 2. Ensure we have finality number of blocks - require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0})) + require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}})) require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1) require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1) @@ -188,8 +188,9 @@ func TestLogPoller_Integration(t *testing.T) { // Now let's update the Filter and replay to get Log2 logs. err = th.LogPoller.RegisterFilter(logpoller.Filter{ - "Emitter - log2", []common.Hash{EmitterABI.Events["Log2"].ID}, - []common.Address{th.EmitterAddress1}, 0, + Name: "Emitter - log2", + EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, }) require.NoError(t, err) // Replay an invalid block should error @@ -254,11 +255,13 @@ func Test_BackupLogPoller(t *testing.T) { ctx := testutils.Context(t) - filter1 := logpoller.Filter{"filter1", []common.Hash{ - EmitterABI.Events["Log1"].ID, - EmitterABI.Events["Log2"].ID}, - []common.Address{th.EmitterAddress1}, - 0} + filter1 := logpoller.Filter{ + Name: "filter1", + EventSigs: []common.Hash{ + EmitterABI.Events["Log1"].ID, + EmitterABI.Events["Log2"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + } err := th.LogPoller.RegisterFilter(filter1) require.NoError(t, err) @@ -268,9 +271,11 @@ func Test_BackupLogPoller(t *testing.T) { require.Equal(t, filter1, filters["filter1"]) err = th.LogPoller.RegisterFilter( - logpoller.Filter{"filter2", - []common.Hash{EmitterABI.Events["Log1"].ID}, - []common.Address{th.EmitterAddress2}, 0}) + logpoller.Filter{ + Name: "filter2", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress2}, + }) require.NoError(t, err) defer func() { @@ -569,9 +574,9 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { th := SetupTH(t, false, 2, 3, 2, 1000) addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2} - topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID} + events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID} - err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses, 0}) + err := th.LogPoller.RegisterFilter(logpoller.Filter{Name: "convertLogs", EventSigs: events, Addresses: addresses}) require.NoError(t, err) blk, err := th.Client.BlockByNumber(ctx, nil) @@ -619,7 +624,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { query := ethereum.FilterQuery{ FromBlock: big.NewInt(2), ToBlock: big.NewInt(5), - Topics: [][]common.Hash{topics}, + Topics: [][]common.Hash{events}, Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}} gethLogs, err := th.Client.FilterLogs(ctx, query) @@ -762,8 +767,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { // Set up a log poller listening for log emitter logs. err := th.LogPoller.RegisterFilter(logpoller.Filter{ - "Test Emitter 1 & 2", []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, - []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0, + Name: "Test Emitter 1 & 2", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, + Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, }) require.NoError(t, err) @@ -1068,12 +1074,22 @@ func TestLogPoller_LoadFilters(t *testing.T) { t.Parallel() th := SetupTH(t, false, 2, 3, 2, 1000) - filter1 := logpoller.Filter{"first Filter", []common.Hash{ - EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0} - filter2 := logpoller.Filter{"second Filter", []common.Hash{ - EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, []common.Address{th.EmitterAddress2}, 0} - filter3 := logpoller.Filter{"third Filter", []common.Hash{ - EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0} + filter1 := logpoller.Filter{ + Name: "first Filter", + EventSigs: []common.Hash{ + EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, + Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, + } + filter2 := logpoller.Filter{ + Name: "second Filter", + EventSigs: []common.Hash{EmitterABI.Events["Log2"].ID, EmitterABI.Events["Log3"].ID}, + Addresses: []common.Address{th.EmitterAddress2}, + } + filter3 := logpoller.Filter{ + Name: "third Filter", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, + } assert.True(t, filter1.Contains(nil)) assert.False(t, filter1.Contains(&filter2)) @@ -1119,9 +1135,11 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { t.Parallel() th := SetupTH(t, false, 2, 3, 2, 1000) - err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{ - EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}, - ) + err := th.LogPoller.RegisterFilter(logpoller.Filter{ + Name: "GetBlocks Test", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, + Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, + }) require.NoError(t, err) // LP retrieves 0 blocks @@ -1365,7 +1383,11 @@ func TestTooManyLogResults(t *testing.T) { }) addr := testutils.NewAddress() - err := lp.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{addr}, 0}) + err := lp.RegisterFilter(logpoller.Filter{ + Name: "Integration test", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{addr}, + }) require.NoError(t, err) lp.PollAndSaveLogs(ctx, 5) block, err2 := o.SelectLatestBlock() diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 1db8271ccb6..4c40a73f67c 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "math/big" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -13,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -95,26 +97,41 @@ func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimes // Each address/event pair must have a unique job id, so it may be removed when the job is deleted. // If a second job tries to overwrite the same pair, this should fail. func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { + topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4} args, err := newQueryArgs(o.chainID). withCustomArg("name", filter.Name). - withCustomArg("retention", filter.Retention). + withRetention(filter.Retention). + withLogsPerBlock(filter.LogsPerBlock). withAddressArray(filter.Addresses). withEventSigArray(filter.EventSigs). + withTopicArrays(filter.Topic2, filter.Topic3, filter.Topic4). toArgs() if err != nil { return err } // '::' has to be escaped in the query string // https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428 - return o.q.WithOpts(qopts...).ExecQNamed(` + var topicsColumns, topicsSql strings.Builder + for n, topicValues := range topicArrays { + if len(topicValues) != 0 { + topicCol := fmt.Sprintf("topic%d", n+2) + fmt.Fprintf(&topicsColumns, ", %s", topicCol) + fmt.Fprintf(&topicsSql, ",\n(SELECT unnest(:%s ::::BYTEA[]) %s) t%d", topicCol, topicCol, n+2) + } + } + query := fmt.Sprintf(` INSERT INTO evm.log_poller_filters - (name, evm_chain_id, retention, created_at, address, event) + (name, evm_chain_id, retention, logs_per_block, created_at, address, event %s) SELECT * FROM - (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, NOW()) x, + (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :logs_per_block ::::NUMERIC, NOW()) x, (SELECT unnest(:address_array ::::BYTEA[]) addr) a, (SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e - ON CONFLICT (name, evm_chain_id, address, event) - DO UPDATE SET retention=:retention ::::BIGINT`, args) + %s + ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0)) + DO UPDATE SET retention=:retention ::::BIGINT, logs_per_block=:logs_per_block ::::NUMERIC`, + topicsColumns.String(), + topicsSql.String()) + return o.q.WithOpts(qopts...).ExecQNamed(query, args) } // DeleteFilter removes all events,address pairs associated with the Filter @@ -130,6 +147,10 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { err := q.Select(&rows, `SELECT name, ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses, ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs, + ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2, + ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3, + ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4, + MAX(logs_per_block) AS logs_per_block, MAX(retention) AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY name`, ubig.New(o.chainID)) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index bcaa6f72fa0..22578daa677 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -2,6 +2,7 @@ package logpoller_test import ( "bytes" + "context" "database/sql" "fmt" "math" @@ -10,6 +11,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/jackc/pgx/v4" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -448,6 +450,99 @@ func TestORM(t *testing.T) { require.Zero(t, len(logs)) } +type PgxLogger struct { + lggr logger.Logger +} + +func NewPgxLogger(lggr logger.Logger) PgxLogger { + return PgxLogger{lggr} +} + +func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) { + +} + +func TestLogPollerFilters(t *testing.T) { + lggr := logger.Test(t) + chainID := testutils.NewRandomEVMChainID() + + dbx := pgtest.NewSqlxDB(t) + orm := logpoller.NewORM(chainID, dbx, lggr, pgtest.NewQConfig(true)) + + event1 := EmitterABI.Events["Log1"].ID + event2 := EmitterABI.Events["Log2"].ID + address := common.HexToAddress("0x1234") + topicA := common.HexToHash("0x1111") + topicB := common.HexToHash("0x2222") + topicC := common.HexToHash("0x3333") + topicD := common.HexToHash("0x4444") + + filters := []logpoller.Filter{{ + Name: "filter by topic2", + EventSigs: types.HashArray{event1, event2}, + Addresses: types.AddressArray{address}, + Topic2: types.HashArray{topicA, topicB}, + }, { + Name: "filter by topic3", + Addresses: types.AddressArray{address}, + EventSigs: types.HashArray{event1}, + Topic3: types.HashArray{topicB, topicC, topicD}, + }, { + Name: "filter by topic4", + Addresses: types.AddressArray{address}, + EventSigs: types.HashArray{event1}, + Topic4: types.HashArray{topicC}, + }, { + Name: "filter by topics 2 and 4", + Addresses: types.AddressArray{address}, + EventSigs: types.HashArray{event2}, + Topic2: types.HashArray{topicA}, + Topic4: types.HashArray{topicC, topicD}, + }, { + Name: "10 logs per block rate limit", + Addresses: types.AddressArray{address}, + EventSigs: types.HashArray{event1}, + LogsPerBlock: ubig.NewI(10), + }, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical) + Name: "duplicate of filter by topic4", + Addresses: types.AddressArray{address}, + EventSigs: types.HashArray{event1}, + Topic3: types.HashArray{topicC}, + }} + + for _, filter := range filters { + t.Run("Save filter: "+filter.Name, func(t *testing.T) { + var count int + err := orm.InsertFilter(filter) + require.NoError(t, err) + err = dbx.Get(&count, `SELECT COUNT(*) FROM evm.log_poller_filters WHERE evm_chain_id = $1 AND name = $2`, ubig.New(chainID), filter.Name) + require.NoError(t, err) + expectedCount := len(filter.Addresses) * len(filter.EventSigs) + if len(filter.Topic2) > 0 { + expectedCount *= len(filter.Topic2) + } + if len(filter.Topic3) > 0 { + expectedCount *= len(filter.Topic3) + } + if len(filter.Topic4) > 0 { + expectedCount *= len(filter.Topic4) + } + assert.Equal(t, count, expectedCount) + }) + } + + // Make sure they all come back the same when we reload them + t.Run("Load filters", func(t *testing.T) { + loadedFilters, err := orm.LoadFilters() + require.NoError(t, err) + for _, filter := range filters { + loadedFilter, ok := loadedFilters[filter.Name] + require.True(t, ok, `Failed to reload filter "%s"`, filter.Name) + assert.Equal(t, filter, loadedFilter) + } + }) +} + func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) { var lgs []logpoller.Log for i := start; i <= stop; i++ { diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index a37b15b2b2d..e0df974c115 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/lib/pq" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" ) @@ -54,6 +55,16 @@ func (q *queryArgs) withEventSigArray(eventSigs []common.Hash) *queryArgs { return q.withCustomArg("event_sig_array", concatBytes(eventSigs)) } +func (q *queryArgs) withTopicArray(topicValues types.HashArray, topicNum uint64) *queryArgs { + return q.withCustomArg(fmt.Sprintf("topic%d", topicNum), concatBytes(topicValues)) +} + +func (q *queryArgs) withTopicArrays(topic2Vals types.HashArray, topic3Vals types.HashArray, topic4Vals types.HashArray) *queryArgs { + return q.withTopicArray(topic2Vals, 2). + withTopicArray(topic3Vals, 3). + withTopicArray(topic4Vals, 4) +} + func (q *queryArgs) withAddress(address common.Address) *queryArgs { return q.withCustomArg("address", address) } @@ -127,6 +138,14 @@ func (q *queryArgs) withTxHash(hash common.Hash) *queryArgs { return q.withCustomHashArg("tx_hash", hash) } +func (q *queryArgs) withRetention(retention time.Duration) *queryArgs { + return q.withCustomArg("retention", retention) +} + +func (q *queryArgs) withLogsPerBlock(logsPerBlock *ubig.Big) *queryArgs { + return q.withCustomArg("logs_per_block", logsPerBlock) +} + func (q *queryArgs) withCustomHashArg(name string, arg common.Hash) *queryArgs { return q.withCustomArg(name, arg.Bytes()) } diff --git a/core/chains/evm/types/types.go b/core/chains/evm/types/types.go index 987fd987d3f..c3ad584ebbd 100644 --- a/core/chains/evm/types/types.go +++ b/core/chains/evm/types/types.go @@ -332,7 +332,11 @@ func (a *AddressArray) Scan(src interface{}) error { if err != nil { return errors.Wrap(err, "Expected BYTEA[] column for AddressArray") } - if baArray.Status != pgtype.Present || len(baArray.Dimensions) > 1 { + if baArray.Status != pgtype.Present { + *a = nil + return nil + } + if len(baArray.Dimensions) > 1 { return errors.Errorf("Expected AddressArray to be 1-dimensional. Dimensions = %v", baArray.Dimensions) } @@ -359,14 +363,18 @@ func (h *HashArray) Scan(src interface{}) error { if err != nil { return errors.Wrap(err, "Expected BYTEA[] column for HashArray") } - if baArray.Status != pgtype.Present || len(baArray.Dimensions) > 1 { + if baArray.Status != pgtype.Present { + *h = nil + return nil + } + if len(baArray.Dimensions) > 1 { return errors.Errorf("Expected HashArray to be 1-dimensional. Dimensions = %v", baArray.Dimensions) } for i, ba := range baArray.Elements { hash := common.Hash{} if ba.Status != pgtype.Present { - return errors.Errorf("Expected all addresses in HashArray to be non-NULL. Got HashArray[%d] = NULL", i) + return errors.Errorf("Expected all hashes in HashArray to be non-NULL. Got HashArray[%d] = NULL", i) } err = hash.Scan(ba.Bytes) if err != nil { diff --git a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql new file mode 100644 index 00000000000..60034b7d4c3 --- /dev/null +++ b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql @@ -0,0 +1,22 @@ +-- +goose Up + +ALTER TABLE evm.log_poller_filters + ADD COLUMN topic2 BYTEA CHECK (octet_length(topic2) = 32), + ADD COLUMN topic3 BYTEA CHECK (octet_length(topic3) = 32), + ADD COLUMN topic4 BYTEA CHECK (octet_length(topic4) = 32), + ADD COLUMN logs_per_block NUMERIC(78,0), + DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key; +-- Ordinary UNIQUE CONSTRAINT can't work for topics because they can be NULL. Any row with any column being NULL automatically satisfies the unique constraint (NULL != NULL) +-- Using a hash of all the columns treats NULL's as the same as any other field. If we ever get to a point where we can require postgresql >= 15 then this can +-- be fixed by using UNIQUE CONSTRAINT NULLS NOT DISTINCT which treats NULL's as if they were ordinary values (NULL == NULL) +CREATE UNIQUE INDEX evm_log_poller_filters_name_chain_address_event_topics_key ON evm.log_poller_filters (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0)); + +-- +goose Down + +DROP INDEX IF EXISTS evm_log_poller_filters_name_chain_address_event_topics_key; +ALTER TABLE evm.log_poller_filters + ADD CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key UNIQUE (name, evm_chain_id, address, event), + DROP COLUMN topic2, + DROP COLUMN topic3, + DROP COLUMN topic4, + DROP COLUMN logs_per_block; From a57d448b9cf94e4402af4cb4cc0b444e1e899803 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 6 Feb 2024 17:15:23 -0800 Subject: [PATCH 2/5] Improve SQL QUERY logging []BYTEA was panicing for nil or empty array, which then gets trapped and ignored... making it very difficult to figure out which query the error is coming from. While fixing that bug, updated formating of []BYTEA and TEXT to be closer an actual SQL query you can run by cutting and pasting from the log --- core/services/pg/q.go | 17 ++++++++++++++--- core/services/pg/q_test.go | 12 ++++++------ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/core/services/pg/q.go b/core/services/pg/q.go index ba2627fa745..49a18817de9 100644 --- a/core/services/pg/q.go +++ b/core/services/pg/q.go @@ -296,14 +296,25 @@ func sprintQ(query string, args []interface{}) string { case common.Hash: pairs = append(pairs, fmt.Sprintf("$%d", i+1), fmt.Sprintf("'\\x%x'", v.Bytes())) case pq.ByteaArray: + pairs = append(pairs, fmt.Sprintf("$%d", i+1)) + if v == nil { + pairs = append(pairs, "NULL") + continue + } + if len(v) == 0 { + pairs = append(pairs, "ARRAY[]") + continue + } var s strings.Builder - fmt.Fprintf(&s, "('\\x%x'", v[0]) + fmt.Fprintf(&s, "ARRAY['\\x%x'", v[0]) for j := 1; j < len(v); j++ { fmt.Fprintf(&s, ",'\\x%x'", v[j]) } - pairs = append(pairs, fmt.Sprintf("$%d", i+1), fmt.Sprintf("%s)", s.String())) + pairs = append(pairs, fmt.Sprintf("%s]", s.String())) + case string: + pairs = append(pairs, fmt.Sprintf("$%d", i+1), fmt.Sprintf("'%s'", v)) default: - pairs = append(pairs, fmt.Sprintf("$%d", i+1), fmt.Sprintf("%v", arg)) + pairs = append(pairs, fmt.Sprintf("$%d", i+1), fmt.Sprintf("%v", v)) } } replacer := strings.NewReplacer(pairs...) diff --git a/core/services/pg/q_test.go b/core/services/pg/q_test.go index 7692fb792bd..6e59e499887 100644 --- a/core/services/pg/q_test.go +++ b/core/services/pg/q_test.go @@ -21,27 +21,27 @@ func Test_sprintQ(t *testing.T) { {"one", "SELECT $1 FROM table;", []interface{}{"foo"}, - "SELECT foo FROM table;"}, + "SELECT 'foo' FROM table;"}, {"two", "SELECT $1 FROM table WHERE bar = $2;", []interface{}{"foo", 1}, - "SELECT foo FROM table WHERE bar = 1;"}, + "SELECT 'foo' FROM table WHERE bar = 1;"}, {"limit", "SELECT $1 FROM table LIMIT $2;", []interface{}{"foo", Limit(10)}, - "SELECT foo FROM table LIMIT 10;"}, + "SELECT 'foo' FROM table LIMIT 10;"}, {"limit-all", "SELECT $1 FROM table LIMIT $2;", []interface{}{"foo", Limit(-1)}, - "SELECT foo FROM table LIMIT NULL;"}, + "SELECT 'foo' FROM table LIMIT NULL;"}, {"bytea", "SELECT $1 FROM table WHERE b = $2;", []interface{}{"foo", []byte{0x0a}}, - "SELECT foo FROM table WHERE b = '\\x0a';"}, + "SELECT 'foo' FROM table WHERE b = '\\x0a';"}, {"bytea[]", "SELECT $1 FROM table WHERE b = $2;", []interface{}{"foo", pq.ByteaArray([][]byte{{0xa}, {0xb}})}, - "SELECT foo FROM table WHERE b = ('\\x0a','\\x0b');"}, + "SELECT 'foo' FROM table WHERE b = ARRAY['\\x0a','\\x0b'];"}, } { t.Run(tt.name, func(t *testing.T) { got := sprintQ(tt.query, tt.args) From 9ee23b2ce760a1c9fbe803b7ae749e3a9c54f213 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 12 Feb 2024 19:17:10 -0800 Subject: [PATCH 3/5] Add MaxLogsKept --- core/chains/evm/logpoller/log_poller.go | 5 +++-- core/chains/evm/logpoller/orm.go | 10 ++++++---- core/chains/evm/logpoller/orm_test.go | 3 ++- core/chains/evm/logpoller/query.go | 4 ++++ ...23_log_poller_filters_add_topics_logs_per_block.sql | 2 ++ 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index efcaf53e7dd..9a0565edc32 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -159,8 +159,9 @@ type Filter struct { Topic2 evmtypes.HashArray // list of possible values for topic2 Topic3 evmtypes.HashArray // list of possible values for topic3 Topic4 evmtypes.HashArray // list of possible values for topic4 - Retention time.Duration - LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) + Retention time.Duration // maximum amount of time to retain logs + MaxLogsKept *ubig.Big // maximum number of logs to retain + LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) } // FilterName is a suggested convenience function for clients to construct unique filter names diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 4c40a73f67c..fe814e8a43c 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -101,6 +101,7 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { args, err := newQueryArgs(o.chainID). withCustomArg("name", filter.Name). withRetention(filter.Retention). + withMaxLogsKept(filter.MaxLogsKept). withLogsPerBlock(filter.LogsPerBlock). withAddressArray(filter.Addresses). withEventSigArray(filter.EventSigs). @@ -121,14 +122,14 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { } query := fmt.Sprintf(` INSERT INTO evm.log_poller_filters - (name, evm_chain_id, retention, logs_per_block, created_at, address, event %s) + (name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s) SELECT * FROM - (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :logs_per_block ::::NUMERIC, NOW()) x, + (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x, (SELECT unnest(:address_array ::::BYTEA[]) addr) a, (SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e %s ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0)) - DO UPDATE SET retention=:retention ::::BIGINT, logs_per_block=:logs_per_block ::::NUMERIC`, + DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`, topicsColumns.String(), topicsSql.String()) return o.q.WithOpts(qopts...).ExecQNamed(query, args) @@ -151,7 +152,8 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3, ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4, MAX(logs_per_block) AS logs_per_block, - MAX(retention) AS retention + MAX(retention) AS retention, + MAX(max_logs_kept) AS max_logs_kept FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY name`, ubig.New(o.chainID)) filters := make(map[string]Filter) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 22578daa677..ff4a15046b7 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -499,9 +499,10 @@ func TestLogPollerFilters(t *testing.T) { Topic2: types.HashArray{topicA}, Topic4: types.HashArray{topicC, topicD}, }, { - Name: "10 logs per block rate limit", + Name: "10 lpb rate limit, 1M max logs", Addresses: types.AddressArray{address}, EventSigs: types.HashArray{event1}, + MaxLogsKept: ubig.NewI(1000000), LogsPerBlock: ubig.NewI(10), }, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical) Name: "duplicate of filter by topic4", diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index e0df974c115..097c8c049fa 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -146,6 +146,10 @@ func (q *queryArgs) withLogsPerBlock(logsPerBlock *ubig.Big) *queryArgs { return q.withCustomArg("logs_per_block", logsPerBlock) } +func (q *queryArgs) withMaxLogsKept(maxLogsKept *ubig.Big) *queryArgs { + return q.withCustomArg("max_logs_kept", maxLogsKept) +} + func (q *queryArgs) withCustomHashArg(name string, arg common.Hash) *queryArgs { return q.withCustomArg(name, arg.Bytes()) } diff --git a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql index 60034b7d4c3..f16313b7d49 100644 --- a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql +++ b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql @@ -4,6 +4,7 @@ ALTER TABLE evm.log_poller_filters ADD COLUMN topic2 BYTEA CHECK (octet_length(topic2) = 32), ADD COLUMN topic3 BYTEA CHECK (octet_length(topic3) = 32), ADD COLUMN topic4 BYTEA CHECK (octet_length(topic4) = 32), + ADD COLUMN max_logs_kept NUMERIC(78,0), ADD COLUMN logs_per_block NUMERIC(78,0), DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key; -- Ordinary UNIQUE CONSTRAINT can't work for topics because they can be NULL. Any row with any column being NULL automatically satisfies the unique constraint (NULL != NULL) @@ -19,4 +20,5 @@ ALTER TABLE evm.log_poller_filters DROP COLUMN topic2, DROP COLUMN topic3, DROP COLUMN topic4, + DROP COLUMN max_logs_kept, DROP COLUMN logs_per_block; From e9f1e351659f4d7905dd3ff0799baa107b2e87d1 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 21 Feb 2024 13:54:53 -0800 Subject: [PATCH 4/5] Address PR comments - Add new index before removing old index to be safer - Change MaxLogsKept & LogsPerBlock from big.Int to uint64 --- core/chains/evm/logpoller/log_poller.go | 4 ++-- core/chains/evm/logpoller/orm_test.go | 4 ++-- core/chains/evm/logpoller/query.go | 4 ++-- ...log_poller_filters_add_topics_logs_per_block.sql | 13 +++++++++---- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 9a0565edc32..a2c35bec59f 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -160,8 +160,8 @@ type Filter struct { Topic3 evmtypes.HashArray // list of possible values for topic3 Topic4 evmtypes.HashArray // list of possible values for topic4 Retention time.Duration // maximum amount of time to retain logs - MaxLogsKept *ubig.Big // maximum number of logs to retain - LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) + MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited ) + LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited ) } // FilterName is a suggested convenience function for clients to construct unique filter names diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ff4a15046b7..9824d0e9426 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -502,8 +502,8 @@ func TestLogPollerFilters(t *testing.T) { Name: "10 lpb rate limit, 1M max logs", Addresses: types.AddressArray{address}, EventSigs: types.HashArray{event1}, - MaxLogsKept: ubig.NewI(1000000), - LogsPerBlock: ubig.NewI(10), + MaxLogsKept: 1000000, + LogsPerBlock: 10, }, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical) Name: "duplicate of filter by topic4", Addresses: types.AddressArray{address}, diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index 097c8c049fa..d8112459743 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -142,11 +142,11 @@ func (q *queryArgs) withRetention(retention time.Duration) *queryArgs { return q.withCustomArg("retention", retention) } -func (q *queryArgs) withLogsPerBlock(logsPerBlock *ubig.Big) *queryArgs { +func (q *queryArgs) withLogsPerBlock(logsPerBlock uint64) *queryArgs { return q.withCustomArg("logs_per_block", logsPerBlock) } -func (q *queryArgs) withMaxLogsKept(maxLogsKept *ubig.Big) *queryArgs { +func (q *queryArgs) withMaxLogsKept(maxLogsKept uint64) *queryArgs { return q.withCustomArg("max_logs_kept", maxLogsKept) } diff --git a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql index f16313b7d49..77e3d5fbd51 100644 --- a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql +++ b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql @@ -4,19 +4,24 @@ ALTER TABLE evm.log_poller_filters ADD COLUMN topic2 BYTEA CHECK (octet_length(topic2) = 32), ADD COLUMN topic3 BYTEA CHECK (octet_length(topic3) = 32), ADD COLUMN topic4 BYTEA CHECK (octet_length(topic4) = 32), - ADD COLUMN max_logs_kept NUMERIC(78,0), - ADD COLUMN logs_per_block NUMERIC(78,0), - DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key; + ADD COLUMN max_logs_kept BIGINT, + ADD COLUMN logs_per_block BIGINT; + -- Ordinary UNIQUE CONSTRAINT can't work for topics because they can be NULL. Any row with any column being NULL automatically satisfies the unique constraint (NULL != NULL) -- Using a hash of all the columns treats NULL's as the same as any other field. If we ever get to a point where we can require postgresql >= 15 then this can -- be fixed by using UNIQUE CONSTRAINT NULLS NOT DISTINCT which treats NULL's as if they were ordinary values (NULL == NULL) CREATE UNIQUE INDEX evm_log_poller_filters_name_chain_address_event_topics_key ON evm.log_poller_filters (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0)); +ALTER TABLE evm.log_poller_filters + DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key; + -- +goose Down +ALTER TABLE evm.log_poller_filters + ADD CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key UNIQUE (name, evm_chain_id, address, event); DROP INDEX IF EXISTS evm_log_poller_filters_name_chain_address_event_topics_key; + ALTER TABLE evm.log_poller_filters - ADD CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key UNIQUE (name, evm_chain_id, address, event), DROP COLUMN topic2, DROP COLUMN topic3, DROP COLUMN topic4, From 1626611e4f62dafe9b4daf8d7c9fe96eaa87c591 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 22 Feb 2024 10:08:28 -0800 Subject: [PATCH 5/5] Update migration # after rebase --- ....sql => 0225_log_poller_filters_add_topics_logs_per_block.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/store/migrate/migrations/{0223_log_poller_filters_add_topics_logs_per_block.sql => 0225_log_poller_filters_add_topics_logs_per_block.sql} (100%) diff --git a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql b/core/store/migrate/migrations/0225_log_poller_filters_add_topics_logs_per_block.sql similarity index 100% rename from core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql rename to core/store/migrate/migrations/0225_log_poller_filters_add_topics_logs_per_block.sql