Skip to content

Commit

Permalink
quick fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Jul 31, 2024
1 parent cf168cb commit 073aedd
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
3 changes: 2 additions & 1 deletion internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ func Plugin(workerCtx context.Context, serviceCtx context.Context, serviceCancel
for {
select {
case <-firstServiceStarted:
github.Run(workerCtx, serviceCtx, serviceCancel, logger)
github.Run(workerCtx, serviceCtx, serviceCancel, logger, firstServiceStarted)
return
case <-serviceCtx.Done():
logger.InfoContext(serviceCtx, "context cancelled before service started")
serviceCancel()
return
default:
time.Sleep(1 * time.Second)
firstServiceStarted <- true // if there is no controller in the config, we need to send this to the first service started channel to start the service
}
}
} else if service.Plugin == "github_controller" {
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func main() {
// obtain config
loadedConfig, err := config.LoadConfig(configPath)
if err != nil {
logger.InfoContext(parentCtx, "unable to load config.yml", "error", err)
// panic(err)
logger.ErrorContext(parentCtx, "unable to load config.yml (is it in the work_dir, or are you using an absolute path?)", "error", err)
panic(err)
}
logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig))
loadedConfig, err = config.LoadInEnvs(loadedConfig)
if err != nil {
panic(err)
}
logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig))

parentCtx = logging.AppendCtx(parentCtx, slog.String("ankletVersion", version))

Expand Down
24 changes: 21 additions & 3 deletions plugins/controllers/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co

server := &http.Server{Addr: ":" + service.Port}
http.HandleFunc("/jobs/v1/receiver", func(w http.ResponseWriter, r *http.Request) {
fmt.Println("RECEIVED ====================")
databaseContainer, err := database.GetDatabaseFromContext(serviceCtx)
if err != nil {
logger.ErrorContext(serviceCtx, "error getting database client from context", "error", err)
Expand All @@ -154,7 +153,11 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
}
switch workflowJob := event.(type) {
case *github.WorkflowJobEvent:
logger.DebugContext(serviceCtx, "received workflow job to consider", slog.Any("workflowJob", workflowJob))
logger.DebugContext(serviceCtx, "received workflow job to consider",
"workflowJob.Action", *workflowJob.Action,
"workflowJob.WorkflowJob.Labels", workflowJob.WorkflowJob.Labels,
"workflowJob.WorkflowJob.ID", *workflowJob.WorkflowJob.ID,
)
if *workflowJob.Action == "queued" {
if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) {
// make sure it doesn't already exist
Expand Down Expand Up @@ -204,7 +207,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
fmt.Println("checking if in queue", queue)
inQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, queue)
if err != nil {
logger.WarnContext(serviceCtx, "error searching in queue", "queue", queue, "error", err)
logger.WarnContext(serviceCtx, err.Error(), "queue", queue)
}
results <- inQueue
}(queue)
Expand Down Expand Up @@ -388,6 +391,7 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
}

if len(allHooks) > 0 {
MainLoop:
for i := len(allHooks) - 1; i >= 0; i-- { // make sure we process/redeliver queued before completed
hookWrapper := allHooks[i]
hookDelivery := hookWrapper["hookDelivery"].(*github.HookDelivery)
Expand Down Expand Up @@ -452,13 +456,27 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
continue
}

// handle queued that have already been successfully delivered before.
if *hookDelivery.Action == "queued" {
// check if a completed hook exists, so we don't re-queue something already finished
for _, job := range allHooks {
otherHookDelivery := job["hookDelivery"].(*github.HookDelivery)
otherWorkflowJobEvent := job["workflowJobEvent"].(github.WorkflowJobEvent)
if *otherHookDelivery.Action == "completed" && *workflowJobEvent.WorkflowJob.ID == *otherWorkflowJobEvent.WorkflowJob.ID {
continue MainLoop
}
}
}

// if in queued, and also has a successful completed, something is wrong and we need to re-deliver it.
if *hookDelivery.Action == "completed" && inQueued && *hookDelivery.StatusCode == 200 && !inCompleted {
logger.InfoContext(serviceCtx, "hook delivery has completed but is still in queued; redelivering")
} else if *hookDelivery.StatusCode == 200 || inQueued || inCompleted { // all other cases (like when it's queued); continue
continue
}

// Note; We cannot (and probably should not) stop completed from being redelivered.

// Redeliver the hook
serviceCtx, redelivery, _, _ := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) {
redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookDelivery.ID)
Expand Down
25 changes: 17 additions & 8 deletions plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,11 @@ func CheckForCompletedJobs(
}
service := config.GetServiceFromContext(serviceCtx)
// get the job ID
existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, -1).Result()
existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, 0).Result()
if err == redis.Nil { // handle no job for service; needed so the github plugin resets and looks for new jobs again
return
} else {
if err == nil {
existingJob, err := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
// check if there is already a completed job queued for the service
// // this can happen if the service crashes or is stopped before it finalizes cleanup
count, err := databaseContainer.Client.LLen(serviceCtx, "anklet/jobs/github/completed/"+service.Name).Result()
Expand All @@ -234,6 +229,7 @@ func CheckForCompletedJobs(
return
}
if count > 0 {
fmt.Println("count > 0 =============")
select {
case completedJobChannel <- true:
default:
Expand All @@ -246,11 +242,21 @@ func CheckForCompletedJobs(
}
return
} else {
fmt.Println("count == 0 =============")
completedJobs, err := databaseContainer.Client.LRange(serviceCtx, "anklet/jobs/github/completed", 0, -1).Result()
if err != nil {
logger.ErrorContext(serviceCtx, "error getting list of completed jobs", "err", err)
return
}
existingJob, err := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
if existingJob.WorkflowJob == nil {
logger.ErrorContext(serviceCtx, "existingJob.WorkflowJob is nil")
return
}
for _, completedJob := range completedJobs {
completedJobWebhook, err := database.UnwrapPayload[github.WorkflowJobEvent](completedJob)
if err != nil {
Expand Down Expand Up @@ -410,7 +416,8 @@ func cleanup(workerCtx context.Context, serviceCtx context.Context, logger *slog
databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name)
}

func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger) {
func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger, firstServiceStarted chan bool) {
fmt.Println(" ====================================================== ")
logger.InfoContext(serviceCtx, "github plugin checking for jobs")

service := config.GetServiceFromContext(serviceCtx)
Expand Down Expand Up @@ -469,7 +476,6 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
}

defer func() {
fmt.Println("DEFER AT END")
wg.Wait()
cleanup(workerCtx, serviceCtx, logger, completedJobChannel)
close(completedJobChannel)
Expand Down Expand Up @@ -694,13 +700,16 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
jobCompleted := false
// logCounter := 0
for !jobCompleted {
fmt.Println("WATCHING FOR JOB COMPLETION")
select {
case <-completedJobChannel:
logger.InfoContext(serviceCtx, "job completed")
jobCompleted = true
completedJobChannel <- true // so cleanup can also see it as completed
case <-serviceCtx.Done():
logger.WarnContext(serviceCtx, "context canceled while watching for job completion")
return
default:
}
time.Sleep(10 * time.Second)
// serviceCtx, currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](serviceCtx, logger, func() (*github.WorkflowJob, *github.Response, error) {
Expand Down

0 comments on commit 073aedd

Please sign in to comment.