diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index efcaf53e7dd..9a0565edc32 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -159,8 +159,9 @@ type Filter struct { Topic2 evmtypes.HashArray // list of possible values for topic2 Topic3 evmtypes.HashArray // list of possible values for topic3 Topic4 evmtypes.HashArray // list of possible values for topic4 - Retention time.Duration - LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) + Retention time.Duration // maximum amount of time to retain logs + MaxLogsKept *ubig.Big // maximum number of logs to retain + LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db ) } // FilterName is a suggested convenience function for clients to construct unique filter names diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 3cab2b90546..184f406a2f2 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -101,6 +101,7 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { args, err := newQueryArgs(o.chainID). withCustomArg("name", filter.Name). withRetention(filter.Retention). + withMaxLogsKept(filter.MaxLogsKept). withLogsPerBlock(filter.LogsPerBlock). withAddressArray(filter.Addresses). withEventSigArray(filter.EventSigs). @@ -121,14 +122,14 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { } query := fmt.Sprintf(` INSERT INTO evm.log_poller_filters - (name, evm_chain_id, retention, logs_per_block, created_at, address, event %s) + (name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s) SELECT * FROM - (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :logs_per_block ::::NUMERIC, NOW()) x, + (SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x, (SELECT unnest(:address_array ::::BYTEA[]) addr) a, (SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e %s ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0)) - DO UPDATE SET retention=:retention ::::BIGINT, logs_per_block=:logs_per_block ::::NUMERIC`, + DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`, topicsColumns.String(), topicsSql.String()) return o.q.WithOpts(qopts...).ExecQNamed(query, args) @@ -151,7 +152,8 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3, ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4, MAX(logs_per_block) AS logs_per_block, - MAX(retention) AS retention + MAX(retention) AS retention, + MAX(max_logs_kept) AS max_logs_kept FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY name`, ubig.New(o.chainID)) filters := make(map[string]Filter) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 6b366dfb66f..db55d601785 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -499,9 +499,10 @@ func TestLogPollerFilters(t *testing.T) { Topic2: types.HashArray{topicA}, Topic4: types.HashArray{topicC, topicD}, }, { - Name: "10 logs per block rate limit", + Name: "10 lpb rate limit, 1M max logs", Addresses: types.AddressArray{address}, EventSigs: types.HashArray{event1}, + MaxLogsKept: ubig.NewI(1000000), LogsPerBlock: ubig.NewI(10), }, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical) Name: "duplicate of filter by topic4", diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index e0df974c115..097c8c049fa 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -146,6 +146,10 @@ func (q *queryArgs) withLogsPerBlock(logsPerBlock *ubig.Big) *queryArgs { return q.withCustomArg("logs_per_block", logsPerBlock) } +func (q *queryArgs) withMaxLogsKept(maxLogsKept *ubig.Big) *queryArgs { + return q.withCustomArg("max_logs_kept", maxLogsKept) +} + func (q *queryArgs) withCustomHashArg(name string, arg common.Hash) *queryArgs { return q.withCustomArg(name, arg.Bytes()) } diff --git a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql index 60034b7d4c3..f16313b7d49 100644 --- a/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql +++ b/core/store/migrate/migrations/0223_log_poller_filters_add_topics_logs_per_block.sql @@ -4,6 +4,7 @@ ALTER TABLE evm.log_poller_filters ADD COLUMN topic2 BYTEA CHECK (octet_length(topic2) = 32), ADD COLUMN topic3 BYTEA CHECK (octet_length(topic3) = 32), ADD COLUMN topic4 BYTEA CHECK (octet_length(topic4) = 32), + ADD COLUMN max_logs_kept NUMERIC(78,0), ADD COLUMN logs_per_block NUMERIC(78,0), DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key; -- Ordinary UNIQUE CONSTRAINT can't work for topics because they can be NULL. Any row with any column being NULL automatically satisfies the unique constraint (NULL != NULL) @@ -19,4 +20,5 @@ ALTER TABLE evm.log_poller_filters DROP COLUMN topic2, DROP COLUMN topic3, DROP COLUMN topic4, + DROP COLUMN max_logs_kept, DROP COLUMN logs_per_block;