diff --git a/.changeset/proud-jokes-exercise.md b/.changeset/proud-jokes-exercise.md
new file mode 100644
index 00000000000..4e36d139de5
--- /dev/null
+++ b/.changeset/proud-jokes-exercise.md
@@ -0,0 +1,5 @@
+---
+"chainlink": minor
+---
+
+#added Report new heads as a telemetry to OTI
diff --git a/.mockery.yaml b/.mockery.yaml
index abb3105b136..37cbff58b1a 100644
--- a/.mockery.yaml
+++ b/.mockery.yaml
@@ -262,11 +262,20 @@ packages:
ORM:
Runner:
PipelineParamUnmarshaler:
- github.com/smartcontractkit/chainlink/v2/core/services/promreporter:
+ github.com/smartcontractkit/chainlink/v2/core/services/headreporter:
config:
- dir: core/internal/mocks
+ dir: "{{ .InterfaceDir }}"
+ filename: "{{ .InterfaceName | snakecase }}_mock.go"
+ inpackage: true
+ mockname: "Mock{{ .InterfaceName | camelcase }}"
interfaces:
+ HeadReporter:
PrometheusBackend:
+ github.com/smartcontractkit/libocr/commontypes:
+ config:
+ dir: "common/types/mocks"
+ interfaces:
+ MonitoringEndpoint:
github.com/smartcontractkit/chainlink/v2/core/services/relay/evm:
interfaces:
BatchCaller:
@@ -301,6 +310,15 @@ packages:
interfaces:
Config:
FeeConfig:
+ github.com/smartcontractkit/chainlink/v2/core/services/telemetry:
+ config:
+ dir: "{{ .InterfaceDir }}"
+ filename: "{{ .InterfaceName | snakecase }}_mock.go"
+ inpackage: true
+ mockname: "Mock{{ .InterfaceName | camelcase }}"
+ interfaces:
+ MonitoringEndpointGenerator:
+ IngressAgent:
github.com/smartcontractkit/chainlink/v2/core/services/webhook:
interfaces:
ExternalInitiatorManager:
diff --git a/common/types/mocks/monitoring_endpoint.go b/common/types/mocks/monitoring_endpoint.go
new file mode 100644
index 00000000000..5afc04c9090
--- /dev/null
+++ b/common/types/mocks/monitoring_endpoint.go
@@ -0,0 +1,65 @@
+// Code generated by mockery v2.43.2. DO NOT EDIT.
+
+package mocks
+
+import mock "github.com/stretchr/testify/mock"
+
+// MonitoringEndpoint is an autogenerated mock type for the MonitoringEndpoint type
+type MonitoringEndpoint struct {
+ mock.Mock
+}
+
+type MonitoringEndpoint_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MonitoringEndpoint) EXPECT() *MonitoringEndpoint_Expecter {
+ return &MonitoringEndpoint_Expecter{mock: &_m.Mock}
+}
+
+// SendLog provides a mock function with given fields: log
+func (_m *MonitoringEndpoint) SendLog(log []byte) {
+ _m.Called(log)
+}
+
+// MonitoringEndpoint_SendLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendLog'
+type MonitoringEndpoint_SendLog_Call struct {
+ *mock.Call
+}
+
+// SendLog is a helper method to define mock.On call
+// - log []byte
+func (_e *MonitoringEndpoint_Expecter) SendLog(log interface{}) *MonitoringEndpoint_SendLog_Call {
+ return &MonitoringEndpoint_SendLog_Call{Call: _e.mock.On("SendLog", log)}
+}
+
+func (_c *MonitoringEndpoint_SendLog_Call) Run(run func(log []byte)) *MonitoringEndpoint_SendLog_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].([]byte))
+ })
+ return _c
+}
+
+func (_c *MonitoringEndpoint_SendLog_Call) Return() *MonitoringEndpoint_SendLog_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MonitoringEndpoint_SendLog_Call) RunAndReturn(run func([]byte)) *MonitoringEndpoint_SendLog_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// NewMonitoringEndpoint creates a new instance of MonitoringEndpoint. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMonitoringEndpoint(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *MonitoringEndpoint {
+ mock := &MonitoringEndpoint{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/core/internal/mocks/prometheus_backend.go b/core/internal/mocks/prometheus_backend.go
deleted file mode 100644
index d02f7062cbf..00000000000
--- a/core/internal/mocks/prometheus_backend.go
+++ /dev/null
@@ -1,204 +0,0 @@
-// Code generated by mockery v2.43.2. DO NOT EDIT.
-
-package mocks
-
-import (
- big "math/big"
-
- mock "github.com/stretchr/testify/mock"
-)
-
-// PrometheusBackend is an autogenerated mock type for the PrometheusBackend type
-type PrometheusBackend struct {
- mock.Mock
-}
-
-type PrometheusBackend_Expecter struct {
- mock *mock.Mock
-}
-
-func (_m *PrometheusBackend) EXPECT() *PrometheusBackend_Expecter {
- return &PrometheusBackend_Expecter{mock: &_m.Mock}
-}
-
-// SetMaxUnconfirmedAge provides a mock function with given fields: _a0, _a1
-func (_m *PrometheusBackend) SetMaxUnconfirmedAge(_a0 *big.Int, _a1 float64) {
- _m.Called(_a0, _a1)
-}
-
-// PrometheusBackend_SetMaxUnconfirmedAge_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedAge'
-type PrometheusBackend_SetMaxUnconfirmedAge_Call struct {
- *mock.Call
-}
-
-// SetMaxUnconfirmedAge is a helper method to define mock.On call
-// - _a0 *big.Int
-// - _a1 float64
-func (_e *PrometheusBackend_Expecter) SetMaxUnconfirmedAge(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetMaxUnconfirmedAge_Call {
- return &PrometheusBackend_SetMaxUnconfirmedAge_Call{Call: _e.mock.On("SetMaxUnconfirmedAge", _a0, _a1)}
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) Run(run func(_a0 *big.Int, _a1 float64)) *PrometheusBackend_SetMaxUnconfirmedAge_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(*big.Int), args[1].(float64))
- })
- return _c
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) Return() *PrometheusBackend_SetMaxUnconfirmedAge_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) RunAndReturn(run func(*big.Int, float64)) *PrometheusBackend_SetMaxUnconfirmedAge_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// SetMaxUnconfirmedBlocks provides a mock function with given fields: _a0, _a1
-func (_m *PrometheusBackend) SetMaxUnconfirmedBlocks(_a0 *big.Int, _a1 int64) {
- _m.Called(_a0, _a1)
-}
-
-// PrometheusBackend_SetMaxUnconfirmedBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedBlocks'
-type PrometheusBackend_SetMaxUnconfirmedBlocks_Call struct {
- *mock.Call
-}
-
-// SetMaxUnconfirmedBlocks is a helper method to define mock.On call
-// - _a0 *big.Int
-// - _a1 int64
-func (_e *PrometheusBackend_Expecter) SetMaxUnconfirmedBlocks(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call {
- return &PrometheusBackend_SetMaxUnconfirmedBlocks_Call{Call: _e.mock.On("SetMaxUnconfirmedBlocks", _a0, _a1)}
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) Run(run func(_a0 *big.Int, _a1 int64)) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(*big.Int), args[1].(int64))
- })
- return _c
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) Return() *PrometheusBackend_SetMaxUnconfirmedBlocks_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) RunAndReturn(run func(*big.Int, int64)) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// SetPipelineRunsQueued provides a mock function with given fields: n
-func (_m *PrometheusBackend) SetPipelineRunsQueued(n int) {
- _m.Called(n)
-}
-
-// PrometheusBackend_SetPipelineRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineRunsQueued'
-type PrometheusBackend_SetPipelineRunsQueued_Call struct {
- *mock.Call
-}
-
-// SetPipelineRunsQueued is a helper method to define mock.On call
-// - n int
-func (_e *PrometheusBackend_Expecter) SetPipelineRunsQueued(n interface{}) *PrometheusBackend_SetPipelineRunsQueued_Call {
- return &PrometheusBackend_SetPipelineRunsQueued_Call{Call: _e.mock.On("SetPipelineRunsQueued", n)}
-}
-
-func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) Run(run func(n int)) *PrometheusBackend_SetPipelineRunsQueued_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(int))
- })
- return _c
-}
-
-func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) Return() *PrometheusBackend_SetPipelineRunsQueued_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) RunAndReturn(run func(int)) *PrometheusBackend_SetPipelineRunsQueued_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// SetPipelineTaskRunsQueued provides a mock function with given fields: n
-func (_m *PrometheusBackend) SetPipelineTaskRunsQueued(n int) {
- _m.Called(n)
-}
-
-// PrometheusBackend_SetPipelineTaskRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineTaskRunsQueued'
-type PrometheusBackend_SetPipelineTaskRunsQueued_Call struct {
- *mock.Call
-}
-
-// SetPipelineTaskRunsQueued is a helper method to define mock.On call
-// - n int
-func (_e *PrometheusBackend_Expecter) SetPipelineTaskRunsQueued(n interface{}) *PrometheusBackend_SetPipelineTaskRunsQueued_Call {
- return &PrometheusBackend_SetPipelineTaskRunsQueued_Call{Call: _e.mock.On("SetPipelineTaskRunsQueued", n)}
-}
-
-func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) Run(run func(n int)) *PrometheusBackend_SetPipelineTaskRunsQueued_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(int))
- })
- return _c
-}
-
-func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) Return() *PrometheusBackend_SetPipelineTaskRunsQueued_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) RunAndReturn(run func(int)) *PrometheusBackend_SetPipelineTaskRunsQueued_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// SetUnconfirmedTransactions provides a mock function with given fields: _a0, _a1
-func (_m *PrometheusBackend) SetUnconfirmedTransactions(_a0 *big.Int, _a1 int64) {
- _m.Called(_a0, _a1)
-}
-
-// PrometheusBackend_SetUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetUnconfirmedTransactions'
-type PrometheusBackend_SetUnconfirmedTransactions_Call struct {
- *mock.Call
-}
-
-// SetUnconfirmedTransactions is a helper method to define mock.On call
-// - _a0 *big.Int
-// - _a1 int64
-func (_e *PrometheusBackend_Expecter) SetUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetUnconfirmedTransactions_Call {
- return &PrometheusBackend_SetUnconfirmedTransactions_Call{Call: _e.mock.On("SetUnconfirmedTransactions", _a0, _a1)}
-}
-
-func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) Run(run func(_a0 *big.Int, _a1 int64)) *PrometheusBackend_SetUnconfirmedTransactions_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(*big.Int), args[1].(int64))
- })
- return _c
-}
-
-func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) Return() *PrometheusBackend_SetUnconfirmedTransactions_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) RunAndReturn(run func(*big.Int, int64)) *PrometheusBackend_SetUnconfirmedTransactions_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// NewPrometheusBackend creates a new instance of PrometheusBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-// The first argument is typically a *testing.T value.
-func NewPrometheusBackend(t interface {
- mock.TestingT
- Cleanup(func())
-}) *PrometheusBackend {
- mock := &PrometheusBackend{}
- mock.Mock.Test(t)
-
- t.Cleanup(func() { mock.AssertExpectations(t) })
-
- return mock
-}
diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go
index 17c217b1c90..98067821f99 100644
--- a/core/services/chainlink/application.go
+++ b/core/services/chainlink/application.go
@@ -45,6 +45,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
+ "github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
@@ -56,7 +57,6 @@ import (
externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
- "github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
@@ -323,8 +323,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
srvcs = append(srvcs, mailMon)
srvcs = append(srvcs, relayerChainInterops.Services()...)
- promReporter := promreporter.NewPromReporter(opts.DS, legacyEVMChains, globalLogger)
- srvcs = append(srvcs, promReporter)
// Initialize Local Users ORM and Authentication Provider specified in config
// BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider
@@ -364,8 +362,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)
+ promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
+ chainIDs := make([]*big.Int, legacyEVMChains.Len())
+ for i, chain := range legacyEVMChains.Slice() {
+ chainIDs[i] = chain.ID()
+ }
+ telemReporter := headreporter.NewTelemetryReporter(telemetryManager, chainIDs...)
+ headReporter := headreporter.NewHeadReporterService(opts.DS, globalLogger, promReporter, telemReporter)
+ srvcs = append(srvcs, headReporter)
for _, chain := range legacyEVMChains.Slice() {
- chain.HeadBroadcaster().Subscribe(promReporter)
+ chain.HeadBroadcaster().Subscribe(headReporter)
chain.TxManager().RegisterResumeCallback(pipelineRunner.ResumeRun)
}
diff --git a/core/services/headreporter/head_reporter.go b/core/services/headreporter/head_reporter.go
new file mode 100644
index 00000000000..f81a6acf913
--- /dev/null
+++ b/core/services/headreporter/head_reporter.go
@@ -0,0 +1,110 @@
+package headreporter
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/services"
+ "github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
+ "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
+
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ "github.com/smartcontractkit/chainlink/v2/core/logger"
+)
+
+type (
+ HeadReporter interface {
+ ReportNewHead(ctx context.Context, head *evmtypes.Head) error
+ ReportPeriodic(ctx context.Context) error
+ }
+
+ HeadReporterService struct {
+ services.StateMachine
+ ds sqlutil.DataSource
+ lggr logger.Logger
+ newHeads *mailbox.Mailbox[*evmtypes.Head]
+ chStop services.StopChan
+ wgDone sync.WaitGroup
+ reportPeriod time.Duration
+ reporters []HeadReporter
+ unsubscribeFns []func()
+ }
+)
+
+func NewHeadReporterService(ds sqlutil.DataSource, lggr logger.Logger, reporters ...HeadReporter) *HeadReporterService {
+ return &HeadReporterService{
+ ds: ds,
+ lggr: lggr.Named("HeadReporter"),
+ newHeads: mailbox.NewSingle[*evmtypes.Head](),
+ chStop: make(chan struct{}),
+ reporters: reporters,
+ }
+}
+
+func (hrd *HeadReporterService) Subscribe(subFn func(types.HeadTrackable) (evmtypes.Head, func())) {
+ _, unsubscribe := subFn(hrd)
+ hrd.unsubscribeFns = append(hrd.unsubscribeFns, unsubscribe)
+}
+
+func (hrd *HeadReporterService) Start(context.Context) error {
+ return hrd.StartOnce(hrd.Name(), func() error {
+ hrd.wgDone.Add(1)
+ go hrd.eventLoop()
+ return nil
+ })
+}
+
+func (hrd *HeadReporterService) Close() error {
+ return hrd.StopOnce(hrd.Name(), func() error {
+ close(hrd.chStop)
+ hrd.wgDone.Wait()
+ return nil
+ })
+}
+
+func (hrd *HeadReporterService) Name() string {
+ return hrd.lggr.Name()
+}
+
+func (hrd *HeadReporterService) HealthReport() map[string]error {
+ return map[string]error{hrd.Name(): hrd.Healthy()}
+}
+
+func (hrd *HeadReporterService) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) {
+ hrd.newHeads.Deliver(head)
+}
+
+func (hrd *HeadReporterService) eventLoop() {
+ hrd.lggr.Debug("Starting event loop")
+ defer hrd.wgDone.Done()
+ ctx, cancel := hrd.chStop.NewCtx()
+ defer cancel()
+ after := time.After(hrd.reportPeriod)
+ for {
+ select {
+ case <-hrd.newHeads.Notify():
+ head, exists := hrd.newHeads.Retrieve()
+ if !exists {
+ continue
+ }
+ for _, reporter := range hrd.reporters {
+ err := reporter.ReportNewHead(ctx, head)
+ if err != nil && ctx.Err() == nil {
+ hrd.lggr.Errorw("Error reporting new head", "err", err)
+ }
+ }
+ case <-after:
+ for _, reporter := range hrd.reporters {
+ err := reporter.ReportPeriodic(ctx)
+ if err != nil && ctx.Err() == nil {
+ hrd.lggr.Errorw("Error in periodic report", "err", err)
+ }
+ }
+ after = time.After(hrd.reportPeriod)
+ case <-hrd.chStop:
+ return
+ }
+ }
+}
diff --git a/core/services/headreporter/head_reporter_mock.go b/core/services/headreporter/head_reporter_mock.go
new file mode 100644
index 00000000000..21978abb86a
--- /dev/null
+++ b/core/services/headreporter/head_reporter_mock.go
@@ -0,0 +1,130 @@
+// Code generated by mockery v2.43.2. DO NOT EDIT.
+
+package headreporter
+
+import (
+ context "context"
+
+ types "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ mock "github.com/stretchr/testify/mock"
+)
+
+// MockHeadReporter is an autogenerated mock type for the HeadReporter type
+type MockHeadReporter struct {
+ mock.Mock
+}
+
+type MockHeadReporter_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockHeadReporter) EXPECT() *MockHeadReporter_Expecter {
+ return &MockHeadReporter_Expecter{mock: &_m.Mock}
+}
+
+// ReportNewHead provides a mock function with given fields: ctx, head
+func (_m *MockHeadReporter) ReportNewHead(ctx context.Context, head *types.Head) error {
+ ret := _m.Called(ctx, head)
+
+ if len(ret) == 0 {
+ panic("no return value specified for ReportNewHead")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, *types.Head) error); ok {
+ r0 = rf(ctx, head)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockHeadReporter_ReportNewHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportNewHead'
+type MockHeadReporter_ReportNewHead_Call struct {
+ *mock.Call
+}
+
+// ReportNewHead is a helper method to define mock.On call
+// - ctx context.Context
+// - head *types.Head
+func (_e *MockHeadReporter_Expecter) ReportNewHead(ctx interface{}, head interface{}) *MockHeadReporter_ReportNewHead_Call {
+ return &MockHeadReporter_ReportNewHead_Call{Call: _e.mock.On("ReportNewHead", ctx, head)}
+}
+
+func (_c *MockHeadReporter_ReportNewHead_Call) Run(run func(ctx context.Context, head *types.Head)) *MockHeadReporter_ReportNewHead_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].(*types.Head))
+ })
+ return _c
+}
+
+func (_c *MockHeadReporter_ReportNewHead_Call) Return(_a0 error) *MockHeadReporter_ReportNewHead_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockHeadReporter_ReportNewHead_Call) RunAndReturn(run func(context.Context, *types.Head) error) *MockHeadReporter_ReportNewHead_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// ReportPeriodic provides a mock function with given fields: ctx
+func (_m *MockHeadReporter) ReportPeriodic(ctx context.Context) error {
+ ret := _m.Called(ctx)
+
+ if len(ret) == 0 {
+ panic("no return value specified for ReportPeriodic")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context) error); ok {
+ r0 = rf(ctx)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockHeadReporter_ReportPeriodic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportPeriodic'
+type MockHeadReporter_ReportPeriodic_Call struct {
+ *mock.Call
+}
+
+// ReportPeriodic is a helper method to define mock.On call
+// - ctx context.Context
+func (_e *MockHeadReporter_Expecter) ReportPeriodic(ctx interface{}) *MockHeadReporter_ReportPeriodic_Call {
+ return &MockHeadReporter_ReportPeriodic_Call{Call: _e.mock.On("ReportPeriodic", ctx)}
+}
+
+func (_c *MockHeadReporter_ReportPeriodic_Call) Run(run func(ctx context.Context)) *MockHeadReporter_ReportPeriodic_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context))
+ })
+ return _c
+}
+
+func (_c *MockHeadReporter_ReportPeriodic_Call) Return(_a0 error) *MockHeadReporter_ReportPeriodic_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockHeadReporter_ReportPeriodic_Call) RunAndReturn(run func(context.Context) error) *MockHeadReporter_ReportPeriodic_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// NewMockHeadReporter creates a new instance of MockHeadReporter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockHeadReporter(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *MockHeadReporter {
+ mock := &MockHeadReporter{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/core/services/headreporter/head_reporter_test.go b/core/services/headreporter/head_reporter_test.go
new file mode 100644
index 00000000000..ded7e1fb61b
--- /dev/null
+++ b/core/services/headreporter/head_reporter_test.go
@@ -0,0 +1,45 @@
+package headreporter
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
+
+ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
+ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
+ "github.com/smartcontractkit/chainlink/v2/core/logger"
+)
+
+func NewHead() evmtypes.Head {
+ return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)}
+}
+
+func Test_HeadReporterService(t *testing.T) {
+ t.Run("report everything", func(t *testing.T) {
+ db := pgtest.NewSqlxDB(t)
+
+ headReporter := NewMockHeadReporter(t)
+ service := NewHeadReporterService(db, logger.TestLogger(t), headReporter)
+ service.reportPeriod = time.Second
+ err := service.Start(testutils.Context(t))
+ require.NoError(t, err)
+
+ var reportCalls atomic.Int32
+ head := NewHead()
+ headReporter.On("ReportNewHead", mock.Anything, &head).Run(func(args mock.Arguments) {
+ reportCalls.Add(1)
+ }).Return(nil)
+ headReporter.On("ReportPeriodic", mock.Anything).Run(func(args mock.Arguments) {
+ reportCalls.Add(1)
+ }).Return(nil)
+ service.OnNewLongestChain(testutils.Context(t), &head)
+
+ require.Eventually(t, func() bool { return reportCalls.Load() == 2 }, 5*time.Second, 100*time.Millisecond)
+ })
+}
diff --git a/core/services/headreporter/helper_test.go b/core/services/headreporter/helper_test.go
new file mode 100644
index 00000000000..fa05182a851
--- /dev/null
+++ b/core/services/headreporter/helper_test.go
@@ -0,0 +1,5 @@
+package headreporter
+
+func (p *prometheusReporter) SetBackend(b PrometheusBackend) {
+ p.backend = b
+}
diff --git a/core/services/headreporter/prometheus_backend_mock.go b/core/services/headreporter/prometheus_backend_mock.go
new file mode 100644
index 00000000000..ca83f6c4fbb
--- /dev/null
+++ b/core/services/headreporter/prometheus_backend_mock.go
@@ -0,0 +1,204 @@
+// Code generated by mockery v2.43.2. DO NOT EDIT.
+
+package headreporter
+
+import (
+ big "math/big"
+
+ mock "github.com/stretchr/testify/mock"
+)
+
+// MockPrometheusBackend is an autogenerated mock type for the PrometheusBackend type
+type MockPrometheusBackend struct {
+ mock.Mock
+}
+
+type MockPrometheusBackend_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockPrometheusBackend) EXPECT() *MockPrometheusBackend_Expecter {
+ return &MockPrometheusBackend_Expecter{mock: &_m.Mock}
+}
+
+// SetMaxUnconfirmedAge provides a mock function with given fields: _a0, _a1
+func (_m *MockPrometheusBackend) SetMaxUnconfirmedAge(_a0 *big.Int, _a1 float64) {
+ _m.Called(_a0, _a1)
+}
+
+// MockPrometheusBackend_SetMaxUnconfirmedAge_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedAge'
+type MockPrometheusBackend_SetMaxUnconfirmedAge_Call struct {
+ *mock.Call
+}
+
+// SetMaxUnconfirmedAge is a helper method to define mock.On call
+// - _a0 *big.Int
+// - _a1 float64
+func (_e *MockPrometheusBackend_Expecter) SetMaxUnconfirmedAge(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call {
+ return &MockPrometheusBackend_SetMaxUnconfirmedAge_Call{Call: _e.mock.On("SetMaxUnconfirmedAge", _a0, _a1)}
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) Run(run func(_a0 *big.Int, _a1 float64)) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(*big.Int), args[1].(float64))
+ })
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) Return() *MockPrometheusBackend_SetMaxUnconfirmedAge_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) RunAndReturn(run func(*big.Int, float64)) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// SetMaxUnconfirmedBlocks provides a mock function with given fields: _a0, _a1
+func (_m *MockPrometheusBackend) SetMaxUnconfirmedBlocks(_a0 *big.Int, _a1 int64) {
+ _m.Called(_a0, _a1)
+}
+
+// MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedBlocks'
+type MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call struct {
+ *mock.Call
+}
+
+// SetMaxUnconfirmedBlocks is a helper method to define mock.On call
+// - _a0 *big.Int
+// - _a1 int64
+func (_e *MockPrometheusBackend_Expecter) SetMaxUnconfirmedBlocks(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call {
+ return &MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call{Call: _e.mock.On("SetMaxUnconfirmedBlocks", _a0, _a1)}
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) Run(run func(_a0 *big.Int, _a1 int64)) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(*big.Int), args[1].(int64))
+ })
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) Return() *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) RunAndReturn(run func(*big.Int, int64)) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// SetPipelineRunsQueued provides a mock function with given fields: n
+func (_m *MockPrometheusBackend) SetPipelineRunsQueued(n int) {
+ _m.Called(n)
+}
+
+// MockPrometheusBackend_SetPipelineRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineRunsQueued'
+type MockPrometheusBackend_SetPipelineRunsQueued_Call struct {
+ *mock.Call
+}
+
+// SetPipelineRunsQueued is a helper method to define mock.On call
+// - n int
+func (_e *MockPrometheusBackend_Expecter) SetPipelineRunsQueued(n interface{}) *MockPrometheusBackend_SetPipelineRunsQueued_Call {
+ return &MockPrometheusBackend_SetPipelineRunsQueued_Call{Call: _e.mock.On("SetPipelineRunsQueued", n)}
+}
+
+func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) Run(run func(n int)) *MockPrometheusBackend_SetPipelineRunsQueued_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(int))
+ })
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) Return() *MockPrometheusBackend_SetPipelineRunsQueued_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) RunAndReturn(run func(int)) *MockPrometheusBackend_SetPipelineRunsQueued_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// SetPipelineTaskRunsQueued provides a mock function with given fields: n
+func (_m *MockPrometheusBackend) SetPipelineTaskRunsQueued(n int) {
+ _m.Called(n)
+}
+
+// MockPrometheusBackend_SetPipelineTaskRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineTaskRunsQueued'
+type MockPrometheusBackend_SetPipelineTaskRunsQueued_Call struct {
+ *mock.Call
+}
+
+// SetPipelineTaskRunsQueued is a helper method to define mock.On call
+// - n int
+func (_e *MockPrometheusBackend_Expecter) SetPipelineTaskRunsQueued(n interface{}) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call {
+ return &MockPrometheusBackend_SetPipelineTaskRunsQueued_Call{Call: _e.mock.On("SetPipelineTaskRunsQueued", n)}
+}
+
+func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) Run(run func(n int)) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(int))
+ })
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) Return() *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) RunAndReturn(run func(int)) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// SetUnconfirmedTransactions provides a mock function with given fields: _a0, _a1
+func (_m *MockPrometheusBackend) SetUnconfirmedTransactions(_a0 *big.Int, _a1 int64) {
+ _m.Called(_a0, _a1)
+}
+
+// MockPrometheusBackend_SetUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetUnconfirmedTransactions'
+type MockPrometheusBackend_SetUnconfirmedTransactions_Call struct {
+ *mock.Call
+}
+
+// SetUnconfirmedTransactions is a helper method to define mock.On call
+// - _a0 *big.Int
+// - _a1 int64
+func (_e *MockPrometheusBackend_Expecter) SetUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetUnconfirmedTransactions_Call {
+ return &MockPrometheusBackend_SetUnconfirmedTransactions_Call{Call: _e.mock.On("SetUnconfirmedTransactions", _a0, _a1)}
+}
+
+func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) Run(run func(_a0 *big.Int, _a1 int64)) *MockPrometheusBackend_SetUnconfirmedTransactions_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(*big.Int), args[1].(int64))
+ })
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) Return() *MockPrometheusBackend_SetUnconfirmedTransactions_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) RunAndReturn(run func(*big.Int, int64)) *MockPrometheusBackend_SetUnconfirmedTransactions_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// NewMockPrometheusBackend creates a new instance of MockPrometheusBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockPrometheusBackend(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *MockPrometheusBackend {
+ mock := &MockPrometheusBackend{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/core/services/promreporter/prom_reporter.go b/core/services/headreporter/prometheus_reporter.go
similarity index 63%
rename from core/services/promreporter/prom_reporter.go
rename to core/services/headreporter/prometheus_reporter.go
index 31d5f1129ef..3e39c7aca45 100644
--- a/core/services/promreporter/prom_reporter.go
+++ b/core/services/headreporter/prometheus_reporter.go
@@ -1,40 +1,28 @@
-package promreporter
+package headreporter
import (
"context"
"fmt"
"math/big"
- "sync"
"time"
- "github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
- txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
- "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
-
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
- "github.com/smartcontractkit/chainlink-common/pkg/services"
- "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
-
+ "github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
+ txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
+ "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
- "github.com/smartcontractkit/chainlink/v2/core/logger"
)
type (
- promReporter struct {
- services.StateMachine
- ds sqlutil.DataSource
- chains legacyevm.LegacyChainContainer
- lggr logger.Logger
- backend PrometheusBackend
- newHeads *mailbox.Mailbox[*evmtypes.Head]
- chStop services.StopChan
- wgDone sync.WaitGroup
- reportPeriod time.Duration
+ prometheusReporter struct {
+ ds sqlutil.DataSource
+ chains legacyevm.LegacyChainContainer
+ backend PrometheusBackend
}
PrometheusBackend interface {
@@ -71,103 +59,15 @@ var (
})
)
-func (defaultBackend) SetUnconfirmedTransactions(evmChainID *big.Int, n int64) {
- promUnconfirmedTransactions.WithLabelValues(evmChainID.String()).Set(float64(n))
-}
-
-func (defaultBackend) SetMaxUnconfirmedAge(evmChainID *big.Int, s float64) {
- promMaxUnconfirmedAge.WithLabelValues(evmChainID.String()).Set(s)
-}
-
-func (defaultBackend) SetMaxUnconfirmedBlocks(evmChainID *big.Int, n int64) {
- promMaxUnconfirmedBlocks.WithLabelValues(evmChainID.String()).Set(float64(n))
-}
-
-func (defaultBackend) SetPipelineRunsQueued(n int) {
- promPipelineTaskRunsQueued.Set(float64(n))
-}
-
-func (defaultBackend) SetPipelineTaskRunsQueued(n int) {
- promPipelineRunsQueued.Set(float64(n))
-}
-
-func NewPromReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter {
- var backend PrometheusBackend = defaultBackend{}
- period := 15 * time.Second
- for _, opt := range opts {
- switch v := opt.(type) {
- case time.Duration:
- period = v
- case PrometheusBackend:
- backend = v
- }
- }
-
- chStop := make(chan struct{})
- return &promReporter{
- ds: ds,
- chains: chainContainer,
- lggr: lggr.Named("PromReporter"),
- backend: backend,
- newHeads: mailbox.NewSingle[*evmtypes.Head](),
- chStop: chStop,
- reportPeriod: period,
+func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer) *prometheusReporter {
+ return &prometheusReporter{
+ ds: ds,
+ chains: chainContainer,
+ backend: defaultBackend{},
}
}
-// Start starts PromReporter.
-func (pr *promReporter) Start(context.Context) error {
- return pr.StartOnce("PromReporter", func() error {
- pr.wgDone.Add(1)
- go pr.eventLoop()
- return nil
- })
-}
-
-func (pr *promReporter) Close() error {
- return pr.StopOnce("PromReporter", func() error {
- close(pr.chStop)
- pr.wgDone.Wait()
- return nil
- })
-}
-func (pr *promReporter) Name() string {
- return pr.lggr.Name()
-}
-
-func (pr *promReporter) HealthReport() map[string]error {
- return map[string]error{pr.Name(): pr.Healthy()}
-}
-
-func (pr *promReporter) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) {
- pr.newHeads.Deliver(head)
-}
-
-func (pr *promReporter) eventLoop() {
- pr.lggr.Debug("Starting event loop")
- defer pr.wgDone.Done()
- ctx, cancel := pr.chStop.NewCtx()
- defer cancel()
- for {
- select {
- case <-pr.newHeads.Notify():
- head, exists := pr.newHeads.Retrieve()
- if !exists {
- continue
- }
- pr.reportHeadMetrics(ctx, head)
- case <-time.After(pr.reportPeriod):
- if err := errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed"); err != nil {
- pr.lggr.Errorw("Error reporting prometheus metrics", "err", err)
- }
-
- case <-pr.chStop:
- return
- }
- }
-}
-
-func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) {
+func (pr *prometheusReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) {
chain, err := pr.chains.Get(evmChainID.String())
if err != nil {
return nil, fmt.Errorf("failed to get chain: %w", err)
@@ -175,20 +75,16 @@ func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) {
return chain.TxManager(), nil
}
-func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.Head) {
+func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
evmChainID := head.EVMChainID.ToInt()
- err := multierr.Combine(
+ return multierr.Combine(
errors.Wrap(pr.reportPendingEthTxes(ctx, evmChainID), "reportPendingEthTxes failed"),
errors.Wrap(pr.reportMaxUnconfirmedAge(ctx, evmChainID), "reportMaxUnconfirmedAge failed"),
errors.Wrap(pr.reportMaxUnconfirmedBlocks(ctx, head), "reportMaxUnconfirmedBlocks failed"),
)
-
- if err != nil && ctx.Err() == nil {
- pr.lggr.Errorw("Error reporting prometheus metrics", "err", err)
- }
}
-func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) {
+func (pr *prometheusReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) {
txm, err := pr.getTxm(evmChainID)
if err != nil {
return fmt.Errorf("failed to get txm: %w", err)
@@ -202,7 +98,7 @@ func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *bi
return nil
}
-func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) {
+func (pr *prometheusReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) {
txm, err := pr.getTxm(evmChainID)
if err != nil {
return fmt.Errorf("failed to get txm: %w", err)
@@ -221,7 +117,7 @@ func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID
return nil
}
-func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) {
+func (pr *prometheusReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) {
txm, err := pr.getTxm(head.EVMChainID.ToInt())
if err != nil {
return fmt.Errorf("failed to get txm: %w", err)
@@ -240,7 +136,11 @@ func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *ev
return nil
}
-func (pr *promReporter) reportPipelineRunStats(ctx context.Context) (err error) {
+func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) error {
+ return errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed")
+}
+
+func (pr *prometheusReporter) reportPipelineRunStats(ctx context.Context) (err error) {
rows, err := pr.ds.QueryContext(ctx, `
SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL
`)
@@ -271,3 +171,23 @@ SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL
return nil
}
+
+func (defaultBackend) SetUnconfirmedTransactions(evmChainID *big.Int, n int64) {
+ promUnconfirmedTransactions.WithLabelValues(evmChainID.String()).Set(float64(n))
+}
+
+func (defaultBackend) SetMaxUnconfirmedAge(evmChainID *big.Int, s float64) {
+ promMaxUnconfirmedAge.WithLabelValues(evmChainID.String()).Set(s)
+}
+
+func (defaultBackend) SetMaxUnconfirmedBlocks(evmChainID *big.Int, n int64) {
+ promMaxUnconfirmedBlocks.WithLabelValues(evmChainID.String()).Set(float64(n))
+}
+
+func (defaultBackend) SetPipelineRunsQueued(n int) {
+ promPipelineTaskRunsQueued.Set(float64(n))
+}
+
+func (defaultBackend) SetPipelineTaskRunsQueued(n int) {
+ promPipelineRunsQueued.Set(float64(n))
+}
diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/headreporter/prometheus_reporter_test.go
similarity index 64%
rename from core/services/promreporter/prom_reporter_test.go
rename to core/services/headreporter/prometheus_reporter_test.go
index a0a4a247c21..d96e617fd79 100644
--- a/core/services/promreporter/prom_reporter_test.go
+++ b/core/services/headreporter/prometheus_reporter_test.go
@@ -1,8 +1,7 @@
-package promreporter_test
+package headreporter_test
import (
"math/big"
- "sync/atomic"
"testing"
"time"
@@ -10,91 +9,40 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
- "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
-
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
- evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
- ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
- "github.com/smartcontractkit/chainlink/v2/core/internal/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
- "github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
+ "github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
)
-func newHead() evmtypes.Head {
- return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)}
-}
-
-func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer {
- config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
- keyStore := cltest.NewKeyStore(t, db).Eth()
- ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
- estimator, err := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator())
- require.NoError(t, err)
- lggr := logger.TestLogger(t)
- lpOpts := logpoller.Opts{
- PollPeriod: 100 * time.Millisecond,
- FinalityDepth: 2,
- BackfillBatchSize: 3,
- RpcBatchSize: 2,
- KeepFinalizedBlocksDepth: 1000,
- }
- ht := headtracker.NewSimulatedHeadTracker(ethClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth)
- lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, ht, lpOpts)
-
- txm, err := txmgr.NewTxm(
- db,
- evmConfig,
- evmConfig.GasEstimator(),
- evmConfig.Transactions(),
- nil,
- dbConfig,
- dbConfig.Listener(),
- ethClient,
- lggr,
- lp,
- keyStore,
- estimator,
- ht)
- require.NoError(t, err)
-
- cfg := configtest.NewGeneralConfig(t, nil)
- return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm)
-}
-
-func Test_PromReporter_OnNewLongestChain(t *testing.T) {
+func Test_PrometheusReporter(t *testing.T) {
t.Run("with nothing in the database", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
- backend := mocks.NewPrometheusBackend(t)
- reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond)
-
- var subscribeCalls atomic.Int32
-
+ backend := headreporter.NewMockPrometheusBackend(t)
backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return()
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return()
- backend.On("SetPipelineTaskRunsQueued", 0).Return()
- backend.On("SetPipelineRunsQueued", 0).
- Run(func(args mock.Arguments) {
- subscribeCalls.Add(1)
- }).
- Return()
- servicetest.Run(t, reporter)
+ reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
+ reporter.SetBackend(backend)
- head := newHead()
- reporter.OnNewLongestChain(testutils.Context(t), &head)
+ head := headreporter.NewHead()
+ err := reporter.ReportNewHead(testutils.Context(t), &head)
+ require.NoError(t, err)
- require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond)
+ backend.On("SetPipelineTaskRunsQueued", 0).Return()
+ backend.On("SetPipelineRunsQueued", 0).Return()
+ err = reporter.ReportPeriodic(testutils.Context(t))
+ require.NoError(t, err)
})
t.Run("with unconfirmed evm.txes", func(t *testing.T) {
@@ -103,61 +51,93 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) {
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
- var subscribeCalls atomic.Int32
+ etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
+ cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress)
+ cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress)
+ require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7))
- backend := mocks.NewPrometheusBackend(t)
+ backend := headreporter.NewMockPrometheusBackend(t)
backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(3)).Return()
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), mock.MatchedBy(func(s float64) bool {
return s > 0
})).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(35)).Return()
- backend.On("SetPipelineTaskRunsQueued", 0).Return()
- backend.On("SetPipelineRunsQueued", 0).
- Run(func(args mock.Arguments) {
- subscribeCalls.Add(1)
- }).
- Return()
- reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond)
- servicetest.Run(t, reporter)
- etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
- cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress)
- cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress)
- require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7))
+ reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
+ reporter.SetBackend(backend)
- head := newHead()
- reporter.OnNewLongestChain(testutils.Context(t), &head)
+ head := headreporter.NewHead()
+ err := reporter.ReportNewHead(testutils.Context(t), &head)
+ require.NoError(t, err)
- require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond)
+ backend.On("SetPipelineTaskRunsQueued", 0).Return()
+ backend.On("SetPipelineRunsQueued", 0).Return()
+
+ err = reporter.ReportPeriodic(testutils.Context(t))
+ require.NoError(t, err)
})
t.Run("with unfinished pipeline task runs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`)
- backend := mocks.NewPrometheusBackend(t)
- reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond)
-
cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1)
cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1)
cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 2)
- var subscribeCalls atomic.Int32
-
+ backend := headreporter.NewMockPrometheusBackend(t)
backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return()
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return()
- backend.On("SetPipelineTaskRunsQueued", 3).Return()
- backend.On("SetPipelineRunsQueued", 2).
- Run(func(args mock.Arguments) {
- subscribeCalls.Add(1)
- }).
- Return()
- servicetest.Run(t, reporter)
- head := newHead()
- reporter.OnNewLongestChain(testutils.Context(t), &head)
+ reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
+ reporter.SetBackend(backend)
+
+ head := headreporter.NewHead()
+ err := reporter.ReportNewHead(testutils.Context(t), &head)
+ require.NoError(t, err)
- require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond)
+ backend.On("SetPipelineTaskRunsQueued", 3).Return()
+ backend.On("SetPipelineRunsQueued", 2).Return()
+
+ err = reporter.ReportPeriodic(testutils.Context(t))
+ require.NoError(t, err)
})
}
+
+func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer {
+ config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
+ keyStore := cltest.NewKeyStore(t, db).Eth()
+ ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
+ estimator, err := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator())
+ require.NoError(t, err)
+ lggr := logger.TestLogger(t)
+ lpOpts := logpoller.Opts{
+ PollPeriod: 100 * time.Millisecond,
+ FinalityDepth: 2,
+ BackfillBatchSize: 3,
+ RpcBatchSize: 2,
+ KeepFinalizedBlocksDepth: 1000,
+ }
+ ht := headtracker.NewSimulatedHeadTracker(ethClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth)
+ lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, ht, lpOpts)
+
+ txm, err := txmgr.NewTxm(
+ db,
+ evmConfig,
+ evmConfig.GasEstimator(),
+ evmConfig.Transactions(),
+ nil,
+ dbConfig,
+ dbConfig.Listener(),
+ ethClient,
+ lggr,
+ lp,
+ keyStore,
+ estimator,
+ ht)
+ require.NoError(t, err)
+
+ cfg := configtest.NewGeneralConfig(t, nil)
+ return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm)
+}
diff --git a/core/services/headreporter/telemetry_reporter.go b/core/services/headreporter/telemetry_reporter.go
new file mode 100644
index 00000000000..d76ce8a6044
--- /dev/null
+++ b/core/services/headreporter/telemetry_reporter.go
@@ -0,0 +1,65 @@
+package headreporter
+
+import (
+ "context"
+ "math/big"
+
+ "github.com/pkg/errors"
+
+ "github.com/smartcontractkit/libocr/commontypes"
+ "google.golang.org/protobuf/proto"
+
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ "github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
+ "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
+ "github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
+)
+
+type telemetryReporter struct {
+ endpoints map[uint64]commontypes.MonitoringEndpoint
+}
+
+func NewTelemetryReporter(monitoringEndpointGen telemetry.MonitoringEndpointGenerator, chainIDs ...*big.Int) HeadReporter {
+ endpoints := make(map[uint64]commontypes.MonitoringEndpoint)
+ for _, chainID := range chainIDs {
+ endpoints[chainID.Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chainID.String(), "", synchronization.HeadReport)
+ }
+ return &telemetryReporter{endpoints: endpoints}
+}
+
+func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
+ monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()]
+ if monitoringEndpoint == nil {
+ return errors.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64())
+ }
+ var finalized *telem.Block
+ latestFinalizedHead := head.LatestFinalizedHead()
+ if latestFinalizedHead != nil {
+ finalized = &telem.Block{
+ Timestamp: uint64(latestFinalizedHead.GetTimestamp().UTC().Unix()),
+ Number: uint64(latestFinalizedHead.BlockNumber()),
+ Hash: latestFinalizedHead.BlockHash().Hex(),
+ }
+ }
+ request := &telem.HeadReportRequest{
+ Latest: &telem.Block{
+ Timestamp: uint64(head.Timestamp.UTC().Unix()),
+ Number: uint64(head.Number),
+ Hash: head.Hash.Hex(),
+ },
+ Finalized: finalized,
+ }
+ bytes, err := proto.Marshal(request)
+ if err != nil {
+ return errors.WithMessage(err, "telem.HeadReportRequest marshal error")
+ }
+ monitoringEndpoint.SendLog(bytes)
+ if finalized == nil {
+ return errors.Errorf("No finalized block was found for chain_id=%d", head.EVMChainID.Int64())
+ }
+ return nil
+}
+
+func (t *telemetryReporter) ReportPeriodic(ctx context.Context) error {
+ return nil
+}
diff --git a/core/services/headreporter/telemetry_reporter_test.go b/core/services/headreporter/telemetry_reporter_test.go
new file mode 100644
index 00000000000..c33edab0bcf
--- /dev/null
+++ b/core/services/headreporter/telemetry_reporter_test.go
@@ -0,0 +1,105 @@
+package headreporter_test
+
+import (
+ "math/big"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/proto"
+
+ mocks2 "github.com/smartcontractkit/chainlink/v2/common/types/mocks"
+ evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
+ ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
+ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
+ "github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
+ "github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
+ "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
+ "github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
+)
+
+func Test_TelemetryReporter_NewHead(t *testing.T) {
+ head := evmtypes.Head{
+ Number: 42,
+ EVMChainID: ubig.NewI(100),
+ Hash: common.HexToHash("0x1010"),
+ Timestamp: time.UnixMilli(1000),
+ IsFinalized: false,
+ Parent: &evmtypes.Head{
+ Number: 41,
+ Hash: common.HexToHash("0x1009"),
+ Timestamp: time.UnixMilli(999),
+ IsFinalized: true,
+ },
+ }
+ requestBytes, err := proto.Marshal(&telem.HeadReportRequest{
+ Latest: &telem.Block{
+ Timestamp: uint64(head.Timestamp.UTC().Unix()),
+ Number: 42,
+ Hash: head.Hash.Hex(),
+ },
+ Finalized: &telem.Block{
+ Timestamp: uint64(head.Parent.Timestamp.UTC().Unix()),
+ Number: 41,
+ Hash: head.Parent.Hash.Hex(),
+ },
+ })
+ assert.NoError(t, err)
+
+ monitoringEndpoint := mocks2.NewMonitoringEndpoint(t)
+ monitoringEndpoint.On("SendLog", requestBytes).Return()
+
+ monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t)
+ monitoringEndpointGen.
+ On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport).
+ Return(monitoringEndpoint)
+ reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, big.NewInt(100))
+
+ err = reporter.ReportNewHead(testutils.Context(t), &head)
+ assert.NoError(t, err)
+}
+
+func Test_TelemetryReporter_NewHeadMissingFinalized(t *testing.T) {
+ head := evmtypes.Head{
+ Number: 42,
+ EVMChainID: ubig.NewI(100),
+ Hash: common.HexToHash("0x1010"),
+ Timestamp: time.UnixMilli(1000),
+ IsFinalized: false,
+ }
+ requestBytes, err := proto.Marshal(&telem.HeadReportRequest{
+ Latest: &telem.Block{
+ Timestamp: uint64(head.Timestamp.UTC().Unix()),
+ Number: 42,
+ Hash: head.Hash.Hex(),
+ },
+ })
+ assert.NoError(t, err)
+
+ monitoringEndpoint := mocks2.NewMonitoringEndpoint(t)
+ monitoringEndpoint.On("SendLog", requestBytes).Return()
+
+ monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t)
+ monitoringEndpointGen.
+ On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport).
+ Return(monitoringEndpoint)
+ reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, big.NewInt(100))
+
+ err = reporter.ReportNewHead(testutils.Context(t), &head)
+ assert.Errorf(t, err, "No finalized block was found for chain_id=100")
+}
+
+func Test_TelemetryReporter_NewHead_MissingEndpoint(t *testing.T) {
+ monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t)
+ monitoringEndpointGen.
+ On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport).
+ Return(nil)
+
+ reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, big.NewInt(100))
+
+ head := evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(100)}
+
+ err := reporter.ReportNewHead(testutils.Context(t), &head)
+ assert.Errorf(t, err, "No monitoring endpoint provided chain_id=100")
+}
diff --git a/core/services/synchronization/common.go b/core/services/synchronization/common.go
index 394830a76af..a6c0191e3a7 100644
--- a/core/services/synchronization/common.go
+++ b/core/services/synchronization/common.go
@@ -28,6 +28,7 @@ const (
OCR3CCIPCommit TelemetryType = "ocr3-ccip-commit"
OCR3CCIPExec TelemetryType = "ocr3-ccip-exec"
OCR3CCIPBootstrap TelemetryType = "ocr3-bootstrap"
+ HeadReport TelemetryType = "head-report"
)
type TelemPayload struct {
diff --git a/core/services/synchronization/telem/telem_head_report.pb.go b/core/services/synchronization/telem/telem_head_report.pb.go
new file mode 100644
index 00000000000..18e4532472b
--- /dev/null
+++ b/core/services/synchronization/telem/telem_head_report.pb.go
@@ -0,0 +1,255 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.33.0
+// protoc v4.25.1
+// source: core/services/synchronization/telem/telem_head_report.proto
+
+package telem
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type HeadReportRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Chain string `protobuf:"bytes,1,opt,name=chain,proto3" json:"chain,omitempty"`
+ Latest *Block `protobuf:"bytes,2,opt,name=latest,proto3" json:"latest,omitempty"`
+ Finalized *Block `protobuf:"bytes,3,opt,name=finalized,proto3,oneof" json:"finalized,omitempty"`
+}
+
+func (x *HeadReportRequest) Reset() {
+ *x = HeadReportRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HeadReportRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HeadReportRequest) ProtoMessage() {}
+
+func (x *HeadReportRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HeadReportRequest.ProtoReflect.Descriptor instead.
+func (*HeadReportRequest) Descriptor() ([]byte, []int) {
+ return file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *HeadReportRequest) GetChain() string {
+ if x != nil {
+ return x.Chain
+ }
+ return ""
+}
+
+func (x *HeadReportRequest) GetLatest() *Block {
+ if x != nil {
+ return x.Latest
+ }
+ return nil
+}
+
+func (x *HeadReportRequest) GetFinalized() *Block {
+ if x != nil {
+ return x.Finalized
+ }
+ return nil
+}
+
+type Block struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+ Number uint64 `protobuf:"varint,2,opt,name=number,proto3" json:"number,omitempty"`
+ Hash string `protobuf:"bytes,3,opt,name=hash,proto3" json:"hash,omitempty"`
+}
+
+func (x *Block) Reset() {
+ *x = Block{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Block) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Block) ProtoMessage() {}
+
+func (x *Block) ProtoReflect() protoreflect.Message {
+ mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Block.ProtoReflect.Descriptor instead.
+func (*Block) Descriptor() ([]byte, []int) {
+ return file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Block) GetTimestamp() uint64 {
+ if x != nil {
+ return x.Timestamp
+ }
+ return 0
+}
+
+func (x *Block) GetNumber() uint64 {
+ if x != nil {
+ return x.Number
+ }
+ return 0
+}
+
+func (x *Block) GetHash() string {
+ if x != nil {
+ return x.Hash
+ }
+ return ""
+}
+
+var File_core_services_synchronization_telem_telem_head_report_proto protoreflect.FileDescriptor
+
+var file_core_services_synchronization_telem_telem_head_report_proto_rawDesc = []byte{
+ 0x0a, 0x3b, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f,
+ 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f,
+ 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x68, 0x65, 0x61, 0x64,
+ 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74,
+ 0x65, 0x6c, 0x65, 0x6d, 0x22, 0x8e, 0x01, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70,
+ 0x6f, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x68,
+ 0x61, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x68, 0x61, 0x69, 0x6e,
+ 0x12, 0x24, 0x0a, 0x06, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x06,
+ 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69,
+ 0x7a, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65,
+ 0x6d, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x00, 0x52, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c,
+ 0x69, 0x7a, 0x65, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x66, 0x69, 0x6e, 0x61,
+ 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x22, 0x51, 0x0a, 0x05, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x1c,
+ 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06,
+ 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6e, 0x75,
+ 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68,
+ 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74,
+ 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e,
+ 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63,
+ 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69,
+ 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_core_services_synchronization_telem_telem_head_report_proto_rawDescOnce sync.Once
+ file_core_services_synchronization_telem_telem_head_report_proto_rawDescData = file_core_services_synchronization_telem_telem_head_report_proto_rawDesc
+)
+
+func file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP() []byte {
+ file_core_services_synchronization_telem_telem_head_report_proto_rawDescOnce.Do(func() {
+ file_core_services_synchronization_telem_telem_head_report_proto_rawDescData = protoimpl.X.CompressGZIP(file_core_services_synchronization_telem_telem_head_report_proto_rawDescData)
+ })
+ return file_core_services_synchronization_telem_telem_head_report_proto_rawDescData
+}
+
+var file_core_services_synchronization_telem_telem_head_report_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_core_services_synchronization_telem_telem_head_report_proto_goTypes = []interface{}{
+ (*HeadReportRequest)(nil), // 0: telem.HeadReportRequest
+ (*Block)(nil), // 1: telem.Block
+}
+var file_core_services_synchronization_telem_telem_head_report_proto_depIdxs = []int32{
+ 1, // 0: telem.HeadReportRequest.latest:type_name -> telem.Block
+ 1, // 1: telem.HeadReportRequest.finalized:type_name -> telem.Block
+ 2, // [2:2] is the sub-list for method output_type
+ 2, // [2:2] is the sub-list for method input_type
+ 2, // [2:2] is the sub-list for extension type_name
+ 2, // [2:2] is the sub-list for extension extendee
+ 0, // [0:2] is the sub-list for field type_name
+}
+
+func init() { file_core_services_synchronization_telem_telem_head_report_proto_init() }
+func file_core_services_synchronization_telem_telem_head_report_proto_init() {
+ if File_core_services_synchronization_telem_telem_head_report_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HeadReportRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Block); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0].OneofWrappers = []interface{}{}
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_core_services_synchronization_telem_telem_head_report_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_core_services_synchronization_telem_telem_head_report_proto_goTypes,
+ DependencyIndexes: file_core_services_synchronization_telem_telem_head_report_proto_depIdxs,
+ MessageInfos: file_core_services_synchronization_telem_telem_head_report_proto_msgTypes,
+ }.Build()
+ File_core_services_synchronization_telem_telem_head_report_proto = out.File
+ file_core_services_synchronization_telem_telem_head_report_proto_rawDesc = nil
+ file_core_services_synchronization_telem_telem_head_report_proto_goTypes = nil
+ file_core_services_synchronization_telem_telem_head_report_proto_depIdxs = nil
+}
diff --git a/core/services/synchronization/telem/telem_head_report.proto b/core/services/synchronization/telem/telem_head_report.proto
new file mode 100644
index 00000000000..6f4cf2ddae6
--- /dev/null
+++ b/core/services/synchronization/telem/telem_head_report.proto
@@ -0,0 +1,17 @@
+syntax = "proto3";
+
+option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem";
+
+package telem;
+
+message HeadReportRequest {
+ string chainID = 1;
+ Block latest = 2;
+ optional Block finalized = 3;
+}
+
+message Block {
+ uint64 timestamp = 1;
+ uint64 number = 2;
+ string hash = 3;
+}
diff --git a/core/services/telemetry/monitoring_endpoint_generator_mock.go b/core/services/telemetry/monitoring_endpoint_generator_mock.go
new file mode 100644
index 00000000000..a0fc503ecca
--- /dev/null
+++ b/core/services/telemetry/monitoring_endpoint_generator_mock.go
@@ -0,0 +1,88 @@
+// Code generated by mockery v2.43.2. DO NOT EDIT.
+
+package telemetry
+
+import (
+ commontypes "github.com/smartcontractkit/libocr/commontypes"
+ mock "github.com/stretchr/testify/mock"
+
+ synchronization "github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
+)
+
+// MockMonitoringEndpointGenerator is an autogenerated mock type for the MonitoringEndpointGenerator type
+type MockMonitoringEndpointGenerator struct {
+ mock.Mock
+}
+
+type MockMonitoringEndpointGenerator_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockMonitoringEndpointGenerator) EXPECT() *MockMonitoringEndpointGenerator_Expecter {
+ return &MockMonitoringEndpointGenerator_Expecter{mock: &_m.Mock}
+}
+
+// GenMonitoringEndpoint provides a mock function with given fields: network, chainID, contractID, telemType
+func (_m *MockMonitoringEndpointGenerator) GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) commontypes.MonitoringEndpoint {
+ ret := _m.Called(network, chainID, contractID, telemType)
+
+ if len(ret) == 0 {
+ panic("no return value specified for GenMonitoringEndpoint")
+ }
+
+ var r0 commontypes.MonitoringEndpoint
+ if rf, ok := ret.Get(0).(func(string, string, string, synchronization.TelemetryType) commontypes.MonitoringEndpoint); ok {
+ r0 = rf(network, chainID, contractID, telemType)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(commontypes.MonitoringEndpoint)
+ }
+ }
+
+ return r0
+}
+
+// MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GenMonitoringEndpoint'
+type MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call struct {
+ *mock.Call
+}
+
+// GenMonitoringEndpoint is a helper method to define mock.On call
+// - network string
+// - chainID string
+// - contractID string
+// - telemType synchronization.TelemetryType
+func (_e *MockMonitoringEndpointGenerator_Expecter) GenMonitoringEndpoint(network interface{}, chainID interface{}, contractID interface{}, telemType interface{}) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call {
+ return &MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call{Call: _e.mock.On("GenMonitoringEndpoint", network, chainID, contractID, telemType)}
+}
+
+func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) Run(run func(network string, chainID string, contractID string, telemType synchronization.TelemetryType)) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(string), args[1].(string), args[2].(string), args[3].(synchronization.TelemetryType))
+ })
+ return _c
+}
+
+func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) Return(_a0 commontypes.MonitoringEndpoint) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) RunAndReturn(run func(string, string, string, synchronization.TelemetryType) commontypes.MonitoringEndpoint) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// NewMockMonitoringEndpointGenerator creates a new instance of MockMonitoringEndpointGenerator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockMonitoringEndpointGenerator(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *MockMonitoringEndpointGenerator {
+ mock := &MockMonitoringEndpointGenerator{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/core/web/testdata/body/health.html b/core/web/testdata/body/health.html
index 90d301bc8b8..d2b6db906b4 100644
--- a/core/web/testdata/body/health.html
+++ b/core/web/testdata/body/health.html
@@ -72,6 +72,9 @@
+
+ HeadReporter
+
JobSpawner
@@ -99,9 +102,6 @@
BridgeCache
-
- PromReporter
-
TelemetryManager
diff --git a/core/web/testdata/body/health.json b/core/web/testdata/body/health.json
index 839428a5103..81ed7ff6d11 100644
--- a/core/web/testdata/body/health.json
+++ b/core/web/testdata/body/health.json
@@ -108,6 +108,15 @@
"output": ""
}
},
+ {
+ "type": "checks",
+ "id": "HeadReporter",
+ "attributes": {
+ "name": "HeadReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "JobSpawner",
@@ -171,15 +180,6 @@
"output": ""
}
},
- {
- "type": "checks",
- "id": "PromReporter",
- "attributes": {
- "name": "PromReporter",
- "status": "passing",
- "output": ""
- }
- },
{
"type": "checks",
"id": "TelemetryManager",
diff --git a/core/web/testdata/body/health.txt b/core/web/testdata/body/health.txt
index 3709b4e15f0..6b165d26d99 100644
--- a/core/web/testdata/body/health.txt
+++ b/core/web/testdata/body/health.txt
@@ -11,6 +11,7 @@ ok EVM.0.Txm.Broadcaster
ok EVM.0.Txm.Confirmer
ok EVM.0.Txm.Finalizer
ok EVM.0.Txm.WrappedEvmEstimator
+ok HeadReporter
ok JobSpawner
ok Mailbox.Monitor
ok Mercury.WSRPCPool
@@ -18,5 +19,4 @@ ok Mercury.WSRPCPool.CacheSet
ok PipelineORM
ok PipelineRunner
ok PipelineRunner.BridgeCache
-ok PromReporter
ok TelemetryManager
diff --git a/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml b/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml
index 88ca12975f6..860c0c158bf 100644
--- a/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml
+++ b/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml
@@ -19,6 +19,7 @@ MaxSize = '0b'
[WebServer]
AllowOrigins = '*'
HTTPPort = 6688
+HTTPWriteTimeout = '1m0s'
SecureCookies = false
[WebServer.RateLimit]
diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar
index 1dbf6b8eb96..777d3e5e126 100644
--- a/testdata/scripts/health/default.txtar
+++ b/testdata/scripts/health/default.txtar
@@ -31,6 +31,7 @@ fj293fbBnlQ!f9vNs
HTTPPort = $PORT
-- out.txt --
+ok HeadReporter
ok JobSpawner
ok Mailbox.Monitor
ok Mercury.WSRPCPool
@@ -38,12 +39,20 @@ ok Mercury.WSRPCPool.CacheSet
ok PipelineORM
ok PipelineRunner
ok PipelineRunner.BridgeCache
-ok PromReporter
ok TelemetryManager
-- out.json --
{
"data": [
+ {
+ "type": "checks",
+ "id": "HeadReporter",
+ "attributes": {
+ "name": "HeadReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "JobSpawner",
@@ -107,15 +116,6 @@ ok TelemetryManager
"output": ""
}
},
- {
- "type": "checks",
- "id": "PromReporter",
- "attributes": {
- "name": "PromReporter",
- "status": "passing",
- "output": ""
- }
- },
{
"type": "checks",
"id": "TelemetryManager",
diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar
index 76937329cb8..bba3b3e111f 100644
--- a/testdata/scripts/health/multi-chain.txtar
+++ b/testdata/scripts/health/multi-chain.txtar
@@ -84,6 +84,7 @@ ok EVM.1.Txm.Broadcaster
ok EVM.1.Txm.Confirmer
ok EVM.1.Txm.Finalizer
ok EVM.1.Txm.WrappedEvmEstimator
+ok HeadReporter
ok JobSpawner
ok Mailbox.Monitor
ok Mercury.WSRPCPool
@@ -91,7 +92,6 @@ ok Mercury.WSRPCPool.CacheSet
ok PipelineORM
ok PipelineRunner
ok PipelineRunner.BridgeCache
-ok PromReporter
ok Solana.Bar
ok StarkNet.Baz
ok TelemetryManager
@@ -238,6 +238,15 @@ ok TelemetryManager
"output": ""
}
},
+ {
+ "type": "checks",
+ "id": "HeadReporter",
+ "attributes": {
+ "name": "HeadReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "JobSpawner",
@@ -301,15 +310,6 @@ ok TelemetryManager
"output": ""
}
},
- {
- "type": "checks",
- "id": "PromReporter",
- "attributes": {
- "name": "PromReporter",
- "status": "passing",
- "output": ""
- }
- },
{
"type": "checks",
"id": "Solana.Bar",