diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index d8b9777cb5a..ffe3cd29930 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -7,6 +7,7 @@ import ( "math/big" "net/http" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -20,10 +21,12 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" @@ -80,6 +83,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/plugins" ) +const ApplicationHeartbeatSeconds = 1 + // Application implements the common functions used in the core node. type Application interface { Start(ctx context.Context) error @@ -192,6 +197,68 @@ type ApplicationOpts struct { NewOracleFactoryFn standardcapabilities.NewOracleFactoryFn } +type ApplicationHeartbeat struct { + commonservices.Service + eng *commonservices.Engine + + beat time.Duration + lggr logger.Logger +} + +func NewApplicationHeartbeat(lggr logger.Logger) ApplicationHeartbeat { + h := ApplicationHeartbeat{ + beat: ApplicationHeartbeatSeconds * time.Second, + lggr: lggr, + } + h.Service, h.eng = commonservices.Config{ + Name: "NodeHeartbeat", + Start: h.start, + Close: h.close, + }.NewServiceEngine(lggr) + return h +} + +func (h *ApplicationHeartbeat) start(_ context.Context) error { + // Setup beholder resources + gauge, err := beholder.GetMeter().Int64Gauge("heartbeat") + if err != nil { + return err + } + count, err := beholder.GetMeter().Int64Gauge("heartbeat_count") + if err != nil { + return err + } + + cme := custmsg.NewLabeler() + + // Define tick functions + tickFn := func(engCtx context.Context) { + // TODO allow override of tracer provider into engine for beholder + _, innerSpan := beholder.GetTracer().Start(engCtx, "heartbeat.beat") + defer innerSpan.End() + + gauge.Record(engCtx, 1) + count.Record(engCtx, 1) + + err = cme.Emit(engCtx, "heartbeat") + if err != nil { + h.lggr.Errorw("heartbeat emit failed", "err", err) + } + } + + // consistent tick period + constantTickFn := func() time.Duration { + return h.beat + } + + h.eng.GoTick(timeutil.NewTicker(constantTickFn), tickFn) + return nil +} + +func (h *ApplicationHeartbeat) close() error { + return nil +} + // NewApplication initializes a new store if one is not already // present at the configured root directory (default: ~/.chainlink), // the logger at the same directory and returns the Application to @@ -199,6 +266,10 @@ type ApplicationOpts struct { // TODO: Inject more dependencies here to save booting up useless stuff in tests func NewApplication(opts ApplicationOpts) (Application, error) { var srvcs []services.ServiceCtx + + heartbeat := NewApplicationHeartbeat(opts.Logger) + srvcs = append(srvcs, &heartbeat) + auditLogger := opts.AuditLogger cfg := opts.Config relayerChainInterops := opts.RelayerChainInteroperators diff --git a/core/services/chainlink/config_telemetry.go b/core/services/chainlink/config_telemetry.go index 125eeed64e5..f2359997933 100644 --- a/core/services/chainlink/config_telemetry.go +++ b/core/services/chainlink/config_telemetry.go @@ -1,6 +1,7 @@ package chainlink import ( + "fmt" "time" "github.com/smartcontractkit/chainlink/v2/core/config/toml" @@ -42,9 +43,13 @@ func (b *telemetryConfig) OtelExporterGRPCEndpoint() string { // // These can be overridden by the TOML if the user so chooses func (b *telemetryConfig) ResourceAttributes() map[string]string { + sha, ver := static.Short() + defaults := map[string]string{ - "service.name": "chainlink", - "service.version": static.Version, + "service.name": "chainlink", + "service.version": static.Version, + "service.sha": static.Sha, + "service.shortversion": fmt.Sprintf("%s@%s", ver, sha), } for k, v := range b.s.ResourceAttributes {