Skip to content

Commit

Permalink
telemetry reporter test
Browse files Browse the repository at this point in the history
  • Loading branch information
bukata-sa committed Jun 26, 2024
1 parent a2208d4 commit b5e7e07
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 145 deletions.
34 changes: 30 additions & 4 deletions core/internal/mocks/head_reporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions core/services/headreporter/head_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
//go:generate mockery --quiet --name HeadReporter --output ../../internal/mocks/ --case=underscore
type (
HeadReporter interface {
ReportNewHead(ctx context.Context, head *evmtypes.Head)
ReportPeriodic(ctx context.Context)
ReportNewHead(ctx context.Context, head *evmtypes.Head) error
ReportPeriodic(ctx context.Context) error
}

HeadReporterService struct {
Expand All @@ -39,7 +39,7 @@ type (

func NewHeadReporterService(config config.HeadReport, ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, opts ...interface{}) *HeadReporterService {
reporters := make([]HeadReporter, 2)
reporters = append(reporters, NewPrometheusReporter(ds, chainContainer, lggr, opts))
reporters = append(reporters, NewPrometheusReporter(ds, chainContainer, opts))
if config.TelemetryEnabled() {
reporters = append(reporters, NewTelemetryReporter(chainContainer, lggr, monitoringEndpointGen))
}
Expand Down Expand Up @@ -100,6 +100,7 @@ func (hrd *HeadReporterService) eventLoop() {
defer hrd.wgDone.Done()
ctx, cancel := hrd.chStop.NewCtx()
defer cancel()
after := time.After(hrd.reportPeriod)
for {
select {
case <-hrd.newHeads.Notify():
Expand All @@ -108,12 +109,19 @@ func (hrd *HeadReporterService) eventLoop() {
continue
}
for _, reporter := range hrd.reporters {
reporter.ReportNewHead(ctx, head)
err := reporter.ReportNewHead(ctx, head)
if err != nil && ctx.Err() == nil {
hrd.lggr.Errorw("Error reporting new head", "err", err)
}
}
case <-time.After(hrd.reportPeriod):
case <-after:
for _, reporter := range hrd.reporters {
reporter.ReportPeriodic(ctx)
err := reporter.ReportPeriodic(ctx)
if err != nil && ctx.Err() == nil {
hrd.lggr.Errorw("Error in periodic report", "err", err)
}
}
after = time.After(hrd.reportPeriod)
case <-hrd.chStop:
return
}
Expand Down
49 changes: 49 additions & 0 deletions core/services/headreporter/head_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ import (
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -15,6 +26,44 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
)

func newHead() evmtypes.Head {
return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)}
}

func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer {
config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
keyStore := cltest.NewKeyStore(t, db).Eth()
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator())
lggr := logger.TestLogger(t)
lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, lpOpts)

txm, err := txmgr.NewTxm(
db,
evmConfig,
evmConfig.GasEstimator(),
evmConfig.Transactions(),
nil,
dbConfig,
dbConfig.Listener(),
ethClient,
lggr,
lp,
keyStore,
estimator)
require.NoError(t, err)

cfg := configtest.NewGeneralConfig(t, nil)
return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm)
}

func Test_HeadReporterService(t *testing.T) {
t.Run("report everything", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
Expand Down
19 changes: 5 additions & 14 deletions core/services/headreporter/prometheus_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type (
prometheusReporter struct {
ds sqlutil.DataSource
chains legacyevm.LegacyChainContainer
lggr logger.Logger
backend PrometheusBackend
}

Expand Down Expand Up @@ -61,7 +59,7 @@ var (
})
)

func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *prometheusReporter {
func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, opts ...interface{}) HeadReporter {
var backend PrometheusBackend = defaultBackend{}
for _, opt := range opts {
switch v := opt.(type) {
Expand All @@ -72,7 +70,6 @@ func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.Legac
return &prometheusReporter{
ds: ds,
chains: chainContainer,
lggr: lggr.Named("PrometheusReporter"),
backend: backend,
}
}
Expand All @@ -85,17 +82,13 @@ func (pr *prometheusReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, erro
return chain.TxManager(), nil
}

func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) {
func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
evmChainID := head.EVMChainID.ToInt()
err := multierr.Combine(
return multierr.Combine(
errors.Wrap(pr.reportPendingEthTxes(ctx, evmChainID), "reportPendingEthTxes failed"),
errors.Wrap(pr.reportMaxUnconfirmedAge(ctx, evmChainID), "reportMaxUnconfirmedAge failed"),
errors.Wrap(pr.reportMaxUnconfirmedBlocks(ctx, head), "reportMaxUnconfirmedBlocks failed"),
)

if err != nil && ctx.Err() == nil {
pr.lggr.Errorw("Error reporting prometheus metrics", "err", err)
}
}

func (pr *prometheusReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) {
Expand Down Expand Up @@ -150,10 +143,8 @@ func (pr *prometheusReporter) reportMaxUnconfirmedBlocks(ctx context.Context, he
return nil
}

func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) {
if err := errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed"); err != nil {
pr.lggr.Errorw("Error reporting prometheus metrics", "err", err)
}
func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) error {
return errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed")
}

func (pr *prometheusReporter) reportPipelineRunStats(ctx context.Context) (err error) {
Expand Down
55 changes: 3 additions & 52 deletions core/services/headreporter/prometheus_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,23 @@ package headreporter_test
import (
"math/big"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
)

func newHead() evmtypes.Head {
return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)}
}

func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer {
config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
keyStore := cltest.NewKeyStore(t, db).Eth()
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator())
lggr := logger.TestLogger(t)
lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, lpOpts)

txm, err := txmgr.NewTxm(
db,
evmConfig,
evmConfig.GasEstimator(),
evmConfig.Transactions(),
nil,
dbConfig,
dbConfig.Listener(),
ethClient,
lggr,
lp,
keyStore,
estimator)
require.NoError(t, err)

cfg := configtest.NewGeneralConfig(t, nil)
return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm)
}

