Skip to content

Commit

Permalink
node/pkg/telemetry: prometheus remote write
Browse files Browse the repository at this point in the history
  • Loading branch information
panoel committed Nov 13, 2023
1 parent 38f02fa commit 0362f82
Show file tree
Hide file tree
Showing 11 changed files with 3,176 additions and 0 deletions.
30 changes: 30 additions & 0 deletions node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
"github.com/certusone/wormhole/node/pkg/version"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
ipfslog "github.com/ipfs/go-log/v2"
Expand All @@ -39,6 +42,7 @@ var (
telemetryLokiURL *string
telemetryNodeName *string
statusAddr *string
promRemoteURL *string
)

const DEV_NETWORK_ID = "/wormhole/dev"
Expand All @@ -58,6 +62,7 @@ func init() {
telemetryLokiURL = QueryServerCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")
}

var QueryServerCmd = &cobra.Command{
Expand Down Expand Up @@ -170,6 +175,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {
}
}()

usingPromRemoteWrite := *promRemoteURL != ""
// Start the status server
if *statusAddr != "" {
go func() {
Expand All @@ -180,6 +186,30 @@ func runQueryServer(cmd *cobra.Command, args []string) {
logger.Fatal("Status server closed unexpectedly", zap.Error(err))
}
}()
// Can only start prometheus scraping if status server is running
if usingPromRemoteWrite {
var info promremotew.PromTelemetryInfo
info.PromRemoteURL = *promRemoteURL
info.NodeName = *telemetryNodeName

// Grab the port from the statusAddr
statusStrings := strings.Split(*statusAddr, ":")
statusPort64, err := strconv.ParseUint(statusStrings[len(statusStrings)-1], 10, 16) // the last element is the port
if (err != nil) || (statusPort64 == 0) {
logger.Fatal("Please specify a valid --statusAddr")
} else {
// TODO: check when the http server gets started.
info.StatusPort = uint16(statusPort64)
err := RunPrometheusScraper(ctx, logger, info)
if err != nil {
logger.Fatal("Failed to start prometheus scraper", zap.Error(err))
}
}
}
} else {
if usingPromRemoteWrite {
logger.Fatal("Cannot use --promRemoteURL without --statusAddr")
}
}

// Handle SIGTERM
Expand Down
24 changes: 24 additions & 0 deletions node/cmd/ccq/status.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package ccq

import (
"context"
"fmt"
"net/http"
"time"

"github.com/certusone/wormhole/node/pkg/common"
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
Expand Down Expand Up @@ -36,3 +38,25 @@ func (s *statusServer) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}

func RunPrometheusScraper(ctx context.Context, logger *zap.Logger, info promremotew.PromTelemetryInfo) error {
promLogger := logger.With(zap.String("component", "prometheus_scraper"))
errC := make(chan error)
common.StartRunnable(ctx, errC, false, "prometheus_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)
if err != nil {
promLogger.Error("ScrapeAndSendLocalMetrics error", zap.Error(err))
return err
}
}
}
})
return nil
}
42 changes: 42 additions & 0 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"path"
"runtime"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -39,6 +40,7 @@ import (
"github.com/certusone/wormhole/node/pkg/node"
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/supervisor"
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -200,6 +202,9 @@ var (
// Loki cloud logging parameters
telemetryLokiURL *string

// Prometheus remote write URL
promRemoteURL *string

chainGovernorEnabled *bool

ccqEnabled *bool
Expand Down Expand Up @@ -368,6 +373,8 @@ func init() {

telemetryLokiURL = NodeCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")

promRemoteURL = NodeCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")

chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor")

ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support")
Expand Down Expand Up @@ -1034,6 +1041,41 @@ func runNode(cmd *cobra.Command, args []string) {
if err != nil {
logger.Fatal("failed to connect to wormchain", zap.Error(err), zap.String("component", "gwrelayer"))
}

}
usingPromRemoteWrite := *promRemoteURL != ""
if usingPromRemoteWrite {
var info promremotew.PromTelemetryInfo
info.PromRemoteURL = *promRemoteURL
info.NodeName = *nodeName

// Grab the port from the statusAddr
statusStrings := strings.Split(*statusAddr, ":")
statusPort64, err := strconv.ParseUint(statusStrings[len(statusStrings)-1], 10, 16) // the last element is the port
if (err != nil) || (statusPort64 == 0) {
logger.Fatal("Please specify a valid --statusAddr")
} else {
// TODO: check when the http server gets started.
info.StatusPort = uint16(statusPort64)
promLogger := logger.With(zap.String("component", "prometheus_scraper"))
errC := make(chan error)
common.StartRunnable(rootCtx, errC, false, "prometheus_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)
if err != nil {
promLogger.Error("ScrapeAndSendLocalMetrics error", zap.Error(err))
continue
}
}
}
})
}
}

var watcherConfigs = []watchers.WatcherConfig{}
Expand Down
1 change: 1 addition & 0 deletions node/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,7 @@ github.com/securego/gosec/v2 v2.12.0/go.mod h1:iTpT+eKTw59bSgklBHlSnH5O2tNygHMDx
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/sercand/kuberesolver/v4 v4.0.0 h1:frL7laPDG/lFm5n98ODmWnn+cvPpzlkf3LhzuPhcHP4=
github.com/sercand/kuberesolver/v4 v4.0.0/go.mod h1:F4RGyuRmMAjeXHKL+w4P7AwUnPceEAPAhxUgXZjKgvM=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down
Loading

0 comments on commit 0362f82

Please sign in to comment.