Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: add metrics to orchestrator and relayer #674

Merged
merged 13 commits into from
Jan 3, 2024
73 changes: 73 additions & 0 deletions cmd/blobstream/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ const (

FlagBackupRelayer = "relayer.backup"
FlagBackupRelayerWaitTime = "relayer.wait-time"

FlagMetrics = "metrics"
FlagMetricsEndpoint = "metrics.endpoint"
FlagMetricsTLS = "metrics.tls"
FlagMetricsP2PEndpoint = "metrics.p2p"
)

func AddLogLevelFlag(cmd *cobra.Command) {
Expand Down Expand Up @@ -392,3 +397,71 @@ func AddEVMRetryTimeoutFlag(cmd *cobra.Command) {
"The time, in minutes, to wait for transactions to be mined on the target EVM chain before recreating them with a different gas price",
)
}

func AddMetricsFlag(cmd *cobra.Command) {
cmd.Flags().Bool(
FlagMetrics,
false,
"Enables OTLP metrics with HTTP exporter",
)
}

func GetMetricsFlag(cmd *cobra.Command) (bool, bool, error) {
changed := cmd.Flags().Changed(FlagMetrics)
val, err := cmd.Flags().GetBool(FlagMetrics)
if err != nil {
return false, changed, err
}
return val, changed, nil
}

func AddMetricsEndpointFlag(cmd *cobra.Command) {
cmd.Flags().String(
FlagMetricsEndpoint,
"localhost:4318",
"Sets HTTP endpoint for OTLP metrics to be exported to. Depends on '--metrics'",
)
}

func GetMetricsEndpointFlag(cmd *cobra.Command) (string, bool, error) {
changed := cmd.Flags().Changed(FlagMetricsEndpoint)
val, err := cmd.Flags().GetString(FlagMetricsEndpoint)
if err != nil {
return "", changed, err
}
return val, changed, nil
}

func AddMetricsTLSFlag(cmd *cobra.Command) {
cmd.Flags().Bool(
FlagMetricsTLS,
false,
"Enable TLS connection to OTLP metric backend",
)
}

func GetMetricsTLSFlag(cmd *cobra.Command) (bool, bool, error) {
changed := cmd.Flags().Changed(FlagMetricsTLS)
val, err := cmd.Flags().GetBool(FlagMetricsTLS)
if err != nil {
return false, changed, err
}
return val, changed, nil
}

func AddP2PMetricsEndpoint(cmd *cobra.Command) {
cmd.Flags().String(
FlagMetricsP2PEndpoint,
"localhost:30001",
"Sets HTTP endpoint for LibP2P metrics to listen on. Depends on '--metrics'. The metrics will be handled at the '/metrics' URI",
)
}

func GetP2PMetricsEndpointFlag(cmd *cobra.Command) (string, bool, error) {
changed := cmd.Flags().Changed(FlagMetricsP2PEndpoint)
val, err := cmd.Flags().GetString(FlagMetricsP2PEndpoint)
if err != nil {
return "", changed, err
}
return val, changed, nil
}
2 changes: 1 addition & 1 deletion cmd/blobstream/bootstrapper/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Start() *cobra.Command {
}

// creating the host
h, err := p2p.CreateHost(config.p2pListenAddr, privKey)
h, err := p2p.CreateHost(config.p2pListenAddr, privKey, nil)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/blobstream/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/celestiaorg/orchestrator-relayer/store"

