Skip to content

Commit

Permalink
CCX-4501 - cmon-sd refactor (#4)
Browse files Browse the repository at this point in the history
- Fix bugs that leads to panic when the service could not send request to cmon.
- Logs and return API errors messages in case of failure
  • Loading branch information
tmwalaszek-s9s authored Jun 26, 2024
1 parent 8875c1f commit 42bfed6
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19-alpine as builder
FROM golang:1.22-alpine as builder
ENV CGO_ENABLED=0
ENV GOOS=linux

Expand Down
199 changes: 105 additions & 94 deletions cmon_sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,133 +15,162 @@ package main

import (
"encoding/json"
"errors"
"log"
"log/slog"
"net/http"
"os"
"slices"
"sort"
"strconv"

"flag"
"fmt"

"github.com/severalnines/cmon-proxy/cmon"
"github.com/severalnines/cmon-proxy/cmon/api"
"github.com/severalnines/cmon-proxy/config"
)

const namespace = "cmon"

var cmonEndpoint string
var cmonUsername string
var cmonPassword string

type ClusterTarget struct {
Target []string `json:"targets,omitempty"`
Label map[string]string `json:"labels,omitempty"`
}

func IndexHandler(w http.ResponseWriter, r *http.Request) {
type Cmon interface {
Authenticate() error
ControllerID() string
GetAllClusterInfo(req *api.GetAllClusterInfoRequest) (*api.GetAllClusterInfoResponse, error)
}

type ErrorMessage struct {
Error string `json:"error"`
}

type Service struct {
cmonClient Cmon
log *slog.Logger
}

func NewService() (*Service, error) {
cmonEndpoint := os.Getenv("CMON_ENDPOINT")
cmonUsername := os.Getenv("CMON_USERNAME")
cmonPassword := os.Getenv("CMON_PASSWORD")

if cmonEndpoint == "" {
cmonEndpoint = "https://127.0.0.1:9501"
}

if cmonUsername == "" {
return nil, errors.New("CMON_USERNAME is required")
}

if cmonPassword == "" {
return nil, errors.New("CMON_PASSWORD is required")
}

client := cmon.NewClient(&config.CmonInstance{
cmonClient := cmon.NewClient(&config.CmonInstance{
Url: cmonEndpoint,
Username: cmonUsername,
Password: cmonPassword,
},
30)

err := client.Authenticate()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))

err := cmonClient.Authenticate()
if err != nil {
return nil, err
}

return &Service{
cmonClient: cmonClient,
log: logger,
}, nil
}

func (s *Service) errorResponse(w http.ResponseWriter, statusCode int, message string) {
m := ErrorMessage{Error: message}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(m)
}

func (s *Service) IndexHandler(w http.ResponseWriter, r *http.Request) {
err := s.cmonClient.Authenticate()
if err != nil {
res, err := client.Ping()
log.Println("Test: ", err, res)
s.log.Error("Error authenticating", err)
s.errorResponse(w, http.StatusUnauthorized, fmt.Sprintf("Error authenticating: %s", err.Error()))
return
}

res, err := client.GetAllClusterInfo(&api.GetAllClusterInfoRequest{
res, err := s.cmonClient.GetAllClusterInfo(&api.GetAllClusterInfoRequest{
WithHosts: true,
})

if err != nil {
log.Println("Test: ", err, res)
s.log.Error("Error getting cluster info", "error", err)
s.errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Error getting cluster info: %s", err.Error()))
return
}

clusterTarget := []ClusterTarget{}

// iterate through all clusters
for i, cluster := range res.Clusters {

temp := ClusterTarget{
Target: []string{},
Label: map[string]string{
"ClusterID": strconv.FormatInt(int64(cluster.ClusterID), 10),
"ClusterName": cluster.ClusterName,
"cid": strconv.FormatInt(int64(cluster.ClusterID), 10),
"ClusterType": cluster.ClusterType,
"ControllerId": client.ControllerID(),
"ControllerId": s.cmonClient.ControllerID(),
},
}

// iterate through all hosts for given cluster
for _, host := range cluster.Hosts {

if host.Nodetype == "controller" {
continue
}

if host.Nodetype == "prometheus" {
continue
}

if host.Nodetype == "keepalived" {
switch host.Nodetype {
case "controller", "prometheus", "keepalived":
continue
}

//check host type and assign exporter port
// check host type and assign exporter port
// node_exporter and process_exporter applies to any node type
temp.Target = append(temp.Target, host.IP+":9100") // node exporter
temp.Target = append(temp.Target, host.IP+":9011") // process exporter

if host.Nodetype == "mysql" || host.Nodetype == "galera" {

temp.Target = append(temp.Target, host.IP+":9104") // mysql exporter
}

if host.Nodetype == "haproxy" {
temp.Target = append(temp.Target, host.IP+":9600") // haproxy exporter
}

if host.Nodetype == "mongo" {
if host.Role == "shardsvr" {
switch host.Nodetype {
case "mysql", "galera":
temp.Target = append(temp.Target, host.IP+":9104")
case "haproxy":
temp.Target = append(temp.Target, host.IP+":9600")
case "mongo":
switch host.Role {
case "shardsvr":
temp.Target = append(temp.Target, host.IP+":9216") // mongo exporter
}
if host.Role == "mongos" {
case "mongos":
temp.Target = append(temp.Target, host.IP+":9215") // mongos exporter
}

if host.Role == "mongocfg" {
case "mongocfg":
temp.Target = append(temp.Target, host.IP+":9214") // mongocfg exporter
}
case "mssql":
temp.Target = append(temp.Target, host.IP+":9399")
case "postgres":
temp.Target = append(temp.Target, host.IP+":9187")
case "redis":
temp.Target = append(temp.Target, host.IP+":9121")
case "proxysql":
temp.Target = append(temp.Target, host.IP+":42004")
case "pgbouncer":
temp.Target = append(temp.Target, host.IP+":9127")
}

if host.Nodetype == "mssql" {
temp.Target = append(temp.Target, host.IP+":9399") // mssql exporter
}

if host.Nodetype == "postgres" {
temp.Target = append(temp.Target, host.IP+":9187") // postgres exporter
}

if host.Nodetype == "redis" {
temp.Target = append(temp.Target, host.IP+":9121") // redis exporter
}

if host.Nodetype == "proxysql" {
temp.Target = append(temp.Target, host.IP+":42004") // proxysql exporter
}

if host.Nodetype == "pgbouncer" {
temp.Target = append(temp.Target, host.IP+":9127") // pgbouncer exporter
}

}
temp.Target = removeDuplicateStr(temp.Target)

sort.Strings(temp.Target)
temp.Target = slices.Compact(temp.Target)
clusterTarget = append(clusterTarget, temp)
i++
}
Expand All @@ -151,43 +180,25 @@ func IndexHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(clusterTarget)
}

func removeDuplicateStr(strSlice []string) []string {
allKeys := make(map[string]bool)
list := []string{}
for _, item := range strSlice {
if _, value := allKeys[item]; !value {
allKeys[item] = true
list = append(list, item)
}
}
return list
}
func (s *Service) Handler() http.Handler {
r := http.NewServeMux()
r.HandleFunc("/", s.IndexHandler)

var port int

func init() {
flag.IntVar(&port, "p", 8080, "Listen port.")
flag.Parse()
return r
}

func main() {
cmonEndpoint = os.Getenv("CMON_ENDPOINT")
cmonUsername = os.Getenv("CMON_USERNAME")
cmonPassword = os.Getenv("CMON_PASSWORD")

if cmonEndpoint == "" {
cmonEndpoint = "https://127.0.0.1:9501"
}
var port int

if cmonUsername == "" {
log.Fatalf("Env variable CMON_USERNAME is not set.")
}
flag.IntVar(&port, "p", 8080, "Listen port.")
flag.Parse()

if cmonPassword == "" {
log.Fatalf("Env variable CMON_PASSWORD is not set.")
service, err := NewService()
if err != nil {
log.Fatalf("Error creating the service: %v", err)
}

http.HandleFunc("/", IndexHandler)
mux := service.Handler()
listenAddress := fmt.Sprintf(":%d", port)
log.Fatal(http.ListenAndServe(listenAddress, nil))
log.Fatal(http.ListenAndServe(listenAddress, mux))
}
Loading

0 comments on commit 42bfed6

Please sign in to comment.