Skip to content

Commit 99071cd

Browse files
CCIP-1336 LogPoller - Bunch of minor improvements (#11348)
* Tracking number of logs inserted by LP * Adding read query to LP * Index for fourth word in data * Regenerating mocks * Filter by address in SelectIndexedLogsByTxHash * Fix * Post review fixes * Post merge fixes
1 parent de6c45e commit 99071cd

File tree

9 files changed

+373
-52
lines changed

9 files changed

+373
-52
lines changed

core/chains/evm/logpoller/disabled.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (disabled) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash,
7171
return nil, ErrDisabled
7272
}
7373

74-
func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
74+
func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
7575
return nil, ErrDisabled
7676
}
7777

@@ -106,3 +106,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
106106
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
107107
return 0, ErrDisabled
108108
}
109+
110+
func (d disabled) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
111+
return nil, ErrDisabled
112+
}

core/chains/evm/logpoller/log_poller.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ type LogPoller interface {
5353
IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
5454
IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error)
5555
IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
56-
IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
56+
IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
5757
IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
5858
IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
5959
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
6060
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
6161
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
62+
LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
6263
}
6364

6465
type Confirmations int
@@ -969,8 +970,8 @@ func (lp *logPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address commo
969970
return lp.orm.SelectIndexedLogsCreatedAfter(address, eventSig, topicIndex, topicValues, after, confs, qopts...)
970971
}
971972

972-
func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
973-
return lp.orm.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...)
973+
func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
974+
return lp.orm.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...)
974975
}
975976

976977
// LogsDataWordGreaterThan note index is 0 based.
@@ -1021,6 +1022,19 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event
10211022
return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
10221023
}
10231024

1025+
// LogsDataWordBetween retrieves a slice of Log records that match specific criteria.
1026+
// Besides generic filters like eventSig, address and confs, it also verifies data content against wordValue
1027+
// data[wordIndexMin] <= wordValue <= data[wordIndexMax].
1028+
//
1029+
// Passing the same value for wordIndexMin and wordIndexMax will check the equality of the wordValue at that index.
1030+
// Leading to returning logs matching: data[wordIndexMin] == wordValue.
1031+
//
1032+
// This function is particularly useful for filtering logs by data word values and their positions within the event data.
1033+
// It returns an empty slice if no logs match the provided criteria.
1034+
func (lp *logPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
1035+
return lp.orm.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
1036+
}
1037+
10241038
// GetBlocksRange tries to get the specified block numbers from the log pollers
10251039
// blocks table. It falls back to the RPC for any unfulfilled requested blocks.
10261040
func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {

core/chains/evm/logpoller/mocks/log_poller.go

+42-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/chains/evm/logpoller/observability.go

+68-23
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ import (
1414
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
1515
)
1616

17+
type queryType string
18+
19+
const (
20+
create queryType = "create"
21+
read queryType = "read"
22+
del queryType = "delete"
23+
)
24+
1725
var (
1826
sqlLatencyBuckets = []float64{
1927
float64(1 * time.Millisecond),
@@ -41,47 +49,63 @@ var (
4149
Name: "log_poller_query_duration",
4250
Help: "Measures duration of Log Poller's queries fetching logs",
4351
Buckets: sqlLatencyBuckets,
44-
}, []string{"evmChainID", "query"})
52+
}, []string{"evmChainID", "query", "type"})
4553
lpQueryDataSets = promauto.NewGaugeVec(prometheus.GaugeOpts{
4654
Name: "log_poller_query_dataset_size",
4755
Help: "Measures size of the datasets returned by Log Poller's queries",
48-
}, []string{"evmChainID", "query"})
56+
}, []string{"evmChainID", "query", "type"})
57+
lpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{
58+
Name: "log_poller_logs_inserted",
59+
Help: "Counter to track number of logs inserted by Log Poller",
60+
}, []string{"evmChainID"})
61+
lpBlockInserted = promauto.NewCounterVec(prometheus.CounterOpts{
62+
Name: "log_poller_blocks_inserted",
63+
Help: "Counter to track number of blocks inserted by Log Poller",
64+
}, []string{"evmChainID"})
4965
)
5066

5167
// ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries.
5268
// It doesn't change internal logic, because all calls are delegated to the origin ORM
5369
type ObservedORM struct {
5470
ORM
55-
queryDuration *prometheus.HistogramVec
56-
datasetSize *prometheus.GaugeVec
57-
chainId string
71+
queryDuration *prometheus.HistogramVec
72+
datasetSize *prometheus.GaugeVec
73+
logsInserted *prometheus.CounterVec
74+
blocksInserted *prometheus.CounterVec
75+
chainId string
5876
}
5977

