Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate beholder client #14110

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shy-bulldogs-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Full Open Telemetry support, configurable via `Telemetry`
1 change: 0 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ packages:
config:
filename: evm_mock.go
dir: "{{ .InterfaceDir }}/rpclibmocks"

github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices:
config:
dir: "{{ .InterfaceDir }}/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func setupNodeOCR3(
}
relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), cfg.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), cfg.Tracing(), cfg.Telemetry()),
GRPCOpts: loop.GRPCOpts{},
CapabilitiesRegistry: coretypes.NewCapabilitiesRegistry(t),
}
Expand All @@ -155,7 +155,7 @@ func setupNodeOCR3(
RestrictedHTTPClient: &http.Client{},
AuditLogger: audit.NoopLogger,
MailMon: mailMon,
LoopRegistry: plugins.NewLoopRegistry(lggr, cfg.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(lggr, cfg.Tracing(), cfg.Telemetry()),
})
require.NoError(t, err)
require.NoError(t, app.GetKeyStore().Unlock(ctx, "password"))
Expand Down
71 changes: 56 additions & 15 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/Masterminds/semver/v3"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
Expand Down Expand Up @@ -63,20 +65,59 @@ var (
grpcOpts loop.GRPCOpts
)

func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, logger logger.Logger) error {
func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTelemetry config.Telemetry, lggr logger.Logger) error {
// Avoid double initializations, but does not prevent relay methods from being called multiple times.
var err error
initGlobalsOnce.Do(func() {
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer
err = loop.SetupTracing(loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(error) { logger.Errorw("Failed to dial", "err", err) },
})
err = func() error {
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer

otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
lggr.Errorw("Telemetry error", "err", err)
}))

tracingCfg := loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(error) { lggr.Errorw("Failed to dial", "err", err) },
}
if !cfgTelemetry.Enabled() {
return loop.SetupTracing(tracingCfg)
}

var attributes []attribute.KeyValue
if tracingCfg.Enabled {
attributes = tracingCfg.Attributes()
}
for k, v := range cfgTelemetry.ResourceAttributes() {
attributes = append(attributes, attribute.String(k, v))
}
clientCfg := beholder.Config{
InsecureConnection: cfgTelemetry.InsecureConnection(),
jmank88 marked this conversation as resolved.
Show resolved Hide resolved
CACertFile: cfgTelemetry.CACertFile(),
OtelExporterGRPCEndpoint: cfgTelemetry.OtelExporterGRPCEndpoint(),
ResourceAttributes: attributes,
TraceSampleRatio: cfgTelemetry.TraceSampleRatio(),
}
if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
return err
}
}
var beholderClient *beholder.Client
beholderClient, err = beholder.NewClient(clientCfg)
if err != nil {
return err
}
beholder.SetClient(beholderClient)
beholder.SetGlobalOtelProviders()
return nil
}()
})
return err
}
Expand Down Expand Up @@ -139,7 +180,7 @@ type ChainlinkAppFactory struct{}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB) (app chainlink.Application, err error) {
err = initGlobals(cfg.Prometheus(), cfg.Tracing(), appLggr)
err = initGlobals(cfg.Prometheus(), cfg.Tracing(), cfg.Telemetry(), appLggr)
if err != nil {
appLggr.Errorf("Failed to initialize globals: %v", err)
}
Expand All @@ -159,7 +200,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
keyStore := keystore.New(ds, utils.GetScryptParams(cfg), appLggr)
mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox"))

loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing(), cfg.Telemetry())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down
3 changes: 1 addition & 2 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
gethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/fatih/color"
"github.com/lib/pq"

"github.com/kylelemons/godebug/diff"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.uber.org/multierr"
Expand Down
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
func genTestEVMRelayers(t *testing.T, opts legacyevm.ChainRelayExtenderConfig, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators {
f := chainlink.RelayerFactory{
Logger: opts.Logger,
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing()),
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing(), opts.AppConfig.Telemetry()),
CapabilitiesRegistry: capabilities.NewRegistry(opts.Logger),
}

Expand Down
4 changes: 2 additions & 2 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestNewUserCache(t *testing.T) {

func TestSetupSolanaRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil)
reg := plugins.NewLoopRegistry(lggr, nil, nil)
ks := mocks.NewSolana(t)

// config 3 chains but only enable 2 => should only be 2 relayer
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestSetupSolanaRelayer(t *testing.T) {

func TestSetupStarkNetRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil)
reg := plugins.NewLoopRegistry(lggr, nil, nil)
ks := mocks.NewStarkNet(t)
// config 3 chains but only enable 2 => should only be 2 relayer
nEnabledChains := 2
Expand Down
1 change: 1 addition & 0 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type AppConfig interface {
Threshold() Threshold
WebServer() WebServer
Tracing() Tracing
Telemetry() Telemetry
}

type DatabaseBackupMode string
Expand Down
22 changes: 22 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,25 @@ TransmitQueueMaxSize = 10_000 # Default
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
# It does not currently include prometheus metrics or standard out logs, but may in the future.
[Telemetry]
# Enabled turns telemetry collection on or off.
Enabled = false # Default
# Endpoint of the OTEL Collector.
Endpoint = 'example.com/collector' # Example
# CACertFile is the file path of the TLS certificate used for secure communication with the OTEL Collector.
# Required unless InescureConnection is true.
CACertFile = 'cert-file' # Example
# InsecureConnection bypasses the TLS CACertFile requirement and uses an insecure connection instead.
# Only available in dev mode.
InsecureConnection = false # Default
# TraceSampleRatio is the rate at which to sample traces. Must be between 0 and 1.
TraceSampleRatio = 0.01 # Default
pkcll marked this conversation as resolved.
Show resolved Hide resolved