"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -69,6 +71,7 @@ func CreateDHTAndWaitForPeers(
p2pListenAddr string,
bootstrappers string,
dataStore ds.Batching,
registerer prometheus.Registerer,
) (*p2p.BlobstreamDHT, error) {
// get the p2p private key or generate a new one
privKey, err := common2.GetP2PKeyOrGenerateNewOne(p2pKeyStore, p2pNickname)
Expand All @@ -77,7 +80,7 @@ func CreateDHTAndWaitForPeers(
}

// creating the host
h, err := p2p.CreateHost(p2pListenAddr, privKey)
h, err := p2p.CreateHost(p2pListenAddr, privKey, registerer)
if err != nil {
return nil, err
}
Expand Down
47 changes: 46 additions & 1 deletion cmd/blobstream/orchestrator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"path/filepath"
"time"

"github.com/celestiaorg/orchestrator-relayer/telemetry"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"

"github.com/celestiaorg/orchestrator-relayer/cmd/blobstream/version"

"github.com/celestiaorg/orchestrator-relayer/cmd/blobstream/base"
Expand Down Expand Up @@ -110,10 +114,50 @@ func Start() *cobra.Command {
return err
}

orchestratorMeters, err := telemetry.InitOrchestratorMeters()
if err != nil {
return err
}

var registerer prometheus.Registerer
if config.MetricsConfig.Metrics {
opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(config.MetricsConfig.Endpoint),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
}
if !config.MetricsConfig.TLS {
opts = append(opts, otlpmetrichttp.WithInsecure())
}
var shutdown func() error
registerer, shutdown, err = telemetry.Start(ctx, logger, ServiceNameOrchestrator, acc.Address.Hex(), opts)
if shutdown != nil {
stopFuncs = append(stopFuncs, shutdown)
}
if err != nil {
return err
}
shutdown, err := telemetry.PrometheusMetrics(ctx, logger, registerer, config.MetricsConfig.P2PEndpoint)
if shutdown != nil {
stopFuncs = append(stopFuncs, shutdown)
}
if err != nil {
return err
}
}

// creating the data store
dataStore := dssync.MutexWrap(s.DataStore)

dht, err := common.CreateDHTAndWaitForPeers(ctx, logger, s.P2PKeyStore, config.P2pNickname, config.P2PListenAddr, config.Bootstrappers, dataStore)
dht, err := common.CreateDHTAndWaitForPeers(
ctx,
logger,
s.P2PKeyStore,
config.P2pNickname,
config.P2PListenAddr,
config.Bootstrappers,
dataStore,
registerer,
)
if err != nil {
return err
}
Expand All @@ -137,6 +181,7 @@ func Start() *cobra.Command {
retrier,
s.EVMKeyStore,
&acc,
orchestratorMeters,
)
if err != nil {
return err
Expand Down
62 changes: 62 additions & 0 deletions cmd/blobstream/orchestrator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"text/template"

"github.com/celestiaorg/orchestrator-relayer/telemetry"

"github.com/spf13/viper"

"github.com/celestiaorg/orchestrator-relayer/cmd/blobstream/base"
Expand Down Expand Up @@ -45,6 +47,22 @@ bootstrappers = "{{ .Bootstrappers }}"

# MultiAddr for the p2p peer to listen on.
listen-addr = "{{ .P2PListenAddr }}"

###############################################################################
### Telemetry Configuration ###
###############################################################################
[telemetry]
# Enables OTLP metrics with HTTP exporter.
metrics = "{{ .MetricsConfig.Metrics }}"

# Sets HTTP endpoint for OTLP metrics to be exported to.
endpoint = "{{ .MetricsConfig.Endpoint }}"

# Enable TLS connection to OTLP metric backend.
tls = "{{ .MetricsConfig.TLS }}"

# Sets the HTTP endpoint for LibP2P metrics to listen on.
p2p-endpoint = "{{ .MetricsConfig.P2PEndpoint }}"
`

func addOrchestratorFlags(cmd *cobra.Command) *cobra.Command {
Expand All @@ -63,6 +81,11 @@ func addOrchestratorFlags(cmd *cobra.Command) *cobra.Command {
base.AddGRPCInsecureFlag(cmd)
base.AddLogLevelFlag(cmd)
base.AddLogFormatFlag(cmd)
base.AddMetricsFlag(cmd)
base.AddMetricsEndpointFlag(cmd)
base.AddMetricsTLSFlag(cmd)
base.AddP2PMetricsEndpoint(cmd)

return cmd
}

Expand All @@ -77,6 +100,7 @@ type StartConfig struct {
GRPCInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"`
LogLevel string
LogFormat string
MetricsConfig telemetry.Config `mapstructure:"telemetry" json:"telemetry"`
}

