diff --git a/VERSION b/VERSION index 2774f85..142464b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.0 \ No newline at end of file +0.11.0 \ No newline at end of file diff --git a/internal/anka/cli.go b/internal/anka/cli.go index 08e3993..07c97e3 100644 --- a/internal/anka/cli.go +++ b/internal/anka/cli.go @@ -276,7 +276,7 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex if err != nil { return pluginCtx, vm, err } - metricsData.IncrementTotalRunningVMs() + metricsData.IncrementTotalRunningVMs(workerCtx, pluginCtx, logger) return pluginCtx, vm, nil } diff --git a/internal/github/client.go b/internal/github/client.go index 4574c9a..e885735 100644 --- a/internal/github/client.go +++ b/internal/github/client.go @@ -106,7 +106,12 @@ func AuthenticateAndReturnGitHubClient( } // https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves. -func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) { +func ExecuteGitHubClientFunction[T any]( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + executeFunc func() (*T, *github.Response, error), +) (context.Context, *T, *github.Response, error) { innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context defer cancel() result, response, err := executeFunc() @@ -128,17 +133,17 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog. if err != nil { return pluginCtx, nil, nil, err } - metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{ + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{ Name: ctxPlugin.Name, Status: "limit_paused", }) select { case <-time.After(sleepDuration): - metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{ + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{ Name: ctxPlugin.Name, Status: "running", }) - return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting + return ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, executeFunc) // Retry the function after waiting case <-pluginCtx.Done(): return pluginCtx, nil, nil, pluginCtx.Err() } diff --git a/internal/metrics/aggregator.go b/internal/metrics/aggregator.go index 0fedff4..f9c44b0 100644 --- a/internal/metrics/aggregator.go +++ b/internal/metrics/aggregator.go @@ -15,7 +15,11 @@ import ( ) // Start runs the HTTP server -func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.Logger, soloReceiver bool) { +func (s *Server) StartAggregatorServer( + workerCtx context.Context, + logger *slog.Logger, + soloReceiver bool, +) { http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) { databaseContainer, err := database.GetDatabaseFromContext(workerCtx) if err != nil { @@ -30,7 +34,7 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L if r.URL.Query().Get("format") == "json" { s.handleAggregatorJsonMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r) } else if r.URL.Query().Get("format") == "prometheus" { - s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r) + s.handleAggregatorPrometheusMetrics(workerCtx, logger, databaseContainer)(w, r) } else { http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest) } @@ -42,7 +46,12 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L http.ListenAndServe(":"+s.Port, nil) } -func (s *Server) handleAggregatorJsonMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleAggregatorJsonMetrics( + workerCtx context.Context, + logger *slog.Logger, + databaseContainer *database.Database, + loadedConfig *config.Config, +) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { combinedMetrics := make(map[string]MetricsData) // for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { // TODO: replace with iteration over metrics db keys as below @@ -64,7 +73,11 @@ func (s *Server) handleAggregatorJsonMetrics(workerCtx context.Context, logger * } } -func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, logger *slog.Logger, databaseContainer *database.Database, loadedConfig *config.Config) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleAggregatorPrometheusMetrics( + workerCtx context.Context, + logger *slog.Logger, + databaseContainer *database.Database, +) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") match := "anklet/metrics/*" @@ -101,9 +114,15 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo var status string var lastSuccessfulRunJobUrl string var lastFailedRunJobUrl string + var lastCanceledRunJobUrl string var lastSuccessfulRun time.Time var lastFailedRun time.Time + var lastCanceledRun time.Time var statusSince time.Time + var totalRanVMs int + var totalSuccessfulRunsSinceStart int + var totalFailedRunsSinceStart int + var totalCanceledRunsSinceStart int pluginMap, ok := plugin.(map[string]interface{}) if !ok { logger.ErrorContext(workerCtx, "error asserting plugin to map", "plugin", plugin) @@ -134,6 +153,16 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo logger.ErrorContext(workerCtx, "error parsing last failed run", "error", err) return } + lastCanceledRunJobUrl = pluginMap["LastCanceledRunJobUrl"].(string) + lastCanceledRun, err = time.Parse(time.RFC3339, pluginMap["LastCanceledRun"].(string)) + if err != nil { + logger.ErrorContext(workerCtx, "error parsing last canceled run", "error", err) + return + } + totalRanVMs = int(pluginMap["TotalRanVMs"].(float64)) + totalSuccessfulRunsSinceStart = int(pluginMap["TotalSuccessfulRunsSinceStart"].(float64)) + totalFailedRunsSinceStart = int(pluginMap["TotalFailedRunsSinceStart"].(float64)) + totalCanceledRunsSinceStart = int(pluginMap["TotalCanceledRunsSinceStart"].(float64)) } if repoName == "" { w.Write([]byte(fmt.Sprintf("plugin_status{name=%s,owner=%s} %s\n", Name, ownerName, status))) @@ -144,15 +173,17 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo if repoName == "" { w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339)))) w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,job_url=%s} %s\n", Name, ownerName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339)))) } else { w.Write([]byte(fmt.Sprintf("plugin_last_successful_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339)))) w.Write([]byte(fmt.Sprintf("plugin_last_failed_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_last_canceled_run{name=%s,owner=%s,repo=%s,job_url=%s} %s\n", Name, ownerName, repoName, lastCanceledRunJobUrl, lastCanceledRun.Format(time.RFC3339)))) } } if repoName == "" { - w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s} %s\n", Name, ownerName, statusSince.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,status=%s} %s\n", Name, ownerName, status, statusSince.Format(time.RFC3339)))) } else { - w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s} %s\n", Name, ownerName, repoName, statusSince.Format(time.RFC3339)))) + w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,owner=%s,repo=%s,status=%s} %s\n", Name, ownerName, repoName, status, statusSince.Format(time.RFC3339)))) } if strings.Contains(pluginName, "_receiver") { @@ -162,40 +193,41 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo } if !soloReceiver { - w.Write([]byte(fmt.Sprintf("total_running_vms{name=%s} %d\n", Name, metricsData.TotalRunningVMs))) - w.Write([]byte(fmt.Sprintf("total_successful_runs_since_start{name=%s} %d\n", Name, metricsData.TotalSuccessfulRunsSinceStart))) - w.Write([]byte(fmt.Sprintf("total_failed_runs_since_start{name=%s} %d\n", Name, metricsData.TotalFailedRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalRanVMs))) + w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalSuccessfulRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalFailedRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", Name, pluginName, ownerName, totalCanceledRunsSinceStart))) queued_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name).Result() if err != nil { logger.ErrorContext(workerCtx, "error querying queued queue length", "error", err) return } - w.Write([]byte(fmt.Sprintf("redis_jobs_queued{name=%s} %d\n", Name, queued_jobs))) + w.Write([]byte(fmt.Sprintf("redis_jobs_queued{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs))) queued_jobs, err = databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/queued/all-orgs/"+Name+"/cleaning").Result() if err != nil { logger.ErrorContext(workerCtx, "error querying queued cleaning queue length", "error", err) return } - w.Write([]byte(fmt.Sprintf("redis_jobs_queued_cleaning{name=%s} %d\n", Name, queued_jobs))) + w.Write([]byte(fmt.Sprintf("redis_jobs_queued_cleaning{name=%s,owner=%s} %d\n", Name, ownerName, queued_jobs))) completed_jobs, err := databaseContainer.Client.LLen(workerCtx, "anklet/jobs/github/completed/all-orgs/"+Name).Result() if err != nil { logger.ErrorContext(workerCtx, "error querying completed queue length", "error", err) return } - w.Write([]byte(fmt.Sprintf("redis_jobs_completed{name=%s} %d\n", Name, completed_jobs))) + w.Write([]byte(fmt.Sprintf("redis_jobs_completed{name=%s,owner=%s} %d\n", Name, ownerName, completed_jobs))) } - w.Write([]byte(fmt.Sprintf("host_cpu_count{name=%s} %d\n", Name, metricsData.HostCPUCount))) - w.Write([]byte(fmt.Sprintf("host_cpu_used_count{name=%s} %d\n", Name, metricsData.HostCPUUsedCount))) - w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{name=%s} %f\n", Name, metricsData.HostCPUUsagePercentage))) - w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{name=%s} %d\n", Name, metricsData.HostMemoryTotalBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{name=%s} %d\n", Name, metricsData.HostMemoryUsedBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{name=%s} %d\n", Name, metricsData.HostMemoryAvailableBytes))) - w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{name=%s} %f\n", Name, metricsData.HostMemoryUsagePercentage))) - w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{name=%s} %d\n", Name, metricsData.HostDiskTotalBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{name=%s} %d\n", Name, metricsData.HostDiskUsedBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{name=%s} %d\n", Name, metricsData.HostDiskAvailableBytes))) - w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{name=%s} %f\n", Name, metricsData.HostDiskUsagePercentage))) + w.Write([]byte(fmt.Sprintf("host_cpu_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUCount))) + w.Write([]byte(fmt.Sprintf("host_cpu_used_count{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostCPUUsedCount))) + w.Write([]byte(fmt.Sprintf("host_cpu_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostCPUUsagePercentage))) + w.Write([]byte(fmt.Sprintf("host_memory_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryTotalBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryUsedBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostMemoryAvailableBytes))) + w.Write([]byte(fmt.Sprintf("host_memory_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostMemoryUsagePercentage))) + w.Write([]byte(fmt.Sprintf("host_disk_total_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskTotalBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_used_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskUsedBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_available_bytes{name=%s,owner=%s} %d\n", Name, ownerName, metricsData.HostDiskAvailableBytes))) + w.Write([]byte(fmt.Sprintf("host_disk_usage_percentage{name=%s,owner=%s} %f\n", Name, ownerName, metricsData.HostDiskUsagePercentage))) } } if nextCursor == 0 { @@ -245,17 +277,22 @@ func ExportMetricsToDB(pluginCtx context.Context, logger *slog.Logger) { logger.ErrorContext(pluginCtx, "error parsing metrics as json", "error", err.Error()) } if pluginCtx.Err() == nil { - setting := databaseContainer.Client.Set(pluginCtx, "anklet/metrics/"+ctxPlugin.Name, metricsDataJson, time.Hour*24*7) // keep metrics for one week max + // This will create a single key using the first plugin's name. It will contain all plugin metrics though. + setting := databaseContainer.Client.Set(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, metricsDataJson, time.Hour*24*7) // keep metrics for one week max if setting.Err() != nil { logger.ErrorContext(pluginCtx, "error storing metrics data in Redis", "error", setting.Err()) return } - exists, err := databaseContainer.Client.Exists(pluginCtx, "anklet/metrics/"+ctxPlugin.Name).Result() + exists, err := databaseContainer.Client.Exists(pluginCtx, "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result() if err != nil { - logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", "anklet/metrics/"+ctxPlugin.Name, "error", err) + logger.ErrorContext(pluginCtx, "error checking if key exists in Redis", "key", "anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "error", err) return } - logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Name+" exists? "+string(exists)) + if exists == 1 { + logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: true") + } else { + logging.DevContext(pluginCtx, "successfully stored metrics data in Redis, key: anklet/metrics/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+" exists: false") + } } } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index ee03acb..a3741f5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -32,12 +32,16 @@ type PluginBase struct { type Plugin struct { *PluginBase - LastSuccessfulRunJobUrl string - LastFailedRunJobUrl string - LastCanceledRunJobUrl string - LastSuccessfulRun time.Time - LastFailedRun time.Time - LastCanceledRun time.Time + LastSuccessfulRunJobUrl string + LastFailedRunJobUrl string + LastCanceledRunJobUrl string + LastSuccessfulRun time.Time + LastFailedRun time.Time + LastCanceledRun time.Time + TotalRanVMs int + TotalSuccessfulRunsSinceStart int + TotalFailedRunsSinceStart int + TotalCanceledRunsSinceStart int } type MetricsData struct { @@ -94,10 +98,15 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) error { return nil } -func (m *MetricsDataLock) IncrementTotalRunningVMs() { +func (m *MetricsDataLock) IncrementTotalRunningVMs( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalRunningVMs++ + m.IncrementPluginTotalRanVMs(workerCtx, pluginCtx, logger) } func (m *MetricsDataLock) DecrementTotalRunningVMs() { @@ -108,22 +117,133 @@ func (m *MetricsDataLock) DecrementTotalRunningVMs() { } } -func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalSuccessfulRunsSinceStart++ + m.IncrementPluginTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger) } -func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalFailedRunsSinceStart++ + m.IncrementPluginTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) } -func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart() { +func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { m.Lock() defer m.Unlock() m.TotalCanceledRunsSinceStart++ + m.IncrementPluginTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger) +} + +func (m *MetricsDataLock) IncrementPluginTotalRanVMs( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalRanVMs: typedPlugin.TotalRanVMs + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalSuccessfulRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalSuccessfulRunsSinceStart: typedPlugin.TotalSuccessfulRunsSinceStart + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalFailedRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalFailedRunsSinceStart: typedPlugin.TotalFailedRunsSinceStart + 1, + }) + } + } + } +} + +func (m *MetricsDataLock) IncrementPluginTotalCanceledRunsSinceStart( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, +) { + pluginConfig, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return + } + for _, plugin := range m.Plugins { + switch typedPlugin := plugin.(type) { + case Plugin: + if typedPlugin.PluginBase.Name == pluginConfig.Name { + UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{ + PluginBase: &PluginBase{ + Name: pluginConfig.Name, + }, + TotalCanceledRunsSinceStart: typedPlugin.TotalCanceledRunsSinceStart + 1, + }) + } + } + } } func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) (interface{}, error) { @@ -157,6 +277,18 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface if updated.LastCanceledRunJobUrl != "" { currentServiceTyped.LastCanceledRunJobUrl = updated.LastCanceledRunJobUrl } + if updated.TotalCanceledRunsSinceStart > currentServiceTyped.TotalCanceledRunsSinceStart { + currentServiceTyped.TotalCanceledRunsSinceStart = updated.TotalCanceledRunsSinceStart + } + if updated.TotalFailedRunsSinceStart > currentServiceTyped.TotalFailedRunsSinceStart { + currentServiceTyped.TotalFailedRunsSinceStart = updated.TotalFailedRunsSinceStart + } + if updated.TotalSuccessfulRunsSinceStart > currentServiceTyped.TotalSuccessfulRunsSinceStart { + currentServiceTyped.TotalSuccessfulRunsSinceStart = updated.TotalSuccessfulRunsSinceStart + } + if updated.TotalRanVMs > currentServiceTyped.TotalRanVMs { + currentServiceTyped.TotalRanVMs = updated.TotalRanVMs + } return currentServiceTyped, nil case PluginBase: updated, ok := updatedPlugin.(PluginBase) @@ -214,7 +346,12 @@ func UpdateSystemMetrics(pluginCtx context.Context, logger *slog.Logger, metrics metricsData.HostDiskUsedBytes = uint64(diskStat.Used) } -func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { +func UpdatePlugin( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + updatedPlugin interface{}, +) error { ctxPlugin, err := config.GetPluginFromContext(pluginCtx) if err != nil { return err @@ -254,7 +391,12 @@ func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger * return nil } -func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { +func (m *MetricsDataLock) UpdatePlugin( + workerCtx context.Context, + pluginCtx context.Context, + logger *slog.Logger, + updatedPlugin interface{}, +) error { m.Lock() defer m.Unlock() var name string @@ -404,6 +546,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http. pluginMap["last_successful_run"] = s.LastSuccessfulRun pluginMap["last_failed_run"] = s.LastFailedRun pluginMap["last_canceled_run"] = s.LastCanceledRun + pluginMap["total_ran_vms"] = s.TotalRanVMs + pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart + pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart + pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart case PluginBase: pluginMap["name"] = s.Name pluginMap["plugin_name"] = s.PluginName @@ -473,6 +619,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http. pluginMap["last_failed_run"] = s.LastFailedRun pluginMap["last_canceled_run_job_url"] = s.LastCanceledRunJobUrl pluginMap["last_canceled_run"] = s.LastCanceledRun + pluginMap["total_ran_vms"] = s.TotalRanVMs + pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart + pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart + pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart case PluginBase: pluginMap["name"] = s.Name pluginMap["plugin_name"] = s.PluginName @@ -515,6 +665,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) var lastSuccessfulRunJobUrl string var lastFailedRunJobUrl string var lastCanceledRunJobUrl string + var totalRanVMs int + var totalSuccessfulRunsSinceStart int + var totalFailedRunsSinceStart int + var totalCanceledRunsSinceStart int switch plugin := service.(type) { case Plugin: name = plugin.Name @@ -531,6 +685,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) lastSuccessfulRunJobUrl = plugin.LastSuccessfulRunJobUrl lastFailedRunJobUrl = plugin.LastFailedRunJobUrl lastCanceledRunJobUrl = plugin.LastCanceledRunJobUrl + totalRanVMs = plugin.TotalRanVMs + totalSuccessfulRunsSinceStart = plugin.TotalSuccessfulRunsSinceStart + totalFailedRunsSinceStart = plugin.TotalFailedRunsSinceStart + totalCanceledRunsSinceStart = plugin.TotalCanceledRunsSinceStart case PluginBase: name = plugin.Name pluginName = plugin.PluginName @@ -564,6 +722,12 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) } else { w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, StatusSince.Format(time.RFC3339)))) } + if !soloReceiver { + w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalRanVMs))) + w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalSuccessfulRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalFailedRunsSinceStart))) + w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalCanceledRunsSinceStart))) + } } w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount))) w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount))) diff --git a/main.go b/main.go index 24082aa..d2c0e2e 100644 --- a/main.go +++ b/main.go @@ -283,6 +283,7 @@ func worker( databaseDatabase = loadedConfig.GlobalDatabaseDatabase } + // TODO: move this into a function/different file // Setup Metrics Server and context metricsPort := "8080" if loadedConfig.Metrics.Port != "" { diff --git a/plugins/handlers/github/github.go b/plugins/handlers/github/github.go index 6a7f76c..8000168 100644 --- a/plugins/handlers/github/github.go +++ b/plugins/handlers/github/github.go @@ -183,6 +183,7 @@ func extractLabelValue(labels []string, prefix string) string { } func sendCancelWorkflowRun( + workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, workflow WorkflowRunJobDetail, @@ -198,7 +199,7 @@ func sendCancelWorkflowRun( } cancelSent := false for { - newPluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { + newPluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { workflowRun, resp, err := githubClient.Actions.GetWorkflowRunByID(context.Background(), pluginConfig.Owner, workflow.Repo, workflow.RunID) return workflowRun, resp, err }) @@ -210,8 +211,8 @@ func sendCancelWorkflowRun( if *workflowRun.Status == "completed" || (workflowRun.Conclusion != nil && *workflowRun.Conclusion == "cancelled") || cancelSent { - metricsData.IncrementTotalCanceledRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -222,7 +223,7 @@ func sendCancelWorkflowRun( } else { logger.WarnContext(pluginCtx, "workflow run is still active... waiting for cancellation so we can clean up...", "workflow_run_id", workflow.RunID) if !cancelSent { // this has to happen here so that it doesn't error with "409 Cannot cancel a workflow run that is completed. " if the job is already cancelled - newPluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + newPluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { resp, err := githubClient.Actions.CancelWorkflowRunByID(context.Background(), pluginConfig.Owner, workflow.Repo, workflow.RunID) return resp, nil, err }) @@ -691,7 +692,7 @@ func Run( } if err != nil { logger.ErrorContext(pluginCtx, "error getting queued jobs", "err", err) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) return pluginCtx, fmt.Errorf("error getting queued jobs: %s", err.Error()) } databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+pluginConfig.Owner+"/"+pluginConfig.Name, eldestQueuedJob) @@ -782,7 +783,7 @@ func Run( } if noTemplateTagExistsError != nil { logger.ErrorContext(pluginCtx, "error ensuring vm template exists on host", "err", noTemplateTagExistsError) - err := sendCancelWorkflowRun(pluginCtx, logger, workflowJob, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, workflowJob, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) } @@ -802,19 +803,19 @@ func Run( var response *github.Response var err error if isRepoSet { - pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { + pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { runnerRegistration, resp, err := githubClient.Actions.CreateRegistrationToken(context.Background(), pluginConfig.Owner, pluginConfig.Repo) return runnerRegistration, resp, err }) } else { - pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { + pluginCtx, runnerRegistration, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { runnerRegistration, resp, err := githubClient.Actions.CreateOrganizationRegistrationToken(context.Background(), pluginConfig.Owner) return runnerRegistration, resp, err }) } if err != nil { logger.DebugContext(pluginCtx, "error creating registration token", "err", err, "response", response) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) retryChannel <- true return pluginCtx, fmt.Errorf("error creating registration token: %s", err.Error()) } @@ -883,7 +884,7 @@ func Run( _, startRunnerErr := os.Stat(startRunnerPath) if installRunnerErr != nil || registerRunnerErr != nil || startRunnerErr != nil { // logger.ErrorContext(pluginCtx, "must include install-runner.bash, register-runner.bash, and start-runner.bash in "+globals.PluginsPath+"/handlers/github/", "err", err) - err := sendCancelWorkflowRun(pluginCtx, logger, workflowJob, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, workflowJob, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) } @@ -899,7 +900,7 @@ func Run( ) if err != nil { // logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err) - metricsData.IncrementTotalFailedRunsSinceStart() + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) retryChannel <- true return pluginCtx, fmt.Errorf("error executing anka copy: %s", err.Error()) } @@ -943,7 +944,7 @@ func Run( retryChannel <- true return pluginCtx, fmt.Errorf("error executing register-runner.bash: %s", registerRunnerErr.Error()) } - defer removeSelfHostedRunner(pluginCtx, *vm, &workflowJob, metricsData) + defer removeSelfHostedRunner(workerCtx, pluginCtx, *vm, &workflowJob, metricsData) // Start runner select { case <-completedJobChannel: @@ -995,8 +996,8 @@ func Run( if err != nil { return pluginCtx, err } - metricsData.IncrementTotalSuccessfulRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -1008,8 +1009,8 @@ func Run( if err != nil { return pluginCtx, err } - metricsData.IncrementTotalFailedRunsSinceStart() - metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ + metricsData.IncrementTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger) + metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ Name: pluginConfig.Name, }, @@ -1100,6 +1101,7 @@ func Run( // removeSelfHostedRunner handles removing a registered runner if the registered runner was orphaned somehow // it's extra safety should the runner not be registered with --ephemeral func removeSelfHostedRunner( + workerCtx context.Context, pluginCtx context.Context, vm anka.VM, workflow *WorkflowRunJobDetail, @@ -1126,12 +1128,12 @@ func removeSelfHostedRunner( } if workflow.Conclusion == "failure" { if isRepoSet { - pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Runners, *github.Response, error) { + pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Runners, *github.Response, error) { runnersList, resp, err := githubClient.Actions.ListRunners(context.Background(), pluginConfig.Owner, pluginConfig.Repo, &github.ListRunnersOptions{}) return runnersList, resp, err }) } else { - pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Runners, *github.Response, error) { + pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Runners, *github.Response, error) { runnersList, resp, err := githubClient.Actions.ListOrganizationRunners(context.Background(), pluginConfig.Owner, &github.ListRunnersOptions{}) return runnersList, resp, err }) @@ -1154,18 +1156,18 @@ func removeSelfHostedRunner( "ankaTemplateTag": "(using latest)", "err": "DELETE https://api.github.com/repos/veertuinc/anklet/actions/runners/142: 422 Bad request - Runner \"anklet-vm-\u003cuuid\u003e\" is still running a job\" []", */ - err := sendCancelWorkflowRun(pluginCtx, logger, *workflow, metricsData) + err := sendCancelWorkflowRun(workerCtx, pluginCtx, logger, *workflow, metricsData) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) return } if isRepoSet { - pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { response, err := githubClient.Actions.RemoveRunner(context.Background(), pluginConfig.Owner, pluginConfig.Repo, *runner.ID) return response, nil, err }) } else { - pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + pluginCtx, _, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.Response, *github.Response, error) { response, err := githubClient.Actions.RemoveOrganizationRunner(context.Background(), pluginConfig.Owner, *runner.ID) return response, nil, err }) diff --git a/plugins/receivers/github/github.go b/plugins/receivers/github/github.go index de89fcf..0462798 100644 --- a/plugins/receivers/github/github.go +++ b/plugins/receivers/github/github.go @@ -503,7 +503,7 @@ func Run( // var response *github.Response var err error if isRepoSet { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, opts) if err != nil { return nil, nil, err @@ -511,7 +511,7 @@ func Run( return &hookDeliveries, response, nil }) } else { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Organizations.ListHookDeliveries(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, opts) if err != nil { return nil, nil, err @@ -594,7 +594,7 @@ MainLoop: var gottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -602,7 +602,7 @@ MainLoop: return gottenHookDelivery, response, nil }) } else { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -714,7 +714,7 @@ MainLoop: var otherGottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { otherGottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -722,7 +722,7 @@ MainLoop: return otherGottenHookDelivery, response, nil }) } else { - pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, otherGottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { otherGottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -759,7 +759,7 @@ MainLoop: // Redeliver the hook var redelivery *github.HookDelivery if isRepoSet { - pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.Repo, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -767,7 +767,7 @@ MainLoop: return redelivery, response, nil }) } else { - pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, redelivery, _, _ = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { redelivery, response, err := githubClient.Organizations.RedeliverHookDelivery(pluginCtx, pluginConfig.Owner, pluginConfig.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err diff --git a/webhook-delivery-tool/main.go b/webhook-delivery-tool/main.go index 83dbce1..c688436 100644 --- a/webhook-delivery-tool/main.go +++ b/webhook-delivery-tool/main.go @@ -60,7 +60,7 @@ func main() { } pluginCtx := context.Background() - + workerCtx := context.Background() var githubClient *github.Client githubClient, err = internalGithub.AuthenticateAndReturnGitHubClient( @@ -81,7 +81,7 @@ func main() { var hookDeliveries *[]*github.HookDelivery opts := &github.ListCursorOptions{PerPage: 10} if isRepoSet { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(pluginCtx, owner, repo, hookID, opts) if err != nil { return nil, nil, err @@ -89,7 +89,7 @@ func main() { return &hookDeliveries, response, nil }) } else { - pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + pluginCtx, hookDeliveries, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { hookDeliveries, response, err := githubClient.Organizations.ListHookDeliveries(pluginCtx, owner, hookID, opts) if err != nil { return nil, nil, err @@ -141,7 +141,7 @@ func main() { var gottenHookDelivery *github.HookDelivery var err error if isRepoSet { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(pluginCtx, owner, repo, hookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -149,7 +149,7 @@ func main() { return gottenHookDelivery, response, nil }) } else { - pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + pluginCtx, gottenHookDelivery, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.HookDelivery, *github.Response, error) { gottenHookDelivery, response, err := githubClient.Organizations.GetHookDelivery(pluginCtx, owner, hookID, *hookDelivery.ID) if err != nil { return nil, nil, err @@ -170,7 +170,7 @@ func main() { workflowJobRepo := *workflowJobEvent.Repo.Name var currentWorkflowJob *github.WorkflowJob - pluginCtx, currentWorkflowJob, _, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { + pluginCtx, currentWorkflowJob, _, err = internalGithub.ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { currentWorkflowJob, response, err := githubClient.Actions.GetWorkflowJobByID(pluginCtx, owner, workflowJobRepo, *workflowJobEvent.WorkflowJob.ID) if err != nil { return nil, nil, err