From ae3dda4e73fc8648b9435d9925897bab815d2918 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Tue, 25 Jun 2024 14:23:12 +0100 Subject: [PATCH] telemetry reporter test --- core/internal/mocks/head_reporter.go | 34 +++- core/services/headreporter/head_reporter.go | 20 ++- .../headreporter/head_reporter_test.go | 49 ++++++ .../headreporter/prometheus_reporter.go | 19 +-- .../headreporter/prometheus_reporter_test.go | 55 +------ .../headreporter/telemetry_reporter.go | 50 +++--- .../headreporter/telemetry_reporter_test.go | 146 ++++++++++++++++++ .../telem/telem_head_report.pb.go | 83 +++++----- .../telem/telem_head_report.proto | 10 +- core/services/telemetry/common.go | 1 + .../mocks/monitoring_endpoint_generator.go | 49 ++++++ 11 files changed, 371 insertions(+), 145 deletions(-) create mode 100644 core/services/headreporter/telemetry_reporter_test.go create mode 100644 core/services/telemetry/mocks/monitoring_endpoint_generator.go diff --git a/core/internal/mocks/head_reporter.go b/core/internal/mocks/head_reporter.go index fbaf71a1574..3f007d71368 100644 --- a/core/internal/mocks/head_reporter.go +++ b/core/internal/mocks/head_reporter.go @@ -16,13 +16,39 @@ type HeadReporter struct { } // ReportNewHead provides a mock function with given fields: ctx, head -func (_m *HeadReporter) ReportNewHead(ctx context.Context, head *types.Head) { - _m.Called(ctx, head) +func (_m *HeadReporter) ReportNewHead(ctx context.Context, head *types.Head) error { + ret := _m.Called(ctx, head) + + if len(ret) == 0 { + panic("no return value specified for ReportNewHead") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *types.Head) error); ok { + r0 = rf(ctx, head) + } else { + r0 = ret.Error(0) + } + + return r0 } // ReportPeriodic provides a mock function with given fields: ctx -func (_m *HeadReporter) ReportPeriodic(ctx context.Context) { - _m.Called(ctx) +func (_m *HeadReporter) ReportPeriodic(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ReportPeriodic") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 } // NewHeadReporter creates a new instance of HeadReporter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. diff --git a/core/services/headreporter/head_reporter.go b/core/services/headreporter/head_reporter.go index b7eef5203b6..b73adce7da0 100644 --- a/core/services/headreporter/head_reporter.go +++ b/core/services/headreporter/head_reporter.go @@ -20,8 +20,8 @@ import ( //go:generate mockery --quiet --name HeadReporter --output ../../internal/mocks/ --case=underscore type ( HeadReporter interface { - ReportNewHead(ctx context.Context, head *evmtypes.Head) - ReportPeriodic(ctx context.Context) + ReportNewHead(ctx context.Context, head *evmtypes.Head) error + ReportPeriodic(ctx context.Context) error } HeadReporterService struct { @@ -39,7 +39,7 @@ type ( func NewHeadReporterService(config config.HeadReport, ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, opts ...interface{}) *HeadReporterService { reporters := make([]HeadReporter, 2) - reporters = append(reporters, NewPrometheusReporter(ds, chainContainer, lggr, opts)) + reporters = append(reporters, NewPrometheusReporter(ds, chainContainer, opts)) if config.TelemetryEnabled() { reporters = append(reporters, NewTelemetryReporter(chainContainer, lggr, monitoringEndpointGen)) } @@ -100,6 +100,7 @@ func (hrd *HeadReporterService) eventLoop() { defer hrd.wgDone.Done() ctx, cancel := hrd.chStop.NewCtx() defer cancel() + after := time.After(hrd.reportPeriod) for { select { case <-hrd.newHeads.Notify(): @@ -108,12 +109,19 @@ func (hrd *HeadReporterService) eventLoop() { continue } for _, reporter := range hrd.reporters { - reporter.ReportNewHead(ctx, head) + err := reporter.ReportNewHead(ctx, head) + if err != nil && ctx.Err() == nil { + hrd.lggr.Errorw("Error reporting new head", "err", err) + } } - case <-time.After(hrd.reportPeriod): + case <-after: for _, reporter := range hrd.reporters { - reporter.ReportPeriodic(ctx) + err := reporter.ReportPeriodic(ctx) + if err != nil && ctx.Err() == nil { + hrd.lggr.Errorw("Error in periodic report", "err", err) + } } + after = time.After(hrd.reportPeriod) case <-hrd.chStop: return } diff --git a/core/services/headreporter/head_reporter_test.go b/core/services/headreporter/head_reporter_test.go index e24c98afc54..27f81a47e50 100644 --- a/core/services/headreporter/head_reporter_test.go +++ b/core/services/headreporter/head_reporter_test.go @@ -5,6 +5,17 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,6 +26,44 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" ) +func newHead() evmtypes.Head { + return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)} +} + +func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { + config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) + keyStore := cltest.NewKeyStore(t, db).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) + lggr := logger.TestLogger(t) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, lpOpts) + + txm, err := txmgr.NewTxm( + db, + evmConfig, + evmConfig.GasEstimator(), + evmConfig.Transactions(), + nil, + dbConfig, + dbConfig.Listener(), + ethClient, + lggr, + lp, + keyStore, + estimator) + require.NoError(t, err) + + cfg := configtest.NewGeneralConfig(t, nil) + return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) +} + func Test_HeadReporterService(t *testing.T) { t.Run("report everything", func(t *testing.T) { db := pgtest.NewSqlxDB(t) diff --git a/core/services/headreporter/prometheus_reporter.go b/core/services/headreporter/prometheus_reporter.go index 5655461bf64..db4c26f2225 100644 --- a/core/services/headreporter/prometheus_reporter.go +++ b/core/services/headreporter/prometheus_reporter.go @@ -16,14 +16,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" - "github.com/smartcontractkit/chainlink/v2/core/logger" ) type ( prometheusReporter struct { ds sqlutil.DataSource chains legacyevm.LegacyChainContainer - lggr logger.Logger backend PrometheusBackend } @@ -61,7 +59,7 @@ var ( }) ) -func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *prometheusReporter { +func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, opts ...interface{}) HeadReporter { var backend PrometheusBackend = defaultBackend{} for _, opt := range opts { switch v := opt.(type) { @@ -72,7 +70,6 @@ func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.Legac return &prometheusReporter{ ds: ds, chains: chainContainer, - lggr: lggr.Named("PrometheusReporter"), backend: backend, } } @@ -85,17 +82,13 @@ func (pr *prometheusReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, erro return chain.TxManager(), nil } -func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) { +func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error { evmChainID := head.EVMChainID.ToInt() - err := multierr.Combine( + return multierr.Combine( errors.Wrap(pr.reportPendingEthTxes(ctx, evmChainID), "reportPendingEthTxes failed"), errors.Wrap(pr.reportMaxUnconfirmedAge(ctx, evmChainID), "reportMaxUnconfirmedAge failed"), errors.Wrap(pr.reportMaxUnconfirmedBlocks(ctx, head), "reportMaxUnconfirmedBlocks failed"), ) - - if err != nil && ctx.Err() == nil { - pr.lggr.Errorw("Error reporting prometheus metrics", "err", err) - } } func (pr *prometheusReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { @@ -150,10 +143,8 @@ func (pr *prometheusReporter) reportMaxUnconfirmedBlocks(ctx context.Context, he return nil } -func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) { - if err := errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed"); err != nil { - pr.lggr.Errorw("Error reporting prometheus metrics", "err", err) - } +func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) error { + return errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed") } func (pr *prometheusReporter) reportPipelineRunStats(ctx context.Context) (err error) { diff --git a/core/services/headreporter/prometheus_reporter_test.go b/core/services/headreporter/prometheus_reporter_test.go index b276fdc1344..097bf1812f2 100644 --- a/core/services/headreporter/prometheus_reporter_test.go +++ b/core/services/headreporter/prometheus_reporter_test.go @@ -3,72 +3,23 @@ package headreporter_test import ( "math/big" "testing" - "time" - "github.com/jmoiron/sqlx" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" ) -func newHead() evmtypes.Head { - return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)} -} - -func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { - config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) - keyStore := cltest.NewKeyStore(t, db).Eth() - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - lggr := logger.TestLogger(t) - lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, - KeepFinalizedBlocksDepth: 1000, - } - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, lpOpts) - - txm, err := txmgr.NewTxm( - db, - evmConfig, - evmConfig.GasEstimator(), - evmConfig.Transactions(), - nil, - dbConfig, - dbConfig.Listener(), - ethClient, - lggr, - lp, - keyStore, - estimator) - require.NoError(t, err) - - cfg := configtest.NewGeneralConfig(t, nil) - return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) -} - func Test_PrometheusReporter(t *testing.T) { t.Run("with nothing in the database", func(t *testing.T) { db := pgtest.NewSqlxDB(t) backend := mocks.NewPrometheusBackend(t) - reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend) backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return() backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return() @@ -100,7 +51,7 @@ func Test_PrometheusReporter(t *testing.T) { })).Return() backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(35)).Return() - reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend) head := newHead() reporter.ReportNewHead(testutils.Context(t), &head) @@ -124,7 +75,7 @@ func Test_PrometheusReporter(t *testing.T) { backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return() backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return() - reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend) head := newHead() reporter.ReportNewHead(testutils.Context(t), &head) diff --git a/core/services/headreporter/telemetry_reporter.go b/core/services/headreporter/telemetry_reporter.go index a6cd417f44a..084b66b09eb 100644 --- a/core/services/headreporter/telemetry_reporter.go +++ b/core/services/headreporter/telemetry_reporter.go @@ -3,6 +3,8 @@ package headreporter import ( "context" + "github.com/pkg/errors" + "github.com/smartcontractkit/libocr/commontypes" "google.golang.org/protobuf/proto" @@ -16,7 +18,6 @@ import ( type ( telemetryReporter struct { - logger logger.Logger endpoints map[uint64]commontypes.MonitoringEndpoint } ) @@ -26,40 +27,43 @@ func NewTelemetryReporter(chainContainer legacyevm.LegacyChainContainer, lggr lo for _, chain := range chainContainer.Slice() { endpoints[chain.ID().Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), "", synchronization.HeadReport) } - return &telemetryReporter{ - logger: lggr.Named("TelemetryReporter"), - endpoints: endpoints, - } + return &telemetryReporter{endpoints: endpoints} } -func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) { +func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error { monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()] - var lastFinalized *telem.Block - lastFinalizedHead := head.LatestFinalizedHead() - if lastFinalizedHead != nil { - lastFinalized = &telem.Block{ - Timestamp: uint64(lastFinalizedHead.GetTimestamp().UTC().Unix()), - BlockNumber: uint64(lastFinalizedHead.BlockNumber()), - BlockHash: lastFinalizedHead.BlockHash().Hex(), + if monitoringEndpoint == nil { + return errors.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64()) + } + var finalized *telem.Block + latestFinalizedHead := head.LatestFinalizedHead() + if latestFinalizedHead != nil { + finalized = &telem.Block{ + Timestamp: uint64(latestFinalizedHead.GetTimestamp().UTC().Unix()), + Number: uint64(latestFinalizedHead.BlockNumber()), + Hash: latestFinalizedHead.BlockHash().Hex(), } } request := &telem.HeadReportRequest{ - ChainId: head.EVMChainID.String(), - Current: &telem.Block{ - Timestamp: uint64(head.Timestamp.UTC().Unix()), - BlockNumber: uint64(head.Number), - BlockHash: head.Hash.Hex(), + ChainId: head.EVMChainID.ToInt().Uint64(), + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: uint64(head.Number), + Hash: head.Hash.Hex(), }, - LastFinalized: lastFinalized, + Finalized: finalized, } bytes, err := proto.Marshal(request) if err != nil { - t.logger.Warnw("telem.HeadReportRequest marshal error", "err", err) - return + return errors.WithMessage(err, "telem.HeadReportRequest marshal error") } monitoringEndpoint.SendLog(bytes) + if finalized == nil { + return errors.Errorf("No finalized block was found for chain_id=%d", head.EVMChainID.Int64()) + } + return nil } -func (t *telemetryReporter) ReportPeriodic(ctx context.Context) { - //do nothing +func (t *telemetryReporter) ReportPeriodic(ctx context.Context) error { + return nil } diff --git a/core/services/headreporter/telemetry_reporter_test.go b/core/services/headreporter/telemetry_reporter_test.go new file mode 100644 index 00000000000..2f5cd3e1f72 --- /dev/null +++ b/core/services/headreporter/telemetry_reporter_test.go @@ -0,0 +1,146 @@ +package headreporter_test + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + mocks2 "github.com/smartcontractkit/chainlink/v2/core/services/telemetry/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/protobuf/proto" +) + +type IngressAgent struct { + mock.Mock +} + +func (t *IngressAgent) SendLog(telemetry []byte) { + _ = t.Called(telemetry) +} + +func NewIngressAgent(t interface { + mock.TestingT + Cleanup(func()) +}) *IngressAgent { + m := &IngressAgent{} + m.Mock.Test(t) + + t.Cleanup(func() { m.AssertExpectations(t) }) + + return m +} + +func Test_TelemetryReporter_NewHead(t *testing.T) { + chain := mocks.NewChain(t) + chain.On("ID").Return(big.NewInt(100)) + + chains := legacyevm.NewLegacyChains(map[string]legacyevm.Chain{"100": chain}, nil) + + ingressAgent := NewIngressAgent(t) + + monitoringEndpointGen := mocks2.NewMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(ingressAgent) + reporter := headreporter.NewTelemetryReporter(chains, logger.TestLogger(t), monitoringEndpointGen) + + head := evmtypes.Head{ + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), + IsFinalized: false, + Parent: &evmtypes.Head{ + Number: 41, + Hash: common.HexToHash("0x1009"), + Timestamp: time.UnixMilli(999), + IsFinalized: true, + }, + } + requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ + ChainId: 100, + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: 42, + Hash: head.Hash.Hex(), + }, + Finalized: &telem.Block{ + Timestamp: uint64(head.Parent.Timestamp.UTC().Unix()), + Number: 41, + Hash: head.Parent.Hash.Hex(), + }, + }) + assert.NoError(t, err) + + ingressAgent.On("SendLog", requestBytes).Return() + + err = reporter.ReportNewHead(testutils.Context(t), &head) + assert.NoError(t, err) +} + +func Test_TelemetryReporter_NewHeadMissingFinalized(t *testing.T) { + chain := mocks.NewChain(t) + chain.On("ID").Return(big.NewInt(100)) + + chains := legacyevm.NewLegacyChains(map[string]legacyevm.Chain{"100": chain}, nil) + + ingressAgent := NewIngressAgent(t) + + monitoringEndpointGen := mocks2.NewMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(ingressAgent) + reporter := headreporter.NewTelemetryReporter(chains, logger.TestLogger(t), monitoringEndpointGen) + + head := evmtypes.Head{ + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), + IsFinalized: false, + } + requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ + ChainId: 100, + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: 42, + Hash: head.Hash.Hex(), + }, + }) + assert.NoError(t, err) + + ingressAgent.On("SendLog", requestBytes).Return() + + err = reporter.ReportNewHead(testutils.Context(t), &head) + assert.Errorf(t, err, "No finalized block was found for chain_id=100") +} + +func Test_TelemetryReporter_NewHead_MissingEndpoint(t *testing.T) { + chain := mocks.NewChain(t) + chain.On("ID").Return(big.NewInt(100)) + + chains := legacyevm.NewLegacyChains(map[string]legacyevm.Chain{"100": chain}, nil) + + monitoringEndpointGen := mocks2.NewMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(nil) + + reporter := headreporter.NewTelemetryReporter(chains, logger.TestLogger(t), monitoringEndpointGen) + + head := evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(100)} + + err := reporter.ReportNewHead(testutils.Context(t), &head) + assert.Errorf(t, err, "No monitoring endpoint provided chain_id=100") +} diff --git a/core/services/synchronization/telem/telem_head_report.pb.go b/core/services/synchronization/telem/telem_head_report.pb.go index 849012ff3d9..aad06298ba1 100644 --- a/core/services/synchronization/telem/telem_head_report.pb.go +++ b/core/services/synchronization/telem/telem_head_report.pb.go @@ -7,10 +7,11 @@ package telem import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( @@ -25,9 +26,9 @@ type HeadReportRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ChainId string `protobuf:"bytes,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` - Current *Block `protobuf:"bytes,2,opt,name=current,proto3" json:"current,omitempty"` - LastFinalized *Block `protobuf:"bytes,3,opt,name=last_finalized,json=lastFinalized,proto3" json:"last_finalized,omitempty"` + ChainId uint64 `protobuf:"varint,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + Latest *Block `protobuf:"bytes,2,opt,name=latest,proto3" json:"latest,omitempty"` + Finalized *Block `protobuf:"bytes,3,opt,name=finalized,proto3,oneof" json:"finalized,omitempty"` } func (x *HeadReportRequest) Reset() { @@ -62,23 +63,23 @@ func (*HeadReportRequest) Descriptor() ([]byte, []int) { return file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP(), []int{0} } -func (x *HeadReportRequest) GetChainId() string { +func (x *HeadReportRequest) GetChainId() uint64 { if x != nil { return x.ChainId } - return "" + return 0 } -func (x *HeadReportRequest) GetCurrent() *Block { +func (x *HeadReportRequest) GetLatest() *Block { if x != nil { - return x.Current + return x.Latest } return nil } -func (x *HeadReportRequest) GetLastFinalized() *Block { +func (x *HeadReportRequest) GetFinalized() *Block { if x != nil { - return x.LastFinalized + return x.Finalized } return nil } @@ -88,9 +89,9 @@ type Block struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - BlockNumber uint64 `protobuf:"varint,2,opt,name=block_number,json=blockNumber,proto3" json:"block_number,omitempty"` - BlockHash string `protobuf:"bytes,3,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Number uint64 `protobuf:"varint,2,opt,name=number,proto3" json:"number,omitempty"` + Hash string `protobuf:"bytes,3,opt,name=hash,proto3" json:"hash,omitempty"` } func (x *Block) Reset() { @@ -132,16 +133,16 @@ func (x *Block) GetTimestamp() uint64 { return 0 } -func (x *Block) GetBlockNumber() uint64 { +func (x *Block) GetNumber() uint64 { if x != nil { - return x.BlockNumber + return x.Number } return 0 } -func (x *Block) GetBlockHash() string { +func (x *Block) GetHash() string { if x != nil { - return x.BlockHash + return x.Hash } return "" } @@ -153,28 +154,27 @@ var file_core_services_synchronization_telem_telem_head_report_proto_rawDesc = [ 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, - 0x65, 0x6c, 0x65, 0x6d, 0x22, 0x8b, 0x01, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, + 0x65, 0x6c, 0x65, 0x6d, 0x22, 0x93, 0x01, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x07, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x12, 0x33, 0x0a, - 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, - 0x6f, 0x63, 0x6b, 0x52, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x46, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, - 0x65, 0x64, 0x22, 0x67, 0x0a, 0x05, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x21, 0x0a, 0x0c, 0x62, 0x6c, 0x6f, - 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x0b, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x0a, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x42, 0x4e, 0x5a, 0x4c, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, - 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x63, 0x68, + 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x06, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x52, 0x06, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x09, 0x66, + 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, + 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x00, 0x52, 0x09, + 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, + 0x5f, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x22, 0x51, 0x0a, 0x05, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, + 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x42, 0x4e, 0x5a, + 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, + 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, + 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -195,8 +195,8 @@ var file_core_services_synchronization_telem_telem_head_report_proto_goTypes = [ (*Block)(nil), // 1: telem.Block } var file_core_services_synchronization_telem_telem_head_report_proto_depIdxs = []int32{ - 1, // 0: telem.HeadReportRequest.current:type_name -> telem.Block - 1, // 1: telem.HeadReportRequest.last_finalized:type_name -> telem.Block + 1, // 0: telem.HeadReportRequest.latest:type_name -> telem.Block + 1, // 1: telem.HeadReportRequest.finalized:type_name -> telem.Block 2, // [2:2] is the sub-list for method output_type 2, // [2:2] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name @@ -235,6 +235,7 @@ func file_core_services_synchronization_telem_telem_head_report_proto_init() { } } } + file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/core/services/synchronization/telem/telem_head_report.proto b/core/services/synchronization/telem/telem_head_report.proto index bacf6b57bec..4249ec7a91a 100644 --- a/core/services/synchronization/telem/telem_head_report.proto +++ b/core/services/synchronization/telem/telem_head_report.proto @@ -5,13 +5,13 @@ option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/sync package telem; message HeadReportRequest { - string chain_id = 1; - Block current = 2; - Block last_finalized = 3; + uint64 chain_id = 1; + Block latest = 2; + optional Block finalized = 3; } message Block { uint64 timestamp = 1; - uint64 block_number = 2; - string block_hash = 3; + uint64 number = 2; + string hash = 3; } diff --git a/core/services/telemetry/common.go b/core/services/telemetry/common.go index 37a92f16c6d..1ccd9674589 100644 --- a/core/services/telemetry/common.go +++ b/core/services/telemetry/common.go @@ -6,6 +6,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" ) +//go:generate mockery --quiet --name MonitoringEndpointGenerator --output ./mocks --case=underscore type MonitoringEndpointGenerator interface { GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint } diff --git a/core/services/telemetry/mocks/monitoring_endpoint_generator.go b/core/services/telemetry/mocks/monitoring_endpoint_generator.go new file mode 100644 index 00000000000..c7903425dfa --- /dev/null +++ b/core/services/telemetry/mocks/monitoring_endpoint_generator.go @@ -0,0 +1,49 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package mocks + +import ( + commontypes "github.com/smartcontractkit/libocr/commontypes" + mock "github.com/stretchr/testify/mock" + + synchronization "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" +) + +// MonitoringEndpointGenerator is an autogenerated mock type for the MonitoringEndpointGenerator type +type MonitoringEndpointGenerator struct { + mock.Mock +} + +// GenMonitoringEndpoint provides a mock function with given fields: network, chainID, contractID, telemType +func (_m *MonitoringEndpointGenerator) GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) commontypes.MonitoringEndpoint { + ret := _m.Called(network, chainID, contractID, telemType) + + if len(ret) == 0 { + panic("no return value specified for GenMonitoringEndpoint") + } + + var r0 commontypes.MonitoringEndpoint + if rf, ok := ret.Get(0).(func(string, string, string, synchronization.TelemetryType) commontypes.MonitoringEndpoint); ok { + r0 = rf(network, chainID, contractID, telemType) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(commontypes.MonitoringEndpoint) + } + } + + return r0 +} + +// NewMonitoringEndpointGenerator creates a new instance of MonitoringEndpointGenerator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMonitoringEndpointGenerator(t interface { + mock.TestingT + Cleanup(func()) +}) *MonitoringEndpointGenerator { + mock := &MonitoringEndpointGenerator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}