Skip to content

Commit

Permalink
Merge pull request #20 from veertuinc/v0.6.0
Browse files Browse the repository at this point in the history
v0.6.0
  • Loading branch information
NorseGaud authored May 27, 2024
2 parents 604d085 + 802e994 commit 3b865d4
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 25 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
| Service::PluginName | service_plugin_name | Name of the plugin |
| Service::OwnerName | service_owner_name | Name of the owner |
| Service::RepoName | service_repo_name | Name of the repo |
| Service::Status | service_status | Status of the service |
| Service::Status | service_status | Status of the service (idle, running, limit_paused, stopped) |
| Service::LastSuccessfulRunJobUrl | service_last_successful_run_job_url | Last successful run job url of the service |
| Service::LastFailedRunJobUrl | service_last_failed_run_job_url | Last failed run job url of the service |
| Service::LastSuccessfulRun | service_last_successful_run | Timestamp of last successful run of the service (RFC3339) |
| Service::LastFailedRun | service_last_failed_run | Timestamp of last failed run of the service (RFC3339) |
| Service::StatusRunningSince | service_status_running_since | Timestamp of when the service was last started (RFC3339) |
| HostCPUCount | host_cpu_count | Total CPU count of the host |
| HostCPUUsedCount | host_cpu_used_count | Total in use CPU count of the host |
| HostCPUUsagePercentage | host_cpu_usage_percentage | CPU usage percentage of the host |
Expand Down Expand Up @@ -181,7 +182,8 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
"LastSuccessfulRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180172013/job/25243983121",
"LastFailedRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180170811/job/25243979917",
"LastSuccessfulRun": "2024-05-21T14:16:06.300971-05:00",
"LastFailedRun": "2024-05-21T14:15:10.994464-05:00"
"LastFailedRun": "2024-05-21T14:15:10.994464-05:00",
"StatusRunningSince": "2024-05-21T14:16:06.300971-05:00"
},
{
"Name": "RUNNER1",
Expand All @@ -192,7 +194,8 @@ Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=js
"LastSuccessfulRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180172546/job/25243984537",
"LastFailedRunJobUrl": "https://github.com/veertuinc/anklet/actions/runs/9180171228/job/25243980930",
"LastSuccessfulRun": "2024-05-21T14:16:35.532016-05:00",
"LastFailedRun": "2024-05-21T14:15:45.930051-05:00"
"LastFailedRun": "2024-05-21T14:15:45.930051-05:00",
"StatusRunningSince": "2024-05-21T14:16:35.532016-05:00"
}
]
}
Expand All @@ -207,9 +210,11 @@ total_failed_runs_since_start 2
service_status{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet} idle
service_last_successful_run{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180172013/job/25243983121} 2024-05-21T14:16:06-05:00
service_last_failed_run{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180170811/job/25243979917} 2024-05-21T14:15:10-05:00
service_status_running_since{service_name=RUNNER2,plugin=github,owner=veertuinc,repo=anklet} 2024-05-21T14:16:06-05:00
service_status{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet} idle
service_last_successful_run{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180172546/job/25243984537} 2024-05-21T14:16:35-05:00
service_last_failed_run{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet,job_url=https://github.com/veertuinc/anklet/actions/runs/9180171228/job/25243980930} 2024-05-21T14:15:45-05:00
service_status_running_since{service_name=RUNNER1,plugin=github,owner=veertuinc,repo=anklet} 2024-05-21T14:16:35-05:00
host_cpu_count 12
host_cpu_used_count 1
host_cpu_usage_percentage 10.674157
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.6.0
2 changes: 1 addition & 1 deletion internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func CheckIfKeyExists(ctx context.Context, key string) (bool, error) {
// introduce a random millisecond sleep to prevent concurrent executions from colliding
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
randomSleep := time.Duration(r.Intn(200)) * time.Millisecond
randomSleep := time.Duration(r.Intn(100)) * time.Millisecond
time.Sleep(randomSleep)
exists, err := database.Client.Exists(ctx, key).Result()
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Service struct {
LastFailedRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
StatusRunningSince time.Time
}

type MetricsData struct {
Expand Down Expand Up @@ -98,6 +99,9 @@ func CompareAndUpdateMetrics(currentService Service, updatedService Service) Ser
currentService.PluginName = updatedService.PluginName
}
if updatedService.Status != "" {
if currentService.Status != updatedService.Status { // make sure we don't update the time if nothing has changed
currentService.StatusRunningSince = time.Now()
}
currentService.Status = updatedService.Status
}
if !updatedService.LastSuccessfulRun.IsZero() {
Expand Down Expand Up @@ -166,12 +170,12 @@ func UpdateService(workerCtx context.Context, serviceCtx context.Context, logger
LastFailedRun: metricsData.Services[i].LastFailedRun,
LastSuccessfulRunJobUrl: metricsData.Services[i].LastSuccessfulRunJobUrl,
LastFailedRunJobUrl: metricsData.Services[i].LastFailedRunJobUrl,
StatusRunningSince: metricsData.Services[i].StatusRunningSince,
}
newService = CompareAndUpdateMetrics(newService, updatedService)
metricsData.Services[i] = newService
}
}
UpdateSystemMetrics(serviceCtx, logger, metricsData)
}