func DefaultStartConfig() *StartConfig {
Expand All @@ -86,6 +110,12 @@ func DefaultStartConfig() *StartConfig {
Bootstrappers: "",
P2PListenAddr: "/ip4/0.0.0.0/tcp/30000",
GRPCInsecure: true,
MetricsConfig: telemetry.Config{
Metrics: false,
Endpoint: "localhost:4318",
TLS: false,
P2PEndpoint: "localhost:30001",
},
}
}

Expand Down Expand Up @@ -166,6 +196,38 @@ func parseOrchestratorFlags(cmd *cobra.Command, startConf *StartConfig) (StartCo
startConf.GRPCInsecure = grpcInsecure
}

metrics, changed, err := base.GetMetricsFlag(cmd)
if err != nil {
return StartConfig{}, err
}
if changed {
startConf.MetricsConfig.Metrics = metrics
}

endpoint, changed, err := base.GetMetricsEndpointFlag(cmd)
if err != nil {
return StartConfig{}, err
}
if changed {
startConf.MetricsConfig.Endpoint = endpoint
}

tls, changed, err := base.GetMetricsTLSFlag(cmd)
if err != nil {
return StartConfig{}, err
}
if changed {
startConf.MetricsConfig.TLS = tls
}

p2p, changed, err := base.GetP2PMetricsEndpointFlag(cmd)
if err != nil {
return StartConfig{}, err
}
if changed {
startConf.MetricsConfig.P2PEndpoint = p2p
}

logLevel, _, err := base.GetLogLevelFlag(cmd)
if err != nil {
return StartConfig{}, err
Expand Down
45 changes: 44 additions & 1 deletion cmd/blobstream/relayer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package relayer

import (
"context"
"fmt"
"path/filepath"
"time"

"github.com/celestiaorg/orchestrator-relayer/telemetry"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"

"github.com/celestiaorg/orchestrator-relayer/cmd/blobstream/version"

"github.com/celestiaorg/orchestrator-relayer/cmd/blobstream/base"
Expand Down Expand Up @@ -165,7 +170,44 @@ func Start() *cobra.Command {
// creating the data store
dataStore := dssync.MutexWrap(s.DataStore)

dht, err := common.CreateDHTAndWaitForPeers(ctx, logger, s.P2PKeyStore, config.p2pNickname, config.P2PListenAddr, config.Bootstrappers, dataStore)
relayerMeters, err := telemetry.InitRelayerMeters()
if err != nil {
return err
}

var registerer prometheus.Registerer
if config.MetricsConfig.Metrics {
opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(config.MetricsConfig.Endpoint),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
}
if !config.MetricsConfig.TLS {
opts = append(opts, otlpmetrichttp.WithInsecure())
}
var shutdown func() error
registerer, shutdown, err = telemetry.Start(
ctx,
logger,
fmt.Sprintf("%s:%s", ServiceNameRelayer, config.ContractAddr),
acc.Address.Hex(),
opts,
)
if shutdown != nil {
stopFuncs = append(stopFuncs, shutdown)
}
if err != nil {
return err
}
shutdown, err := telemetry.PrometheusMetrics(ctx, logger, registerer, config.MetricsConfig.P2PEndpoint)
if shutdown != nil {
stopFuncs = append(stopFuncs, shutdown)
}
if err != nil {
return err
}
}

dht, err := common.CreateDHTAndWaitForPeers(ctx, logger, s.P2PKeyStore, config.p2pNickname, config.P2PListenAddr, config.Bootstrappers, dataStore, registerer)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,6 +249,7 @@ func Start() *cobra.Command {
time.Duration(config.EVMRetryTimeout)*time.Minute,
config.isBackupRelayer,
time.Duration(config.backupRelayerWaitTime)*time.Minute,
relayerMeters,
)

// Listen for and trap any OS signal to graceful shutdown and exit
Expand Down
Loading
Loading