Skip to content

Commit

Permalink
Add monitoring endpoint (#34)
Browse files Browse the repository at this point in the history
* Add monitoring endpoint
  • Loading branch information
robertjndw authored Jan 11, 2024
1 parent 3ebf3cd commit a1b7e56
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
8 changes: 8 additions & 0 deletions HadesAPI/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var BuildQueue *queue.Queue
var MonitorClient *MonitoringClient

type HadesAPIConfig struct {
APIPort uint `env:"API_PORT,notEmpty" envDefault:"8080"`
Expand All @@ -35,12 +36,19 @@ func main() {
return
}

MonitorClient, err = NewMonitoringClient(cfg.RabbitMQConfig.Url, cfg.RabbitMQConfig.User, cfg.RabbitMQConfig.Password)
if err != nil {
log.WithError(err).Fatal("Failed to connect to RabbitMQ Management API")
return
}

log.Infof("Starting HadesAPI on port %d", cfg.APIPort)
gin.SetMode(gin.ReleaseMode)

r := gin.Default()
r.GET("/ping", ping)
r.POST("/build", AddBuildToQueue)
r.GET("/monitoring", MonitoringQueue)

log.Panic(r.Run(fmt.Sprintf(":%d", cfg.APIPort)))
}
129 changes: 129 additions & 0 deletions HadesAPI/queue_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/Mtze/HadesCI/shared/payload"
log "github.com/sirupsen/logrus"
)

const monitoring_url = "http://%s:15672/api/queues/%%2F/%s" // %%2F is url encoded `/` for default vhost

type MonitoringValues struct {
MessageSize int `json:"message_size"`
ConsumerSize int `json:"consumer_size"`
Messages []payload.QueuePayload `json:"messages"`
}

type MonitoringClient struct {
endpoint string
user string
pass string
}

func NewMonitoringClient(q_url, user, pass string) (*MonitoringClient, error) {
u, err := url.Parse(q_url)
if err != nil {
log.WithError(err).Error("error parsing RabbitMQ URL")
return nil, err
}
endpoint := fmt.Sprintf(monitoring_url, u.Host, "builds")
return &MonitoringClient{endpoint, user, pass}, nil
}

func (m *MonitoringClient) getSizes() (message_size, consumer_size int) {
log.Debug("Getting queue size from ", m.endpoint)

req, _ := http.NewRequest("GET", m.endpoint, nil)
req.SetBasicAuth(m.user, m.pass)

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Error("error getting queue size")
return -1, -1
}
defer resp.Body.Close()
// Decode the JSON response into a struct
var queueInfo struct {
Messages int `json:"messages"`
Consumers int `json:"consumers"`
}
if err := json.NewDecoder(resp.Body).Decode(&queueInfo); err != nil {
log.WithError(err).Error("error decoding queue information")
return -1, -1
}
return queueInfo.Messages, queueInfo.Consumers
}

func (m *MonitoringClient) GetQueueState() MonitoringValues {
msg_size, cons_size := m.getSizes()
// No messages in queue, save extra request
if msg_size < 1 {
return MonitoringValues{
MessageSize: msg_size,
ConsumerSize: cons_size,
}
}

req_payload := struct {
Count int `json:"count"`
AckMode string `json:"ackmode"`
Encoding string `json:"encoding"`
}{
Count: msg_size,
AckMode: "ack_requeue_true",
Encoding: "auto",
}

// Marshal the struct into JSON
jsonValue, err := json.Marshal(req_payload)
if err != nil {
log.Fatal(err)
}

req, _ := http.NewRequest("POST", m.endpoint+"/get", bytes.NewBuffer(jsonValue))
req.SetBasicAuth(m.user, m.pass)
req.Header.Set("Content-Type", "application/json")

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Error("error getting queue size")
return MonitoringValues{}
}
defer resp.Body.Close()
// Decode the JSON response into a struct
var messages []struct {
PayloadString string `json:"payload"`
}
if err := json.NewDecoder(resp.Body).Decode(&messages); err != nil {
log.WithError(err).Error("error decoding queue information")
return MonitoringValues{}
}
// Extracting just the payloads
var payloads []payload.QueuePayload
for _, message := range messages {
var payload payload.QueuePayload
err := json.Unmarshal([]byte(message.PayloadString), &payload)
if err != nil {
log.WithError(err).Error("error decoding queue information")
continue
}
// Filter confidential information
payload.Metadata = map[string]string{}
for i := range payload.Steps {
payload.Steps[i].Metadata = map[string]string{}
}
payloads = append(payloads, payload)
}
return MonitoringValues{
MessageSize: msg_size,
ConsumerSize: cons_size,
Messages: payloads,
}
}
5 changes: 5 additions & 0 deletions HadesAPI/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ func AddBuildToQueue(c *gin.Context) {
log.Debug("Received build request ", payload)
BuildQueue.Enqueue(c.Request.Context(), payload.QueuePayload, uint8(payload.Priority))
}

func MonitoringQueue(c *gin.Context) {
state := MonitorClient.GetQueueState()
c.JSON(http.StatusOK, state)
}

0 comments on commit a1b7e56

Please sign in to comment.