From 431aac090321caeebf07bee7c8228ebaad92d3b9 Mon Sep 17 00:00:00 2001 From: Nathan Pierce Date: Fri, 26 Jul 2024 14:21:36 -0700 Subject: [PATCH] quick fixes --- plugins/controllers/github.go | 338 +++++++++++++++++++++++++--------- plugins/github/github.go | 91 ++++----- 2 files changed, 297 insertions(+), 132 deletions(-) diff --git a/plugins/controllers/github.go b/plugins/controllers/github.go index 31f54c2..9183f69 100644 --- a/plugins/controllers/github.go +++ b/plugins/controllers/github.go @@ -7,6 +7,7 @@ import ( "log/slog" "net/http" "strings" + "sync" "time" "github.com/bradleyfalzon/ghinstallation/v2" @@ -46,29 +47,29 @@ func exists_in_array_exact(array_to_search_in []string, desired []string) bool { return true } -// make sure there are no duplicates in a specific queue -func EnsureNoDuplicates(serviceCtx context.Context, logger *slog.Logger, jobID int64, queue string) error { +// passing the the queue and ID to check for +func InQueue(serviceCtx context.Context, logger *slog.Logger, jobID int64, queue string) (bool, error) { databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) if err != nil { logger.ErrorContext(serviceCtx, "error getting database client from context", "error", err) - return err + return false, err } queued, err := databaseContainer.Client.LRange(serviceCtx, queue, 0, -1).Result() if err != nil { logger.ErrorContext(serviceCtx, "error getting list of queued jobs", "err", err) - return err + return false, err } for _, queueItem := range queued { workflowJobEvent, err := database.UnwrapPayload[github.WorkflowJobEvent](queueItem) if err != nil { logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) - return err + return false, err } if *workflowJobEvent.WorkflowJob.ID == jobID { - return fmt.Errorf("job already in queue") + return true, fmt.Errorf("job already in queue") } } - return nil + return false, nil } // https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves. @@ -156,10 +157,12 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co if *workflowJob.Action == "queued" { if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) { // make sure it doesn't already exist - err := EnsureNoDuplicates(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queued") + inQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queued") if err != nil { - logger.DebugContext(serviceCtx, err.Error()) - } else { + logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + return + } + if !inQueue { // if it doesn't exist already // push it to the queue wrappedJobPayload := map[string]interface{}{ "type": "WorkflowJobPayload", @@ -180,28 +183,101 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co } } else if *workflowJob.Action == "completed" { if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) { - // make sure it doesn't already exist - err := EnsureNoDuplicates(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed") - if err != nil { - logger.DebugContext(serviceCtx, err.Error()) - } else { - // push it to the queue - wrappedJobPayload := map[string]interface{}{ - "type": "WorkflowJobPayload", - "payload": workflowJob, + + queues := []string{"anklet/jobs/github/queued"} + results := make(chan bool, len(queues)) + var wg sync.WaitGroup + + for _, queue := range queues { + wg.Add(1) + go func(queue string) { + defer wg.Done() + inQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, queue) + if err != nil { + logger.ErrorContext(serviceCtx, "error searching in queue", "queue", queue, "error", err) + results <- false + return + } + results <- inQueue + }(queue) + } + + fmt.Println("here1") + go func() { + wg.Wait() + close(results) + }() + + fmt.Println("here2") + + inAQueue := false + for result := range results { + if result { + inAQueue = true + break } - wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + } + + fmt.Println("here3") + + if inAQueue { // only add completed if it's in a queue + inCompletedQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed") if err != nil { - logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) return } - push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/completed", wrappedPayloadJSON) - if push.Err() != nil { - logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) - return + if !inCompletedQueue { + // push it to the queue + wrappedJobPayload := map[string]interface{}{ + "type": "WorkflowJobPayload", + "payload": workflowJob, + } + wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + if err != nil { + logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + return + } + push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/completed", wrappedPayloadJSON) + if push.Err() != nil { + logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) + return + } + logger.InfoContext(serviceCtx, "job pushed to completed queue", "json", string(wrappedPayloadJSON)) } - logger.InfoContext(serviceCtx, "job pushed to completed queue", "json", string(wrappedPayloadJSON)) } + + // // make sure we don't orphan completed if there is nothing in queued or other lists for it + // inQueueQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queue") + // if err != nil { + // logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + // return + // } + // if !inQueueQueue { + // // make sure it doesn't already exist + // inCompletedQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed") + // if err != nil { + // logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + // return + // } + // if !inCompletedQueue { + // // push it to the queue + // wrappedJobPayload := map[string]interface{}{ + // "type": "WorkflowJobPayload", + // "payload": workflowJob, + // } + // wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + // if err != nil { + // logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + // return + // } + // push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/completed", wrappedPayloadJSON) + // if push.Err() != nil { + // logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) + // return + // } + // logger.InfoContext(serviceCtx, "job pushed to completed queue", "json", string(wrappedPayloadJSON)) + // } + // } } } } @@ -220,89 +296,169 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co // redeliver all hooks that failed while the service was down githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) - // check if the items int he queued list are even still running, otherwise remove them - queuedJobs, err := databaseContainer.Client.LRange(serviceCtx, "anklet/jobs/github/queued", 0, -1).Result() - if err != nil { - logger.ErrorContext(serviceCtx, "error getting list of queued jobs", "err", err) - return - } - var firstQueuedJobStartedAtDate time.Time - if len(queuedJobs) > 0 { - logger.InfoContext(serviceCtx, "searching for redeliveries") - // get the started date of the eldest item in the list of queued jobs - firstQueuedJobEvent, err := database.UnwrapPayload[github.WorkflowJobEvent](queuedJobs[0]) - if err != nil { - logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) - return - } - firstQueuedJobStartedAtDate = firstQueuedJobEvent.WorkflowJob.StartedAt.Time - var allHooks []github.HookDelivery - opts := &github.ListCursorOptions{PerPage: 10} - for { - serviceCtx, hooks, response, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { - hooks, response, err := githubClient.Repositories.ListHookDeliveries(serviceCtx, service.Owner, service.Repo, service.HookID, opts) - if err != nil { - return nil, nil, err - } - return &hooks, response, nil - }) + // Redeliver queued jobs + limitForHooks := time.Now().Add(-time.Hour * 24) // the time we want the stop search for redeliveries + var allHooks []map[string]interface{} + opts := &github.ListCursorOptions{PerPage: 10} + doneWithHooks := false + for { + serviceCtx, hookDeliveries, response, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(serviceCtx, service.Owner, service.Repo, service.HookID, opts) if err != nil { - logger.ErrorContext(serviceCtx, "error listing hooks", "err", err) - return - } - for _, hook := range *hooks { - if hook.StatusCode != nil && *hook.StatusCode == 502 { - allHooks = append(allHooks, *hook) - } + return nil, nil, err } - if response.Cursor == "" || (len(allHooks) > 0 && firstQueuedJobStartedAtDate.After(allHooks[len(allHooks)-1].DeliveredAt.Time)) { - break - } - opts.Cursor = response.Cursor + return &hookDeliveries, response, nil + }) + if err != nil { + logger.ErrorContext(serviceCtx, "error listing hooks", "err", err) + return } - for _, queuedJob := range queuedJobs { - wrappedPayload, err := database.UnwrapPayload[github.WorkflowJobEvent](queuedJob) - if err != nil { - logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) - return - } - for _, hook := range allHooks { - serviceCtx, hookdelivery, _, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { - hookdelivery, response, err := githubClient.Repositories.GetHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hook.ID) + for _, hookDelivery := range *hookDeliveries { + if hookDelivery.StatusCode != nil && *hookDelivery.StatusCode == 502 && !*hookDelivery.Redelivery { + serviceCtx, gottenHookDelivery, _, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookDelivery.ID) if err != nil { return nil, nil, err } - return hookdelivery, response, nil + return gottenHookDelivery, response, nil }) if err != nil { logger.ErrorContext(serviceCtx, "error listing hooks", "err", err) return } - var hookResponse github.WorkflowJobEvent - err = json.Unmarshal(*hookdelivery.Request.RawPayload, &hookResponse) + var workflowJobEvent github.WorkflowJobEvent + err = json.Unmarshal(*gottenHookDelivery.Request.RawPayload, &workflowJobEvent) if err != nil { logger.ErrorContext(serviceCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) return } - if *hookResponse.WorkflowJob.ID == *wrappedPayload.WorkflowJob.ID && !*hookdelivery.Redelivery { - logger.DebugContext(serviceCtx, "redelivery being considered", "action", *hookResponse.Action, "workflowID", *hookResponse.WorkflowJob.ID, "redeliveryGUID", *hookdelivery.GUID) - err := EnsureNoDuplicates(serviceCtx, logger, *hookResponse.WorkflowJob.ID, "anklet/jobs/github/"+*hookResponse.Action) - if err != nil { - logger.DebugContext(serviceCtx, err.Error()) - } else { - serviceCtx, redelivery, _, _ := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { - redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookdelivery.ID) - if err != nil { - return nil, nil, err - } - return redelivery, response, nil - }) - // err doesn't matter here and it will always throw "job scheduled on GitHub side; try again later" - logger.InfoContext(serviceCtx, "hook redelivered", "hook", redelivery) - } + if len(allHooks) > 0 && limitForHooks.After(gottenHookDelivery.DeliveredAt.Time) { + doneWithHooks = true break } + allHooks = append(allHooks, map[string]interface{}{ + "hookDelivery": gottenHookDelivery, + "workflowJobEvent": workflowJobEvent, + }) + } + } + if response.Cursor == "" || doneWithHooks { + break + } + opts.Cursor = response.Cursor + } + + // get all keys from database for the main queue and service queues as well as completed + queuedKeys, err := databaseContainer.Client.Keys(serviceCtx, "anklet/jobs/github/queued*").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of keys", "err", err) + return + } + var allQueuedJobs map[string][]string + for _, key := range queuedKeys { + queuedJobs, err := databaseContainer.Client.LRange(serviceCtx, key, 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of queued jobs for key: "+key, "err", err) + return + } + allQueuedJobs[key] = queuedJobs + } + completedKeys, err := databaseContainer.Client.Keys(serviceCtx, "anklet/jobs/github/completed*").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of keys", "err", err) + return + } + var allCompletedJobs map[string][]string + for _, key := range completedKeys { + completedJobs, err := databaseContainer.Client.LRange(serviceCtx, key, 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of queued jobs for key: "+key, "err", err) + return + } + allCompletedJobs[key] = completedJobs + } + + if len(allHooks) > 0 { + for _, hookWrapper := range allHooks { + hookDelivery := hookWrapper["hookDelivery"].(github.HookDelivery) + workflowJobEvent := hookWrapper["workflowJobEvent"].(github.WorkflowJobEvent) + + inQueued := false + // inQueuedListKey := "" + // inQueuedListIndex := 0 + inCompleted := false + inCompletedListKey := "" + inCompletedIndex := 0 + + // Queued deliveries + if *hookDelivery.Action == "queued" { + for _, queuedJobs := range allQueuedJobs { + for _, queuedJob := range queuedJobs { + wrappedPayload, err := database.UnwrapPayload[github.WorkflowJobEvent](queuedJob) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + if *wrappedPayload.WorkflowJob.ID == *workflowJobEvent.WorkflowJob.ID { + inQueued = true + // inQueuedListKey = key + // inQueuedListIndex = index + break + } + } + } + + } + + // Completed deliveries + if *hookDelivery.Action == "completed" { + for key, completedJobs := range allCompletedJobs { + for index, completedJob := range completedJobs { + wrappedPayload, err := database.UnwrapPayload[github.WorkflowJobEvent](completedJob) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + if *wrappedPayload.WorkflowJob.ID == *workflowJobEvent.WorkflowJob.ID { + inCompleted = true + inCompletedListKey = key + inCompletedIndex = index + break + } + } + } + } + + // if in queued, but also has completed; continue and do nothing + if inQueued && inCompleted { + continue } + + // if in completed, but has no queued; remove from completed db + if inCompleted && !inQueued { + _, err = databaseContainer.Client.LRem(serviceCtx, inCompletedListKey, 1, allCompletedJobs[inCompletedListKey][inCompletedIndex]).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex]) + return + } + continue + } + + // all other cases (like when it's queued); continue + if inQueued || inCompleted { + continue + } + + // Redeliver the hook + serviceCtx, redelivery, _, _ := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookDelivery.ID) + if err != nil { + return nil, nil, err + } + return redelivery, response, nil + }) + // err doesn't matter here and it will always throw "job scheduled on GitHub side; try again later" + logger.InfoContext(serviceCtx, "hook redelivered", "hook", redelivery) } } // notify the main thread that the service has started diff --git a/plugins/github/github.go b/plugins/github/github.go index 1951f88..280b05e 100644 --- a/plugins/github/github.go +++ b/plugins/github/github.go @@ -179,7 +179,6 @@ func CheckForCompletedJobs( logger *slog.Logger, mu *sync.Mutex, completedJobChannel chan bool, - serviceDatabaseKeyName string, ranOnce chan struct{}, runOnce bool, ) { @@ -215,20 +214,23 @@ func CheckForCompletedJobs( return default: } + service := config.GetServiceFromContext(serviceCtx) // get the job ID - existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, serviceDatabaseKeyName, -1).Result() - if err != redis.Nil { // handle no job for service + existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, -1).Result() + if err == redis.Nil { // handle no job for service; needed so the github plugin resets and looks for new jobs again + return + } else { if err == nil { existingJob, err := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString) if err != nil { logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) return } - // check if there is already a completed job queued for the server + // check if there is already a completed job queued for the service // // this can happen if the service crashes or is stopped before it finalizes cleanup - count, err := databaseContainer.Client.LLen(serviceCtx, serviceDatabaseKeyName+"/completed").Result() + count, err := databaseContainer.Client.LLen(serviceCtx, "anklet/jobs/github/completed/"+service.Name).Result() if err != nil { - logger.ErrorContext(serviceCtx, "error getting count of objects in "+serviceDatabaseKeyName+"/completed", "err", err) + logger.ErrorContext(serviceCtx, "error getting count of objects in anklet/jobs/github/completed/"+service.Name, "err", err) return } if count > 0 { @@ -236,9 +238,9 @@ func CheckForCompletedJobs( case completedJobChannel <- true: default: // remove the completed job we found - _, err = databaseContainer.Client.Del(serviceCtx, serviceDatabaseKeyName+"/completed").Result() + _, err = databaseContainer.Client.Del(serviceCtx, "anklet/jobs/github/completed/"+service.Name).Result() if err != nil { - logger.ErrorContext(serviceCtx, "error removing completedJob from "+serviceDatabaseKeyName+"/completed", "err", err) + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed/"+service.Name, "err", err) return } } @@ -259,17 +261,17 @@ func CheckForCompletedJobs( // remove the completed job we found _, err = databaseContainer.Client.LRem(serviceCtx, "anklet/jobs/github/completed", 1, completedJob).Result() if err != nil { - logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err) + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", completedJobWebhook) return } // delete the existing service task - // _, err = databaseContainer.Client.Del(serviceCtx, serviceDatabaseKeyName).Result() + // _, err = databaseContainer.Client.Del(serviceCtx, serviceQueueDatabaseKeyName).Result() // if err != nil { - // logger.ErrorContext(serviceCtx, "error deleting all objects from "+serviceDatabaseKeyName, "err", err) + // logger.ErrorContext(serviceCtx, "error deleting all objects from "+serviceQueueDatabaseKeyName, "err", err) // return // } // add a task for the completed job so we know the clean up - _, err = databaseContainer.Client.LPush(serviceCtx, serviceDatabaseKeyName+"/completed", completedJob).Result() + _, err = databaseContainer.Client.LPush(serviceCtx, "anklet/jobs/github/completed/"+service.Name, completedJob).Result() if err != nil { logger.ErrorContext(serviceCtx, "error inserting completed job into list", "err", err) return @@ -299,10 +301,11 @@ func CheckForCompletedJobs( // cleanup will pop off the last item from the list and, based on its type, perform the appropriate cleanup action // this assumes the plugin code created a list item to represent the thing to clean up -func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger, serviceDatabaseKeyName string) { +func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger, completedJobChannel chan bool) { logger.InfoContext(serviceCtx, "cleaning up") // create an idependent copy of the serviceCtx so we can do cleanup even if serviceCtx got "context canceled" cleanupContext := context.Background() + service := config.GetServiceFromContext(serviceCtx) serviceDatabase, err := dbFunctions.GetDatabaseFromContext(serviceCtx) if err != nil { logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) @@ -311,6 +314,7 @@ func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog cleanupContext = context.WithValue(cleanupContext, config.ContextKey("database"), serviceDatabase) cleanupContext, cancel := context.WithCancel(cleanupContext) defer cancel() + cleanCompleted := false databaseContainer, err := dbFunctions.GetDatabaseFromContext(cleanupContext) if err != nil { logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) @@ -318,20 +322,20 @@ func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog } for { var jobJSON string - exists, err := databaseContainer.Client.Exists(cleanupContext, serviceDatabaseKeyName+"/cleaning").Result() + exists, err := databaseContainer.Client.Exists(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning").Result() if err != nil { logger.ErrorContext(cleanupContext, "error checking if cleaning up already in progress", "err", err) } if exists == 1 { logger.InfoContext(serviceCtx, "cleaning up already in progress; getting job") - jobJSON, err = databaseContainer.Client.LIndex(cleanupContext, serviceDatabaseKeyName+"/cleaning", 0).Result() + jobJSON, err = databaseContainer.Client.LIndex(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning", 0).Result() if err != nil { logger.ErrorContext(serviceCtx, "error getting job from the list", "err", err) return } } else { // pop the job from the list and push it to the cleaning list - jobJSON, err = databaseContainer.Client.RPopLPush(cleanupContext, serviceDatabaseKeyName, serviceDatabaseKeyName+"/cleaning").Result() + jobJSON, err = databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+service.Name, "anklet/jobs/github/queued/"+service.Name+"/cleaning").Result() if err == redis.Nil { break } else if err != nil { @@ -371,33 +375,40 @@ func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog } ankaCLI := anka.GetAnkaCLIFromContext(serviceCtx) ankaCLI.AnkaDelete(workerCtx, serviceCtx, &vm) - case "WorkflowJobPayload": - var hook github.WorkflowJobEvent - err = json.Unmarshal(payloadBytes, &hook) + case "WorkflowJobPayload": // MUST COME LAST + var workflowJobEvent github.WorkflowJobEvent + err = json.Unmarshal(payloadBytes, &workflowJobEvent) if err != nil { logger.ErrorContext(serviceCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err) return } - // return it to the queue + // return it to the queue if the job isn't completed yet // if we don't, we could suffer from a situation where a completed job comes in and is orphaned - _, err := databaseContainer.Client.RPopLPush(cleanupContext, serviceDatabaseKeyName+"/cleaning", "anklet/jobs/github/queued").Result() - if err != nil { - logger.ErrorContext(serviceCtx, "error pushing job to queued", "err", err) - return + select { + case <-completedJobChannel: + default: + _, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning", "anklet/jobs/github/queued/").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error pushing job back to queued", "err", err) + return + } } - logger.InfoContext(serviceCtx, "finalized cleaning of workflow job", "workflowJobID", hook.WorkflowJob.ID) - databaseContainer.Client.Del(cleanupContext, serviceDatabaseKeyName+"/completed") + logger.InfoContext(serviceCtx, "finalized cleaning of workflow job", "workflowJobID", workflowJobEvent.WorkflowJob.ID) + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning") + cleanCompleted = true default: logger.ErrorContext(serviceCtx, "unknown job type", "job", typedJob) return } - databaseContainer.Client.Del(cleanupContext, serviceDatabaseKeyName+"/cleaning") + if cleanCompleted { + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/completed/"+service.Name) + } } - databaseContainer.Client.Del(cleanupContext, serviceDatabaseKeyName) + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name) } func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger) { - return + logger.InfoContext(serviceCtx, "github plugin checking for jobs") service := config.GetServiceFromContext(serviceCtx) @@ -439,7 +450,6 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("repo", service.Repo)) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("owner", service.Owner)) - serviceDatabaseKeyName := "anklet/jobs/github/service/" + service.Name repositoryURL := fmt.Sprintf("https://github.com/%s/%s", service.Owner, service.Repo) mu := &sync.Mutex{} @@ -459,23 +469,22 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co fmt.Println("DEFER AT END") fmt.Println("wg.Wait()") wg.Wait() - fmt.Println("wg.Wait() done") + fmt.Println("wg.Wait() done") // cleanup after we exit the check for completed so we can clean up the environment if a completed job was received + cleanup(workerCtx, serviceCtx, logger, completedJobChannel) close(completedJobChannel) - // cleanup after we exit the check for completed so we can clean up the environment if a completed job was received - cleanup(workerCtx, serviceCtx, logger, serviceDatabaseKeyName) }() // check constantly for a cancelled webhook to be received for our job ranOnce := make(chan struct{}) go func() { fmt.Println("CheckForCompletedJobs goroutine") - CheckForCompletedJobs(workerCtx, serviceCtx, logger, mu, completedJobChannel, serviceDatabaseKeyName, ranOnce, false) + CheckForCompletedJobs(workerCtx, serviceCtx, logger, mu, completedJobChannel, ranOnce, false) wg.Done() fmt.Println("CheckForCompletedJobs goroutine done") }() <-ranOnce // wait for the goroutine to run at least once // finalize cleanup if the service crashed mid-cleanup - cleanup(workerCtx, serviceCtx, logger, serviceDatabaseKeyName) + cleanup(workerCtx, serviceCtx, logger, completedJobChannel) select { case <-completedJobChannel: @@ -489,12 +498,12 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co var wrappedPayloadJSON string // allow picking up where we left off - wrappedPayloadJSON, err = databaseContainer.Client.LIndex(serviceCtx, serviceDatabaseKeyName, -1).Result() + wrappedPayloadJSON, err = databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, -1).Result() if err != nil && err != redis.Nil { - logger.ErrorContext(serviceCtx, "error getting last object from "+serviceDatabaseKeyName, "err", err) + logger.ErrorContext(serviceCtx, "error getting last object from anklet/jobs/github/queued/"+service.Name, "err", err) return } - if wrappedPayloadJSON == "" { + if wrappedPayloadJSON == "" { // if we haven't done anything before, get something from the main queue eldestQueuedJob, err := databaseContainer.Client.LPop(serviceCtx, "anklet/jobs/github/queued").Result() if err == redis.Nil { logger.DebugContext(serviceCtx, "no queued jobs found") @@ -504,7 +513,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co logger.ErrorContext(serviceCtx, "error getting queued jobs", "err", err) return } - databaseContainer.Client.RPush(serviceCtx, serviceDatabaseKeyName, eldestQueuedJob) + databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/queued/"+service.Name, eldestQueuedJob) wrappedPayloadJSON = eldestQueuedJob } @@ -519,7 +528,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co // check if the job is already completed, so we don't orphan if there is // a job in anklet/jobs/github/queued and also a anklet/jobs/github/completed - CheckForCompletedJobs(workerCtx, serviceCtx, logger, mu, completedJobChannel, serviceDatabaseKeyName, ranOnce, true) + CheckForCompletedJobs(workerCtx, serviceCtx, logger, mu, completedJobChannel, ranOnce, true) select { case <-completedJobChannel: logger.InfoContext(serviceCtx, "completed job found") @@ -618,7 +627,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co logger.ErrorContext(serviceCtx, "error marshalling vm to json", "err", err) return } - err = databaseContainer.Client.RPush(serviceCtx, serviceDatabaseKeyName, wrappedVmJSON).Err() + err = databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/queued/"+service.Name, wrappedVmJSON).Err() if err != nil { logger.ErrorContext(serviceCtx, "error pushing vm data to database", "err", err) return