Skip to content

Commit

Permalink
saving metrics progress
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Aug 19, 2024
1 parent 187e07b commit a51fd81
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 111 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ With the github plugin, there is a Controller Plugin and a Service Plugin.
> Note: You can only ever run two VMs per host per the Apple macOS SLA. While you can specify more than two services, only two will ever be running a VM at one time. `sleep_interval` can be used to control the frequency/priority of a service and increase the odds that a job will be picked up.
3. Run the daemon by executing `anklet` on the host that has the [Anka CLI installed](https://docs.veertu.com/anka/anka-virtualization-cli/getting-started/installing-the-anka-virtualization-package/).
- `tail -fF /Users/myUser/Library/Logs/anklet.log` to see the logs. You can run `anklet` with `LOG_LEVEL=DEBUG` to see more verbose output.
3. To stop, you have two options:
- `anklet -s stop` to stop the services semi-gracefully (interrupt the plugin at the next context cancellation definition, and still try to cleanup gracefully). This requires that the plugin has properly defined context cancellation checks.
- `anklet -s drain` to stop services, but wait for all jobs to finish gracefully.
3. To stop, send an interrupt or ctrl+c. It will attempt a graceful shut down of services, sending unfinished jobs back to the queue or waiting until the job is done to prevent orphans.

It is also possible to use ENVs for several of the items in the config. They override anything set in the yml. Here is a list of ENVs that you can use:

Expand All @@ -143,8 +141,10 @@ While you can run it anywhere you want, its likely going to be less latency to h

### Plugin Setup and Usage Guides

- #### [**`Github Actions`**](./plugins/github/README.md)
#### Github Actions

- [**`Service Plugin`**](./plugins/services/github/README.md)
- [**`Controller Plugin`**](./plugins/controllers/github/README.md)

---

Expand Down
8 changes: 6 additions & 2 deletions internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,14 @@ func (cli *Cli) AnkaRegistryPull(workerCtx context.Context, serviceCtx context.C
}
logger.DebugContext(serviceCtx, "pulling template to host")
defer metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "running",
ServiceBase: &metrics.ServiceBase{
Status: "running",
},
})
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "pulling",
ServiceBase: &metrics.ServiceBase{
Status: "pulling",
},
})
pulledTemplate, err := cli.ExecuteParseJson(serviceCtx, args...)
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions internal/metrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log/slog"
"net/http"
"strings"
"time"

