Skip to content

Commit

Permalink
various fixes to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Dec 14, 2024
1 parent f921772 commit 409e39c
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 80 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.0
0.11.0
2 changes: 1 addition & 1 deletion internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex
if err != nil {
return pluginCtx, vm, err
}
metricsData.IncrementTotalRunningVMs()
metricsData.IncrementTotalRunningVMs(workerCtx, pluginCtx, logger)
return pluginCtx, vm, nil
}

Expand Down
13 changes: 9 additions & 4 deletions internal/github/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ func AuthenticateAndReturnGitHubClient(
}

// https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves.
func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) {
func ExecuteGitHubClientFunction[T any](
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
executeFunc func() (*T, *github.Response, error),
) (context.Context, *T, *github.Response, error) {
innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context
defer cancel()
result, response, err := executeFunc()
Expand All @@ -128,17 +133,17 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.
if err != nil {
return pluginCtx, nil, nil, err
}
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting
return ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, executeFunc) // Retry the function after waiting
case <-pluginCtx.Done():
return pluginCtx, nil, nil, pluginCtx.Err()
}
Expand Down
91 changes: 64 additions & 27 deletions internal/metrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import (
)

// Start runs the HTTP server
func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.Logger, soloReceiver bool) {
func (s *Server) StartAggregatorServer(
workerCtx context.Context,
logger *slog.Logger,
soloReceiver bool,
) {
http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) {
databaseContainer, err := database.GetDatabaseFromContext(workerCtx)
if err != nil {
Expand All @@ -30,7 +34,7 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L
if r.URL.Query().Get("format") == "json" {
s.handleAggregatorJsonMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r)
} else if r.URL.Query().Get("format") == "prometheus" {
s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r)
s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer)(w, r)
} else {
http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest)
}
Expand All @@ -42,7 +46,12 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L
http.ListenAndServe(":"+s.Port, nil)
}

func (s *Server) handleAggregatorJsonMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleAggregatorJsonMetrics(
workerCtx context.Context,
logger *slog.Logger,
databaseContainer *database.Database,
loadedConfig *config.Config,
) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
combinedMetrics := make(map[string]MetricsData)
// for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { // TODO: replace with iteration over metrics db keys as below
Expand All @@ -64,7 +73,11 @@ func (s *Server) handleAggregatorJsonMetrics(workerCtx context.Context, logger *
}
}

func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleAggregatorPrometheusMetrics(
workerCtx context.Context,
logger *slog.Logger,
databaseContainer *database.Database,
) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
match := "anklet/metrics/*"
Expand Down Expand Up @@ -101,9 +114,15 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo
var status string
var lastSuccessfulRunJobUrl string
var lastFailedRunJobUrl string
var lastCanceledRunJobUrl string
var lastSuccessfulRun time.Time
var lastFailedRun time.Time
var lastCanceledRun time.Time
var statusSince time.Time
var totalRanVMs int
var totalSuccessfulRunsSinceStart int
var totalFailedRunsSinceStart int
var totalCanceledRunsSinceStart int
pluginMap, ok := plugin.(map[string]interface{})
if !ok {
logger.ErrorContext(workerCtx, "error asserting plugin to map", "plugin", plugin)
Expand Down Expand Up @@ -134,6 +153,16 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo
logger.ErrorContext(workerCtx, "error parsing last failed run", "error", err)
return
}
lastCanceledRunJobUrl = pluginMap["LastCanceledRunJobUrl"].(string)
lastCanceledRun, err = time.Parse(time.RFC3339, pluginMap["LastCanceledRun"].(string))
if err != nil {
logger.ErrorContext(workerCtx, "error parsing last canceled run", "error", err)
return
}
totalRanVMs = int(pluginMap["TotalRanVMs"].(float64))
totalSuccessfulRunsSinceStart = int(pluginMap["TotalSuccessfulRunsSinceStart"].(float64))
totalFailedRunsSinceStart = int(pluginMap["TotalFailedRunsSinceStart"].(float64))
totalCanceledRunsSinceStart = int(pluginMap["TotalCanceledRunsSinceStart"].(float64))
}
if repoName == "" {
w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s} %s\n", Name, ownerName, status)))
Expand All @@ -144,15 +173,17 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo
if repoName == "" {
w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339))))
} else {
w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339))))
}
}
if repoName == "" {
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s} %s\n", Name, ownerName, statusSince.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,status=%s} %s\n", Name, ownerName, status, statusSince.Format(time.RFC3339))))
} else {
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s} %s\n", Name, ownerName, repoName, statusSince.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s,status=%s} %s\n", Name, ownerName, repoName, status, statusSince.Format(time.RFC3339))))
}

