Skip to content

Commit

Permalink
further improvements to the new in_progress logic
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Dec 12, 2024
1 parent e365436 commit 680f075
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 56 deletions.
4 changes: 2 additions & 2 deletions internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func (cli *Cli) AnkaCopyOutOfVM(ctx context.Context, objectToCopyOut string, hos
return err
}
if copyOutput.Status != "OK" {
return fmt.Errorf("error copying into vm: %s", copyOutput.Message)
return fmt.Errorf("error copying out of vm: %s", copyOutput.Message)
}
logger.DebugContext(ctx, "copy output", "std", copyOutput)
logger.InfoContext(ctx, "successfully copied %s out of vm", "object", objectToCopyOut, "stdout", copyOutput.Message)
logger.InfoContext(ctx, fmt.Sprintf("successfully copied %s out of vm to %s", objectToCopyOut, hostLevelDestination), "stdout", copyOutput.Message)

return nil
}
Expand Down
115 changes: 61 additions & 54 deletions plugins/handlers/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,32 +145,30 @@ func DeleteFromQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64

// passing the the queue and ID to check for
func InQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64, queue string) (bool, error) {
toReturn := false
databaseContainer, err := database.GetDatabaseFromContext(pluginCtx)
if err != nil {
return toReturn, err
return false, err
}
queued, err := databaseContainer.Client.LRange(pluginCtx, queue, 0, -1).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting list of queued jobs", "err", err)
return toReturn, err
return false, err
}
for _, queueItem := range queued {
workflowJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queueItem)
if err != nil {
logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err)
return toReturn, err
return false, err
}
if typeErr != nil { // not the type we want
continue
}
if *workflowJobEvent.WorkflowJob.ID == jobID {
// logger.WarnContext(pluginCtx, "WorkflowJob.ID already in queue", "WorkflowJob.ID", jobID)
toReturn = true
break
return true, nil
}
}
return toReturn, nil
return false, nil
}

