Skip to content

Commit

Permalink
feat: expose metrics through db (#41)
Browse files Browse the repository at this point in the history
* feat: expose metrics through db + metrics improvements

---------

Co-authored-by: Nathan <[email protected]>
Co-authored-by: Nathan Pierce <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2024
1 parent 65c0f8d commit c3200fc
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 219 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.0
0.11.0
2 changes: 1 addition & 1 deletion internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex
if err != nil {
return pluginCtx, vm, err
}
metricsData.IncrementTotalRunningVMs()
metricsData.IncrementTotalRunningVMs(workerCtx, pluginCtx, logger)
return pluginCtx, vm, nil
}

Expand Down
13 changes: 9 additions & 4 deletions internal/github/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ func AuthenticateAndReturnGitHubClient(
}

// https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves.
func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) {
func ExecuteGitHubClientFunction[T any](
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
executeFunc func() (*T, *github.Response, error),
) (context.Context, *T, *github.Response, error) {
innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context
defer cancel()
result, response, err := executeFunc()
Expand All @@ -128,17 +133,17 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.
if err != nil {
return pluginCtx, nil, nil, err
}
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting
return ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, executeFunc) // Retry the function after waiting
case <-pluginCtx.Done():
return pluginCtx, nil, nil, pluginCtx.Err()
}
Expand Down
357 changes: 225 additions & 132 deletions internal/metrics/aggregator.go

Large diffs are not rendered by default.

188 changes: 176 additions & 12 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ type PluginBase struct {

type Plugin struct {
*PluginBase
LastSuccessfulRunJobUrl string
LastFailedRunJobUrl string
LastCanceledRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
LastCanceledRun time.Time
LastSuccessfulRunJobUrl string
LastFailedRunJobUrl string
LastCanceledRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
LastCanceledRun time.Time
TotalRanVMs int
TotalSuccessfulRunsSinceStart int
TotalFailedRunsSinceStart int
TotalCanceledRunsSinceStart int
}

type MetricsData struct {
Expand Down Expand Up @@ -94,10 +98,15 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) error {
return nil
}

func (m *MetricsDataLock) IncrementTotalRunningVMs() {
func (m *MetricsDataLock) IncrementTotalRunningVMs(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalRunningVMs++
m.IncrementPluginTotalRanVMs(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) DecrementTotalRunningVMs() {
Expand All @@ -108,22 +117,133 @@ func (m *MetricsDataLock) DecrementTotalRunningVMs() {
}
}

func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalSuccessfulRunsSinceStart++
m.IncrementPluginTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalFailedRunsSinceStart++
m.IncrementPluginTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalCanceledRunsSinceStart++
m.IncrementPluginTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementPluginTotalRanVMs(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalRanVMs: typedPlugin.TotalRanVMs + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalSuccessfulRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalSuccessfulRunsSinceStart: typedPlugin.TotalSuccessfulRunsSinceStart + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalFailedRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalFailedRunsSinceStart: typedPlugin.TotalFailedRunsSinceStart + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalCanceledRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalCanceledRunsSinceStart: typedPlugin.TotalCanceledRunsSinceStart + 1,
})
}
}
}
}

func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) (interface{}, error) {
Expand Down Expand Up @@ -157,6 +277,18 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface
if updated.LastCanceledRunJobUrl != "" {
currentServiceTyped.LastCanceledRunJobUrl = updated.LastCanceledRunJobUrl
}
if updated.TotalCanceledRunsSinceStart > currentServiceTyped.TotalCanceledRunsSinceStart {
currentServiceTyped.TotalCanceledRunsSinceStart = updated.TotalCanceledRunsSinceStart
}
if updated.TotalFailedRunsSinceStart > currentServiceTyped.TotalFailedRunsSinceStart {
currentServiceTyped.TotalFailedRunsSinceStart = updated.TotalFailedRunsSinceStart
}
if updated.TotalSuccessfulRunsSinceStart > currentServiceTyped.TotalSuccessfulRunsSinceStart {
currentServiceTyped.TotalSuccessfulRunsSinceStart = updated.TotalSuccessfulRunsSinceStart
}
if updated.TotalRanVMs > currentServiceTyped.TotalRanVMs {
currentServiceTyped.TotalRanVMs = updated.TotalRanVMs
}
return currentServiceTyped, nil
case PluginBase:
updated, ok := updatedPlugin.(PluginBase)
Expand Down Expand Up @@ -214,7 +346,12 @@ func UpdateSystemMetrics(pluginCtx context.Context, logger *slog.Logger, metrics
metricsData.HostDiskUsedBytes = uint64(diskStat.Used)
}

func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error {
func UpdatePlugin(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
updatedPlugin interface{},
) error {
ctxPlugin, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return err
Expand Down Expand Up @@ -254,7 +391,12 @@ func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *
return nil
}

func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error {
func (m *MetricsDataLock) UpdatePlugin(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
updatedPlugin interface{},
) error {
m.Lock()
defer m.Unlock()
var name string
Expand Down Expand Up @@ -404,6 +546,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http.
pluginMap["last_successful_run"] = s.LastSuccessfulRun
pluginMap["last_failed_run"] = s.LastFailedRun
pluginMap["last_canceled_run"] = s.LastCanceledRun
pluginMap["total_ran_vms"] = s.TotalRanVMs
pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart
pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart
pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart
case PluginBase:
pluginMap["name"] = s.Name
pluginMap["plugin_name"] = s.PluginName
Expand Down Expand Up @@ -473,6 +619,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http.
pluginMap["last_failed_run"] = s.LastFailedRun
pluginMap["last_canceled_run_job_url"] = s.LastCanceledRunJobUrl
pluginMap["last_canceled_run"] = s.LastCanceledRun
pluginMap["total_ran_vms"] = s.TotalRanVMs
pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart
pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart
pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart
case PluginBase:
pluginMap["name"] = s.Name
pluginMap["plugin_name"] = s.PluginName
Expand Down Expand Up @@ -515,6 +665,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
var lastSuccessfulRunJobUrl string
var lastFailedRunJobUrl string
var lastCanceledRunJobUrl string
var totalRanVMs int
var totalSuccessfulRunsSinceStart int
var totalFailedRunsSinceStart int
var totalCanceledRunsSinceStart int
switch plugin := service.(type) {
case Plugin:
name = plugin.Name
Expand All @@ -531,6 +685,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
lastSuccessfulRunJobUrl = plugin.LastSuccessfulRunJobUrl
lastFailedRunJobUrl = plugin.LastFailedRunJobUrl
lastCanceledRunJobUrl = plugin.LastCanceledRunJobUrl
totalRanVMs = plugin.TotalRanVMs
totalSuccessfulRunsSinceStart = plugin.TotalSuccessfulRunsSinceStart
totalFailedRunsSinceStart = plugin.TotalFailedRunsSinceStart
totalCanceledRunsSinceStart = plugin.TotalCanceledRunsSinceStart
case PluginBase:
name = plugin.Name
pluginName = plugin.PluginName
Expand Down Expand Up @@ -564,6 +722,12 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
} else {
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, StatusSince.Format(time.RFC3339))))
}
if !soloReceiver {
w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalRanVMs)))
w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalFailedRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalCanceledRunsSinceStart)))
}
}
w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount)))
Expand Down
Loading

0 comments on commit c3200fc

Please sign in to comment.