func Test_PrometheusReporter(t *testing.T) {
t.Run("with nothing in the database", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)

backend := mocks.NewPrometheusBackend(t)
reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend)
reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend)

backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return()
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
Expand Down Expand Up @@ -100,7 +51,7 @@ func Test_PrometheusReporter(t *testing.T) {
})).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(35)).Return()

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend)
reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend)

head := newHead()
reporter.ReportNewHead(testutils.Context(t), &head)
Expand All @@ -124,7 +75,7 @@ func Test_PrometheusReporter(t *testing.T) {
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return()

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend)
reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db), backend)

head := newHead()
reporter.ReportNewHead(testutils.Context(t), &head)
Expand Down
50 changes: 27 additions & 23 deletions core/services/headreporter/telemetry_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package headreporter
import (
"context"

"github.com/pkg/errors"

"github.com/smartcontractkit/libocr/commontypes"
"google.golang.org/protobuf/proto"

Expand All @@ -16,7 +18,6 @@ import (

type (
telemetryReporter struct {
logger logger.Logger
endpoints map[uint64]commontypes.MonitoringEndpoint
}
)
Expand All @@ -26,40 +27,43 @@ func NewTelemetryReporter(chainContainer legacyevm.LegacyChainContainer, lggr lo
for _, chain := range chainContainer.Slice() {
endpoints[chain.ID().Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), "", synchronization.HeadReport)
}
return &telemetryReporter{
logger: lggr.Named("TelemetryReporter"),
endpoints: endpoints,
}
return &telemetryReporter{endpoints: endpoints}
}

func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) {
func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()]
var lastFinalized *telem.Block
lastFinalizedHead := head.LatestFinalizedHead()
if lastFinalizedHead != nil {
lastFinalized = &telem.Block{
Timestamp: uint64(lastFinalizedHead.GetTimestamp().UTC().Unix()),
BlockNumber: uint64(lastFinalizedHead.BlockNumber()),
BlockHash: lastFinalizedHead.BlockHash().Hex(),
if monitoringEndpoint == nil {
return errors.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64())
}
var finalized *telem.Block
latestFinalizedHead := head.LatestFinalizedHead()
if latestFinalizedHead != nil {
finalized = &telem.Block{
Timestamp: uint64(latestFinalizedHead.GetTimestamp().UTC().Unix()),
Number: uint64(latestFinalizedHead.BlockNumber()),
Hash: latestFinalizedHead.BlockHash().Hex(),
}
}
request := &telem.HeadReportRequest{
ChainId: head.EVMChainID.String(),
Current: &telem.Block{
Timestamp: uint64(head.Timestamp.UTC().Unix()),
BlockNumber: uint64(head.Number),
BlockHash: head.Hash.Hex(),
ChainId: head.EVMChainID.ToInt().Uint64(),
Latest: &telem.Block{
Timestamp: uint64(head.Timestamp.UTC().Unix()),
Number: uint64(head.Number),
Hash: head.Hash.Hex(),
},
LastFinalized: lastFinalized,
Finalized: finalized,
}
bytes, err := proto.Marshal(request)
if err != nil {
t.logger.Warnw("telem.HeadReportRequest marshal error", "err", err)
return
return errors.WithMessage(err, "telem.HeadReportRequest marshal error")
}
monitoringEndpoint.SendLog(bytes)
if finalized == nil {
return errors.Errorf("No finalized block was found for chain_id=%d", head.EVMChainID.Int64())
}
return nil
}

func (t *telemetryReporter) ReportPeriodic(ctx context.Context) {
//do nothing
func (t *telemetryReporter) ReportPeriodic(ctx context.Context) error {
return nil
}
Loading

0 comments on commit b5e7e07

Please sign in to comment.