Skip to content

Commit

Permalink
quick fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Jul 25, 2024
1 parent 965c357 commit f822670
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 93 deletions.
18 changes: 18 additions & 0 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package database

import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -112,3 +113,20 @@ func AddUniqueRunKey(ctx context.Context) (bool, error) {
}
return true, errors.New("unique run key already exists")
}

func UnwrapPayload[T any](payload string) (T, error) {
var wrappedPayload map[string]interface{}
var t T
err := json.Unmarshal([]byte(payload), &wrappedPayload)
if err != nil {
return t, err
}
payloadBytes, err := json.Marshal(wrappedPayload["payload"])
if err != nil {
return t, err
}
if err := json.Unmarshal(payloadBytes, &t); err != nil {
return t, err
}
return t, nil
}
53 changes: 6 additions & 47 deletions plugins/controllers/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,9 @@ func EnsureNoDuplicates(serviceCtx context.Context, logger *slog.Logger, jobID i
return err
}
for _, queueItem := range queued {
var workflowJobEvent github.WorkflowJobEvent
var payload map[string]interface{}
if err := json.Unmarshal([]byte(queueItem), &payload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return err
}
payloadBytes, err := json.Marshal(payload["payload"])
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return err
}
err = json.Unmarshal(payloadBytes, &workflowJobEvent)
workflowJobEvent, err := database.UnwrapPayload[github.WorkflowJobEvent](queueItem)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to github.WorkflowJobEvent", "err", err)
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return err
}
if *workflowJobEvent.WorkflowJob.ID == jobID {
Expand Down Expand Up @@ -241,27 +230,12 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
if len(queuedJobs) > 0 {
logger.InfoContext(serviceCtx, "searching for redeliveries")
// get the started date of the eldest item in the list of queued jobs
var firstQueuedJobEvent github.WorkflowJobEvent
var payload map[string]interface{}
if err := json.Unmarshal([]byte(queuedJobs[0]), &payload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
payloadBytes, err := json.Marshal(payload["payload"])
firstQueuedJobEvent, err := database.UnwrapPayload[github.WorkflowJobEvent](queuedJobs[0])
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return
}
err = json.Unmarshal(payloadBytes, &firstQueuedJobEvent)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to github.WorkflowJobEvent", "err", err)
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
if err := json.Unmarshal([]byte(queuedJobs[0]), &firstQueuedJobEvent); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling first queued job", "err", err)
} else {
firstQueuedJobStartedAtDate = firstQueuedJobEvent.WorkflowJob.StartedAt.Time
}
firstQueuedJobStartedAtDate = firstQueuedJobEvent.WorkflowJob.StartedAt.Time
var allHooks []github.HookDelivery
opts := &github.ListCursorOptions{PerPage: 10}
for {
Expand All @@ -287,23 +261,8 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
opts.Cursor = response.Cursor
}
for _, queuedJob := range queuedJobs {
var wrappedPayload github.WorkflowJobEvent
var payload map[string]interface{}
if err := json.Unmarshal([]byte(queuedJob), &payload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
payloadBytes, err := json.Marshal(payload["payload"])
wrappedPayload, err := database.UnwrapPayload[github.WorkflowJobEvent](queuedJob)
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return
}
err = json.Unmarshal(payloadBytes, &wrappedPayload)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to github.WorkflowJobEvent", "err", err)
return
}
if err := json.Unmarshal([]byte(queuedJob), &wrappedPayload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
Expand Down
53 changes: 7 additions & 46 deletions plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/redis/go-redis/v9"
"github.com/veertuinc/anklet/internal/anka"
"github.com/veertuinc/anklet/internal/config"
"github.com/veertuinc/anklet/internal/database"
dbFunctions "github.com/veertuinc/anklet/internal/database"
internalGithub "github.com/veertuinc/anklet/internal/github"
"github.com/veertuinc/anklet/internal/logging"
Expand Down Expand Up @@ -215,23 +216,12 @@ func CheckForCompletedJobs(
default:
}
// get the job ID
var existingJob github.WorkflowJobEvent
existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, serviceDatabaseKeyName, -1).Result()
if err != redis.Nil { // handle no job for service
if err == nil {
var payload map[string]interface{}
if err := json.Unmarshal([]byte(existingJobString), &payload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
payloadBytes, err := json.Marshal(payload["payload"])
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return
}
err = json.Unmarshal(payloadBytes, &existingJob)
existingJob, err := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to github.WorkflowJobEvent", "err", err)
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
// check if there is already a completed job queued for the server
Expand Down Expand Up @@ -260,31 +250,12 @@ func CheckForCompletedJobs(
return
}
for _, completedJob := range completedJobs {
var completedJobWebhook github.WorkflowJobEvent
var completedPayload map[string]interface{}
if err := json.Unmarshal([]byte(completedJob), &completedPayload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
completedPayloadBytes, err := json.Marshal(completedPayload["payload"])
completedJobWebhook, err := database.UnwrapPayload[github.WorkflowJobEvent](completedJob)
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return
}
err = json.Unmarshal(completedPayloadBytes, &completedJobWebhook)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to github.WorkflowJobEvent", "err", err)
return
}
err = json.Unmarshal([]byte(completedJob), &completedJobWebhook)
if err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling completed job", "err", err)
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
fmt.Println("completedJobWebhook.WorkflowJob.ID", *completedJobWebhook.WorkflowJob.ID)
fmt.Println("existingJob.WorkflowJob.ID", *existingJob.WorkflowJob.ID)
if *completedJobWebhook.WorkflowJob.ID == *existingJob.WorkflowJob.ID {
fmt.Println("INSIDE!!!")
// remove the completed job we found
_, err = databaseContainer.Client.LRem(serviceCtx, "anklet/jobs/github/completed", 1, completedJob).Result()
if err != nil {
Expand Down Expand Up @@ -516,8 +487,6 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
default:
}

var queuedJob github.WorkflowJobEvent
var wrappedPayload map[string]interface{}
var wrappedPayloadJSON string
// allow picking up where we left off
wrappedPayloadJSON, err = databaseContainer.Client.LIndex(serviceCtx, serviceDatabaseKeyName, -1).Result()
Expand All @@ -539,17 +508,9 @@ func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel co
wrappedPayloadJSON = eldestQueuedJob
}

if err := json.Unmarshal([]byte(wrappedPayloadJSON), &wrappedPayload); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
payloadBytes, err := json.Marshal(wrappedPayload["payload"])
queuedJob, err := database.UnwrapPayload[github.WorkflowJobEvent](wrappedPayloadJSON)
if err != nil {
logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err)
return
}
if err := json.Unmarshal(payloadBytes, &queuedJob); err != nil {
logger.ErrorContext(serviceCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err)
logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err)
return
}
serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowJobID", *queuedJob.WorkflowJob.ID))
Expand Down

0 comments on commit f822670

Please sign in to comment.