Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bukata-sa committed Jul 8, 2024
1 parent 04be8a5 commit 3c81271
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 61 deletions.
1 change: 1 addition & 0 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type AppConfig interface {
Pyroscope() Pyroscope
Sentry() Sentry
TelemetryIngress() TelemetryIngress
HeadReport() HeadReport
Threshold() Threshold
WebServer() WebServer
Tracing() Tracing
Expand Down
5 changes: 5 additions & 0 deletions core/config/head_report_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package config

type HeadReport interface {
TelemetryEnabled() bool
}
12 changes: 12 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Core struct {
Tracing Tracing `toml:",omitempty"`
Mercury Mercury `toml:",omitempty"`
Capabilities Capabilities `toml:",omitempty"`
HeadReport HeadReport `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand All @@ -76,6 +77,7 @@ func (c *Core) SetFrom(f *Core) {
c.TelemetryIngress.setFrom(&f.TelemetryIngress)
c.AuditLogger.SetFrom(&f.AuditLogger)
c.Log.setFrom(&f.Log)
c.HeadReport.setFrom(&f.HeadReport)

c.WebServer.setFrom(&f.WebServer)
c.JobPipeline.setFrom(&f.JobPipeline)
Expand Down Expand Up @@ -485,6 +487,16 @@ func (t *TelemetryIngress) setFrom(f *TelemetryIngress) {
}
}

type HeadReport struct {
TelemetryEnabled *bool
}

func (t *HeadReport) setFrom(f *HeadReport) {
if v := f.TelemetryEnabled; v != nil {
t.TelemetryEnabled = v
}
}

type AuditLogger struct {
Enabled *bool
ForwardToUrl *commonconfig.URL
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

srvcs = append(srvcs, mailMon)
srvcs = append(srvcs, relayerChainInterops.Services()...)
headReporter := headreporter.NewHeadReporterService(opts.DS, legacyEVMChains, globalLogger, telemetryManager)
headReporter := headreporter.NewHeadReporterService(cfg.HeadReport(), opts.DS, legacyEVMChains, globalLogger, telemetryManager)
srvcs = append(srvcs, headReporter)

// Initialize Local Users ORM and Authentication Provider specified in config
Expand Down
6 changes: 6 additions & 0 deletions core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ func (g *generalConfig) RootDir() string {
return h
}

func (g *generalConfig) HeadReport() coreconfig.HeadReport {
return &headReport{
h: g.c.HeadReport,
}
}

func (g *generalConfig) TelemetryIngress() coreconfig.TelemetryIngress {
return &telemetryIngressConfig{
c: g.c.TelemetryIngress,
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/config_general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestTOMLGeneralConfig_Defaults(t *testing.T) {
assert.False(t, config.StarkNetEnabled())
assert.Equal(t, false, config.JobPipeline().ExternalInitiatorsEnabled())
assert.Equal(t, 15*time.Minute, config.WebServer().SessionTimeout().Duration())
assert.Equal(t, false, config.HeadReport().TelemetryEnabled())
}

func TestTOMLGeneralConfig_InsecureConfig(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions core/services/chainlink/config_head_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package chainlink

import (
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)

type headReport struct {
h toml.HeadReport
}

func (h headReport) TelemetryEnabled() bool {
return *h.h.TelemetryEnabled
}
17 changes: 17 additions & 0 deletions core/services/chainlink/config_head_report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package chainlink

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestHeadReportConfig(t *testing.T) {
opts := GeneralConfigOpts{
ConfigStrings: []string{fullTOML},
}
cfg, err := opts.New()
require.NoError(t, err)

hr := cfg.HeadReport()
require.True(t, hr.TelemetryEnabled())
}
3 changes: 3 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ TLSCertPath = '/path/to/cert.pem'
env = 'dev'
test = 'load'

[HeadReport]
TelemetryEnabled = true

[Mercury]
[Mercury.Cache]
LatestReportTTL = '1m40s'
Expand Down
13 changes: 8 additions & 5 deletions core/services/headreporter/head_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package headreporter
import (
"context"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"sync"
"time"
Expand Down Expand Up @@ -35,11 +36,13 @@ type (
}
)

func NewHeadReporterService(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, opts ...interface{}) *HeadReporterService {
return NewHeadReporterServiceWithReporters(ds, chainContainer, lggr, []HeadReporter{
NewPrometheusReporter(ds, chainContainer, lggr, opts),
NewTelemetryReporter(chainContainer, lggr, monitoringEndpointGen),
}, opts)
func NewHeadReporterService(config config.HeadReport, ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, opts ...interface{}) *HeadReporterService {
reporters := make([]HeadReporter, 2)
reporters = append(reporters, NewPrometheusReporter(ds, chainContainer, lggr, opts))
if config.TelemetryEnabled() {
reporters = append(reporters, NewTelemetryReporter(chainContainer, lggr, monitoringEndpointGen))
}
return NewHeadReporterServiceWithReporters(ds, chainContainer, lggr, reporters, opts)
}

func NewHeadReporterServiceWithReporters(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, reporters []HeadReporter, opts ...interface{}) *HeadReporterService {
Expand Down
32 changes: 21 additions & 11 deletions core/services/headreporter/telemetry_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/libocr/commontypes"
"google.golang.org/protobuf/proto"
"math/big"
)

type (
telemetryReporter struct {
logger logger.Logger
endpoints map[*big.Int]commontypes.MonitoringEndpoint
endpoints map[uint64]commontypes.MonitoringEndpoint
}
)

func NewTelemetryReporter(chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator) *telemetryReporter {
endpoints := make(map[*big.Int]commontypes.MonitoringEndpoint)
func NewTelemetryReporter(chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, monitoringEndpointGen telemetry.MonitoringEndpointGenerator) HeadReporter {
endpoints := make(map[uint64]commontypes.MonitoringEndpoint)
for _, chain := range chainContainer.Slice() {
endpoints[chain.ID()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), "", synchronization.HeadReport)
endpoints[chain.ID().Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), "", synchronization.HeadReport)
}
return &telemetryReporter{
logger: lggr.Named("TelemetryReporter"),
Expand All @@ -32,13 +31,24 @@ func NewTelemetryReporter(chainContainer legacyevm.LegacyChainContainer, lggr lo
}

func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) {
monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt()]
monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()]
var lastFinalized *telem.Block
lastFinalizedHead := head.LatestFinalizedHead()
if lastFinalizedHead != nil {
lastFinalized = &telem.Block{
Timestamp: uint64(lastFinalizedHead.GetTimestamp().UTC().Unix()),
BlockNumber: uint64(lastFinalizedHead.BlockNumber()),
BlockHash: lastFinalizedHead.BlockHash().Hex(),
}
}
request := &telem.HeadReportRequest{
ChainId: head.EVMChainID.String(),
Timestamp: uint64(head.Timestamp.UTC().Unix()),
BlockNumber: uint64(head.Number),
BlockHash: head.Hash.Hex(),
Finalized: head.IsFinalized,
ChainId: head.EVMChainID.String(),
Current: &telem.Block{
Timestamp: uint64(head.Timestamp.UTC().Unix()),
BlockNumber: uint64(head.Number),
BlockHash: head.Hash.Hex(),
},
LastFinalized: lastFinalized,
}
bytes, err := proto.Marshal(request)
if err != nil {
Expand Down
Loading

0 comments on commit 3c81271

Please sign in to comment.