func extractLabelValue(labels []string, prefix string) string {
Expand Down Expand Up @@ -236,7 +234,7 @@ func CheckForCompletedJobs(
completedJobChannel chan github.WorkflowJobEvent,
ranOnce chan struct{},
runOnce bool,
failureChannel chan bool,
retryChannel chan bool,
) {
ctxPlugin, err := config.GetPluginFromContext(pluginCtx)
if err != nil {
Expand Down Expand Up @@ -266,8 +264,7 @@ func CheckForCompletedJobs(
// do not use 'continue' in the loop or else the ranOnce won't happen
// logging.DevContext(pluginCtx, "CheckForCompletedJobs "+ctxPlugin.Name+" | runOnce "+fmt.Sprint(runOnce))
select {
case <-failureChannel:
// logger.ErrorContext(pluginCtx, "CheckForCompletedJobs"+ctxPlugin.Name+" failureChannel")
case <-retryChannel:
returnToMainQueue, ok := workerCtx.Value(config.ContextKey("returnToMainQueue")).(chan bool)
if !ok {
logger.ErrorContext(pluginCtx, "error getting returnToMainQueue from context")
Expand Down Expand Up @@ -471,9 +468,7 @@ func cleanup(
if strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEBUG" || strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEV" {
err = ankaCLI.AnkaCopyOutOfVM(pluginCtx, "/Users/anka/actions-runner/_diag", "/tmp/"+vm.Name)
if err != nil {
logger.ErrorContext(pluginCtx, "error copying out of vm", "err", err)
} else {
logger.DebugContext(pluginCtx, "successfully copied out of vm", "vm", vm.Name)
logger.ErrorContext(pluginCtx, "error copying actions runner out of vm", "err", err)
}
}
ankaCLI.AnkaDelete(workerCtx, pluginCtx, &vm)
Expand Down Expand Up @@ -611,7 +606,7 @@ func Run(
checkForCompletedJobsMu := &sync.Mutex{}
cleanupMu := &sync.Mutex{}

failureChannel := make(chan bool, 1)
retryChannel := make(chan bool, 1)

completedJobChannel := make(chan github.WorkflowJobEvent, 1)
// wait group so we can wait for the goroutine to finish before exiting the service
Expand Down Expand Up @@ -641,7 +636,7 @@ func Run(
completedJobChannel,
ranOnce,
false,
failureChannel,
retryChannel,
)
wg.Done()
}()
Expand Down Expand Up @@ -701,7 +696,7 @@ func Run(

// check if the job is already completed, so we don't orphan if there is
// a job in anklet/jobs/github/queued and also a anklet/jobs/github/completed
CheckForCompletedJobs(workerCtx, pluginCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, true, failureChannel)
CheckForCompletedJobs(workerCtx, pluginCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, true, retryChannel)
select {
case <-completedJobChannel:
logger.InfoContext(pluginCtx, "completed job found by CheckForCompletedJobs")
Expand Down Expand Up @@ -759,11 +754,11 @@ func Run(

// See if VM Template existing already
//TODO: be able to interrupt this
noTemplateTagExistsError, returnToQueueError := ankaCLI.EnsureVMTemplateExists(workerCtx, pluginCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag)
if returnToQueueError != nil {
noTemplateTagExistsError, templateExistsError := ankaCLI.EnsureVMTemplateExists(workerCtx, pluginCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag)
if templateExistsError != nil {
// DO NOT RETURN AN ERROR TO MAIN. It will cause the other job on this node to be cancelled.
logger.WarnContext(pluginCtx, "problem ensuring vm template exists on host", "err", returnToQueueError)
failureChannel <- true // return to queue so another node can pick it up
logger.WarnContext(pluginCtx, "problem ensuring vm template exists on host", "err", templateExistsError)
retryChannel <- true // return to queue so another node can pick it up
return pluginCtx, nil
}
if noTemplateTagExistsError != nil {
Expand All @@ -778,7 +773,7 @@ func Run(

if pluginCtx.Err() != nil {
// logger.WarnContext(pluginCtx, "context canceled during vm template check")
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("context canceled during vm template check")
}

Expand All @@ -802,54 +797,58 @@ func Run(
if err != nil {
logger.DebugContext(pluginCtx, "error creating registration token", "err", err, "response", response)
metricsData.IncrementTotalFailedRunsSinceStart()
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("error creating registration token: %s", err.Error())
}
if *runnerRegistration.Token == "" {
logger.DebugContext(pluginCtx, "registration token is empty; something wrong with github or your service token", "response", response)
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("registration token is empty; something wrong with github or your service token")
}

if pluginCtx.Err() != nil {
// logger.WarnContext(pluginCtx, "context canceled before ObtainAnkaVM")
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("context canceled before ObtainAnkaVM")
}

// Obtain Anka VM (and name)
newPluginCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, pluginCtx, workflowJob.AnkaTemplate)
wrappedVM := map[string]interface{}{
"type": "anka.VM",
"payload": vm,
}
wrappedVmJSON, wrappedVmErr := json.Marshal(wrappedVM)
if wrappedVmErr != nil {
// logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr)
ankaCLI.AnkaDelete(workerCtx, pluginCtx, vm)
failureChannel <- true
return newPluginCtx, fmt.Errorf("error marshalling vm to json: %s", wrappedVmErr.Error())
var wrappedVmJSON []byte
var wrappedVmErr error
if vm != nil {
wrappedVM := map[string]interface{}{
"type": "anka.VM",
"payload": vm,
}
wrappedVmJSON, wrappedVmErr = json.Marshal(wrappedVM)
if wrappedVmErr != nil {
// logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr)
ankaCLI.AnkaDelete(workerCtx, pluginCtx, vm)
retryChannel <- true
return newPluginCtx, fmt.Errorf("error marshalling vm to json: %s", wrappedVmErr.Error())
}
newPluginCtx = logging.AppendCtx(newPluginCtx, slog.String("vmName", vm.Name))
}
newPluginCtx = logging.AppendCtx(newPluginCtx, slog.String("vmName", vm.Name)) // TODO: THIS ISN"T WORKING
pluginCtx = newPluginCtx

dbErr := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, wrappedVmJSON).Err()
if dbErr != nil {
// logger.ErrorContext(pluginCtx, "error pushing vm data to database", "err", dbErr)
failureChannel <- true
retryChannel <- true
return newPluginCtx, fmt.Errorf("error pushing vm data to database: %s", dbErr.Error())
}
if err != nil {
// this is thrown, for example, when there is no capacity on the host
// we must be sure to create the DB entry so cleanup happens properly
logger.ErrorContext(pluginCtx, "error obtaining anka vm", "err", err)
failureChannel <- true
retryChannel <- true
return pluginCtx, nil
}

if pluginCtx.Err() != nil {
// logger.WarnContext(pluginCtx, "context canceled after ObtainAnkaVM")
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("context canceled after ObtainAnkaVM")
}

Expand Down Expand Up @@ -884,7 +883,7 @@ func Run(
if err != nil {
// logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err)
metricsData.IncrementTotalFailedRunsSinceStart()
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("error executing anka copy: %s", err.Error())
}
select {
Expand All @@ -903,7 +902,7 @@ func Run(
installRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./install-runner.bash")
if installRunnerErr != nil {
// logger.ErrorContext(pluginCtx, "error executing install-runner.bash", "err", installRunnerErr)
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("error executing install-runner.bash: %s", installRunnerErr.Error())
}
// Register runner
Expand All @@ -924,7 +923,7 @@ func Run(
)
if registerRunnerErr != nil {
// logger.ErrorContext(pluginCtx, "error executing register-runner.bash", "err", registerRunnerErr)
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("error executing register-runner.bash: %s", registerRunnerErr.Error())
}
defer removeSelfHostedRunner(pluginCtx, *vm, &workflowJob)
Expand All @@ -943,7 +942,7 @@ func Run(
startRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./start-runner.bash")
if startRunnerErr != nil {
// logger.ErrorContext(pluginCtx, "error executing start-runner.bash", "err", startRunnerErr)
failureChannel <- true
retryChannel <- true
return pluginCtx, fmt.Errorf("error executing start-runner.bash: %s", startRunnerErr.Error())
}

Expand All @@ -959,10 +958,11 @@ func Run(
}
} // skipPrep

logger.InfoContext(pluginCtx, "watching for job completion")
logger.InfoContext(pluginCtx, "finished preparing anka VM with actions runner")

// Watch for job completion
logCounter := 0
alreadyLogged := false
for {
select {
case completedJobEvent := <-completedJobChannel:
Expand Down Expand Up @@ -1013,22 +1013,29 @@ func Run(
return pluginCtx, nil
default:
time.Sleep(10 * time.Second)
inProgressQueue, err := InQueue(pluginCtx, logger, workflowJob.JobID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner)
if err != nil {
logger.ErrorContext(pluginCtx, "error searching in queue", "error", err)
}
if inProgressQueue {
if logCounter%2 == 0 {
logger.InfoContext(pluginCtx, "job found registered runner and is now in progress", "job_id", workflowJob.JobID)
}
} else {
if !alreadyLogged {
logger.InfoContext(pluginCtx, "job is waiting for registered runner", "job_id", workflowJob.JobID)
alreadyLogged = true
}
}
if logCounter == 6 {
// after one minute, check to see if the job has started in the in_progress queue
// if not, then the runner registration failed
inQueue, err := InQueue(pluginCtx, logger, workflowJob.JobID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner)
if err != nil {
logger.ErrorContext(pluginCtx, "error searching in queue", "error", err)
} else {
if !inQueue {
failureChannel <- true
return pluginCtx, fmt.Errorf("runner registration failed")
}
if !inProgressQueue {
logger.ErrorContext(pluginCtx, "waiting for runner registration timed out, will retry")
retryChannel <- true
return pluginCtx, nil
}
}
if logCounter%2 == 0 {
logger.InfoContext(pluginCtx, "job still in progress", "job_id", workflowJob.JobID)
}
logCounter++
}
// pluginCtx, currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](pluginCtx, logger, func() (*github.WorkflowJob, *github.Response, error) {
Expand Down

0 comments on commit 680f075

Please sign in to comment.