"github.com/veertuinc/anklet/internal/config"
Expand Down Expand Up @@ -78,10 +79,20 @@ func (s *Server) handleAggregatorPrometheusMetrics(workerCtx context.Context, lo
w.Write([]byte(fmt.Sprintf("total_successful_runs_since_start{metricsUrl=%s} %d\n", metricsURL, metricsData.TotalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("total_failed_runs_since_start{metricsUrl=%s} %d\n", metricsURL, metricsData.TotalFailedRunsSinceStart)))
for _, service := range metricsData.Services {
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, metricsURL, service.Status)))
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastSuccessfulRunJobUrl, metricsURL, service.LastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastFailedRunJobUrl, metricsURL, service.LastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, metricsURL, service.StatusRunningSince.Format(time.RFC3339))))
fullServiceBase, ok := service.(ServiceBase)
if !ok {
panic("service is not of type ServiceBase")
}
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", fullServiceBase.Name, fullServiceBase.PluginName, fullServiceBase.OwnerName, fullServiceBase.RepoName, metricsURL, fullServiceBase.Status)))
if !strings.Contains(fullServiceBase.PluginName, "_controller") {
fullService, ok := service.(Service)
if !ok {
panic("service is not of type Service")
}
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", fullService.Name, fullService.PluginName, fullService.OwnerName, fullService.RepoName, fullService.LastSuccessfulRunJobUrl, metricsURL, fullService.LastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s,metricsUrl=%s} %s\n", fullService.Name, fullService.PluginName, fullService.OwnerName, fullService.RepoName, fullService.LastFailedRunJobUrl, metricsURL, fullService.LastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s,metricsUrl=%s} %s\n", fullService.Name, fullService.PluginName, fullService.OwnerName, fullService.RepoName, metricsURL, fullService.StatusRunningSince.Format(time.RFC3339))))
}
}
w.Write([]byte(fmt.Sprintf("host_cpu_count{metricsUrl=%s} %d\n", metricsURL, metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count{metricsUrl=%s} %d\n", metricsURL, metricsData.HostCPUUsedCount)))
Expand Down
200 changes: 140 additions & 60 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ type Server struct {
Port string
}

type ServiceBase struct {
Name string
PluginName string
RepoName string
OwnerName string
Status string
StatusRunningSince time.Time
}

type Service struct {
Name string
PluginName string
RepoName string
OwnerName string
Status string
*ServiceBase
LastSuccessfulRunJobUrl string
LastFailedRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
StatusRunningSince time.Time
}

type MetricsData struct {
Expand All @@ -49,20 +53,26 @@ type MetricsData struct {
HostDiskUsedBytes uint64
HostDiskAvailableBytes uint64
HostDiskUsagePercentage float64
Services []Service
Services []interface{}
}

type MetricsDataLock struct {
sync.RWMutex
MetricsData
}

func (m *MetricsDataLock) AddService(service Service) {
func (m *MetricsDataLock) AddService(service ServiceBase) {
m.Lock()
defer m.Unlock()
found := false
for i, svc := range m.Services {
if svc.Name == service.Name {
var name string
svc, ok := svc.(ServiceBase)
if !ok {
panic("unable to get service name")
}
name = svc.Name
if name == service.Name {
m.Services[i] = service
found = true
break
Expand Down Expand Up @@ -99,29 +109,56 @@ func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() {
m.TotalFailedRunsSinceStart++
}

func CompareAndUpdateMetrics(currentService Service, updatedService Service) Service {
if updatedService.PluginName != "" {
currentService.PluginName = updatedService.PluginName
}
if updatedService.Status != "" {
if currentService.Status != updatedService.Status { // make sure we don't update the time if nothing has changed
currentService.StatusRunningSince = time.Now()
func CompareAndUpdateMetrics(currentService interface{}, updatedService interface{}) interface{} {
switch currentServiceTyped := currentService.(type) {
case Service:
updated, ok := updatedService.(Service)
if !ok {
panic("unable to convert updatedService to Service")
}
currentService.Status = updatedService.Status
}
if !updatedService.LastSuccessfulRun.IsZero() {
currentService.LastSuccessfulRun = updatedService.LastSuccessfulRun
}
if !updatedService.LastFailedRun.IsZero() {
currentService.LastFailedRun = updatedService.LastFailedRun
}
if updatedService.LastSuccessfulRunJobUrl != "" {
currentService.LastSuccessfulRunJobUrl = updatedService.LastSuccessfulRunJobUrl
}
if updatedService.LastFailedRunJobUrl != "" {
currentService.LastFailedRunJobUrl = updatedService.LastFailedRunJobUrl
if updated.PluginName != "" {
currentServiceTyped.PluginName = updated.PluginName
}
fmt.Printf("updated.Status: %s\n", updated.Status)
fmt.Printf("currentServiceTyped.Status: %s\n", currentServiceTyped.Status)
if updated.Status != "" {
if currentServiceTyped.Status != updated.Status {
currentServiceTyped.StatusRunningSince = time.Now()
}
currentServiceTyped.Status = updated.Status
}
if !updated.LastSuccessfulRun.IsZero() {
currentServiceTyped.LastSuccessfulRun = updated.LastSuccessfulRun
}
if !updated.LastFailedRun.IsZero() {
currentServiceTyped.LastFailedRun = updated.LastFailedRun
}
if updated.LastSuccessfulRunJobUrl != "" {
currentServiceTyped.LastSuccessfulRunJobUrl = updated.LastSuccessfulRunJobUrl
}
if updated.LastFailedRunJobUrl != "" {
currentServiceTyped.LastFailedRunJobUrl = updated.LastFailedRunJobUrl
}
return currentServiceTyped
case ServiceBase:
fmt.Printf("ServiceBase\n")
updated, ok := updatedService.(ServiceBase)
if !ok {
panic("unable to convert updatedService to ServiceBase")
}
if updated.PluginName != "" {
currentServiceTyped.PluginName = updated.PluginName
}
if updated.Status != "" {
if currentServiceTyped.Status != updated.Status {
currentServiceTyped.StatusRunningSince = time.Now()
}
currentServiceTyped.Status = updated.Status
}
return currentServiceTyped
default:
panic("unable to convert currentService to Service or ServiceBase")
}
return currentService
}

func UpdateSystemMetrics(serviceCtx context.Context, logger *slog.Logger, metricsData *MetricsDataLock) {
Expand Down Expand Up @@ -160,40 +197,52 @@ func UpdateSystemMetrics(serviceCtx context.Context, logger *slog.Logger, metric
metricsData.HostDiskUsedBytes = uint64(diskStat.Used)
}

func UpdateService(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger, updatedService Service) {
func UpdateService(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger, updatedService interface{}) {
service := config.GetServiceFromContext(serviceCtx)
metricsData := GetMetricsDataFromContext(workerCtx)
for i, svc := range metricsData.Services {
if svc.Name == service.Name {
newService := Service{
Name: service.Name,
RepoName: service.Repo,
OwnerName: service.Owner,
PluginName: metricsData.Services[i].PluginName,
Status: metricsData.Services[i].Status,
}
if !strings.Contains(service.Plugin, "_controller") {
newService.LastSuccessfulRun = metricsData.Services[i].LastSuccessfulRun
newService.LastFailedRun = metricsData.Services[i].LastFailedRun
newService.LastSuccessfulRunJobUrl = metricsData.Services[i].LastSuccessfulRunJobUrl
newService.LastFailedRunJobUrl = metricsData.Services[i].LastFailedRunJobUrl
newService.StatusRunningSince = metricsData.Services[i].StatusRunningSince
for i, currentService := range metricsData.Services {
switch fullUpdatedService := updatedService.(type) {
case Service:
if fullUpdatedService.Name == service.Name {
newService := CompareAndUpdateMetrics(currentService, updatedService)
metricsData.Services[i] = newService
}
newService = CompareAndUpdateMetrics(newService, updatedService)
case ServiceBase:
newService := CompareAndUpdateMetrics(currentService, updatedService)
metricsData.Services[i] = newService
default:
panic("unable to convert svc to Service or ServiceBase")
}
}
}

func (m *MetricsDataLock) UpdateService(serviceCtx context.Context, logger *slog.Logger, updatedService Service) {
func (m *MetricsDataLock) UpdateService(serviceCtx context.Context, logger *slog.Logger, updatedService interface{}) {
m.Lock()
defer m.Unlock()
if updatedService.Name == "" {
panic("updateService.Name is required")
}
for i, svc := range m.Services {
if svc.Name == updatedService.Name {
m.Services[i] = CompareAndUpdateMetrics(svc, updatedService)
fmt.Printf("UpdateService 2 start\n")
var name string
switch fullUpdatedService := updatedService.(type) {
case Service:
if fullUpdatedService.Name == "" {
panic("updateService.Name is required")
}
for i, svc := range m.Services {
switch typedSvc := svc.(type) {
case Service:
if fullUpdatedService.Name == typedSvc.Name {
m.Services[i] = CompareAndUpdateMetrics(typedSvc, updatedService)
}
}
}
case ServiceBase:
name = fullUpdatedService.Name
for i, svc := range m.Services {
switch typedSvc := svc.(type) {
case ServiceBase:
if name == typedSvc.Name {
m.Services[i] = CompareAndUpdateMetrics(typedSvc, updatedService)
}
}
}
}
}
Expand Down Expand Up @@ -231,7 +280,6 @@ func (s *Server) Start(parentCtx context.Context, logger *slog.Logger) {
func (s *Server) handleJsonMetrics(ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metricsData := ctx.Value(config.ContextKey("metrics")).(*MetricsDataLock)
// services := metricsData.GetServices()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metricsData)
}
Expand All @@ -245,11 +293,43 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context) http.HandlerFunc {
w.Write([]byte(fmt.Sprintf("total_successful_runs_since_start %d\n", metricsData.TotalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("total_failed_runs_since_start %d\n", metricsData.TotalFailedRunsSinceStart)))
for _, service := range metricsData.Services {
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.Status)))
if !strings.Contains(service.PluginName, "_controller") {
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastSuccessfulRunJobUrl, service.LastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.LastFailedRunJobUrl, service.LastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", service.Name, service.PluginName, service.OwnerName, service.RepoName, service.StatusRunningSince.Format(time.RFC3339))))
var name string
var pluginName string
var repoName string
var ownerName string
var status string
var statusRunningSince time.Time
var lastSuccessfulRun time.Time
var lastFailedRun time.Time
var lastSuccessfulRunJobUrl string
var lastFailedRunJobUrl string
switch svc := service.(type) {
case Service:
name = svc.Name
pluginName = svc.PluginName
repoName = svc.RepoName
ownerName = svc.OwnerName
status = svc.Status
statusRunningSince = svc.StatusRunningSince
lastSuccessfulRun = svc.LastSuccessfulRun
lastFailedRun = svc.LastFailedRun
lastSuccessfulRunJobUrl = svc.LastSuccessfulRunJobUrl
lastFailedRunJobUrl = svc.LastFailedRunJobUrl
case ServiceBase:
name = svc.Name
pluginName = svc.PluginName
repoName = svc.RepoName
ownerName = svc.OwnerName
status = svc.Status
statusRunningSince = svc.StatusRunningSince
default:
panic("unable to convert svc to Service or ServiceBase")
}
w.Write([]byte(fmt.Sprintf("service_status{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, status)))
if !strings.Contains(pluginName, "_controller") {
w.Write([]byte(fmt.Sprintf("service_last_successful_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", name, pluginName, ownerName, repoName, lastSuccessfulRunJobUrl, lastSuccessfulRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_last_failed_run{service_name=%s,plugin=%s,owner=%s,repo=%s,job_url=%s} %s\n", name, pluginName, ownerName, repoName, lastFailedRunJobUrl, lastFailedRun.Format(time.RFC3339))))
w.Write([]byte(fmt.Sprintf("service_status_running_since{service_name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, statusRunningSince.Format(time.RFC3339))))
}
}
w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount)))
Expand Down
6 changes: 4 additions & 2 deletions internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ func Plugin(workerCtx context.Context, serviceCtx context.Context, serviceCancel
}
github.Run(workerCtx, serviceCtx, serviceCancel, logger)
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "idle",
ServiceBase: &metrics.ServiceBase{
Status: "idle",
},
})
}
}
} else if service.Plugin == "github_controller" {
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.ServiceBase{
Status: "initializing",
})
github_controller.Run(workerCtx, serviceCtx, serviceCancel, logger, firstServiceStarted)
Expand Down
10 changes: 7 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.

logger.InfoContext(serviceCtx, "starting service")

metricsData.AddService(metrics.Service{
metricsData.AddService(metrics.ServiceBase{
Name: service.Name,
PluginName: service.Plugin,
RepoName: service.Repo,
Expand All @@ -335,7 +335,9 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
select {
case <-serviceCtx.Done():
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "stopped",
ServiceBase: &metrics.ServiceBase{
Status: "stopped",
},
})
logger.WarnContext(serviceCtx, shutDownMessage)
serviceCancel()
Expand All @@ -348,7 +350,9 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
return
}
metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{
Status: "idle",
ServiceBase: &metrics.ServiceBase{
Status: "idle",
},
})
select {
case <-time.After(time.Duration(service.SleepInterval) * time.Second):
Expand Down
Loading

0 comments on commit a51fd81

Please sign in to comment.