diff --git a/README.md b/README.md index bafa2f8..6da6a52 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,9 @@ Your config.yml file must define the database in one of the following ways: ### Plugin Setup and Usage Guides -You can control the location plugins are stored on the host by setting the `plugins_path` in the `config.yml` file. If not set, it will default to `~/.config/anklet/plugins/`. +You can control the location plugins are stored on the host by setting the `plugins_path` in the `config.yml` file. If not set, it will default to `~/.config/anklet/plugins/`. + +**NOTE: Plugin names MUST be unique across all hosts.** #### Github Actions diff --git a/internal/anka/cli.go b/internal/anka/cli.go index d015338..3dfee58 100644 --- a/internal/anka/cli.go +++ b/internal/anka/cli.go @@ -326,15 +326,40 @@ func (cli *Cli) AnkaStart(pluginCtx context.Context) error { return nil } -func (cli *Cli) AnkaCopy(pluginCtx context.Context, filesToCopyIn ...string) error { - if pluginCtx.Err() != nil { - return fmt.Errorf("context canceled before AnkaCopy") +func (cli *Cli) AnkaCopyOutOfVM(ctx context.Context, objectToCopyOut string, hostLevelDestination string) error { + if ctx.Err() != nil { + return fmt.Errorf("context canceled before AnkaCopyOutOfVM") } - logger, err := logging.GetLoggerFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(ctx) if err != nil { return err } - vm, err := GetAnkaVmFromContext(pluginCtx) + vm, err := GetAnkaVmFromContext(ctx) + if err != nil { + return err + } + copyOutput, err := cli.ExecuteParseJson(ctx, "anka", "-j", "cp", "-a", fmt.Sprintf("%s:%s", vm.Name, objectToCopyOut), hostLevelDestination) + if err != nil { + return err + } + if copyOutput.Status != "OK" { + return fmt.Errorf("error copying into vm: %s", copyOutput.Message) + } + logger.DebugContext(ctx, "copy output", "std", copyOutput) + logger.InfoContext(ctx, "successfully copied %s out of vm", "object", objectToCopyOut, "stdout", copyOutput.Message) + + return nil +} + +func (cli *Cli) AnkaCopyIntoVM(ctx context.Context, filesToCopyIn ...string) error { + if ctx.Err() != nil { + return fmt.Errorf("context canceled before AnkaCopyIntoVM") + } + logger, err := logging.GetLoggerFromContext(ctx) + if err != nil { + return err + } + vm, err := GetAnkaVmFromContext(ctx) if err != nil { return err } @@ -345,15 +370,15 @@ func (cli *Cli) AnkaCopy(pluginCtx context.Context, filesToCopyIn ...string) err return fmt.Errorf("error evaluating symlink for %s: %w", hostLevelFile, err) } hostLevelFile = realPath - copyOutput, err := cli.ExecuteParseJson(pluginCtx, "anka", "-j", "cp", "-a", hostLevelFile, fmt.Sprintf("%s:", vm.Name)) + copyOutput, err := cli.ExecuteParseJson(ctx, "anka", "-j", "cp", "-a", hostLevelFile, fmt.Sprintf("%s:", vm.Name)) if err != nil { return err } if copyOutput.Status != "OK" { return fmt.Errorf("error copying into vm: %s", copyOutput.Message) } - logger.DebugContext(pluginCtx, "copy output", "std", copyOutput) - logger.InfoContext(pluginCtx, "successfully copied file into vm", "file", hostLevelFile, "stdout", copyOutput.Message) + logger.DebugContext(ctx, "copy output", "std", copyOutput) + logger.InfoContext(ctx, "successfully copied file into vm", "file", hostLevelFile, "stdout", copyOutput.Message) } return nil diff --git a/internal/config/config.go b/internal/config/config.go index 75869ca..e56c315 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,7 +32,8 @@ type Config struct { } type Log struct { - FileDir string `yaml:"file_dir"` + FileDir string `yaml:"file_dir"` + SplitByPlugin bool `yaml:"split_by_plugin"` } type Metrics struct { diff --git a/internal/logging/logging.go b/internal/logging/logging.go index 1cad53f..f743baf 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -51,7 +51,7 @@ func UpdateLoggerToFile(logger *slog.Logger, filePath string, suffix string) (*s return nil, "", err } - options := &slog.HandlerOptions{Level: slog.LevelInfo} + options := &slog.HandlerOptions{Level: slog.LevelDebug} handler := &ContextHandler{Handler: slog.NewJSONHandler(file, options)} // Copy existing logger attributes to the new logger @@ -109,13 +109,13 @@ func Panic(workerCtx context.Context, pluginCtx context.Context, errorMessage st panic(errorMessage) } -func DevContext(pluginCtx context.Context, errorMessage string) { +func DevContext(ctx context.Context, message string) { if strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEV" { - logger, err := GetLoggerFromContext(pluginCtx) + logger, err := GetLoggerFromContext(ctx) if err != nil { panic(err) } - logger.DebugContext(pluginCtx, errorMessage) + logger.DebugContext(ctx, message) } } diff --git a/main.go b/main.go index 4b73ea5..2511d83 100644 --- a/main.go +++ b/main.go @@ -53,9 +53,8 @@ var ( func main() { - logger := logging.New() + parentLogger := logging.New() parentCtx := context.Background() - parentCtx = context.WithValue(parentCtx, config.ContextKey("logger"), logger) if version == "" { version = "dev" // Default version if not set by go build @@ -71,7 +70,7 @@ func main() { homeDir, err := os.UserHomeDir() if err != nil { - logger.ErrorContext(parentCtx, "unable to get user home directory", "error", err) + parentLogger.ErrorContext(parentCtx, "unable to get user home directory", "error", err) os.Exit(1) } @@ -93,15 +92,15 @@ func main() { // obtain config loadedConfig, err := config.LoadConfig(configPath) if err != nil { - logger.ErrorContext(parentCtx, "unable to load config.yml (is it in the work_dir, or are you using an absolute path?)", "error", err) + parentLogger.ErrorContext(parentCtx, "unable to load config.yml (is it in the work_dir, or are you using an absolute path?)", "error", err) os.Exit(1) } loadedConfig, err = config.LoadInEnvs(loadedConfig) if err != nil { - logger.ErrorContext(parentCtx, "unable to load config.yml from environment variables", "error", err) + parentLogger.ErrorContext(parentCtx, "unable to load config.yml from environment variables", "error", err) os.Exit(1) } - logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) + parentLogger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) parentCtx = logging.AppendCtx(parentCtx, slog.String("ankletVersion", version)) @@ -115,15 +114,17 @@ func main() { if !strings.HasSuffix(loadedConfig.Log.FileDir, "/") { loadedConfig.Log.FileDir += "/" } - logger, fileLocation, err := logging.UpdateLoggerToFile(logger, loadedConfig.Log.FileDir, suffix) - if err != nil { - fmt.Printf("{\"time\":\"%s\",\"level\":\"ERROR\",\"msg\":\"%s\"}\n", time.Now().Format(time.RFC3339), err) - os.Exit(1) - } - logger.InfoContext(parentCtx, "writing logs to file", slog.String("fileLocation", fileLocation)) - logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) + // logger, fileLocation, err = logging.UpdateLoggerToFile(logger, logFileDir, suffix) + // if err != nil { + // fmt.Printf("{\"time\":\"%s\",\"level\":\"ERROR\",\"msg\":\"%s\"}\n", time.Now().Format(time.RFC3339), err) + // os.Exit(1) + // } + // logger.InfoContext(parentCtx, "writing logs to file", slog.String("fileLocation", fileLocation)) } + // must come after the log config is handled + parentCtx = context.WithValue(parentCtx, config.ContextKey("logger"), parentLogger) + // if loadedConfig.PidFileDir == "" { // loadedConfig.PidFileDir = "./" // } @@ -143,7 +144,7 @@ func main() { } } - logger.DebugContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) + parentLogger.DebugContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) parentCtx = context.WithValue(parentCtx, config.ContextKey("config"), &loadedConfig) // daemonContext := &daemon.Context{ @@ -175,7 +176,7 @@ func main() { pluginsPath = filepath.Join(homeDir, ".config", "anklet", "plugins") } - logger.InfoContext(parentCtx, "plugins path", slog.String("pluginsPath", pluginsPath)) + parentLogger.InfoContext(parentCtx, "plugins path", slog.String("pluginsPath", pluginsPath)) parentCtx = context.WithValue(parentCtx, config.ContextKey("globals"), config.Globals{ RunOnce: runOnce, @@ -196,7 +197,7 @@ func main() { if githubPluginExists { rateLimiter, err := github_ratelimit.NewRateLimitWaiterClient(httpTransport) if err != nil { - logger.ErrorContext(parentCtx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err) + parentLogger.ErrorContext(parentCtx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err) os.Exit(1) } parentCtx = context.WithValue(parentCtx, config.ContextKey("rateLimiter"), rateLimiter) @@ -218,7 +219,7 @@ func main() { signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) //go - worker(parentCtx, logger, loadedConfig, sigChan) + worker(parentCtx, parentLogger, loadedConfig, sigChan) // err = daemon.ServeSignals() // if err != nil { @@ -226,16 +227,21 @@ func main() { // } } -func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.Config, sigChan chan os.Signal) { +func worker( + parentCtx context.Context, + parentLogger *slog.Logger, + loadedConfig config.Config, + sigChan chan os.Signal, +) { globals, err := config.GetGlobalsFromContext(parentCtx) if err != nil { - logger.ErrorContext(parentCtx, "unable to get globals from context", "error", err) + parentLogger.ErrorContext(parentCtx, "unable to get globals from context", "error", err) os.Exit(1) } toRunOnce := globals.RunOnce workerCtx, workerCancel := context.WithCancel(parentCtx) suffix := parentCtx.Value(config.ContextKey("suffix")).(string) - logger.InfoContext(workerCtx, "starting anklet"+suffix) + parentLogger.InfoContext(workerCtx, "starting anklet"+suffix) returnToMainQueue := make(chan bool, 1) jobFailureChannel := make(chan bool, 1) workerCtx = context.WithValue(workerCtx, config.ContextKey("returnToMainQueue"), returnToMainQueue) @@ -250,10 +256,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. // logger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") // workerCancel() case syscall.SIGQUIT: // doesn't work for receivers since they don't loop - logger.WarnContext(workerCtx, "graceful shutdown, waiting for jobs to finish...") + parentLogger.WarnContext(workerCtx, "graceful shutdown, waiting for jobs to finish...") toRunOnce = "true" default: - logger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") + parentLogger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") workerCancel() returnToMainQueue <- true } @@ -292,7 +298,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. } ln, err := net.Listen("tcp", ":"+metricsPort) if err != nil { - logger.ErrorContext(workerCtx, "metrics port already in use", "port", metricsPort, "error", err) + parentLogger.ErrorContext(workerCtx, "metrics port already in use", "port", metricsPort, "error", err) os.Exit(1) } ln.Close() @@ -314,12 +320,12 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. Database: databaseDatabase, }) if err != nil { - logger.ErrorContext(workerCtx, "unable to access database", "error", err) + parentLogger.ErrorContext(workerCtx, "unable to access database", "error", err) os.Exit(1) } workerCtx = context.WithValue(workerCtx, config.ContextKey("database"), databaseContainer) - go metricsService.StartAggregatorServer(workerCtx, logger, false) - logger.InfoContext(workerCtx, "metrics aggregator started on port "+metricsPort) + go metricsService.StartAggregatorServer(workerCtx, parentLogger, false) + parentLogger.InfoContext(workerCtx, "metrics aggregator started on port "+metricsPort) for _, metricsURL := range loadedConfig.Metrics.MetricsURLs { wg.Add(1) go func(metricsURL string) { @@ -329,7 +335,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. // check if valid URL _, err = url.Parse(metricsURL) if err != nil { - logger.ErrorContext(pluginCtx, "invalid URL", "error", err) + parentLogger.ErrorContext(pluginCtx, "invalid URL", "error", err) pluginCancel() return } @@ -337,11 +343,11 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. select { case <-workerCtx.Done(): pluginCancel() - logger.WarnContext(pluginCtx, shutDownMessage) + parentLogger.WarnContext(pluginCtx, shutDownMessage) return default: // get metrics from endpoint and update the main list - metrics.UpdatemetricsURLDBEntry(pluginCtx, logger, metricsURL) + metrics.UpdatemetricsURLDBEntry(pluginCtx, parentLogger, metricsURL) if workerCtx.Err() != nil || toRunOnce == "true" { pluginCancel() break @@ -362,8 +368,8 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. firstPluginStarted := make(chan bool, 1) metricsData := &metrics.MetricsDataLock{} workerCtx = context.WithValue(workerCtx, config.ContextKey("metrics"), metricsData) - logger.InfoContext(workerCtx, "metrics server started on port "+metricsPort) - metrics.UpdateSystemMetrics(workerCtx, logger, metricsData) + parentLogger.InfoContext(workerCtx, "metrics server started on port "+metricsPort) + metrics.UpdateSystemMetrics(workerCtx, parentLogger, metricsData) ///////////// // Plugins soloReceiver := false @@ -387,16 +393,39 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. pluginCtx, pluginCancel := context.WithCancel(workerCtx) // Inherit from parent context if plugin.Name == "" { - logger.ErrorContext(pluginCtx, "name is required for plugins") + parentLogger.ErrorContext(pluginCtx, "name is required for plugins") + pluginCancel() + workerCancel() + return + } + + if strings.Contains(plugin.Name, " ") { + parentLogger.ErrorContext(pluginCtx, "plugin name cannot contain spaces") pluginCancel() workerCancel() return } + // support plugin specific log files + var pluginLogger = parentLogger + if loadedConfig.Log.SplitByPlugin { + parentLogger.InfoContext(parentCtx, "writing "+plugin.Name+" logs to "+loadedConfig.Log.FileDir+"anklet"+"-"+plugin.Name+".log") + pluginLogger, _, err = logging.UpdateLoggerToFile(parentLogger, loadedConfig.Log.FileDir, "-"+plugin.Name) + if err != nil { + parentLogger.ErrorContext(pluginCtx, "unable to update logger to file", "error", err) + pluginCancel() + workerCancel() + return + } + } + + // must come after the log config is handled + pluginCtx = context.WithValue(pluginCtx, config.ContextKey("logger"), pluginLogger) + pluginCtx = logging.AppendCtx(pluginCtx, slog.String("pluginName", plugin.Name)) if plugin.Repo == "" { - logger.InfoContext(pluginCtx, "no repo set for plugin; assuming it's an organization level plugin") + pluginLogger.InfoContext(pluginCtx, "no repo set for plugin; assuming it's an organization level plugin") pluginCtx = context.WithValue(pluginCtx, config.ContextKey("isRepoSet"), false) logging.DevContext(pluginCtx, "set isRepoSet to false") } else { @@ -416,7 +445,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. logging.DevContext(pluginCtx, "plugin is not a receiver; loading the anka CLI") ankaCLI, err := anka.NewCLI(pluginCtx) if err != nil { - logger.ErrorContext(pluginCtx, "unable to create anka cli", "error", err) + pluginLogger.ErrorContext(pluginCtx, "unable to create anka cli", "error", err) pluginCancel() workerCancel() return @@ -442,7 +471,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. Database: databaseDatabase, }) if err != nil { - logger.ErrorContext(pluginCtx, "unable to access database", "error", err) + pluginLogger.ErrorContext(pluginCtx, "unable to access database", "error", err) pluginCancel() workerCancel() return @@ -451,14 +480,14 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. logging.DevContext(pluginCtx, "connected to database") } - logger.InfoContext(pluginCtx, "starting plugin") + pluginLogger.InfoContext(pluginCtx, "starting plugin") for { select { case <-pluginCtx.Done(): logging.DevContext(pluginCtx, "plugin for loop::pluginCtx.Done()") - metricsData.SetStatus(pluginCtx, logger, "stopped") - logger.WarnContext(pluginCtx, shutDownMessage) + metricsData.SetStatus(pluginCtx, pluginLogger, "stopped") + pluginLogger.WarnContext(pluginCtx, shutDownMessage) pluginCancel() return default: @@ -467,31 +496,31 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. workerCtx, pluginCtx, pluginCancel, - logger, + pluginLogger, firstPluginStarted, metricsData, ) if err != nil { - logger.ErrorContext(updatedPluginCtx, "error running plugin", "error", err) + pluginLogger.ErrorContext(updatedPluginCtx, "error running plugin", "error", err) pluginCancel() // Send SIGQUIT to the main pid p, err := os.FindProcess(os.Getpid()) if err != nil { - logger.ErrorContext(updatedPluginCtx, "error finding process", "error", err) + pluginLogger.ErrorContext(updatedPluginCtx, "error finding process", "error", err) } else { err = p.Signal(syscall.SIGQUIT) if err != nil { - logger.ErrorContext(updatedPluginCtx, "error sending SIGQUIT signal", "error", err) + pluginLogger.ErrorContext(updatedPluginCtx, "error sending SIGQUIT signal", "error", err) } } return } if workerCtx.Err() != nil || toRunOnce == "true" { pluginCancel() - logger.WarnContext(updatedPluginCtx, shutDownMessage) + pluginLogger.WarnContext(updatedPluginCtx, shutDownMessage) return } - metricsData.SetStatus(updatedPluginCtx, logger, "idle") + metricsData.SetStatus(updatedPluginCtx, pluginLogger, "idle") select { case <-time.After(time.Duration(plugin.SleepInterval) * time.Second): case <-pluginCtx.Done(): @@ -508,10 +537,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. soloReceiver = false } } - go metricsService.Start(workerCtx, logger, soloReceiver) + go metricsService.Start(workerCtx, parentLogger, soloReceiver) } wg.Wait() time.Sleep(time.Second) // prevents exiting before the logger has a chance to write the final log entry (from panics) - logger.WarnContext(workerCtx, "anklet (and all plugins) shut down") + parentLogger.WarnContext(parentCtx, "anklet (and all plugins) shut down") os.Exit(0) } diff --git a/plugins/handlers/github/README.md b/plugins/handlers/github/README.md index e26aa28..12de870 100644 --- a/plugins/handlers/github/README.md +++ b/plugins/handlers/github/README.md @@ -10,6 +10,8 @@ For help setting up the database, see [Database Setup](https://github.com/veertu In the `config.yml`, you can define the `github` plugin as follows: +**NOTE: Plugin names MUST be unique across all hosts.** + ``` plugins: - name: RUNNER1 diff --git a/plugins/handlers/github/github.go b/plugins/handlers/github/github.go index b7d88b5..6c59bd0 100644 --- a/plugins/handlers/github/github.go +++ b/plugins/handlers/github/github.go @@ -111,6 +111,68 @@ type WorkflowRunJobDetail struct { // return true // } +func DeleteFromQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64, queue string) error { + databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) + if err != nil { + logging.Panic(pluginCtx, pluginCtx, "error getting database client from context: "+err.Error()) + } + queued, err := databaseContainer.Client.LRange(pluginCtx, queue, 0, -1).Result() + if err != nil { + logger.ErrorContext(pluginCtx, "error getting list of queued jobs", "err", err) + return err + } + for _, queueItem := range queued { + workflowJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queueItem) + if err != nil { + logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) + return err + } + if typeErr != nil { // not the type we want + continue + } + if *workflowJobEvent.WorkflowJob.ID == jobID { + // logger.WarnContext(pluginCtx, "WorkflowJob.ID already in queue", "WorkflowJob.ID", jobID) + _, err = databaseContainer.Client.LRem(pluginCtx, queue, 1, queueItem).Result() + if err != nil { + logger.ErrorContext(pluginCtx, "error removing job from queue", "err", err) + return err + } + return nil + } + } + return nil +} + +// passing the the queue and ID to check for +func InQueue(pluginCtx context.Context, logger *slog.Logger, jobID int64, queue string) (bool, error) { + toReturn := false + databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) + if err != nil { + return toReturn, err + } + queued, err := databaseContainer.Client.LRange(pluginCtx, queue, 0, -1).Result() + if err != nil { + logger.ErrorContext(pluginCtx, "error getting list of queued jobs", "err", err) + return toReturn, err + } + for _, queueItem := range queued { + workflowJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queueItem) + if err != nil { + logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) + return toReturn, err + } + if typeErr != nil { // not the type we want + continue + } + if *workflowJobEvent.WorkflowJob.ID == jobID { + // logger.WarnContext(pluginCtx, "WorkflowJob.ID already in queue", "WorkflowJob.ID", jobID) + toReturn = true + break + } + } + return toReturn, nil +} + func extractLabelValue(labels []string, prefix string) string { for _, label := range labels { if strings.HasPrefix(label, prefix) { @@ -406,6 +468,14 @@ func cleanup( logger.ErrorContext(pluginCtx, "error getting ankaCLI from context", "err", err) return } + if strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEBUG" || strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEV" { + err = ankaCLI.AnkaCopyOutOfVM(pluginCtx, "/Users/anka/actions-runner/_diag", "/tmp/"+vm.Name) + if err != nil { + logger.ErrorContext(pluginCtx, "error copying out of vm", "err", err) + } else { + logger.DebugContext(pluginCtx, "successfully copied out of vm", "vm", vm.Name) + } + } ankaCLI.AnkaDelete(workerCtx, pluginCtx, &vm) databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning") continue // required to keep processing tasks in the db list @@ -416,6 +486,11 @@ func cleanup( logger.ErrorContext(pluginCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err) return } + // delete the in_progress queue's index that matches the wrkflowJobID + err = DeleteFromQueue(pluginCtx, logger, *workflowJobEvent.WorkflowJob.ID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) + if err != nil { + logger.ErrorContext(pluginCtx, "error deleting from in_progress queue", "err", err) + } // return it to the queue if the job isn't completed yet // if we don't, we could suffer from a situation where a completed job comes in and is orphaned select { @@ -801,7 +876,7 @@ func Run( // Copy runner scripts to VM logger.DebugContext(pluginCtx, "copying install-runner.bash, register-runner.bash, and start-runner.bash to vm") - err = ankaCLI.AnkaCopy(pluginCtx, + err = ankaCLI.AnkaCopyIntoVM(pluginCtx, installRunnerPath, registerRunnerPath, startRunnerPath, @@ -938,6 +1013,19 @@ func Run( return pluginCtx, nil default: time.Sleep(10 * time.Second) + if logCounter == 6 { + // after one minute, check to see if the job has started in the in_progress queue + // if not, then the runner registration failed + inQueue, err := InQueue(pluginCtx, logger, workflowJob.JobID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) + if err != nil { + logger.ErrorContext(pluginCtx, "error searching in queue", "error", err) + } else { + if !inQueue { + failureChannel <- true + return pluginCtx, fmt.Errorf("runner registration failed") + } + } + } if logCounter%2 == 0 { logger.InfoContext(pluginCtx, "job still in progress", "job_id", workflowJob.JobID) } diff --git a/plugins/receivers/github/README.md b/plugins/receivers/github/README.md index f19d650..9bfa5d6 100644 --- a/plugins/receivers/github/README.md +++ b/plugins/receivers/github/README.md @@ -10,6 +10,8 @@ The Github Receiver Plugin is used to receive webhook events from github and sto In the `config.yml`, you can define the `github_receiver` plugin as follows: +**NOTE: Plugin names MUST be unique across all hosts.** + ``` global_receiver_secret: 12345 # this can be set using the ANKLET_GLOBAL_RECEIVER_SECRET env var too plugins: diff --git a/plugins/receivers/github/github.go b/plugins/receivers/github/github.go index dab0b79..b27e3a5 100644 --- a/plugins/receivers/github/github.go +++ b/plugins/receivers/github/github.go @@ -144,6 +144,9 @@ func Run( return pluginCtx, fmt.Errorf("error authenticating github client: " + err.Error()) } + // clean up in_progress queue if it exists + databaseContainer.Client.Del(pluginCtx, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) + server := &http.Server{Addr: ":" + ctxPlugin.Port} http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -210,6 +213,43 @@ func Run( ) } } + } else if *workflowJob.Action == "in_progress" { + // store in_progress so we can know if the registration failed + if exists_in_array_partial(workflowJob.WorkflowJob.Labels, []string{"anka-template"}) { + // make sure it doesn't already exist + inQueue, err := InQueue(pluginCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner) + if err != nil { + logger.ErrorContext(pluginCtx, "error searching in queue", "error", err) + return + } + if !inQueue { // if it doesn't exist already + // push it to the queue + wrappedJobPayload := map[string]interface{}{ + "type": "WorkflowJobPayload", + "payload": workflowJob, + } + wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + if err != nil { + logger.ErrorContext(pluginCtx, "error converting job payload to JSON", "error", err) + return + } + push := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/in_progress/"+ctxPlugin.Owner, wrappedPayloadJSON) + if push.Err() != nil { + logger.ErrorContext(pluginCtx, "error pushing job to queue", "error", push.Err()) + return + } + logger.InfoContext(pluginCtx, "job pushed to in_progress queue", + "workflowJob.ID", *workflowJob.WorkflowJob.ID, + "workflowJob.Name", *workflowJob.WorkflowJob.Name, + "workflowJob.RunID", *workflowJob.WorkflowJob.RunID, + "html_url", *workflowJob.WorkflowJob.HTMLURL, + "status", *workflowJob.WorkflowJob.Status, + "conclusion", workflowJob.WorkflowJob.Conclusion, + "started_at", workflowJob.WorkflowJob.StartedAt, + "completed_at", workflowJob.WorkflowJob.CompletedAt, + ) + } + } } else if *workflowJob.Action == "completed" { if exists_in_array_partial(workflowJob.WorkflowJob.Labels, []string{"anka-template"}) {