func (m *MetricsData) UpdateService(serviceCtx context.Context, logger *slog.Logger, updatedService Service) {
Expand All @@ -183,7 +187,6 @@ func (m *MetricsData) UpdateService(serviceCtx context.Context, logger *slog.Log
for i, svc := range m.Services {
if svc.Name == updatedService.Name {
m.Services[i] = CompareAndUpdateMetrics(svc, updatedService)
UpdateSystemMetrics(serviceCtx, logger, m)
}
}
}
Expand All @@ -196,14 +199,18 @@ func NewServer(port string) *Server {
}

// Start runs the HTTP server
func (s *Server) Start(parentCtx context.Context) {
func (s *Server) Start(parentCtx context.Context, logger *slog.Logger) {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
// update system metrics each call
metricsData := GetMetricsDataFromContext(parentCtx)
UpdateSystemMetrics(parentCtx, logger, metricsData)
//
if r.URL.Query().Get("format") == "json" {
s.handleJsonMetrics(parentCtx)(w, r)
} else if r.URL.Query().Get("format") == "prometheus" {
s.handlePrometheusMetrics(parentCtx)(w, r)
} else {
http.Error(w, "Unsupported format", http.StatusBadRequest)
http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest)
}
})
http.ListenAndServe(":"+s.Port, nil)
Expand All @@ -230,6 +237,7 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context) http.HandlerFunc {
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.Status)))
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastSuccessfulRunJobUrl, service.LastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastFailedRunJobUrl, service.LastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.StatusRunningSince.Format(time.RFC3339))))
}
w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount)))
Expand Down
18 changes: 9 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
metricsPort = loadedConfig.Metrics.Port
}
metricsServer := metrics.NewServer(metricsPort)
go metricsServer.Start(workerCtx)
go metricsServer.Start(workerCtx, logger)
logger.InfoContext(workerCtx, "metrics server started on port "+metricsPort)
metrics.UpdateSystemMetrics(workerCtx, logger, metricsData)

