Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose metrics through db #41

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.0
0.11.0
2 changes: 1 addition & 1 deletion internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex
if err != nil {
return pluginCtx, vm, err
}
metricsData.IncrementTotalRunningVMs()
metricsData.IncrementTotalRunningVMs(workerCtx, pluginCtx, logger)
return pluginCtx, vm, nil
}

Expand Down
13 changes: 9 additions & 4 deletions internal/github/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ func AuthenticateAndReturnGitHubClient(
}

// https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves.
func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) {
func ExecuteGitHubClientFunction[T any](
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
executeFunc func() (*T, *github.Response, error),
) (context.Context, *T, *github.Response, error) {
innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context
defer cancel()
result, response, err := executeFunc()
Expand All @@ -128,17 +133,17 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.
if err != nil {
return pluginCtx, nil, nil, err
}
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
metricsData.UpdatePlugin(workerCtx, pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting
return ExecuteGitHubClientFunction(workerCtx, pluginCtx, logger, executeFunc) // Retry the function after waiting
case <-pluginCtx.Done():
return pluginCtx, nil, nil, pluginCtx.Err()
}
Expand Down
357 changes: 225 additions & 132 deletions internal/metrics/aggregator.go

Large diffs are not rendered by default.

188 changes: 176 additions & 12 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ type PluginBase struct {

type Plugin struct {
*PluginBase
LastSuccessfulRunJobUrl string
LastFailedRunJobUrl string
LastCanceledRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
LastCanceledRun time.Time
LastSuccessfulRunJobUrl string
LastFailedRunJobUrl string
LastCanceledRunJobUrl string
LastSuccessfulRun time.Time
LastFailedRun time.Time
LastCanceledRun time.Time
TotalRanVMs int
TotalSuccessfulRunsSinceStart int
TotalFailedRunsSinceStart int
TotalCanceledRunsSinceStart int
}

type MetricsData struct {
Expand Down Expand Up @@ -94,10 +98,15 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) error {
return nil
}

func (m *MetricsDataLock) IncrementTotalRunningVMs() {
func (m *MetricsDataLock) IncrementTotalRunningVMs(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalRunningVMs++
m.IncrementPluginTotalRanVMs(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) DecrementTotalRunningVMs() {
Expand All @@ -108,22 +117,133 @@ func (m *MetricsDataLock) DecrementTotalRunningVMs() {
}
}

func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalSuccessfulRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalSuccessfulRunsSinceStart++
m.IncrementPluginTotalSuccessfulRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalFailedRunsSinceStart++
m.IncrementPluginTotalFailedRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart() {
func (m *MetricsDataLock) IncrementTotalCanceledRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
m.Lock()
defer m.Unlock()
m.TotalCanceledRunsSinceStart++
m.IncrementPluginTotalCanceledRunsSinceStart(workerCtx, pluginCtx, logger)
}

func (m *MetricsDataLock) IncrementPluginTotalRanVMs(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalRanVMs: typedPlugin.TotalRanVMs + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalSuccessfulRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalSuccessfulRunsSinceStart: typedPlugin.TotalSuccessfulRunsSinceStart + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalFailedRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalFailedRunsSinceStart: typedPlugin.TotalFailedRunsSinceStart + 1,
})
}
}
}
}

func (m *MetricsDataLock) IncrementPluginTotalCanceledRunsSinceStart(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
) {
pluginConfig, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return
}
for _, plugin := range m.Plugins {
switch typedPlugin := plugin.(type) {
case Plugin:
if typedPlugin.PluginBase.Name == pluginConfig.Name {
UpdatePlugin(workerCtx, pluginCtx, logger, Plugin{
PluginBase: &PluginBase{
Name: pluginConfig.Name,
},
TotalCanceledRunsSinceStart: typedPlugin.TotalCanceledRunsSinceStart + 1,
})
}
}
}
}

