From 680f075b3d5caf7f9c7a93bf9626f47648ea5592 Mon Sep 17 00:00:00 2001 From: Nathan Pierce Date: Thu, 12 Dec 2024 13:10:13 -0600 Subject: [PATCH] further improvements to the new in_progress logic --- internal/anka/cli.go | 4 +- plugins/handlers/github/github.go | 115 ++++++++++++++++-------------- 2 files changed, 63 insertions(+), 56 deletions(-) diff --git a/internal/anka/cli.go b/internal/anka/cli.go index 3dfee58..08e3993 100644 --- a/internal/anka/cli.go +++ b/internal/anka/cli.go @@ -343,10 +343,10 @@ func (cli *Cli) AnkaCopyOutOfVM(ctx context.Context, objectToCopyOut string, hos return err } if copyOutput.Status != "OK" { - return fmt.Errorf("error copying into vm: %s", copyOutput.Message) + return fmt.Errorf("error copying out of vm: %s", copyOutput.Message) } logger.DebugContext(ctx, "copy output", "std", copyOutput) - logger.InfoContext(ctx, "successfully copied %s out of vm", "object", objectToCopyOut, "stdout", copyOutput.Message) + logger.InfoContext(ctx, fmt.Sprintf("successfully copied %s out of vm to %s", objectToCopyOut, hostLevelDestination), "stdout", copyOutput.Message) return nil } diff --git a/plugins/handlers/github/github.go b/plugins/handlers/github/github.go index 6c59bd0..8b949b2 100644 --- a/plugins/handlers/github/github.go +++ b/plugins/handlers/github/github.go @@ -145,32 +145,30 @@ func DeleteFromQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64 // passing the the queue and ID to check for func InQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64, queue string) (bool, error) { - toReturn := false databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) if err != nil { - return toReturn, err + return false, err } queued, err := databaseContainer.Client.LRange(pluginCtx, queue, 0, -1).Result() if err != nil { logger.ErrorContext(pluginCtx, "error getting list of queued jobs", "err", err) - return toReturn, err + return false, err } for _, queueItem := range queued { workflowJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queueItem) if err != nil { logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) - return toReturn, err + return false, err } if typeErr != nil { // not the type we want continue } if *workflowJobEvent.WorkflowJob.ID == jobID { // logger.WarnContext(pluginCtx, "WorkflowJob.ID already in queue", "WorkflowJob.ID", jobID) - toReturn = true - break + return true, nil } } - return toReturn, nil + return false, nil } func extractLabelValue(labels []string, prefix string) string { @@ -236,7 +234,7 @@ func CheckForCompletedJobs( completedJobChannel chan github.WorkflowJobEvent, ranOnce chan struct{}, runOnce bool, - failureChannel chan bool, + retryChannel chan bool, ) { ctxPlugin, err := config.GetPluginFromContext(pluginCtx) if err != nil { @@ -266,8 +264,7 @@ func CheckForCompletedJobs( // do not use 'continue' in the loop or else the ranOnce won't happen // logging.DevContext(pluginCtx, "CheckForCompletedJobs "+ctxPlugin.Name+" | runOnce "+fmt.Sprint(runOnce)) select { - case <-failureChannel: - // logger.ErrorContext(pluginCtx, "CheckForCompletedJobs"+ctxPlugin.Name+" failureChannel") + case <-retryChannel: returnToMainQueue, ok := workerCtx.Value(config.ContextKey("returnToMainQueue")).(chan bool) if !ok { logger.ErrorContext(pluginCtx, "error getting returnToMainQueue from context") @@ -471,9 +468,7 @@ func cleanup( if strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEBUG" || strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEV" { err = ankaCLI.AnkaCopyOutOfVM(pluginCtx, "/Users/anka/actions-runner/_diag", "/tmp/"+vm.Name) if err != nil { - logger.ErrorContext(pluginCtx, "error copying out of vm", "err", err) - } else { - logger.DebugContext(pluginCtx, "successfully copied out of vm", "vm", vm.Name) + logger.ErrorContext(pluginCtx, "error copying actions runner out of vm", "err", err) } } ankaCLI.AnkaDelete(workerCtx, pluginCtx, &vm) @@ -611,7 +606,7 @@ func Run( checkForCompletedJobsMu := &sync.Mutex{} cleanupMu := &sync.Mutex{} - failureChannel := make(chan bool, 1) + retryChannel := make(chan bool, 1) completedJobChannel := make(chan github.WorkflowJobEvent, 1) // wait group so we can wait for the goroutine to finish before exiting the service @@ -641,7 +636,7 @@ func Run( completedJobChannel, ranOnce, false, - failureChannel, + retryChannel, ) wg.Done() }() @@ -701,7 +696,7 @@ func Run( // check if the job is already completed, so we don't orphan if there is // a job in anklet/jobs/github/queued and also a anklet/jobs/github/completed - CheckForCompletedJobs(workerCtx, pluginCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, true, failureChannel) + CheckForCompletedJobs(workerCtx, pluginCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, true, retryChannel) select { case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found by CheckForCompletedJobs") @@ -759,11 +754,11 @@ func Run( // See if VM Template existing already //TODO: be able to interrupt this - noTemplateTagExistsError, returnToQueueError := ankaCLI.EnsureVMTemplateExists(workerCtx, pluginCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag) - if returnToQueueError != nil { + noTemplateTagExistsError, templateExistsError := ankaCLI.EnsureVMTemplateExists(workerCtx, pluginCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag) + if templateExistsError != nil { // DO NOT RETURN AN ERROR TO MAIN. It will cause the other job on this node to be cancelled. - logger.WarnContext(pluginCtx, "problem ensuring vm template exists on host", "err", returnToQueueError) - failureChannel <- true // return to queue so another node can pick it up + logger.WarnContext(pluginCtx, "problem ensuring vm template exists on host", "err", templateExistsError) + retryChannel <- true // return to queue so another node can pick it up return pluginCtx, nil } if noTemplateTagExistsError != nil { @@ -778,7 +773,7 @@ func Run( if pluginCtx.Err() != nil { // logger.WarnContext(pluginCtx, "context canceled during vm template check") - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("context canceled during vm template check") } @@ -802,54 +797,58 @@ func Run( if err != nil { logger.DebugContext(pluginCtx, "error creating registration token", "err", err, "response", response) metricsData.IncrementTotalFailedRunsSinceStart() - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("error creating registration token: %s", err.Error()) } if *runnerRegistration.Token == "" { logger.DebugContext(pluginCtx, "registration token is empty; something wrong with github or your service token", "response", response) - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("registration token is empty; something wrong with github or your service token") } if pluginCtx.Err() != nil { // logger.WarnContext(pluginCtx, "context canceled before ObtainAnkaVM") - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("context canceled before ObtainAnkaVM") } // Obtain Anka VM (and name) newPluginCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, pluginCtx, workflowJob.AnkaTemplate) - wrappedVM := map[string]interface{}{ - "type": "anka.VM", - "payload": vm, - } - wrappedVmJSON, wrappedVmErr := json.Marshal(wrappedVM) - if wrappedVmErr != nil { - // logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr) - ankaCLI.AnkaDelete(workerCtx, pluginCtx, vm) - failureChannel <- true - return newPluginCtx, fmt.Errorf("error marshalling vm to json: %s", wrappedVmErr.Error()) + var wrappedVmJSON []byte + var wrappedVmErr error + if vm != nil { + wrappedVM := map[string]interface{}{ + "type": "anka.VM", + "payload": vm, + } + wrappedVmJSON, wrappedVmErr = json.Marshal(wrappedVM) + if wrappedVmErr != nil { + // logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr) + ankaCLI.AnkaDelete(workerCtx, pluginCtx, vm) + retryChannel <- true + return newPluginCtx, fmt.Errorf("error marshalling vm to json: %s", wrappedVmErr.Error()) + } + newPluginCtx = logging.AppendCtx(newPluginCtx, slog.String("vmName", vm.Name)) } - newPluginCtx = logging.AppendCtx(newPluginCtx, slog.String("vmName", vm.Name)) // TODO: THIS ISN"T WORKING pluginCtx = newPluginCtx dbErr := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, wrappedVmJSON).Err() if dbErr != nil { // logger.ErrorContext(pluginCtx, "error pushing vm data to database", "err", dbErr) - failureChannel <- true + retryChannel <- true return newPluginCtx, fmt.Errorf("error pushing vm data to database: %s", dbErr.Error()) } if err != nil { // this is thrown, for example, when there is no capacity on the host // we must be sure to create the DB entry so cleanup happens properly logger.ErrorContext(pluginCtx, "error obtaining anka vm", "err", err) - failureChannel <- true + retryChannel <- true return pluginCtx, nil } if pluginCtx.Err() != nil { // logger.WarnContext(pluginCtx, "context canceled after ObtainAnkaVM") - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("context canceled after ObtainAnkaVM") } @@ -884,7 +883,7 @@ func Run( if err != nil { // logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err) metricsData.IncrementTotalFailedRunsSinceStart() - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("error executing anka copy: %s", err.Error()) } select { @@ -903,7 +902,7 @@ func Run( installRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./install-runner.bash") if installRunnerErr != nil { // logger.ErrorContext(pluginCtx, "error executing install-runner.bash", "err", installRunnerErr) - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("error executing install-runner.bash: %s", installRunnerErr.Error()) } // Register runner @@ -924,7 +923,7 @@ func Run( ) if registerRunnerErr != nil { // logger.ErrorContext(pluginCtx, "error executing register-runner.bash", "err", registerRunnerErr) - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("error executing register-runner.bash: %s", registerRunnerErr.Error()) } defer removeSelfHostedRunner(pluginCtx, *vm, &workflowJob) @@ -943,7 +942,7 @@ func Run( startRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./start-runner.bash") if startRunnerErr != nil { // logger.ErrorContext(pluginCtx, "error executing start-runner.bash", "err", startRunnerErr) - failureChannel <- true + retryChannel <- true return pluginCtx, fmt.Errorf("error executing start-runner.bash: %s", startRunnerErr.Error()) } @@ -959,10 +958,11 @@ func Run( } } // skipPrep - logger.InfoContext(pluginCtx, "watching for job completion") + logger.InfoContext(pluginCtx, "finished preparing anka VM with actions runner") // Watch for job completion logCounter := 0 + alreadyLogged := false for { select { case completedJobEvent := <-completedJobChannel: @@ -1013,22 +1013,29 @@ func Run( return pluginCtx, nil default: time.Sleep(10 * time.Second) + inProgressQueue, err := InQueue(pluginCtx, logger, workflowJob.JobID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) + if err != nil { + logger.ErrorContext(pluginCtx, "error searching in queue", "error", err) + } + if inProgressQueue { + if logCounter%2 == 0 { + logger.InfoContext(pluginCtx, "job found registered runner and is now in progress", "job_id", workflowJob.JobID) + } + } else { + if !alreadyLogged { + logger.InfoContext(pluginCtx, "job is waiting for registered runner", "job_id", workflowJob.JobID) + alreadyLogged = true + } + } if logCounter == 6 { // after one minute, check to see if the job has started in the in_progress queue // if not, then the runner registration failed - inQueue, err := InQueue(pluginCtx, logger, workflowJob.JobID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) - if err != nil { - logger.ErrorContext(pluginCtx, "error searching in queue", "error", err) - } else { - if !inQueue { - failureChannel <- true - return pluginCtx, fmt.Errorf("runner registration failed") - } + if !inProgressQueue { + logger.ErrorContext(pluginCtx, "waiting for runner registration timed out, will retry") + retryChannel <- true + return pluginCtx, nil } } - if logCounter%2 == 0 { - logger.InfoContext(pluginCtx, "job still in progress", "job_id", workflowJob.JobID) - } logCounter++ } // pluginCtx, currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) {