Skip to content

Commit

Permalink
supporting multiple receivers/orgs using the same redis DB
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Oct 9, 2024
1 parent f16d63e commit 36aab6f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
53 changes: 26 additions & 27 deletions plugins/handlers/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,17 @@ func CheckForCompletedJobs(
default:
}
// get the job ID
existingJobString, err := databaseContainer.Client.LIndex(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Name, 0).Result()
existingJobString, err := databaseContainer.Client.LIndex(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, 0).Result()
if runOnce && err == redis.Nil { // handle no job for service; needed so the github plugin resets and looks for new jobs again
logger.ErrorContext(pluginCtx, "CheckForCompletedJobs"+ctxPlugin.Name+" err == redis.Nil")
return
} else {
if err == nil {
// check if there is already a completed job queued for the service
// // this can happen if the service crashes or is stopped before it finalizes cleanup
count, err := databaseContainer.Client.LLen(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Name).Result()
count, err := databaseContainer.Client.LLen(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting count of objects in anklet/jobs/github/completed/"+ctxPlugin.Name, "err", err)
logger.ErrorContext(pluginCtx, "error getting count of objects in anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "err", err)
return
}
existingJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString)
Expand All @@ -263,14 +263,14 @@ func CheckForCompletedJobs(
case completedJobChannel <- existingJobEvent:
default:
// remove the completed job we found
_, err = databaseContainer.Client.Del(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Name).Result()
_, err = databaseContainer.Client.Del(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Name, "err", err)
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "err", err)
return
}
}
} else {
completedJobs, err := databaseContainer.Client.LRange(pluginCtx, "anklet/jobs/github/completed", 0, -1).Result()
completedJobs, err := databaseContainer.Client.LRange(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner, 0, -1).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting list of completed jobs", "err", err)
return
Expand All @@ -287,9 +287,9 @@ func CheckForCompletedJobs(
}
if *completedJobWebhookEvent.WorkflowJob.ID == *existingJobEvent.WorkflowJob.ID {
// remove the completed job we found
_, err = databaseContainer.Client.LRem(pluginCtx, "anklet/jobs/github/completed", 1, completedJob).Result()
_, err = databaseContainer.Client.LRem(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner, 1, completedJob).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", completedJobWebhookEvent)
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner, "err", err, "completedJob", completedJobWebhookEvent)
return
}
// delete the existing service task
Expand All @@ -299,7 +299,7 @@ func CheckForCompletedJobs(
// return
// }
// add a task for the completed job so we know the clean up
_, err = databaseContainer.Client.LPush(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Name, completedJob).Result()
_, err = databaseContainer.Client.LPush(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, completedJob).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error inserting completed job into list", "err", err)
return
Expand Down Expand Up @@ -366,20 +366,20 @@ func cleanup(
}
for {
var jobJSON string
exists, err := databaseContainer.Client.Exists(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning").Result()
exists, err := databaseContainer.Client.Exists(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning").Result()
if err != nil {
logger.ErrorContext(cleanupContext, "error checking if cleaning up already in progress", "err", err)
}
if exists == 1 {
logger.InfoContext(pluginCtx, "cleaning up already in progress; getting job")
jobJSON, err = databaseContainer.Client.LIndex(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning", 0).Result()
jobJSON, err = databaseContainer.Client.LIndex(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning", 0).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting job from the list", "err", err)
return
}
} else {
// pop the job from the list and push it to the cleaning list
jobJSON, err = databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning").Result()
jobJSON, err = databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning").Result()
if err == redis.Nil {
return // nothing to clean up
} else if err != nil {
Expand Down Expand Up @@ -418,7 +418,7 @@ func cleanup(
}
ankaCLI := anka.GetAnkaCLIFromContext(pluginCtx)
ankaCLI.AnkaDelete(workerCtx, pluginCtx, &vm)
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning")
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning")
continue // required to keep processing tasks in the db list
case "WorkflowJobPayload": // MUST COME LAST
var workflowJobEvent github.WorkflowJobEvent
Expand All @@ -431,27 +431,27 @@ func cleanup(
// if we don't, we could suffer from a situation where a completed job comes in and is orphaned
select {
case <-completedJobChannel:
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/completed/"+ctxPlugin.Name)
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning")
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/completed/"+ctxPlugin.Owner+"/"+ctxPlugin.Name)
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning")
break // break loop and delete /queued/servicename
default:
select {
case <-returnToMainQueue:
logger.WarnContext(pluginCtx, "pushing job back to anklet/jobs/github/queued")
_, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning", "anklet/jobs/github/queued").Result()
logger.WarnContext(pluginCtx, "pushing job back to anklet/jobs/github/queued/"+ctxPlugin.Owner)
_, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning", "anklet/jobs/github/queued/"+ctxPlugin.Owner).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error pushing job back to queued", "err", err)
return
}
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning")
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning")
default:
logger.WarnContext(pluginCtx, "pushing job back to anklet/jobs/github/queued/"+ctxPlugin.Name)
_, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning", "anklet/jobs/github/queued/"+ctxPlugin.Name).Result()
logger.WarnContext(pluginCtx, "pushing job back to anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name)
_, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning", "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error pushing job back to queued", "err", err)
return
}
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Name+"/cleaning")
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning")
}
}
default:
Expand All @@ -460,7 +460,6 @@ func cleanup(
}
return // don't delete the queued/servicename
}
// databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name)
}

func Run(
Expand Down Expand Up @@ -586,13 +585,13 @@ func Run(

var wrappedPayloadJSON string
// allow picking up where we left off
wrappedPayloadJSON, err = databaseContainer.Client.LIndex(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Name, -1).Result()
wrappedPayloadJSON, err = databaseContainer.Client.LIndex(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, -1).Result()
if err != nil && err != redis.Nil {
logger.ErrorContext(pluginCtx, "error getting last object from anklet/jobs/github/queued/"+ctxPlugin.Name, "err", err)
logger.ErrorContext(pluginCtx, "error getting last object from anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "err", err)
return
}
if wrappedPayloadJSON == "" { // if we haven't done anything before, get something from the main queue
eldestQueuedJob, err := databaseContainer.Client.LPop(pluginCtx, "anklet/jobs/github/queued").Result()
eldestQueuedJob, err := databaseContainer.Client.LPop(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner).Result()
if err == redis.Nil {
logger.DebugContext(pluginCtx, "no queued jobs found")
completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine
Expand All @@ -603,7 +602,7 @@ func Run(
logger.ErrorContext(pluginCtx, "error getting queued jobs", "err", err)
return
}
databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Name, eldestQueuedJob)
databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, eldestQueuedJob)
wrappedPayloadJSON = eldestQueuedJob
}

Expand Down Expand Up @@ -751,7 +750,7 @@ func Run(
failureChannel <- true
return
}
dbErr := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Name, wrappedVmJSON).Err()
dbErr := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, wrappedVmJSON).Err()
if dbErr != nil {
logger.ErrorContext(pluginCtx, "error pushing vm data to database", "err", dbErr)
failureChannel <- true
Expand Down
16 changes: 8 additions & 8 deletions plugins/receivers/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func Run(
if *workflowJob.Action == "queued" {
if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) {
// make sure it doesn't already exist
inQueue, err := InQueue(pluginCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queued")
inQueue, err := InQueue(pluginCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queued/"+ctxPlugin.Owner)
if err != nil {
logger.ErrorContext(pluginCtx, "error searching in queue", "error", err)
return
Expand All @@ -213,7 +213,7 @@ func Run(
logger.ErrorContext(pluginCtx, "error converting job payload to JSON", "error", err)
return
}
push := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued", wrappedPayloadJSON)
push := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner, wrappedPayloadJSON)
if push.Err() != nil {
logger.ErrorContext(pluginCtx, "error pushing job to queue", "error", push.Err())
return
Expand All @@ -226,7 +226,7 @@ func Run(

queues := []string{}
// get all keys from database for the main queue and service queues as well as completed
queuedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/queued*").Result()
queuedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"*").Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err)
return
Expand Down Expand Up @@ -258,7 +258,7 @@ func Run(
}
}
if inAQueue { // only add completed if it's in a queue
inCompletedQueue, err := InQueue(pluginCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed")
inCompletedQueue, err := InQueue(pluginCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed/"+ctxPlugin.Owner)
if err != nil {
logger.ErrorContext(pluginCtx, "error searching in queue", "error", err)
return
Expand All @@ -274,7 +274,7 @@ func Run(
logger.ErrorContext(pluginCtx, "error converting job payload to JSON", "error", err)
return
}
push := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/completed", wrappedPayloadJSON)
push := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/completed/"+ctxPlugin.Owner, wrappedPayloadJSON)
if push.Err() != nil {
logger.ErrorContext(pluginCtx, "error pushing job to queue", "error", push.Err())
return
Expand Down Expand Up @@ -422,7 +422,7 @@ func Run(
}

// get all keys from database for the main queue and service queues as well as completed
queuedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/queued*").Result()
queuedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"*").Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err)
return
Expand All @@ -436,7 +436,7 @@ func Run(
}
allQueuedJobs[key] = queuedJobs
}
completedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/completed*").Result()
completedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/completed"+ctxPlugin.Owner+"*").Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err)
return
Expand Down Expand Up @@ -535,7 +535,7 @@ func Run(
if inCompleted && !inQueued {
_, err = databaseContainer.Client.LRem(pluginCtx, inCompletedListKey, 1, allCompletedJobs[inCompletedListKey][inCompletedIndex]).Result()
if err != nil {
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex])
logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner, "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex])
return
}
continue
Expand Down

0 comments on commit 36aab6f

Please sign in to comment.