/////////////
// MAIN LOGIC
for _, service := range loadedConfig.Services {
Expand Down Expand Up @@ -230,11 +232,12 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.

logger.InfoContext(serviceCtx, "started service")
metricsData.AddService(metrics.Service{
Name: service.Name,
PluginName: service.Plugin,
RepoName: service.Repo,
OwnerName: service.Owner,
Status: "idle",
Name: service.Name,
PluginName: service.Plugin,
RepoName: service.Repo,
OwnerName: service.Owner,
Status: "idle",
StatusRunningSince: time.Now(),
})

for {
Expand All @@ -247,9 +250,6 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
})
return
default:
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "checking",
})
run.Plugin(workerCtx, serviceCtx, logger)
if workerCtx.Err() != nil || toRunOnce == "true" {
serviceCancel()
Expand Down
35 changes: 28 additions & 7 deletions plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,18 @@ func ExecuteGitHubClientFunction[T any](serviceCtx context.Context, logger *slog
if response.Rate.Remaining <= 10 { // handle primary rate limiting
sleepDuration := time.Until(response.Rate.Reset.Time) + time.Second // Adding a second to ensure we're past the reset time
logger.WarnContext(serviceCtx, "GitHub API rate limit exceeded, sleeping until reset")
metricsData := metrics.GetMetricsDataFromContext(serviceCtx)
service := config.GetServiceFromContext(serviceCtx)
metricsData.UpdateService(serviceCtx, logger, metrics.Service{
Name: service.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdateService(serviceCtx, logger, metrics.Service{
Name: service.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(serviceCtx, logger, executeFunc) // Retry the function after waiting
case <-serviceCtx.Done():
return serviceCtx, nil, nil, serviceCtx.Err()
Expand Down Expand Up @@ -106,21 +116,23 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{})
return &workflows, resp, err
})

if serviceCtx.Err() != nil {
logger.WarnContext(serviceCtx, "context canceled during workflows listing")
return []WorkflowRunJobDetail{}, errors.New("context canceled during workflows listing")
return []WorkflowRunJobDetail{}, nil
}
if err != nil {
logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflows", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflows")
}

for _, workflow := range (*workflows).Workflows {
if *workflow.State == "active" {
// WORKFLOW RUNS
serviceCtx, workflow_runs, _, err := ExecuteGitHubClientFunction[*github.WorkflowRuns](serviceCtx, logger, func() (**github.WorkflowRuns, *github.Response, error) {
workflow_runs, resp, err := githubClient.Actions.ListWorkflowRunsByID(context.Background(), service.Owner, service.Repo, *workflow.ID, &github.ListWorkflowRunsOptions{
// ListOptions: github.ListOptions{PerPage: 30},
Status: "queued",
ListOptions: github.ListOptions{PerPage: 30},
Status: "queued",
})
return &workflow_runs, resp, err // Adjusted to return the direct result
})
Expand All @@ -140,10 +152,11 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
if err != nil {
if strings.Contains(err.Error(), "context canceled") {
logger.WarnContext(serviceCtx, "context canceled during githubClient.Actions.ListWorkflowJobs", "err", err)
return []WorkflowRunJobDetail{}, nil
} else {
logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflowJobs", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflowJobs")
}
return []WorkflowRunJobDetail{}, errors.New("error executing githubClient.Actions.ListWorkflowJobs")
}
for _, job := range workflowRunJobs.Jobs {
if *job.Status == "queued" { // I don't know why, but we'll get completed jobs back in the list
Expand Down Expand Up @@ -187,10 +200,11 @@ func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]Work
if err != nil {
if strings.Contains(err.Error(), "context canceled") {
logger.WarnContext(serviceCtx, "context was canceled while checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, nil
} else {
logger.ErrorContext(serviceCtx, "error checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
}
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
}

if !exists {
Expand Down Expand Up @@ -239,6 +253,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log

hostHasVmCapacity := anka.HostHasVmCapacity(serviceCtx)
if !hostHasVmCapacity {
logger.DebugContext(serviceCtx, "host does not have vm capacity")
return
}

Expand Down Expand Up @@ -268,6 +283,11 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log
// obtain all queued workflow runs and jobs
allWorkflowRunJobDetails, err := getWorkflowRunJobs(serviceCtx, logger)
if err != nil {
logger.ErrorContext(serviceCtx, "error getting workflow run jobs", "err", err)
return
}
if serviceCtx.Err() != nil {
logger.WarnContext(serviceCtx, "context canceled after getWorkflowRunJobs")
return
}

Expand Down Expand Up @@ -300,10 +320,11 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log
logger.ErrorContext(serviceCtx, "error checking if already in db", "err", err)
return
} else if already {
// logger.DebugContext(serviceCtx, "job already running, skipping")
logger.DebugContext(serviceCtx, "job already running, skipping")
// this would cause a double run problem if a job finished on hostA and hostB had an array of workflowRunJobs with queued still for the same job
// we get the latest workflow run jobs each run to prevent this
return
// also, we don't return and use continue below so that we can just use the next job in the list and not have to re-parse the entire thing or make more api calls
continue
} else if !already {
added, err := dbFunctions.AddUniqueRunKey(serviceCtx)
if added && err != nil {
Expand Down

0 comments on commit 3b865d4

Please sign in to comment.