func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) (interface{}, error) {
Expand Down Expand Up @@ -157,6 +277,18 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface
if updated.LastCanceledRunJobUrl != "" {
currentServiceTyped.LastCanceledRunJobUrl = updated.LastCanceledRunJobUrl
}
if updated.TotalCanceledRunsSinceStart > currentServiceTyped.TotalCanceledRunsSinceStart {
currentServiceTyped.TotalCanceledRunsSinceStart = updated.TotalCanceledRunsSinceStart
}
if updated.TotalFailedRunsSinceStart > currentServiceTyped.TotalFailedRunsSinceStart {
currentServiceTyped.TotalFailedRunsSinceStart = updated.TotalFailedRunsSinceStart
}
if updated.TotalSuccessfulRunsSinceStart > currentServiceTyped.TotalSuccessfulRunsSinceStart {
currentServiceTyped.TotalSuccessfulRunsSinceStart = updated.TotalSuccessfulRunsSinceStart
}
if updated.TotalRanVMs > currentServiceTyped.TotalRanVMs {
currentServiceTyped.TotalRanVMs = updated.TotalRanVMs
}
return currentServiceTyped, nil
case PluginBase:
updated, ok := updatedPlugin.(PluginBase)
Expand Down Expand Up @@ -214,7 +346,12 @@ func UpdateSystemMetrics(pluginCtx context.Context, logger *slog.Logger, metrics
metricsData.HostDiskUsedBytes = uint64(diskStat.Used)
}

func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error {
func UpdatePlugin(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
updatedPlugin interface{},
) error {
ctxPlugin, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
return err
Expand Down Expand Up @@ -254,7 +391,12 @@ func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *
return nil
}

func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error {
func (m *MetricsDataLock) UpdatePlugin(
workerCtx context.Context,
pluginCtx context.Context,
logger *slog.Logger,
updatedPlugin interface{},
) error {
m.Lock()
defer m.Unlock()
var name string
Expand Down Expand Up @@ -404,6 +546,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http.
pluginMap["last_successful_run"] = s.LastSuccessfulRun
pluginMap["last_failed_run"] = s.LastFailedRun
pluginMap["last_canceled_run"] = s.LastCanceledRun
pluginMap["total_ran_vms"] = s.TotalRanVMs
pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart
pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart
pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart
case PluginBase:
pluginMap["name"] = s.Name
pluginMap["plugin_name"] = s.PluginName
Expand Down Expand Up @@ -473,6 +619,10 @@ func (s *Server) handleJsonMetrics(ctx context.Context, soloReceiver bool) http.
pluginMap["last_failed_run"] = s.LastFailedRun
pluginMap["last_canceled_run_job_url"] = s.LastCanceledRunJobUrl
pluginMap["last_canceled_run"] = s.LastCanceledRun
pluginMap["total_ran_vms"] = s.TotalRanVMs
pluginMap["total_successful_runs_since_start"] = s.TotalSuccessfulRunsSinceStart
pluginMap["total_failed_runs_since_start"] = s.TotalFailedRunsSinceStart
pluginMap["total_canceled_runs_since_start"] = s.TotalCanceledRunsSinceStart
case PluginBase:
pluginMap["name"] = s.Name
pluginMap["plugin_name"] = s.PluginName
Expand Down Expand Up @@ -515,6 +665,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
var lastSuccessfulRunJobUrl string
var lastFailedRunJobUrl string
var lastCanceledRunJobUrl string
var totalRanVMs int
var totalSuccessfulRunsSinceStart int
var totalFailedRunsSinceStart int
var totalCanceledRunsSinceStart int
switch plugin := service.(type) {
case Plugin:
name = plugin.Name
Expand All @@ -531,6 +685,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
lastSuccessfulRunJobUrl = plugin.LastSuccessfulRunJobUrl
lastFailedRunJobUrl = plugin.LastFailedRunJobUrl
lastCanceledRunJobUrl = plugin.LastCanceledRunJobUrl
totalRanVMs = plugin.TotalRanVMs
totalSuccessfulRunsSinceStart = plugin.TotalSuccessfulRunsSinceStart
totalFailedRunsSinceStart = plugin.TotalFailedRunsSinceStart
totalCanceledRunsSinceStart = plugin.TotalCanceledRunsSinceStart
case PluginBase:
name = plugin.Name
pluginName = plugin.PluginName
Expand Down Expand Up @@ -564,6 +722,12 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool)
} else {
w.Write([]byte(fmt.Sprintf("plugin_status_since{name=%s,plugin=%s,owner=%s,repo=%s} %s\n", name, pluginName, ownerName, repoName, StatusSince.Format(time.RFC3339))))
}
if !soloReceiver {
w.Write([]byte(fmt.Sprintf("plugin_total_ran_vms{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalRanVMs)))
w.Write([]byte(fmt.Sprintf("plugin_total_successful_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalSuccessfulRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_failed_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalFailedRunsSinceStart)))
w.Write([]byte(fmt.Sprintf("plugin_total_canceled_runs_since_start{name=%s,plugin=%s,owner=%s} %d\n", name, pluginName, ownerName, totalCanceledRunsSinceStart)))
}
}
w.Write([]byte(fmt.Sprintf("host_cpu_count %d\n", metricsData.HostCPUCount)))
w.Write([]byte(fmt.Sprintf("host_cpu_used_count %d\n", metricsData.HostCPUUsedCount)))
Expand Down
Loading
Loading