diff --git a/go.mod b/go.mod index f539f94..0c7cbe4 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,12 @@ module github.com/norsegaud/anklet -go 1.21.6 +go 1.22.2 require ( github.com/gofri/go-github-ratelimit v1.1.0 github.com/google/go-github/v58 v58.0.0 github.com/google/uuid v1.6.0 - github.com/norsegaud/go-daemon v0.1.8 + github.com/norsegaud/go-daemon v0.1.10 github.com/redis/go-redis/v9 v9.5.1 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 2c5656b..f2b0534 100644 --- a/go.sum +++ b/go.sum @@ -19,11 +19,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= -github.com/norsegaud/go-daemon v0.1.8 h1:mpcs/L40jCyTV457wYdxYoNxWViDmcN4VAjbGBTieN4= -github.com/norsegaud/go-daemon v0.1.8/go.mod h1:+Y6PlACn3Gp+n7EU8r7xtOPiN9YwNFG7ngy325wKugE= +github.com/norsegaud/go-daemon v0.1.10 h1:JnYLPJ1uHU1SsZ29iDkXQbzvy0Fxecy2o2buDgnI528= +github.com/norsegaud/go-daemon v0.1.10/go.mod h1:Xfzc5NRusltO7M151oKbmC4bBXBi44uFviKbP2BUxQ8= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index 31acc83..26e54ea 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,9 @@ import ( "fmt" "log" "log/slog" + "net/http" "os" + "os/signal" "path/filepath" "sync" "syscall" @@ -24,11 +26,11 @@ import ( ) var ( - version = "dev" - runOnce = "false" - signal = flag.String("s", "", `Send signal to the daemon: - quit — graceful shutdown - stop — fast shutdown + version = "dev" + runOnce = "false" + signalFlag = flag.String("s", "", `Send signal to the daemon: + drain — graceful shutdown, will wait until all jobs finish before exiting + stop — best effort graceful shutdown, interrupting the job as soon as possible reload — reloading the configuration file`) stop = make(chan struct{}) done = make(chan struct{}) @@ -45,22 +47,14 @@ func termHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerF } } -func reloadHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc { - return func(sig os.Signal) error { - logger.InfoContext(ctx, "reloading configuration") - return nil - } -} - func main() { logger := logging.New() ctx, cancel := context.WithCancel(context.Background()) flag.Parse() - daemon.AddCommand(daemon.StringFlag(signal, "quit"), syscall.SIGQUIT, termHandler(ctx, logger)) - daemon.AddCommand(daemon.StringFlag(signal, "stop"), syscall.SIGTERM, termHandler(ctx, logger)) - daemon.AddCommand(daemon.StringFlag(signal, "reload"), syscall.SIGHUP, reloadHandler(ctx, logger)) + daemon.AddCommand(daemon.StringFlag(signalFlag, "drain"), syscall.SIGQUIT, termHandler(ctx, logger)) + daemon.AddCommand(daemon.StringFlag(signalFlag, "stop"), syscall.SIGTERM, termHandler(ctx, logger)) homeDir, err := os.UserHomeDir() if err != nil { @@ -126,77 +120,9 @@ func main() { return } - // main worker child for daemon - go func() { - LOOP: - for { - time.Sleep(time.Second) // this is work to be done by worker. - select { - case <-stop: - fmt.Println("stopping") - cancel() - break LOOP - default: - var wg sync.WaitGroup - for _, service := range loadedConfig.Services { - wg.Add(1) - go func(service config.Service) { - defer wg.Done() - serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context - defer serviceCancel() // Ensure we cancel the context when service exits - - if service.Name == "" { - panic("name is required for services") - } - - serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service) - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name)) - serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger) - - ankaCLI, err := anka.NewCLI(serviceCtx) - if err != nil { - panic(fmt.Sprintf("unable to create anka cli: %v", err)) - } - - serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI) - - githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token) - githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) - serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) - - if loadedConfig.Database.Enabled { - databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database) - if err != nil { - panic(fmt.Sprintf("unable to access database: %v", err)) - } - serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{ - Client: databaseClient, - }) - } - - for { - select { - case <-stop: - logger.WarnContext(serviceCtx, "service is stopping") - return // Exit the loop and hence the goroutine - default: - run.Plugin(serviceCtx, logger) - if ctx.Err() != nil { - return - } - if runOnce == "true" { // only run once; used for testing - return - } - time.Sleep(time.Duration(service.SleepInterval) * time.Second) - } - } - }(service) - } - wg.Wait() // Wait for all services to finish - } - } - done <- struct{}{} - }() + for _, service := range loadedConfig.Services { + go worker(ctx, logger, service, rateLimiter, *loadedConfig, cancel) + } err = daemon.ServeSignals() if err != nil { @@ -205,3 +131,71 @@ func main() { logger.InfoContext(ctx, "anklet terminated") } + +func worker(ctx context.Context, logger *slog.Logger, service config.Service, rateLimiter *http.Client, loadedConfig config.Config, cancel context.CancelFunc) { +LOOP: + for { + time.Sleep(time.Second) // this is work to be done by worker. + select { + case <-stop: + break LOOP + default: + serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context + // defer serviceCancel() // Ensure we cancel the context when service exits + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + for { + sig := <-sigChan + switch sig { + case syscall.SIGTERM: + logger.InfoContext(ctx, "Received SIGTERM, shutting down forcefully...") + serviceCancel() + case syscall.SIGQUIT: + logger.InfoContext(ctx, "Received SIGQUIT, shutting down gracefully...") + } + } + }() + + if service.Name == "" { + panic("name is required for services") + } + + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name)) + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger) + + ankaCLI, err := anka.NewCLI(serviceCtx) + if err != nil { + panic(fmt.Sprintf("unable to create anka cli: %v", err)) + } + + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI) + + githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token) + githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) + + if loadedConfig.Database.Enabled { + databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database) + if err != nil { + panic(fmt.Sprintf("unable to access database: %v", err)) + } + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{ + Client: databaseClient, + }) + } + + run.Plugin(serviceCtx, logger) + if ctx.Err() != nil { + return + } + if runOnce == "true" { // only run once; used for testing + return + } + time.Sleep(time.Duration(service.SleepInterval) * time.Second) + } + } + done <- struct{}{} +} diff --git a/plugins/github/github.go b/plugins/github/github.go index 35fd67b..98e12d6 100644 --- a/plugins/github/github.go +++ b/plugins/github/github.go @@ -93,7 +93,6 @@ func getWorkflowRunJobs(ctx context.Context, logger *slog.Logger) ([]WorkflowRun service := config.GetServiceFromContext(ctx) var allWorkflowRunJobDetails []WorkflowRunJobDetail // WORKFLOWS - fmt.Println("here") workflows, _, err := ExecuteGitHubClientFunction[*github.Workflows](ctx, logger, func() (**github.Workflows, *github.Response, error) { workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{}) @@ -252,6 +251,33 @@ func Run(ctx context.Context, logger *slog.Logger) { for _, workflowRunJob := range allWorkflowRunJobDetails { ctx = setLoggingContext(ctx, workflowRunJob) + logger.InfoContext(ctx, "handling anka workflow run job 1") + time.Sleep(2 * time.Second) + if ctx.Err() != nil { + logger.InfoContext(ctx, "ctx.Err 1") + return + } + logger.InfoContext(ctx, "handling anka workflow run job 2") + time.Sleep(2 * time.Second) + if ctx.Err() != nil { + logger.InfoContext(ctx, "ctx.Err 2") + return + } + logger.InfoContext(ctx, "handling anka workflow run job 3") + time.Sleep(2 * time.Second) + if ctx.Err() != nil { + logger.InfoContext(ctx, "ctx.Err 3") + return + } + logger.InfoContext(ctx, "handling anka workflow run job 4") + time.Sleep(2 * time.Second) + if ctx.Err() != nil { + logger.InfoContext(ctx, "ctx.Err 4") + return + } + + return + // Check if the job is already running, and ensure in DB to prevent other runners from getting it uniqueKey := fmt.Sprintf("%s:%s", workflowRunJob.RunID, workflowRunJob.UniqueID) ctx = dbFunctions.UpdateUniqueRunKey(ctx, uniqueKey) @@ -331,6 +357,10 @@ func Run(ctx context.Context, logger *slog.Logger) { return } + if ctx.Err() != nil { + logger.ErrorContext(ctx, "context cancelled before install runner") + return + } ankRunScriptOutput, err := ankaCLI.ExecuteAndParseJsonOnError(ctx, "anka", "-j", "run", vm.Name, "./install-runner.bash", ) @@ -341,6 +371,10 @@ func Run(ctx context.Context, logger *slog.Logger) { } logger.DebugContext(ctx, "install runner", "stdout", string(ankRunScriptOutput)) // Register runner + if ctx.Err() != nil { + logger.ErrorContext(ctx, "context cancelled before register runner") + return + } ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx, "anka", "-j", "run", vm.Name, "./register-runner.bash", vm.Name, *repoRunnerRegistration.Token, repositoryURL, strings.Join(workflowRunJob.Job.Labels, ","), @@ -352,6 +386,10 @@ func Run(ctx context.Context, logger *slog.Logger) { defer removeSelfHostedRunner(ctx, *vm, *workflowRunJob.Job.RunID) logger.DebugContext(ctx, "register runner", "stdout", string(ankRunScriptOutput)) // Install and Start runner + if ctx.Err() != nil { + logger.ErrorContext(ctx, "context cancelled before start runner") + return + } ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx, "anka", "-j", "run", vm.Name, "./start-runner.bash", ) @@ -385,6 +423,10 @@ func Run(ctx context.Context, logger *slog.Logger) { jobCompleted = true logger.InfoContext(ctx, "job completed", "job_id", *workflowRunJob.Job.ID) } else if logCounter%2 == 0 { + if ctx.Err() != nil { + logger.ErrorContext(ctx, "context cancelled during job status check") + return + } logger.InfoContext(ctx, "job still in progress", "job_id", *workflowRunJob.Job.ID) time.Sleep(5 * time.Second) // Wait before checking the job status again }