Skip to content

Commit

Permalink
quick fix
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Apr 8, 2024
1 parent 7e26588 commit a0d0d58
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 56 deletions.
12 changes: 6 additions & 6 deletions internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (cli *Cli) ExecuteParseJson(ctx context.Context, args ...string) (*AnkaJson

func (cli *Cli) ExecuteAndParseJsonOnError(ctx context.Context, args ...string) ([]byte, error) {
if ctx.Err() != nil {
return nil, fmt.Errorf("context cancelled before ExecuteAndParseJsonOnError")
return nil, fmt.Errorf("context canceled before ExecuteAndParseJsonOnError")
}
ankaJson := &AnkaJson{}
out, _ := cli.Execute(ctx, args...)
Expand All @@ -142,7 +142,7 @@ func (cli *Cli) ExecuteAndParseJsonOnError(ctx context.Context, args ...string)

func (cli *Cli) AnkaRegistryPull(ctx context.Context, template string, tag string) error {
if ctx.Err() != nil {
return fmt.Errorf("context cancelled before AnkaRegistryPull")
return fmt.Errorf("context canceled before AnkaRegistryPull")
}
logger := logging.GetLoggerFromContext(ctx)
logger.DebugContext(ctx, "pulling template to host")
Expand Down Expand Up @@ -176,7 +176,7 @@ func (cli *Cli) AnkaDelete(ctx context.Context, vm *VM) error {

func (cli *Cli) ObtainAnkaVM(ctx context.Context, ankaTemplate string) (context.Context, *VM, error) {
if ctx.Err() != nil {
return ctx, nil, fmt.Errorf("context cancelled before ObtainAnkaVMAndName")
return ctx, nil, fmt.Errorf("context canceled before ObtainAnkaVMAndName")
}
logger := logging.GetLoggerFromContext(ctx)
vmID, err := uuid.NewRandom()
Expand Down Expand Up @@ -204,7 +204,7 @@ func (cli *Cli) ObtainAnkaVM(ctx context.Context, ankaTemplate string) (context.

func (cli *Cli) AnkaClone(ctx context.Context, template string) error {
if ctx.Err() != nil {
return fmt.Errorf("context cancelled before AnkaClone")
return fmt.Errorf("context canceled before AnkaClone")
}
logger := logging.GetLoggerFromContext(ctx)
vm := GetAnkaVmFromContext(ctx)
Expand All @@ -221,7 +221,7 @@ func (cli *Cli) AnkaClone(ctx context.Context, template string) error {

func (cli *Cli) AnkaStart(ctx context.Context) error {
if ctx.Err() != nil {
return fmt.Errorf("context cancelled before AnkaStart")
return fmt.Errorf("context canceled before AnkaStart")
}
logger := logging.GetLoggerFromContext(ctx)
vm := GetAnkaVmFromContext(ctx)
Expand All @@ -238,7 +238,7 @@ func (cli *Cli) AnkaStart(ctx context.Context) error {

func (cli *Cli) AnkaCopy(ctx context.Context, filesToCopyIn ...string) error {
if ctx.Err() != nil {
return fmt.Errorf("context cancelled before AnkaCopy")
return fmt.Errorf("context canceled before AnkaCopy")
}
logger := logging.GetLoggerFromContext(ctx)
vm := GetAnkaVmFromContext(ctx)
Expand Down
4 changes: 2 additions & 2 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func RemoveUniqueKeyFromDB(ctx context.Context) {

func CheckIfKeyExists(ctx context.Context, key string) (bool, error) {
if ctx.Err() != nil {
return false, errors.New("context cancelled during CheckIfKeyExists")
return false, errors.New("context canceled during CheckIfKeyExists")
}
database := GetDatabaseFromContext(ctx)
// introduce a random millisecond sleep to prevent concurrent executions from colliding
Expand All @@ -88,7 +88,7 @@ func CheckIfKeyExists(ctx context.Context, key string) (bool, error) {

func AddUniqueRunKey(ctx context.Context) (bool, error) {
if ctx.Err() != nil {
return false, errors.New("context cancelled during AddUniqueRunKey")
return false, errors.New("context canceled during AddUniqueRunKey")
}
database := GetDatabaseFromContext(ctx)
exists, err := CheckIfKeyExists(ctx, database.UniqueRunKey)
Expand Down
43 changes: 25 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -133,31 +134,32 @@ func main() {
}

func worker(ctx context.Context, logger *slog.Logger, service config.Service, rateLimiter *http.Client, loadedConfig config.Config, cancel context.CancelFunc) {
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer signal.Stop(sigChan)
defer close(sigChan)
for sig := range sigChan {
switch sig {
case syscall.SIGTERM:
logger.WarnContext(ctx, "best effort graceful shutdown, interrupting the job as soon as possible...")
serviceCancel()
return
case syscall.SIGQUIT:
logger.WarnContext(ctx, "graceful shutdown, waiting for jobs to finish...")
return
}
}
}()
LOOP:
for {
time.Sleep(time.Second) // this is work to be done by worker.
select {
case <-stop:
break LOOP
default:
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
// defer serviceCancel() // Ensure we cancel the context when service exits

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
for {
sig := <-sigChan
switch sig {
case syscall.SIGTERM:
logger.WarnContext(ctx, "best effort graceful shutdown, interrupting the job as soon as possible...")
serviceCancel()
case syscall.SIGQUIT:
logger.InfoContext(ctx, "graceful shutdown, waiting for jobs to finish...")
}
}
}()

if service.Name == "" {
panic("name is required for services")
}
Expand All @@ -180,7 +182,12 @@ LOOP:
if loadedConfig.Database.Enabled {
databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database)
if err != nil {
panic(fmt.Sprintf("unable to access database: %v", err))
if errors.Is(err, context.Canceled) {
logger.WarnContext(ctx, "database access attempt with canceled context", "err", err)
return
} else {
logger.ErrorContext(ctx, "unable to access database", "err", err)
}
}
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{
Client: databaseClient,
Expand Down
65 changes: 35 additions & 30 deletions plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package github

import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -87,20 +88,19 @@ func setLoggingContext(ctx context.Context, workflowRunJob WorkflowRunJobDetail)

func getWorkflowRunJobs(ctx context.Context, logger *slog.Logger) ([]WorkflowRunJobDetail, error) {
if ctx.Err() != nil {
return nil, fmt.Errorf("context cancelled before getWorkflowRunJobs")
return nil, fmt.Errorf("context canceled before getWorkflowRunJobs")
}
githubClient := internalGithub.GetGitHubClientFromContext(ctx)
service := config.GetServiceFromContext(ctx)
var allWorkflowRunJobDetails []WorkflowRunJobDetail
// WORKFLOWS

workflows, _, err := ExecuteGitHubClientFunction[*github.Workflows](ctx, logger, func() (**github.Workflows, *github.Response, error) {
workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{})
return &workflows, resp, err // Adjusted to return the direct result
})
if ctx.Err() != nil {
logger.WarnContext(ctx, "context cancelled during workflows listing")
return []WorkflowRunJobDetail{}, errors.New("context cancelled during workflows listing")
logger.WarnContext(ctx, "context canceled during workflows listing")
return []WorkflowRunJobDetail{}, errors.New("context canceled during workflows listing")
}
if err != nil {
logger.ErrorContext(ctx, "error executing githubClient.Actions.ListWorkflows", "err", err)
Expand Down Expand Up @@ -169,8 +169,13 @@ func getWorkflowRunJobs(ctx context.Context, logger *slog.Logger) ([]WorkflowRun
// if a node is pulling, the job doesn't change from queued, so let's do a check to see if a node picked it up or not
exists, err := dbFunctions.CheckIfKeyExists(ctx, fmt.Sprintf("%s:%s", runID, uniqueID))
if err != nil {
logger.ErrorContext(ctx, "error checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
if strings.Contains(err.Error(), "context canceled") {
logger.WarnContext(ctx, "context was canceled while checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
} else {
logger.ErrorContext(ctx, "error checking if key exists in database", "err", err)
return []WorkflowRunJobDetail{}, errors.New("error checking if key exists in database")
}
}

if !exists {
Expand Down Expand Up @@ -229,23 +234,23 @@ func Run(ctx context.Context, logger *slog.Logger) {
return
}

// simplifiedWorkflowRuns := make([]map[string]interface{}, 0)
// for _, workflowRunJob := range allWorkflowRunJobDetails {
// simplifiedRun := map[string]interface{}{
// "name": workflowRunJob.Job.Name,
// "created_at": workflowRunJob.Job.CreatedAt,
// "workflow_name": workflowRunJob.Job.WorkflowName,
// "workflow_run_name": workflowRunJob.WorkflowRunName,
// "run_id": workflowRunJob.Job.RunID,
// "unique_id": workflowRunJob.UniqueID,
// "html_url": workflowRunJob.Job.HTMLURL,
// "labels": workflowRunJob.Job.Labels,
// "status": workflowRunJob.Job.Status,
// }
// simplifiedWorkflowRuns = append(simplifiedWorkflowRuns, simplifiedRun)
// }
// allWorkflowRunJobsJSON, _ := json.MarshalIndent(simplifiedWorkflowRuns, "", " ")
// fmt.Printf("%s\n", allWorkflowRunJobsJSON)
simplifiedWorkflowRuns := make([]map[string]interface{}, 0)
for _, workflowRunJob := range allWorkflowRunJobDetails {
simplifiedRun := map[string]interface{}{
"name": workflowRunJob.Job.Name,
"created_at": workflowRunJob.Job.CreatedAt,
"workflow_name": workflowRunJob.Job.WorkflowName,
"workflow_run_name": workflowRunJob.WorkflowRunName,
"run_id": workflowRunJob.Job.RunID,
"unique_id": workflowRunJob.UniqueID,
"html_url": workflowRunJob.Job.HTMLURL,
"labels": workflowRunJob.Job.Labels,
"status": workflowRunJob.Job.Status,
}
simplifiedWorkflowRuns = append(simplifiedWorkflowRuns, simplifiedRun)
}
allWorkflowRunJobsJSON, _ := json.MarshalIndent(simplifiedWorkflowRuns, "", " ")
fmt.Printf("%s\n", allWorkflowRunJobsJSON)

// Loop over all items, so we don't have to re-request the whole list of queued jobs if one is already running on another host
for _, workflowRunJob := range allWorkflowRunJobDetails {
Expand Down Expand Up @@ -305,7 +310,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
}

if ctx.Err() != nil {
logger.InfoContext(ctx, "context cancelled before ObtainAnkaVM")
logger.WarnContext(ctx, "context canceled before ObtainAnkaVM")
return
}

Expand All @@ -331,7 +336,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
}

if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before install runner")
logger.WarnContext(ctx, "context canceled before install runner")
return
}
ankRunScriptOutput, err := ankaCLI.ExecuteAndParseJsonOnError(ctx,
Expand All @@ -345,7 +350,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
logger.DebugContext(ctx, "install runner", "stdout", string(ankRunScriptOutput))
// Register runner
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before register runner")
logger.WarnContext(ctx, "context canceled before register runner")
return
}
ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx,
Expand All @@ -360,7 +365,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
logger.DebugContext(ctx, "register runner", "stdout", string(ankRunScriptOutput))
// Install and Start runner
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before start runner")
logger.WarnContext(ctx, "context canceled before start runner")
return
}
ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx,
Expand All @@ -372,7 +377,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
}
logger.DebugContext(ctx, "runner logs", "data", string(ankRunScriptOutput))
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before jobCompleted checks")
logger.WarnContext(ctx, "context canceled before jobCompleted checks")
return
}

Expand All @@ -381,7 +386,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
logCounter := 0
for !jobCompleted {
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled while watching for job completion")
logger.WarnContext(ctx, "context canceled while watching for job completion")
break
}
currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](ctx, logger, func() (*github.WorkflowJob, *github.Response, error) {
Expand All @@ -397,7 +402,7 @@ func Run(ctx context.Context, logger *slog.Logger) {
logger.InfoContext(ctx, "job completed", "job_id", *workflowRunJob.Job.ID)
} else if logCounter%2 == 0 {
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled during job status check")
logger.WarnContext(ctx, "context canceled during job status check")
return
}
logger.InfoContext(ctx, "job still in progress", "job_id", *workflowRunJob.Job.ID)
Expand Down

0 comments on commit a0d0d58

Please sign in to comment.