if strings.Contains(pluginName, "_receiver") {
Expand All @@ -162,40 +193,41 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo
}

if !soloReceiver {
w.Write([]byte(fmt.Sprintf("total_running_vms{name=%s} %d\n", Name, metricsData.TotalRunningVMs)))
w.Write([]byte(fmt.Sprintf("total_successful_runs_since_start{name=%s} %d\n", Name, metricsData.TotalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("total_failed_runs_since_start{name=%s} %d\n", Name, metricsData.TotalFailedRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalRanVMs)))
w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalFailedRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalCanceledRunsSinceStart)))
queued_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name).Result()
if err != nil {
logger.ErrorContext(workerCtx, "error querying queued queue length", "error", err)
return
}
w.Write([]byte(fmt.Sprintf("redis_jobs_queued{name=%s} %d\n", Name, queued_jobs)))
w.Write([]byte(fmt.Sprintf("redis_jobs_queued{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs)))
queued_jobs, err = databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name+"/cleaning").Result()
if err != nil {
logger.ErrorContext(workerCtx, "error querying queued cleaning queue length", "error", err)
return
}
w.Write([]byte(fmt.Sprintf("redis_jobs_queued_cleaning{name=%s} %d\n", Name, queued_jobs)))
w.Write([]byte(fmt.Sprintf("redis_jobs_queued_cleaning{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs)))
completed_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/completed/all-orgs/"+Name).Result()
if err != nil {
logger.ErrorContext(workerCtx, "error querying completed queue length", "error", err)
return
}
w.Write([]byte(fmt.Sprintf("redis_jobs_completed{name=%s} %d\n", Name, completed_jobs)))
w.Write([]byte(fmt.Sprintf("redis_jobs_completed{name=%s,owner=%s} %d\n", Name, ownerName, completed_jobs)))
}

w.Write([]byte(fmt.Sprintf("host_cpu_count{name=%s} %d\n", Name, metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count{name=%s} %d\n", Name, metricsData.HostCPUUsedCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{name=%s} %f\n", Name, metricsData.HostCPUUsagePercentage)))
w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{name=%s} %d\n", Name, metricsData.HostMemoryTotalBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{name=%s} %d\n", Name, metricsData.HostMemoryUsedBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{name=%s} %d\n", Name, metricsData.HostMemoryAvailableBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{name=%s} %f\n", Name, metricsData.HostMemoryUsagePercentage)))
w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{name=%s} %d\n", Name, metricsData.HostDiskTotalBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{name=%s} %d\n", Name, metricsData.HostDiskUsedBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{name=%s} %d\n", Name, metricsData.HostDiskAvailableBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{name=%s} %f\n", Name, metricsData.HostDiskUsagePercentage)))
w.Write([]byte(fmt.Sprintf("host_cpu_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUUsedCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostCPUUsagePercentage)))
w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryTotalBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryUsedBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryAvailableBytes)))
w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostMemoryUsagePercentage)))
w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskTotalBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskUsedBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskAvailableBytes)))
w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostDiskUsagePercentage)))
}
}
if nextCursor == 0 {
Expand Down Expand Up @@ -245,17 +277,22 @@ func ExportMetricsToDB(pluginCtx context.Context, logger *slog.Logger) {
logger.ErrorContext(pluginCtx, "error parsing metrics as json", "error", err.Error())
}
if pluginCtx.Err() == nil {
setting := databaseContainer.Client.Set(pluginCtx, "anklet/metrics/"+ctxPlugin.Name, metricsDataJson, time.Hour*24*7) // keep metrics for one week max
// This will create a single key using the first plugin's name. It will contain all plugin metrics though.
setting := databaseContainer.Client.Set(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, metricsDataJson, time.Hour*24*7) // keep metrics for one week max
if setting.Err() != nil {
logger.ErrorContext(pluginCtx, "error storing metrics data in Redis", "error", setting.Err())
return
}
exists, err := databaseContainer.Client.Exists(pluginCtx, "anklet/metrics/"+ctxPlugin.Name).Result()
exists, err := databaseContainer.Client.Exists(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", "anklet/metrics/"+ctxPlugin.Name, "error", err)
logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "error", err)
return
}
logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Name+" exists? "+string(exists))
if exists == 1 {
logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: true")
} else {
logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: false")
}
}
}
}
Expand Down
Loading

0 comments on commit 409e39c

Please sign in to comment.