# ResourceAttributes are global metadata to include with all telemetry.
[Telemetry.ResourceAttributes]
# foo is an example resource attribute
foo = "bar" # Example
10 changes: 10 additions & 0 deletions core/config/telemetry_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

type Telemetry interface {
Enabled() bool
InsecureConnection() bool
CACertFile() string
OtelExporterGRPCEndpoint() string
ResourceAttributes() map[string]string
TraceSampleRatio() float64
}
77 changes: 69 additions & 8 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Core struct {
Tracing Tracing `toml:",omitempty"`
Mercury Mercury `toml:",omitempty"`
Capabilities Capabilities `toml:",omitempty"`
Telemetry Telemetry `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand Down Expand Up @@ -95,18 +96,25 @@ func (c *Core) SetFrom(f *Core) {
c.Sentry.setFrom(&f.Sentry)
c.Insecure.setFrom(&f.Insecure)
c.Tracing.setFrom(&f.Tracing)
c.Telemetry.setFrom(&f.Telemetry)
}

func (c *Core) ValidateConfig() (err error) {
_, verr := parse.HomeDir(*c.RootDir)
if err != nil {
if verr != nil {
err = multierr.Append(err, configutils.ErrInvalid{Name: "RootDir", Value: true, Msg: fmt.Sprintf("Failed to expand RootDir. Please use an explicit path: %s", verr)})
}

if (*c.OCR.Enabled || *c.OCR2.Enabled) && !*c.P2P.V2.Enabled {
err = multierr.Append(err, configutils.ErrInvalid{Name: "P2P.V2.Enabled", Value: false, Msg: "P2P required for OCR or OCR2. Please enable P2P or disable OCR/OCR2."})
}

if *c.Tracing.Enabled && *c.Telemetry.Enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both are enabled will there be a clash over how the globally registered tracing provider is set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, TraceSpanExporter is used to register a second exporter.

I did notice an issue with the attributes merge though. Fix incoming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c.Tracing.CollectorTarget == c.Telemetry.Endpoint {
err = multierr.Append(err, configutils.ErrInvalid{Name: "Tracing.CollectorTarget", Value: *c.Tracing.CollectorTarget, Msg: "Same as Telemetry.Endpoint. Must be different or disabled."})
}
}

return err
}

Expand Down Expand Up @@ -1576,25 +1584,25 @@ type Tracing struct {

func (t *Tracing) setFrom(f *Tracing) {
if v := f.Enabled; v != nil {
t.Enabled = f.Enabled
t.Enabled = v
}
if v := f.CollectorTarget; v != nil {
t.CollectorTarget = f.CollectorTarget
t.CollectorTarget = v
}
if v := f.NodeID; v != nil {
t.NodeID = f.NodeID
t.NodeID = v
}
if v := f.Attributes; v != nil {
t.Attributes = f.Attributes
t.Attributes = v
}
if v := f.SamplingRatio; v != nil {
t.SamplingRatio = f.SamplingRatio
t.SamplingRatio = v
}
if v := f.Mode; v != nil {
t.Mode = f.Mode
t.Mode = v
}
if v := f.TLSCertPath; v != nil {
t.TLSCertPath = f.TLSCertPath
t.TLSCertPath = v
}
}

Expand Down Expand Up @@ -1648,6 +1656,59 @@ func (t *Tracing) ValidateConfig() (err error) {
return err
}

type Telemetry struct {
Enabled *bool
CACertFile *string
Endpoint *string
InsecureConnection *bool
ResourceAttributes map[string]string `toml:",omitempty"`
TraceSampleRatio *float64
}

func (b *Telemetry) setFrom(f *Telemetry) {
if v := f.Enabled; v != nil {
b.Enabled = v
}
if v := f.CACertFile; v != nil {
b.CACertFile = v
}
if v := f.Endpoint; v != nil {
b.Endpoint = v
}
if v := f.InsecureConnection; v != nil {
b.InsecureConnection = v
}
if v := f.ResourceAttributes; v != nil {
b.ResourceAttributes = v
}
if v := f.TraceSampleRatio; v != nil {
b.TraceSampleRatio = v
}
}

func (b *Telemetry) ValidateConfig() (err error) {
if b.Enabled == nil || !*b.Enabled {
return nil
}
if b.Endpoint == nil || *b.Endpoint == "" {
err = multierr.Append(err, configutils.ErrMissing{Name: "Endpoint", Msg: "must be set when Telemetry is enabled"})
}
if b.InsecureConnection != nil && *b.InsecureConnection {
if build.IsProd() {
err = multierr.Append(err, configutils.ErrInvalid{Name: "InsecureConnection", Msg: "cannot be used in production builds"})
}
} else {
if b.CACertFile == nil || *b.CACertFile == "" {
err = multierr.Append(err, configutils.ErrMissing{Name: "CACertFile", Msg: "must be set, unless InsecureConnection is used"})
}
}
if ratio := b.TraceSampleRatio; ratio != nil && (*ratio < 0 || *ratio > 1) {
err = multierr.Append(err, configutils.ErrInvalid{Name: "TraceSampleRatio", Value: *ratio, Msg: "must be between 0 and 1"})
}

return err
}

var hostnameRegex = regexp.MustCompile(`^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*$`)

// Validates uri is valid external or local URI
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
keyStore := keystore.NewInMemory(ds, utils.FastScryptParams, lggr)

mailMon := mailbox.NewMonitor(cfg.AppID().String(), lggr.Named("Mailbox"))
loopRegistry := plugins.NewLoopRegistry(lggr, nil)
loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down Expand Up @@ -471,7 +471,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
RestrictedHTTPClient: c,
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil),
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil),
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
CapabilitiesDispatcher: dispatcher,
Expand Down
Loading
Loading