6078
// NewObservedORM creates an observed version of log poller's ORM created by NewORM
6179
// Please see ObservedLogPoller for more details on how latencies are measured
6280
func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ObservedORM {
6381
return &ObservedORM{
64-
ORM: NewORM(chainID, db, lggr, cfg),
65-
queryDuration: lpQueryDuration,
66-
datasetSize: lpQueryDataSets,
67-
chainId: chainID.String(),
82+
ORM: NewORM(chainID, db, lggr, cfg),
83+
queryDuration: lpQueryDuration,
84+
datasetSize: lpQueryDataSets,
85+
logsInserted: lpLogsInserted,
86+
blocksInserted: lpBlockInserted,
87+
chainId: chainID.String(),
6888
}
6989
}
7090

7191
func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
72-
return withObservedExec(o, "InsertLogs", func() error {
92+
err := withObservedExec(o, "InsertLogs", create, func() error {
7393
return o.ORM.InsertLogs(logs, qopts...)
7494
})
95+
trackInsertedLogsAndBlock(o, logs, nil, err)
96+
return err
7597
}
7698

7799
func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
78-
return withObservedExec(o, "InsertLogsWithBlock", func() error {
100+
err := withObservedExec(o, "InsertLogsWithBlock", create, func() error {
79101
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
80102
})
103+
trackInsertedLogsAndBlock(o, logs, &block, err)
104+
return err
81105
}
82106

83107
func (o *ObservedORM) InsertFilter(filter Filter, qopts ...pg.QOpt) error {
84-
return withObservedExec(o, "InsertFilter", func() error {
108+
return withObservedExec(o, "InsertFilter", create, func() error {
85109
return o.ORM.InsertFilter(filter, qopts...)
86110
})
87111
}
@@ -93,25 +117,25 @@ func (o *ObservedORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
93117
}
94118

95119
func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
96-
return withObservedExec(o, "DeleteFilter", func() error {
120+
return withObservedExec(o, "DeleteFilter", del, func() error {
97121
return o.ORM.DeleteFilter(name, qopts...)
98122
})
99123
}
100124

101125
func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
102-
return withObservedExec(o, "DeleteBlocksBefore", func() error {
126+
return withObservedExec(o, "DeleteBlocksBefore", del, func() error {
103127
return o.ORM.DeleteBlocksBefore(end, qopts...)
104128
})
105129
}
106130

107131
func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
108-
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
132+
return withObservedExec(o, "DeleteLogsAndBlocksAfter", del, func() error {
109133
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
110134
})
111135
}
112136

113137
func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error {
114-
return withObservedExec(o, "DeleteExpiredLogs", func() error {
138+
return withObservedExec(o, "DeleteExpiredLogs", del, func() error {
115139
return o.ORM.DeleteExpiredLogs(qopts...)
116140
})
117141
}
@@ -176,9 +200,9 @@ func (o *ObservedORM) SelectLogs(start, end int64, address common.Address, event
176200
})
177201
}
178202

179-
func (o *ObservedORM) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
180-
return withObservedQueryAndResults(o, "IndexedLogsByTxHash", func() ([]Log, error) {
181-
return o.ORM.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...)
203+
func (o *ObservedORM) SelectIndexedLogsByTxHash(address common.Address, eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
204+
return withObservedQueryAndResults(o, "SelectIndexedLogsByTxHash", func() ([]Log, error) {
205+
return o.ORM.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...)
182206
})
183207
}
184208

@@ -212,6 +236,12 @@ func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, even
212236
})
213237
}
214238

239+
func (o *ObservedORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
240+
return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) {
241+
return o.ORM.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
242+
})
243+
}
244+
215245
func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
216246
return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) {
217247
return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
@@ -228,7 +258,7 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
228258
results, err := withObservedQuery(o, queryName, query)
229259
if err == nil {
230260
o.datasetSize.
231-
WithLabelValues(o.chainId, queryName).
261+
WithLabelValues(o.chainId, queryName, string(read)).
232262
Set(float64(len(results)))
233263
}
234264
return results, err
@@ -238,18 +268,33 @@ func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T,
238268
queryStarted := time.Now()
239269
defer func() {
240270
o.queryDuration.
241-
WithLabelValues(o.chainId, queryName).
271+
WithLabelValues(o.chainId, queryName, string(read)).
242272
Observe(float64(time.Since(queryStarted)))
243273
}()
244274
return query()
245275
}
246276

247-
func withObservedExec(o *ObservedORM, query string, exec func() error) error {
277+
func withObservedExec(o *ObservedORM, query string, queryType queryType, exec func() error) error {
248278
queryStarted := time.Now()
249279
defer func() {
250280
o.queryDuration.
251-
WithLabelValues(o.chainId, query).
281+
WithLabelValues(o.chainId, query, string(queryType)).
252282
Observe(float64(time.Since(queryStarted)))
253283
}()
254284
return exec()
255285
}
286+
287+
func trackInsertedLogsAndBlock(o *ObservedORM, logs []Log, block *LogPollerBlock, err error) {
288+
if err != nil {
289+
return
290+
}
291+
o.logsInserted.
292+
WithLabelValues(o.chainId).
293+
Add(float64(len(logs)))
294+
295+
if block != nil {
296+
o.blocksInserted.
297+
WithLabelValues(o.chainId).
298+
Inc()
299+
}
300+
}

0 commit comments

Comments
 (0)