Skip to content

Commit

Permalink
integrate beholder client (#14110)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Sep 11, 2024
1 parent 28989b3 commit 8454f46
Show file tree
Hide file tree
Showing 47 changed files with 568 additions and 100 deletions.
5 changes: 5 additions & 0 deletions .changeset/shy-bulldogs-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Full Open Telemetry support, configurable via `Telemetry`
1 change: 0 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ packages:
config:
filename: evm_mock.go
dir: "{{ .InterfaceDir }}/rpclibmocks"

github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices:
config:
dir: "{{ .InterfaceDir }}/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func setupNodeOCR3(
}
relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), cfg.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), cfg.Tracing(), cfg.Telemetry()),
GRPCOpts: loop.GRPCOpts{},
CapabilitiesRegistry: coretypes.NewCapabilitiesRegistry(t),
}
Expand All @@ -155,7 +155,7 @@ func setupNodeOCR3(
RestrictedHTTPClient: &http.Client{},
AuditLogger: audit.NoopLogger,
MailMon: mailMon,
LoopRegistry: plugins.NewLoopRegistry(lggr, cfg.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(lggr, cfg.Tracing(), cfg.Telemetry()),
})
require.NoError(t, err)
require.NoError(t, app.GetKeyStore().Unlock(ctx, "password"))
Expand Down
71 changes: 56 additions & 15 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/Masterminds/semver/v3"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
Expand Down Expand Up @@ -63,20 +65,59 @@ var (
grpcOpts loop.GRPCOpts
)

func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, logger logger.Logger) error {
func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTelemetry config.Telemetry, lggr logger.Logger) error {
// Avoid double initializations, but does not prevent relay methods from being called multiple times.
var err error
initGlobalsOnce.Do(func() {
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer
err = loop.SetupTracing(loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(error) { logger.Errorw("Failed to dial", "err", err) },
})
err = func() error {
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer

otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
lggr.Errorw("Telemetry error", "err", err)
}))

tracingCfg := loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(error) { lggr.Errorw("Failed to dial", "err", err) },
}
if !cfgTelemetry.Enabled() {
return loop.SetupTracing(tracingCfg)
}

var attributes []attribute.KeyValue
if tracingCfg.Enabled {
attributes = tracingCfg.Attributes()
}
for k, v := range cfgTelemetry.ResourceAttributes() {
attributes = append(attributes, attribute.String(k, v))
}
clientCfg := beholder.Config{
InsecureConnection: cfgTelemetry.InsecureConnection(),
CACertFile: cfgTelemetry.CACertFile(),
OtelExporterGRPCEndpoint: cfgTelemetry.OtelExporterGRPCEndpoint(),
ResourceAttributes: attributes,
TraceSampleRatio: cfgTelemetry.TraceSampleRatio(),
}
if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
return err
}
}
var beholderClient *beholder.Client
beholderClient, err = beholder.NewClient(clientCfg)
if err != nil {
return err
}
beholder.SetClient(beholderClient)
beholder.SetGlobalOtelProviders()
return nil
}()
})
return err
}
Expand Down Expand Up @@ -139,7 +180,7 @@ type ChainlinkAppFactory struct{}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB) (app chainlink.Application, err error) {
err = initGlobals(cfg.Prometheus(), cfg.Tracing(), appLggr)
err = initGlobals(cfg.Prometheus(), cfg.Tracing(), cfg.Telemetry(), appLggr)
if err != nil {
appLggr.Errorf("Failed to initialize globals: %v", err)
}
Expand All @@ -159,7 +200,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
keyStore := keystore.New(ds, utils.GetScryptParams(cfg), appLggr)
mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox"))

loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing(), cfg.Telemetry())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down
3 changes: 1 addition & 2 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
gethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/fatih/color"
"github.com/lib/pq"

