Skip to content

Commit

Permalink
final for daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Apr 8, 2024
1 parent f209dd8 commit d4d43c2
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 92 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module github.com/norsegaud/anklet

go 1.21.6
go 1.22.2

require (
github.com/gofri/go-github-ratelimit v1.1.0
github.com/google/go-github/v58 v58.0.0
github.com/google/uuid v1.6.0
github.com/norsegaud/go-daemon v0.1.8
github.com/norsegaud/go-daemon v0.1.10
github.com/redis/go-redis/v9 v9.5.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/norsegaud/go-daemon v0.1.8 h1:mpcs/L40jCyTV457wYdxYoNxWViDmcN4VAjbGBTieN4=
github.com/norsegaud/go-daemon v0.1.8/go.mod h1:+Y6PlACn3Gp+n7EU8r7xtOPiN9YwNFG7ngy325wKugE=
github.com/norsegaud/go-daemon v0.1.10 h1:JnYLPJ1uHU1SsZ29iDkXQbzvy0Fxecy2o2buDgnI528=
github.com/norsegaud/go-daemon v0.1.10/go.mod h1:Xfzc5NRusltO7M151oKbmC4bBXBi44uFviKbP2BUxQ8=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
166 changes: 80 additions & 86 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
Expand All @@ -24,11 +26,11 @@ import (
)

var (
version = "dev"
runOnce = "false"
signal = flag.String("s", "", `Send signal to the daemon:
quit — graceful shutdown
stop — fast shutdown
version = "dev"
runOnce = "false"
signalFlag = flag.String("s", "", `Send signal to the daemon:
drain — graceful shutdown, will wait until all jobs finish before exiting
stop — best effort graceful shutdown, interrupting the job as soon as possible
reload — reloading the configuration file`)
stop = make(chan struct{})
done = make(chan struct{})
Expand All @@ -45,22 +47,14 @@ func termHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerF
}
}

func reloadHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc {
return func(sig os.Signal) error {
logger.InfoContext(ctx, "reloading configuration")
return nil
}
}

func main() {

logger := logging.New()
ctx, cancel := context.WithCancel(context.Background())

flag.Parse()
daemon.AddCommand(daemon.StringFlag(signal, "quit"), syscall.SIGQUIT, termHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signal, "stop"), syscall.SIGTERM, termHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signal, "reload"), syscall.SIGHUP, reloadHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signalFlag, "drain"), syscall.SIGQUIT, termHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signalFlag, "stop"), syscall.SIGTERM, termHandler(ctx, logger))

homeDir, err := os.UserHomeDir()
if err != nil {
Expand Down Expand Up @@ -126,77 +120,9 @@ func main() {
return
}

// main worker child for daemon
go func() {
LOOP:
for {
time.Sleep(time.Second) // this is work to be done by worker.
select {
case <-stop:
fmt.Println("stopping")
cancel()
break LOOP
default:
var wg sync.WaitGroup
for _, service := range loadedConfig.Services {
wg.Add(1)
go func(service config.Service) {
defer wg.Done()
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
defer serviceCancel() // Ensure we cancel the context when service exits

if service.Name == "" {
panic("name is required for services")
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service)
serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name))
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger)

ankaCLI, err := anka.NewCLI(serviceCtx)
if err != nil {
panic(fmt.Sprintf("unable to create anka cli: %v", err))
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI)

githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token)
githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient)
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient)

if loadedConfig.Database.Enabled {
databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database)
if err != nil {
panic(fmt.Sprintf("unable to access database: %v", err))
}
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{
Client: databaseClient,
})
}

for {
select {
case <-stop:
logger.WarnContext(serviceCtx, "service is stopping")
return // Exit the loop and hence the goroutine
default:
run.Plugin(serviceCtx, logger)
if ctx.Err() != nil {
return
}
if runOnce == "true" { // only run once; used for testing
return
}
time.Sleep(time.Duration(service.SleepInterval) * time.Second)
}
}
}(service)
}
wg.Wait() // Wait for all services to finish
}
}
done <- struct{}{}
}()
for _, service := range loadedConfig.Services {
go worker(ctx, logger, service, rateLimiter, *loadedConfig, cancel)
}

err = daemon.ServeSignals()
if err != nil {
Expand All @@ -205,3 +131,71 @@ func main() {

logger.InfoContext(ctx, "anklet terminated")
}

func worker(ctx context.Context, logger *slog.Logger, service config.Service, rateLimiter *http.Client, loadedConfig config.Config, cancel context.CancelFunc) {
LOOP:
for {
time.Sleep(time.Second) // this is work to be done by worker.
select {
case <-stop:
break LOOP
default:
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
// defer serviceCancel() // Ensure we cancel the context when service exits

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
for {
sig := <-sigChan
switch sig {
case syscall.SIGTERM:
logger.InfoContext(ctx, "Received SIGTERM, shutting down forcefully...")
serviceCancel()
case syscall.SIGQUIT:
logger.InfoContext(ctx, "Received SIGQUIT, shutting down gracefully...")
}
}
}()

if service.Name == "" {
panic("name is required for services")
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service)
serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name))
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger)

ankaCLI, err := anka.NewCLI(serviceCtx)
if err != nil {
panic(fmt.Sprintf("unable to create anka cli: %v", err))
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI)

githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token)
githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient)
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient)

if loadedConfig.Database.Enabled {
databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database)
if err != nil {
panic(fmt.Sprintf("unable to access database: %v", err))
}
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{
Client: databaseClient,
})
}

run.Plugin(serviceCtx, logger)
if ctx.Err() != nil {
return
}
if runOnce == "true" { // only run once; used for testing
return
}
time.Sleep(time.Duration(service.SleepInterval) * time.Second)
}
}
done <- struct{}{}
}
44 changes: 43 additions & 1 deletion plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func getWorkflowRunJobs(ctx context.Context, logger *slog.Logger) ([]WorkflowRun
service := config.GetServiceFromContext(ctx)
var allWorkflowRunJobDetails []WorkflowRunJobDetail
// WORKFLOWS
fmt.Println("here")

workflows, _, err := ExecuteGitHubClientFunction[*github.Workflows](ctx, logger, func() (**github.Workflows, *github.Response, error) {
workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{})
Expand Down Expand Up @@ -252,6 +251,33 @@ func Run(ctx context.Context, logger *slog.Logger) {
for _, workflowRunJob := range allWorkflowRunJobDetails {
ctx = setLoggingContext(ctx, workflowRunJob)

logger.InfoContext(ctx, "handling anka workflow run job 1")
time.Sleep(2 * time.Second)
if ctx.Err() != nil {
logger.InfoContext(ctx, "ctx.Err 1")
return
}
logger.InfoContext(ctx, "handling anka workflow run job 2")
time.Sleep(2 * time.Second)
if ctx.Err() != nil {
logger.InfoContext(ctx, "ctx.Err 2")
return
}
logger.InfoContext(ctx, "handling anka workflow run job 3")
time.Sleep(2 * time.Second)
if ctx.Err() != nil {
logger.InfoContext(ctx, "ctx.Err 3")
return
}
logger.InfoContext(ctx, "handling anka workflow run job 4")
time.Sleep(2 * time.Second)
if ctx.Err() != nil {
logger.InfoContext(ctx, "ctx.Err 4")
return
}

return

// Check if the job is already running, and ensure in DB to prevent other runners from getting it
uniqueKey := fmt.Sprintf("%s:%s", workflowRunJob.RunID, workflowRunJob.UniqueID)
ctx = dbFunctions.UpdateUniqueRunKey(ctx, uniqueKey)
Expand Down Expand Up @@ -331,6 +357,10 @@ func Run(ctx context.Context, logger *slog.Logger) {
return
}

if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before install runner")
return
}
ankRunScriptOutput, err := ankaCLI.ExecuteAndParseJsonOnError(ctx,
"anka", "-j", "run", vm.Name, "./install-runner.bash",
)
Expand All @@ -341,6 +371,10 @@ func Run(ctx context.Context, logger *slog.Logger) {
}
logger.DebugContext(ctx, "install runner", "stdout", string(ankRunScriptOutput))
// Register runner
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before register runner")
return
}
ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx,
"anka", "-j", "run", vm.Name, "./register-runner.bash",
vm.Name, *repoRunnerRegistration.Token, repositoryURL, strings.Join(workflowRunJob.Job.Labels, ","),
Expand All @@ -352,6 +386,10 @@ func Run(ctx context.Context, logger *slog.Logger) {
defer removeSelfHostedRunner(ctx, *vm, *workflowRunJob.Job.RunID)
logger.DebugContext(ctx, "register runner", "stdout", string(ankRunScriptOutput))
// Install and Start runner
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled before start runner")
return
}
ankRunScriptOutput, err = ankaCLI.ExecuteAndParseJsonOnError(ctx,
"anka", "-j", "run", vm.Name, "./start-runner.bash",
)
Expand Down Expand Up @@ -385,6 +423,10 @@ func Run(ctx context.Context, logger *slog.Logger) {
jobCompleted = true
logger.InfoContext(ctx, "job completed", "job_id", *workflowRunJob.Job.ID)
} else if logCounter%2 == 0 {
if ctx.Err() != nil {
logger.ErrorContext(ctx, "context cancelled during job status check")
return
}
logger.InfoContext(ctx, "job still in progress", "job_id", *workflowRunJob.Job.ID)
time.Sleep(5 * time.Second) // Wait before checking the job status again
}
Expand Down

0 comments on commit d4d43c2

Please sign in to comment.