diff --git a/VERSION b/VERSION index 2774f85..142464b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.0 \ No newline at end of file +0.11.0 \ No newline at end of file diff --git a/internal/anka/cli.go b/internal/anka/cli.go index 08e3993..07c97e3 100644 --- a/internal/anka/cli.go +++ b/internal/anka/cli.go @@ -276,7 +276,7 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex if err != nil { return pluginCtx, vm, err } - metricsData.IncrementTotalRunningVMs() + metricsData.IncrementTotalRunningVMs(workerCtx, pluginCtx, logger) return pluginCtx, vm, nil } diff --git a/internal/github/client.go b/internal/github/client.go index 4574c9a..e885735 100644 --- a/internal/github/client.go +++ b/internal/github/client.go @@ -106,7 +106,12 @@ func AuthenticateAndReturnGitHubClient( } // https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves. -func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) { +func ExecuteGitHubClientFunction[T any]( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + executeFunc func() (*T, *github.Response, error), +) (context.Context, *T, *github.Response, error) { innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context defer cancel() result, response, err := executeFunc() @@ -128,17 +133,17 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog. if err != nil { return pluginCtx, nil, nil, err } - metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{ + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{ Name: ctxPlugin.Name, Status: "limit_paused", }) select { case <-time.After(sleepDuration): - metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{ + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{ Name: ctxPlugin.Name, Status: "running", }) - return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting + return ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, executeFunc) // Retry the function after waiting case <-pluginCtx.Done(): return pluginCtx, nil, nil, pluginCtx.Err() } diff --git a/internal/metrics/aggregator.go b/internal/metrics/aggregator.go index 6ff5f5a..f9c44b0 100644 --- a/internal/metrics/aggregator.go +++ b/internal/metrics/aggregator.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "net/http" "strings" @@ -12,10 +11,15 @@ import ( "github.com/veertuinc/anklet/internal/config" "github.com/veertuinc/anklet/internal/database" + "github.com/veertuinc/anklet/internal/logging" ) // Start runs the HTTP server -func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.Logger, soloReceiver bool) { +func (s *Server) StartAggregatorServer( + workerCtx context.Context, + logger *slog.Logger, + soloReceiver bool, +) { http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) { databaseContainer, err := database.GetDatabaseFromContext(workerCtx) if err != nil { @@ -30,7 +34,7 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L if r.URL.Query().Get("format") == "json" { s.handleAggregatorJsonMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r) } else if r.URL.Query().Get("format") == "prometheus" { - s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r) + s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer)(w, r) } else { http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest) } @@ -42,166 +46,255 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L http.ListenAndServe(":"+s.Port, nil) } -func (s *Server) handleAggregatorJsonMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleAggregatorJsonMetrics( + workerCtx context.Context, + logger *slog.Logger, + databaseContainer *database.Database, + loadedConfig *config.Config, +) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { combinedMetrics := make(map[string]MetricsData) - for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { - value, err := databaseContainer.Client.Get(workerCtx, metricsURL).Result() - if err != nil { - logger.ErrorContext(workerCtx, "error getting value from Redis", "key", metricsURL, "error", err) - return - } - var metricsData MetricsData - err = json.Unmarshal([]byte(value), &metricsData) - if err != nil { - logger.ErrorContext(workerCtx, "error unmarshalling metrics data", "error", err) - return - } - combinedMetrics[metricsURL] = metricsData - } + // for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { // TODO: replace with iteration over metrics db keys as below + // value, err := databaseContainer.Client.Get(workerCtx, metricsURL).Result() + // if err != nil { + // logger.ErrorContext(workerCtx, "error getting value from Redis", "key", "error", err) + // return + // } + // var metricsData MetricsData + // err = json.Unmarshal([]byte(value), &metricsData) + // if err != nil { + // logger.ErrorContext(workerCtx, "error unmarshalling metrics data", "error", err) + // return + // } + // combinedMetrics[metricsURL] = metricsData + // } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(combinedMetrics) } } -func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleAggregatorPrometheusMetrics( + workerCtx context.Context, + logger *slog.Logger, + databaseContainer *database.Database, +) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") - for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { - value, err := databaseContainer.Client.Get(workerCtx, metricsURL).Result() + match := "anklet/metrics/*" + var cursor uint64 + for { + databaseContainer, err := database.GetDatabaseFromContext(workerCtx) if err != nil { - logger.ErrorContext(workerCtx, "error getting value from Redis", "key", metricsURL, "error", err) - return + logger.ErrorContext(workerCtx, "error getting database client from context", "error", err) + break } - var metricsData MetricsData - err = json.Unmarshal([]byte(value), &metricsData) + keys, nextCursor, err := databaseContainer.Client.Scan(workerCtx, cursor, match, 10).Result() if err != nil { - logger.ErrorContext(workerCtx, "error unmarshalling metrics data", "error", err) - return + logger.ErrorContext(workerCtx, "error scanning database for metrics", "error", err) + break } - soloReceiver := false - for _, plugin := range metricsData.Plugins { - var pluginName string - var Name string - var ownerName string - var repoName string - var status string - var lastSuccessfulRunJobUrl string - var lastFailedRunJobUrl string - var lastSuccessfulRun time.Time - var lastFailedRun time.Time - var statusSince time.Time - pluginMap, ok := plugin.(map[string]interface{}) - if !ok { - logger.ErrorContext(workerCtx, "error asserting plugin to map", "plugin", plugin) + for _, key := range keys { + value, err := databaseContainer.Client.Get(workerCtx, key).Result() + if err != nil { + logger.ErrorContext(workerCtx, "error getting value from Redis", "key", key, "error", err) return } - pluginName = pluginMap["plugin_name"].(string) - Name = pluginMap["name"].(string) - ownerName = pluginMap["owner_name"].(string) - if pluginMap["repo_name"] != nil { - repoName = pluginMap["repo_name"].(string) - } - status = pluginMap["status"].(string) - statusSince, err = time.Parse(time.RFC3339, pluginMap["status_since"].(string)) + var metricsData MetricsData + err = json.Unmarshal([]byte(value), &metricsData) if err != nil { - logger.ErrorContext(workerCtx, "error parsing status since", "error", err) + logger.ErrorContext(workerCtx, "error unmarshalling metrics data", "error", err) return } - if !strings.Contains(pluginName, "_receiver") { - lastSuccessfulRunJobUrl = pluginMap["last_successful_run_job_url"].(string) - lastFailedRunJobUrl = pluginMap["last_failed_run_job_url"].(string) - lastSuccessfulRun, err = time.Parse(time.RFC3339, pluginMap["last_successful_run"].(string)) - if err != nil { - logger.ErrorContext(workerCtx, "error parsing last successful run", "error", err) + for _, plugin := range metricsData.Plugins { + soloReceiver := false + var pluginName string + var Name string + var ownerName string + var repoName string + var status string + var lastSuccessfulRunJobUrl string + var lastFailedRunJobUrl string + var lastCanceledRunJobUrl string + var lastSuccessfulRun time.Time + var lastFailedRun time.Time + var lastCanceledRun time.Time + var statusSince time.Time + var totalRanVMs int + var totalSuccessfulRunsSinceStart int + var totalFailedRunsSinceStart int + var totalCanceledRunsSinceStart int + pluginMap, ok := plugin.(map[string]interface{}) + if !ok { + logger.ErrorContext(workerCtx, "error asserting plugin to map", "plugin", plugin) return } - lastFailedRun, err = time.Parse(time.RFC3339, pluginMap["last_failed_run"].(string)) + pluginName = pluginMap["PluginName"].(string) + Name = pluginMap["Name"].(string) + ownerName = pluginMap["OwnerName"].(string) + if pluginMap["RepoName"] != nil { + repoName = pluginMap["RepoName"].(string) + } + status = pluginMap["Status"].(string) + statusSince, err = time.Parse(time.RFC3339, pluginMap["StatusSince"].(string)) if err != nil { - logger.ErrorContext(workerCtx, "error parsing last failed run", "error", err) + logger.ErrorContext(workerCtx, "error parsing status since", "error", err) return } - } - if repoName == "" { - w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s,metricsUrl=%s} %s\n", Name, ownerName, metricsURL, status))) - } else { - w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", Name, ownerName, repoName, metricsURL, status))) - } - if !strings.Contains(pluginName, "_receiver") { + if !strings.Contains(pluginName, "_receiver") { + lastSuccessfulRunJobUrl = pluginMap["LastSuccessfulRunJobUrl"].(string) + lastFailedRunJobUrl = pluginMap["LastFailedRunJobUrl"].(string) + lastSuccessfulRun, err = time.Parse(time.RFC3339, pluginMap["LastSuccessfulRun"].(string)) + if err != nil { + logger.ErrorContext(workerCtx, "error parsing last successful run", "error", err) + return + } + lastFailedRun, err = time.Parse(time.RFC3339, pluginMap["LastFailedRun"].(string)) + if err != nil { + logger.ErrorContext(workerCtx, "error parsing last failed run", "error", err) + return + } + lastCanceledRunJobUrl = pluginMap["LastCanceledRunJobUrl"].(string) + lastCanceledRun, err = time.Parse(time.RFC3339, pluginMap["LastCanceledRun"].(string)) + if err != nil { + logger.ErrorContext(workerCtx, "error parsing last canceled run", "error", err) + return + } + totalRanVMs = int(pluginMap["TotalRanVMs"].(float64)) + totalSuccessfulRunsSinceStart = int(pluginMap["TotalSuccessfulRunsSinceStart"].(float64)) + totalFailedRunsSinceStart = int(pluginMap["TotalFailedRunsSinceStart"].(float64)) + totalCanceledRunsSinceStart = int(pluginMap["TotalCanceledRunsSinceStart"].(float64)) + } if repoName == "" { - w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,job_url=%s,metricsUrl=%s} %s\n", Name, ownerName, lastSuccessfulRunJobUrl, metricsURL, lastSuccessfulRun.Format(time.RFC3339)))) - w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,job_url=%s,metricsUrl=%s} %s\n", Name, ownerName, lastFailedRunJobUrl, metricsURL, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s} %s\n", Name, ownerName, status))) } else { - w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", Name, ownerName, repoName, lastSuccessfulRunJobUrl, metricsURL, lastSuccessfulRun.Format(time.RFC3339)))) - w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", Name, ownerName, repoName, lastFailedRunJobUrl, metricsURL, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s,repo=%s} %s\n", Name, ownerName, repoName, status))) } - } - if repoName == "" { - w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,metricsUrl=%s} %s\n", Name, ownerName, metricsURL, statusSince.Format(time.RFC3339)))) - } else { - w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", Name, ownerName, repoName, metricsURL, statusSince.Format(time.RFC3339)))) - } - if strings.Contains(pluginName, "_receiver") { - soloReceiver = true - } else { - soloReceiver = false + if !strings.Contains(pluginName, "_receiver") { + if repoName == "" { + w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339)))) + } else { + w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339)))) + } + } + if repoName == "" { + w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,status=%s} %s\n", Name, ownerName, status, statusSince.Format(time.RFC3339)))) + } else { + w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s,status=%s} %s\n", Name, ownerName, repoName, status, statusSince.Format(time.RFC3339)))) + } + + if strings.Contains(pluginName, "_receiver") { + soloReceiver = true + } else { + soloReceiver = false + } + + if !soloReceiver { + w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalRanVMs))) + w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalSuccessfulRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalFailedRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalCanceledRunsSinceStart))) + queued_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name).Result() + if err != nil { + logger.ErrorContext(workerCtx, "error querying queued queue length", "error", err) + return + } + w.Write([]byte(fmt.Sprintf("redis_jobs_queued{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs))) + queued_jobs, err = databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name+"/cleaning").Result() + if err != nil { + logger.ErrorContext(workerCtx, "error querying queued cleaning queue length", "error", err) + return + } + w.Write([]byte(fmt.Sprintf("redis_jobs_queued_cleaning{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs))) + completed_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/completed/all-orgs/"+Name).Result() + if err != nil { + logger.ErrorContext(workerCtx, "error querying completed queue length", "error", err) + return + } + w.Write([]byte(fmt.Sprintf("redis_jobs_completed{name=%s,owner=%s} %d\n", Name, ownerName, completed_jobs))) + } + + w.Write([]byte(fmt.Sprintf("host_cpu_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUCount))) + w.Write([]byte(fmt.Sprintf("host_cpu_used_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUUsedCount))) + w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostCPUUsagePercentage))) + w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryTotalBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryUsedBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryAvailableBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostMemoryUsagePercentage))) + w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskTotalBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskUsedBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskAvailableBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostDiskUsagePercentage))) } } - if !soloReceiver { - w.Write([]byte(fmt.Sprintf("total_running_vms{metricsUrl=%s} %d\n", metricsURL, metricsData.TotalRunningVMs))) - w.Write([]byte(fmt.Sprintf("total_successful_runs_since_start{metricsUrl=%s} %d\n", metricsURL, metricsData.TotalSuccessfulRunsSinceStart))) - w.Write([]byte(fmt.Sprintf("total_failed_runs_since_start{metricsUrl=%s} %d\n", metricsURL, metricsData.TotalFailedRunsSinceStart))) + if nextCursor == 0 { + break } - - w.Write([]byte(fmt.Sprintf("host_cpu_count{metricsUrl=%s} %d\n", metricsURL, metricsData.HostCPUCount))) - w.Write([]byte(fmt.Sprintf("host_cpu_used_count{metricsUrl=%s} %d\n", metricsURL, metricsData.HostCPUUsedCount))) - w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{metricsUrl=%s} %f\n", metricsURL, metricsData.HostCPUUsagePercentage))) - w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostMemoryTotalBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostMemoryUsedBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostMemoryAvailableBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{metricsUrl=%s} %f\n", metricsURL, metricsData.HostMemoryUsagePercentage))) - w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostDiskTotalBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostDiskUsedBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{metricsUrl=%s} %d\n", metricsURL, metricsData.HostDiskAvailableBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{metricsUrl=%s} %f\n", metricsURL, metricsData.HostDiskUsagePercentage))) + cursor = nextCursor + } + // global queues + queued_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs").Result() + if err != nil { + logger.ErrorContext(workerCtx, "error querying queued queue length", "error", err) + return + } + w.Write([]byte(fmt.Sprintf("redis_jobs_queued_total %d\n", queued_jobs))) + completed_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/completed/all-orgs").Result() + if err != nil { + logger.ErrorContext(workerCtx, "error querying completed queue length", "error", err) + return } + w.Write([]byte(fmt.Sprintf("redis_jobs_completed_total %d\n", completed_jobs))) } } -func UpdatemetricsURLDBEntry(pluginCtx context.Context, logger *slog.Logger, metricsURL string) { - resp, err := http.Get(metricsURL + "?format=json") - if err != nil { - logger.ErrorContext(pluginCtx, "error fetching metrics from url", "metrics_url", metricsURL, "error", err) - return - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - logger.ErrorContext(pluginCtx, "error reading response body", "metrics_url", metricsURL, "error", err) - return - } - var metricsData MetricsData - err = json.Unmarshal(body, &metricsData) - if err != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling metrics data", "metrics_url", metricsURL, "error", err) - return - } - logger.DebugContext(pluginCtx, "obtained metrics from url", "metrics", metricsData) - databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) - if err != nil { - logger.ErrorContext(pluginCtx, "error getting database client from context", "error", err) - return - } - // Store the JSON data in Redis - setting := databaseContainer.Client.Set(pluginCtx, metricsURL, body, 0) - if setting.Err() != nil { - logger.ErrorContext(pluginCtx, "error storing metrics data in Redis", "error", setting.Err()) - return - } - exists, err := databaseContainer.Client.Exists(pluginCtx, metricsURL).Result() - if err != nil { - logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", metricsURL, "error", err) - return - } - logger.DebugContext(pluginCtx, "successfully stored metrics data in Redis", "key", metricsURL, "exists", exists) +func ExportMetricsToDB(pluginCtx context.Context, logger *slog.Logger) { + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-pluginCtx.Done(): + return + case <-ticker.C: + logging.DevContext(pluginCtx, "Exporting metrics to database") + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting plugin from context", "error", err.Error()) + } + databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting database client from context", "error", err.Error()) + } + metricsData, err := GetMetricsDataFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting metrics data from context", "error", err.Error()) + } + metricsDataJson, err := json.Marshal(metricsData.MetricsData) + if err != nil { + logger.ErrorContext(pluginCtx, "error parsing metrics as json", "error", err.Error()) + } + if pluginCtx.Err() == nil { + // This will create a single key using the first plugin's name. It will contain all plugin metrics though. + setting := databaseContainer.Client.Set(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, metricsDataJson, time.Hour*24*7) // keep metrics for one week max + if setting.Err() != nil { + logger.ErrorContext(pluginCtx, "error storing metrics data in Redis", "error", setting.Err()) + return + } + exists, err := databaseContainer.Client.Exists(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result() + if err != nil { + logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "error", err) + return + } + if exists == 1 { + logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: true") + } else { + logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: false") + } + } + } + } + }() } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index ee03acb..a3741f5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -32,12 +32,16 @@ type PluginBase struct { type Plugin struct { *PluginBase - LastSuccessfulRunJobUrl string - LastFailedRunJobUrl string - LastCanceledRunJobUrl string - LastSuccessfulRun time.Time - LastFailedRun time.Time - LastCanceledRun time.Time + LastSuccessfulRunJobUrl string + LastFailedRunJobUrl string + LastCanceledRunJobUrl string + LastSuccessfulRun time.Time + LastFailedRun time.Time + LastCanceledRun time.Time + TotalRanVMs int + TotalSuccessfulRunsSinceStart int + TotalFailedRunsSinceStart int + TotalCanceledRunsSinceStart int } type MetricsData struct { @@ -94,10 +98,15 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) error { return nil } -func (m *MetricsDataLock) IncrementTotalRunningVMs() { +func (m *MetricsDataLock) IncrementTotalRunningVMs( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalRunningVMs++ + m.IncrementPluginTotalRanVMs(workerCtx, pluginCtx, logger) } func (m *MetricsDataLock) DecrementTotalRunningVMs() { @@ -108,22 +117,133 @@ func (m *MetricsDataLock) DecrementTotalRunningVMs() { } } -func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalSuccessfulRunsSinceStart++ + m.IncrementPluginTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger) } -func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalFailedRunsSinceStart++ + m.IncrementPluginTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) } -func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalCanceledRunsSinceStart++ + m.IncrementPluginTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger) +} + +func (m *MetricsDataLock) IncrementPluginTotalRanVMs( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalRanVMs: typedPlugin.TotalRanVMs + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalSuccessfulRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalSuccessfulRunsSinceStart: typedPlugin.TotalSuccessfulRunsSinceStart + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalFailedRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalFailedRunsSinceStart: typedPlugin.TotalFailedRunsSinceStart + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalCanceledRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalCanceledRunsSinceStart: typedPlugin.TotalCanceledRunsSinceStart + 1, + }) + } + } + } } func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) (interface{}, error) { @@ -157,6 +277,18 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface if updated.LastCanceledRunJobUrl != "" { currentServiceTyped.LastCanceledRunJobUrl = updated.LastCanceledRunJobUrl } + if updated.TotalCanceledRunsSinceStart > currentServiceTyped.TotalCanceledRunsSinceStart { + currentServiceTyped.TotalCanceledRunsSinceStart = updated.TotalCanceledRunsSinceStart + } + if updated.TotalFailedRunsSinceStart > currentServiceTyped.TotalFailedRunsSinceStart { + currentServiceTyped.TotalFailedRunsSinceStart = updated.TotalFailedRunsSinceStart + } + if updated.TotalSuccessfulRunsSinceStart > currentServiceTyped.TotalSuccessfulRunsSinceStart { + currentServiceTyped.TotalSuccessfulRunsSinceStart = updated.TotalSuccessfulRunsSinceStart + } + if updated.TotalRanVMs > currentServiceTyped.TotalRanVMs { + currentServiceTyped.TotalRanVMs = updated.TotalRanVMs + } return currentServiceTyped, nil case PluginBase: updated, ok := updatedPlugin.(PluginBase) @@ -214,7 +346,12 @@ func UpdateSystemMetrics(pluginCtx context.Context, logger *slog.Logger, metrics metricsData.HostDiskUsedBytes = uint64(diskStat.Used) } -func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { +func UpdatePlugin( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + updatedPlugin interface{}, +) error { ctxPlugin, err := config.GetPluginFromContext(pluginCtx) if err != nil { return err @@ -254,7 +391,12 @@ func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger * return nil } -func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { +func (m *MetricsDataLock) UpdatePlugin( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + updatedPlugin interface{}, +) error { m.Lock() defer m.Unlock() var name string @@ -404,6 +546,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http. pluginMap["last_successful_run"] = s.LastSuccessfulRun pluginMap["last_failed_run"] = s.LastFailedRun pluginMap["last_canceled_run"] = s.LastCanceledRun + pluginMap["total_ran_vms"] = s.TotalRanVMs + pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart + pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart + pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart case PluginBase: pluginMap["name"] = s.Name pluginMap["plugin_name"] = s.PluginName @@ -473,6 +619,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http. pluginMap["last_failed_run"] = s.LastFailedRun pluginMap["last_canceled_run_job_url"] = s.LastCanceledRunJobUrl pluginMap["last_canceled_run"] = s.LastCanceledRun + pluginMap["total_ran_vms"] = s.TotalRanVMs + pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart + pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart + pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart case PluginBase: pluginMap["name"] = s.Name pluginMap["plugin_name"] = s.PluginName @@ -515,6 +665,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) var lastSuccessfulRunJobUrl string var lastFailedRunJobUrl string var lastCanceledRunJobUrl string + var totalRanVMs int + var totalSuccessfulRunsSinceStart int + var totalFailedRunsSinceStart int + var totalCanceledRunsSinceStart int switch plugin := service.(type) { case Plugin: name = plugin.Name @@ -531,6 +685,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) lastSuccessfulRunJobUrl = plugin.LastSuccessfulRunJobUrl lastFailedRunJobUrl = plugin.LastFailedRunJobUrl lastCanceledRunJobUrl = plugin.LastCanceledRunJobUrl + totalRanVMs = plugin.TotalRanVMs + totalSuccessfulRunsSinceStart = plugin.TotalSuccessfulRunsSinceStart + totalFailedRunsSinceStart = plugin.TotalFailedRunsSinceStart + totalCanceledRunsSinceStart = plugin.TotalCanceledRunsSinceStart case PluginBase: name = plugin.Name pluginName = plugin.PluginName @@ -564,6 +722,12 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) } else { w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, StatusSince.Format(time.RFC3339)))) } + if !soloReceiver { + w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalRanVMs))) + w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalSuccessfulRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalFailedRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalCanceledRunsSinceStart))) + } } w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount))) w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount))) diff --git a/main.go b/main.go index b599fc4..d2c0e2e 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "log/slog" "net" "net/http" - "net/url" "os" "os/signal" "path/filepath" @@ -284,6 +283,7 @@ func worker( databaseDatabase = loadedConfig.GlobalDatabaseDatabase } + // TODO: move this into a function/different file // Setup Metrics Server and context metricsPort := "8080" if loadedConfig.Metrics.Port != "" { @@ -308,7 +308,6 @@ func worker( ln.Close() metricsService := metrics.NewServer(metricsPort) if loadedConfig.Metrics.Aggregator { - workerCtx = logging.AppendCtx(workerCtx, slog.Any("metrics_urls", loadedConfig.Metrics.MetricsURLs)) if databaseURL == "" { // if no global database URL is set, use the metrics database URL databaseURL = loadedConfig.Metrics.Database.URL databasePort = loadedConfig.Metrics.Database.Port @@ -330,41 +329,26 @@ func worker( workerCtx = context.WithValue(workerCtx, config.ContextKey("database"), databaseContainer) go metricsService.StartAggregatorServer(workerCtx, parentLogger, false) parentLogger.InfoContext(workerCtx, "metrics aggregator started on port "+metricsPort) - for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { - wg.Add(1) - go func(metricsURL string) { - defer wg.Done() - pluginCtx, pluginCancel := context.WithCancel(workerCtx) // Inherit from parent context - pluginCtx = logging.AppendCtx(pluginCtx, slog.String("metrics_url", metricsURL)) - // check if valid URL - _, err = url.Parse(metricsURL) - if err != nil { - parentLogger.ErrorContext(pluginCtx, "invalid URL", "error", err) + wg.Add(1) + defer wg.Done() + pluginCtx, pluginCancel := context.WithCancel(workerCtx) // Inherit from parent context + for { + select { + case <-workerCtx.Done(): + pluginCancel() + parentLogger.WarnContext(pluginCtx, shutDownMessage) + return + default: + if workerCtx.Err() != nil || toRunOnce == "true" { pluginCancel() - return + break } - for { - select { - case <-workerCtx.Done(): - pluginCancel() - parentLogger.WarnContext(pluginCtx, shutDownMessage) - return - default: - // get metrics from endpoint and update the main list - metrics.UpdatemetricsURLDBEntry(pluginCtx, parentLogger, metricsURL) - if workerCtx.Err() != nil || toRunOnce == "true" { - parentLogger.DebugContext(pluginCtx, "workerCtx.Err() != nil || toRunOnce == true") - pluginCancel() - break - } - select { - case <-time.After(time.Duration(loadedConfig.Metrics.SleepInterval) * time.Second): - case <-pluginCtx.Done(): - break - } - } + select { + case <-time.After(time.Duration(loadedConfig.Metrics.SleepInterval) * time.Second): + case <-pluginCtx.Done(): + break } - }(metricsURL) + } } } else { // firstPluginStarted: always make sure the first plugin in the config starts first before any others. diff --git a/plugins/handlers/github/github.go b/plugins/handlers/github/github.go index 8001039..8000168 100644 --- a/plugins/handlers/github/github.go +++ b/plugins/handlers/github/github.go @@ -35,6 +35,8 @@ type WorkflowRunJobDetail struct { Conclusion string } +var once sync.Once + // func exists_in_array_exact(array_to_search_in []string, desired []string) bool { // for _, desired_string := range desired { // found := false @@ -181,6 +183,7 @@ func extractLabelValue(labels []string, prefix string) string { } func sendCancelWorkflowRun( + workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, workflow WorkflowRunJobDetail, @@ -196,7 +199,7 @@ func sendCancelWorkflowRun( } cancelSent := false for { - newPluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { + newPluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { workflowRun, resp, err := githubClient.Actions.GetWorkflowRunByID(context.Background(), pluginConfig.Owner, workflow.Repo, workflow.RunID) return workflowRun, resp, err }) @@ -208,8 +211,8 @@ func sendCancelWorkflowRun( if *workflowRun.Status == "completed" || (workflowRun.Conclusion != nil && *workflowRun.Conclusion == "cancelled") || cancelSent { - metricsData.IncrementTotalCanceledRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -220,7 +223,7 @@ func sendCancelWorkflowRun( } else { logger.WarnContext(pluginCtx, "workflow run is still active... waiting for cancellation so we can clean up...", "workflow_run_id", workflow.RunID) if !cancelSent { // this has to happen here so that it doesn't error with "409 Cannot cancel a workflow run that is completed. " if the job is already cancelled - newPluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + newPluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { resp, err := githubClient.Actions.CancelWorkflowRunByID(context.Background(), pluginConfig.Owner, workflow.Repo, workflow.RunID) return resp, nil, err }) @@ -561,6 +564,10 @@ func Run( }, }) + once.Do(func() { + metrics.ExportMetricsToDB(pluginCtx, logger) + }) + configFileName, err := config.GetConfigFileNameFromContext(pluginCtx) if err != nil { return pluginCtx, err @@ -685,7 +692,7 @@ func Run( } if err != nil { logger.ErrorContext(pluginCtx, "error getting queued jobs", "err", err) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) return pluginCtx, fmt.Errorf("error getting queued jobs: %s", err.Error()) } databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+pluginConfig.Owner+"/"+pluginConfig.Name, eldestQueuedJob) @@ -776,7 +783,7 @@ func Run( } if noTemplateTagExistsError != nil { logger.ErrorContext(pluginCtx, "error ensuring vm template exists on host", "err", noTemplateTagExistsError) - err := sendCancelWorkflowRun(pluginCtx, logger, workflowJob, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, workflowJob, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) } @@ -796,19 +803,19 @@ func Run( var response *github.Response var err error if isRepoSet { - pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { + pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { runnerRegistration, resp, err := githubClient.Actions.CreateRegistrationToken(context.Background(), pluginConfig.Owner, pluginConfig.Repo) return runnerRegistration, resp, err }) } else { - pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { + pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { runnerRegistration, resp, err := githubClient.Actions.CreateOrganizationRegistrationToken(context.Background(), pluginConfig.Owner) return runnerRegistration, resp, err }) } if err != nil { logger.DebugContext(pluginCtx, "error creating registration token", "err", err, "response", response) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) retryChannel <- true return pluginCtx, fmt.Errorf("error creating registration token: %s", err.Error()) } @@ -877,7 +884,7 @@ func Run( _, startRunnerErr := os.Stat(startRunnerPath) if installRunnerErr != nil || registerRunnerErr != nil || startRunnerErr != nil { // logger.ErrorContext(pluginCtx, "must include install-runner.bash, register-runner.bash, and start-runner.bash in "+globals.PluginsPath+"/handlers/github/", "err", err) - err := sendCancelWorkflowRun(pluginCtx, logger, workflowJob, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, workflowJob, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) } @@ -893,7 +900,7 @@ func Run( ) if err != nil { // logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) retryChannel <- true return pluginCtx, fmt.Errorf("error executing anka copy: %s", err.Error()) } @@ -937,7 +944,7 @@ func Run( retryChannel <- true return pluginCtx, fmt.Errorf("error executing register-runner.bash: %s", registerRunnerErr.Error()) } - defer removeSelfHostedRunner(pluginCtx, *vm, &workflowJob, metricsData) + defer removeSelfHostedRunner(workerCtx, pluginCtx, *vm, &workflowJob, metricsData) // Start runner select { case <-completedJobChannel: @@ -989,8 +996,8 @@ func Run( if err != nil { return pluginCtx, err } - metricsData.IncrementTotalSuccessfulRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -1002,8 +1009,8 @@ func Run( if err != nil { return pluginCtx, err } - metricsData.IncrementTotalFailedRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -1094,6 +1101,7 @@ func Run( // removeSelfHostedRunner handles removing a registered runner if the registered runner was orphaned somehow // it's extra safety should the runner not be registered with --ephemeral func removeSelfHostedRunner( + workerCtx context.Context, pluginCtx context.Context, vm anka.VM, workflow *WorkflowRunJobDetail, @@ -1120,12 +1128,12 @@ func removeSelfHostedRunner( } if workflow.Conclusion == "failure" { if isRepoSet { - pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Runners, *github.Response, error) { + pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Runners, *github.Response, error) { runnersList, resp, err := githubClient.Actions.ListRunners(context.Background(), pluginConfig.Owner, pluginConfig.Repo, &github.ListRunnersOptions{}) return runnersList, resp, err }) } else { - pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Runners, *github.Response, error) { + pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Runners, *github.Response, error) { runnersList, resp, err := githubClient.Actions.ListOrganizationRunners(context.Background(), pluginConfig.Owner, &github.ListRunnersOptions{}) return runnersList, resp, err }) @@ -1148,18 +1156,18 @@ func removeSelfHostedRunner( "ankaTemplateTag": "(using latest)", "err": "DELETE https://api.github.com/repos/veertuinc/anklet/actions/runners/142: 422 Bad request - Runner \"anklet-vm-\u003cuuid\u003e\" is still running a job\" []", */ - err := sendCancelWorkflowRun(pluginCtx, logger, *workflow, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, *workflow, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) return } if isRepoSet { - pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { response, err := githubClient.Actions.RemoveRunner(context.Background(), pluginConfig.Owner, pluginConfig.Repo, *runner.ID) return response, nil, err }) } else { - pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { response, err := githubClient.Actions.RemoveOrganizationRunner(context.Background(), pluginConfig.Owner, *runner.ID) return response, nil, err }) diff --git a/plugins/receivers/github/github.go b/plugins/receivers/github/github.go index 1ab95a1..0462798 100644 --- a/plugins/receivers/github/github.go +++ b/plugins/receivers/github/github.go @@ -25,6 +25,8 @@ type Server struct { Port string } +var once sync.Once + // NewServer creates a new instance of Server func NewServer(port string) *Server { return &Server{ @@ -105,6 +107,9 @@ func Run( }, ) + once.Do(func() { + metrics.ExportMetricsToDB(pluginCtx, logger) + }) configFileName, err := config.GetConfigFileNameFromContext(pluginCtx) if err != nil { return pluginCtx, err @@ -498,7 +503,7 @@ func Run( // var response *github.Response var err error if isRepoSet { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, opts) if err != nil { return nil, nil, err @@ -506,7 +511,7 @@ func Run( return &hookDeliveries, response, nil }) } else { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Organizations.ListHookDeliveries(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, opts) if err != nil { return nil, nil, err @@ -589,7 +594,7 @@ MainLoop: var gottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -597,7 +602,7 @@ MainLoop: return gottenHookDelivery, response, nil }) } else { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -709,7 +714,7 @@ MainLoop: var otherGottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { otherGottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -717,7 +722,7 @@ MainLoop: return otherGottenHookDelivery, response, nil }) } else { - pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { otherGottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -754,7 +759,7 @@ MainLoop: // Redeliver the hook var redelivery *github.HookDelivery if isRepoSet { - pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -762,7 +767,7 @@ MainLoop: return redelivery, response, nil }) } else { - pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { redelivery, response, err := githubClient.Organizations.RedeliverHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err diff --git a/webhook-delivery-tool/main.go b/webhook-delivery-tool/main.go index 83dbce1..c688436 100644 --- a/webhook-delivery-tool/main.go +++ b/webhook-delivery-tool/main.go @@ -60,7 +60,7 @@ func main() { } pluginCtx := context.Background() - + workerCtx := context.Background() var githubClient *github.Client githubClient, err = internalGithub.AuthenticateAndReturnGitHubClient( @@ -81,7 +81,7 @@ func main() { var hookDeliveries *[]*github.HookDelivery opts := &github.ListCursorOptions{PerPage: 10} if isRepoSet { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(pluginCtx, owner, repo, hookID, opts) if err != nil { return nil, nil, err @@ -89,7 +89,7 @@ func main() { return &hookDeliveries, response, nil }) } else { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Organizations.ListHookDeliveries(pluginCtx, owner, hookID, opts) if err != nil { return nil, nil, err @@ -141,7 +141,7 @@ func main() { var gottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, owner, repo, hookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -149,7 +149,7 @@ func main() { return gottenHookDelivery, response, nil }) } else { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, owner, hookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -170,7 +170,7 @@ func main() { workflowJobRepo := *workflowJobEvent.Repo.Name var currentWorkflowJob *github.WorkflowJob - pluginCtx, currentWorkflowJob, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { + pluginCtx, currentWorkflowJob, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { currentWorkflowJob, response, err := githubClient.Actions.GetWorkflowJobByID(pluginCtx, owner, workflowJobRepo, *workflowJobEvent.WorkflowJob.ID) if err != nil { return nil, nil, err