"github.com/kylelemons/godebug/diff"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.uber.org/multierr"
Expand Down
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
func genTestEVMRelayers(t *testing.T, opts legacyevm.ChainRelayExtenderConfig, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators {
f := chainlink.RelayerFactory{
Logger: opts.Logger,
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing(), opts.AppConfig.Telemetry()),
CapabilitiesRegistry: capabilities.NewRegistry(opts.Logger),
}

Expand Down
4 changes: 2 additions & 2 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestNewUserCache(t *testing.T) {

func TestSetupSolanaRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil)
reg := plugins.NewLoopRegistry(lggr, nil, nil)
ks := mocks.NewSolana(t)

// config 3 chains but only enable 2 => should only be 2 relayer
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestSetupSolanaRelayer(t *testing.T) {

func TestSetupStarkNetRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil)
reg := plugins.NewLoopRegistry(lggr, nil, nil)
ks := mocks.NewStarkNet(t)
// config 3 chains but only enable 2 => should only be 2 relayer
nEnabledChains := 2
Expand Down
1 change: 1 addition & 0 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type AppConfig interface {
Threshold() Threshold
WebServer() WebServer
Tracing() Tracing
Telemetry() Telemetry
}

type DatabaseBackupMode string
Expand Down
22 changes: 22 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,25 @@ TransmitQueueMaxSize = 10_000 # Default
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
# It does not currently include prometheus metrics or standard out logs, but may in the future.
[Telemetry]
# Enabled turns telemetry collection on or off.
Enabled = false # Default
# Endpoint of the OTEL Collector.
Endpoint = 'example.com/collector' # Example
# CACertFile is the file path of the TLS certificate used for secure communication with the OTEL Collector.
# Required unless InescureConnection is true.
CACertFile = 'cert-file' # Example
# InsecureConnection bypasses the TLS CACertFile requirement and uses an insecure connection instead.
# Only available in dev mode.
InsecureConnection = false # Default
# TraceSampleRatio is the rate at which to sample traces. Must be between 0 and 1.
TraceSampleRatio = 0.01 # Default

# ResourceAttributes are global metadata to include with all telemetry.
[Telemetry.ResourceAttributes]
# foo is an example resource attribute
foo = "bar" # Example
10 changes: 10 additions & 0 deletions core/config/telemetry_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

type Telemetry interface {
Enabled() bool
InsecureConnection() bool
CACertFile() string
OtelExporterGRPCEndpoint() string
ResourceAttributes() map[string]string
TraceSampleRatio() float64
}
77 changes: 69 additions & 8 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Core struct {
Tracing Tracing `toml:",omitempty"`
Mercury Mercury `toml:",omitempty"`
Capabilities Capabilities `toml:",omitempty"`
Telemetry Telemetry `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand Down Expand Up @@ -95,18 +96,25 @@ func (c *Core) SetFrom(f *Core) {
c.Sentry.setFrom(&f.Sentry)
c.Insecure.setFrom(&f.Insecure)
c.Tracing.setFrom(&f.Tracing)
c.Telemetry.setFrom(&f.Telemetry)
}

func (c *Core) ValidateConfig() (err error) {
_, verr := parse.HomeDir(*c.RootDir)
if err != nil {
if verr != nil {
err = multierr.Append(err, configutils.ErrInvalid{Name: "RootDir", Value: true, Msg: fmt.Sprintf("Failed to expand RootDir. Please use an explicit path: %s", verr)})
}

if (*c.OCR.Enabled || *c.OCR2.Enabled) && !*c.P2P.V2.Enabled {
err = multierr.Append(err, configutils.ErrInvalid{Name: "P2P.V2.Enabled", Value: false, Msg: "P2P required for OCR or OCR2. Please enable P2P or disable OCR/OCR2."})
}

if *c.Tracing.Enabled && *c.Telemetry.Enabled {
if c.Tracing.CollectorTarget == c.Telemetry.Endpoint {
err = multierr.Append(err, configutils.ErrInvalid{Name: "Tracing.CollectorTarget", Value: *c.Tracing.CollectorTarget, Msg: "Same as Telemetry.Endpoint. Must be different or disabled."})
}
}

return err
}

Expand Down Expand Up @@ -1576,25 +1584,25 @@ type Tracing struct {

func (t *Tracing) setFrom(f *Tracing) {
if v := f.Enabled; v != nil {
t.Enabled = f.Enabled
t.Enabled = v
}
if v := f.CollectorTarget; v != nil {
t.CollectorTarget = f.CollectorTarget
t.CollectorTarget = v
}
if v := f.NodeID; v != nil {
t.NodeID = f.NodeID
t.NodeID = v
}
if v := f.Attributes; v != nil {
t.Attributes = f.Attributes
t.Attributes = v
}
if v := f.SamplingRatio; v != nil {
t.SamplingRatio = f.SamplingRatio
t.SamplingRatio = v
}
if v := f.Mode; v != nil {
t.Mode = f.Mode
t.Mode = v
}
if v := f.TLSCertPath; v != nil {
t.TLSCertPath = f.TLSCertPath
t.TLSCertPath = v
}
}

Expand Down Expand Up @@ -1648,6 +1656,59 @@ func (t *Tracing) ValidateConfig() (err error) {
return err
}

type Telemetry struct {
Enabled *bool
CACertFile *string
Endpoint *string
InsecureConnection *bool
ResourceAttributes map[string]string `toml:",omitempty"`
TraceSampleRatio *float64
}

func (b *Telemetry) setFrom(f *Telemetry) {
if v := f.Enabled; v != nil {
b.Enabled = v
}
if v := f.CACertFile; v != nil {
b.CACertFile = v
}
if v := f.Endpoint; v != nil {
b.Endpoint = v
}
if v := f.InsecureConnection; v != nil {
b.InsecureConnection = v
}
if v := f.ResourceAttributes; v != nil {
b.ResourceAttributes = v
}
if v := f.TraceSampleRatio; v != nil {
b.TraceSampleRatio = v
}
}

func (b *Telemetry) ValidateConfig() (err error) {
if b.Enabled == nil || !*b.Enabled {
return nil
}
if b.Endpoint == nil || *b.Endpoint == "" {
err = multierr.Append(err, configutils.ErrMissing{Name: "Endpoint", Msg: "must be set when Telemetry is enabled"})
}
if b.InsecureConnection != nil && *b.InsecureConnection {
if build.IsProd() {
err = multierr.Append(err, configutils.ErrInvalid{Name: "InsecureConnection", Msg: "cannot be used in production builds"})
}
} else {
if b.CACertFile == nil || *b.CACertFile == "" {
err = multierr.Append(err, configutils.ErrMissing{Name: "CACertFile", Msg: "must be set, unless InsecureConnection is used"})
}
}
if ratio := b.TraceSampleRatio; ratio != nil && (*ratio < 0 || *ratio > 1) {
err = multierr.Append(err, configutils.ErrInvalid{Name: "TraceSampleRatio", Value: *ratio, Msg: "must be between 0 and 1"})
}

return err
}

var hostnameRegex = regexp.MustCompile(`^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*$`)

// Validates uri is valid external or local URI
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
keyStore := keystore.NewInMemory(ds, utils.FastScryptParams, lggr)

mailMon := mailbox.NewMonitor(cfg.AppID().String(), lggr.Named("Mailbox"))
loopRegistry := plugins.NewLoopRegistry(lggr, nil)
loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down Expand Up @@ -471,7 +471,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
RestrictedHTTPClient: c,
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil),
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil),
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
CapabilitiesDispatcher: dispatcher,
Expand Down
Loading

0 comments on commit 8454f46

Please sign in to comment.