Skip to content

Commit

Permalink
Add MaxLogsKept
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 13, 2024
1 parent e8fb883 commit 4cea6a4
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 7 deletions.
5 changes: 3 additions & 2 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ type Filter struct {
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration
LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db )
Retention time.Duration // maximum amount of time to retain logs
MaxLogsKept *ubig.Big // maximum number of logs to retain
LogsPerBlock *ubig.Big // rate limit ( maximum # of logs per block to save to db )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
10 changes: 6 additions & 4 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
withAddressArray(filter.Addresses).
withEventSigArray(filter.EventSigs).
Expand All @@ -121,14 +122,14 @@ func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) {
}
query := fmt.Sprintf(`
INSERT INTO evm.log_poller_filters
(name, evm_chain_id, retention, logs_per_block, created_at, address, event %s)
(name, evm_chain_id, retention, max_logs_kept, logs_per_block, created_at, address, event %s)
SELECT * FROM
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :logs_per_block ::::NUMERIC, NOW()) x,
(SELECT :name, :evm_chain_id ::::NUMERIC, :retention ::::BIGINT, :max_logs_kept ::::NUMERIC, :logs_per_block ::::NUMERIC, NOW()) x,
(SELECT unnest(:address_array ::::BYTEA[]) addr) a,
(SELECT unnest(:event_sig_array ::::BYTEA[]) ev) e
%s
ON CONFLICT (hash_record_extended((name, evm_chain_id, address, event, topic2, topic3, topic4), 0))
DO UPDATE SET retention=:retention ::::BIGINT, logs_per_block=:logs_per_block ::::NUMERIC`,
DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, logs_per_block=:logs_per_block ::::NUMERIC`,
topicsColumns.String(),
topicsSql.String())
return o.q.WithOpts(qopts...).ExecQNamed(query, args)
Expand All @@ -151,7 +152,8 @@ func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3,
ARRAY_AGG(DISTINCT topic4 ORDER BY topic4) FILTER(WHERE topic4 IS NOT NULL) AS topic4,
MAX(logs_per_block) AS logs_per_block,
MAX(retention) AS retention
MAX(retention) AS retention,
MAX(max_logs_kept) AS max_logs_kept
FROM evm.log_poller_filters WHERE evm_chain_id = $1
GROUP BY name`, ubig.New(o.chainID))
filters := make(map[string]Filter)
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,10 @@ func TestLogPollerFilters(t *testing.T) {
Topic2: types.HashArray{topicA},
Topic4: types.HashArray{topicC, topicD},
}, {
Name: "10 logs per block rate limit",
Name: "10 lpb rate limit, 1M max logs",
Addresses: types.AddressArray{address},
EventSigs: types.HashArray{event1},
MaxLogsKept: ubig.NewI(1000000),
LogsPerBlock: ubig.NewI(10),
}, { // ensure that the UNIQUE CONSTRAINT isn't too strict (should only error if all fields are identical)
Name: "duplicate of filter by topic4",
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (q *queryArgs) withLogsPerBlock(logsPerBlock *ubig.Big) *queryArgs {
return q.withCustomArg("logs_per_block", logsPerBlock)
}

func (q *queryArgs) withMaxLogsKept(maxLogsKept *ubig.Big) *queryArgs {
return q.withCustomArg("max_logs_kept", maxLogsKept)
}

func (q *queryArgs) withCustomHashArg(name string, arg common.Hash) *queryArgs {
return q.withCustomArg(name, arg.Bytes())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ALTER TABLE evm.log_poller_filters
ADD COLUMN topic2 BYTEA CHECK (octet_length(topic2) = 32),
ADD COLUMN topic3 BYTEA CHECK (octet_length(topic3) = 32),
ADD COLUMN topic4 BYTEA CHECK (octet_length(topic4) = 32),
ADD COLUMN max_logs_kept NUMERIC(78,0),
ADD COLUMN logs_per_block NUMERIC(78,0),
DROP CONSTRAINT evm_log_poller_filters_name_evm_chain_id_address_event_key;
-- Ordinary UNIQUE CONSTRAINT can't work for topics because they can be NULL. Any row with any column being NULL automatically satisfies the unique constraint (NULL != NULL)
Expand All @@ -19,4 +20,5 @@ ALTER TABLE evm.log_poller_filters
DROP COLUMN topic2,
DROP COLUMN topic3,
DROP COLUMN topic4,
DROP COLUMN max_logs_kept,
DROP COLUMN logs_per_block;

0 comments on commit 4cea6a4

Please sign in to comment.