Skip to content

Commit

Permalink
quick fix
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Apr 5, 2024
1 parent f16ff0f commit f209dd8
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 71 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gofri/go-github-ratelimit v1.1.0
github.com/google/go-github/v58 v58.0.0
github.com/google/uuid v1.6.0
github.com/norsegaud/go-daemon v0.1.8
github.com/redis/go-redis/v9 v9.5.1
gopkg.in/yaml.v2 v2.4.0
)
Expand All @@ -14,6 +15,8 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
golang.org/x/sys v0.19.0 // indirect
)

replace github.com/norsegaud/anklet/plugins/github => ./plugins/github
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/norsegaud/go-daemon v0.1.8 h1:mpcs/L40jCyTV457wYdxYoNxWViDmcN4VAjbGBTieN4=
github.com/norsegaud/go-daemon v0.1.8/go.mod h1:+Y6PlACn3Gp+n7EU8r7xtOPiN9YwNFG7ngy325wKugE=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
7 changes: 7 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ type ContextKey string
type Config struct {
Services []Service `yaml:"services"`
Database Database `yaml:"database"`
Log Log `yaml:"log"`
PidFile string `yaml:"pid_file" default:"/tmp/anklet.pid"`
WorkDir string `yaml:"work_dir" default:"/tmp/"`
}

type Log struct {
Location string `yaml:"location" default:"~/Library/Logs/anklet.log"`
}

type Database struct {
Expand Down
216 changes: 146 additions & 70 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package main

import (
"context"
"flag"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
Expand All @@ -19,30 +20,47 @@ import (
internalGithub "github.com/norsegaud/anklet/internal/github"
"github.com/norsegaud/anklet/internal/logging"
"github.com/norsegaud/anklet/internal/run"
"github.com/norsegaud/go-daemon"
)

var version string // Declare version variable to hold the version set by go build
var (
version = "dev"
runOnce = "false"
signal = flag.String("s", "", `Send signal to the daemon:
quit — graceful shutdown
stop — fast shutdown
reload — reloading the configuration file`)
stop = make(chan struct{})
done = make(chan struct{})
)

func termHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc {
return func(sig os.Signal) error {
logger.InfoContext(ctx, "terminating anklet, please do not interrupt...")
stop <- struct{}{}
if sig == syscall.SIGQUIT {
<-done
}
return daemon.ErrStop
}
}

var runOnce string
func reloadHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc {
return func(sig os.Signal) error {
logger.InfoContext(ctx, "reloading configuration")
return nil
}
}

func main() {

logger := logging.New()
ctx, cancel := context.WithCancel(context.Background())

if version == "" {
version = "dev" // Default version if not set by go build
}

ctx = logging.AppendCtx(ctx, slog.String("ankletVersion", version))

// trap SIGTERM and SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
go func() {
<-c
logger.WarnContext(ctx, "received an interrupt, stopping services")
cancel() // Signal all services to stop
}()
flag.Parse()
daemon.AddCommand(daemon.StringFlag(signal, "quit"), syscall.SIGQUIT, termHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signal, "stop"), syscall.SIGTERM, termHandler(ctx, logger))
daemon.AddCommand(daemon.StringFlag(signal, "reload"), syscall.SIGHUP, reloadHandler(ctx, logger))

homeDir, err := os.UserHomeDir()
if err != nil {
Expand All @@ -56,6 +74,43 @@ func main() {
panic(err)
}

daemonContext := &daemon.Context{
PidFileName: loadedConfig.PidFile,
PidFilePerm: 0644,
LogFileName: loadedConfig.Log.Location,
LogFilePerm: 0640,
WorkDir: loadedConfig.WorkDir,
Umask: 027,
Args: []string{"anklet"},
}

if len(daemon.ActiveFlags()) > 0 {
d, err := daemonContext.Search()
if err != nil {
log.Fatalf("Unable send signal to the daemon: %s", err.Error())
}
err = daemon.SendCommands(d)
if err != nil {
log.Fatalln(err.Error())
}
return
}

d, err := daemonContext.Reborn()
if err != nil {
log.Fatalln(err)
}
if d != nil {
return
}
defer daemonContext.Release()

if version == "" {
version = "dev" // Default version if not set by go build
}

ctx = logging.AppendCtx(ctx, slog.String("ankletVersion", version))

logger.InfoContext(ctx, "starting anklet")
logger.DebugContext(ctx, "loaded config", slog.Any("config", loadedConfig))

Expand All @@ -71,61 +126,82 @@ func main() {
return
}

var wg sync.WaitGroup
for _, service := range loadedConfig.Services {
wg.Add(1)
go func(service config.Service) {
defer wg.Done()
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
defer serviceCancel() // Ensure we cancel the context when service exits

if service.Name == "" {
panic("name is required for services")
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service)
serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name))
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger)

ankaCLI, err := anka.NewCLI(serviceCtx)
if err != nil {
panic(fmt.Sprintf("unable to create anka cli: %v", err))
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI)

githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token)
githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient)
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient)

if loadedConfig.Database.Enabled {
databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database)
if err != nil {
panic(fmt.Sprintf("unable to access database: %v", err))
// main worker child for daemon
go func() {
LOOP:
for {
time.Sleep(time.Second) // this is work to be done by worker.
select {
case <-stop:
fmt.Println("stopping")
cancel()
break LOOP
default:
var wg sync.WaitGroup
for _, service := range loadedConfig.Services {
wg.Add(1)
go func(service config.Service) {
defer wg.Done()
serviceCtx, serviceCancel := context.WithCancel(ctx) // Inherit from parent context
defer serviceCancel() // Ensure we cancel the context when service exits

if service.Name == "" {
panic("name is required for services")
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("service"), service)
serviceCtx = logging.AppendCtx(serviceCtx, slog.String("serviceName", service.Name))
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("logger"), logger)

ankaCLI, err := anka.NewCLI(serviceCtx)
if err != nil {
panic(fmt.Sprintf("unable to create anka cli: %v", err))
}

serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI)

githubClient := github.NewClient(rateLimiter).WithAuthToken(service.Token)
githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient)
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient)

if loadedConfig.Database.Enabled {
databaseClient, err := database.NewClient(serviceCtx, loadedConfig.Database)
if err != nil {
panic(fmt.Sprintf("unable to access database: %v", err))
}
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{
Client: databaseClient,
})
}

for {
select {
case <-stop:
logger.WarnContext(serviceCtx, "service is stopping")
return // Exit the loop and hence the goroutine
default:
run.Plugin(serviceCtx, logger)
if ctx.Err() != nil {
return
}
if runOnce == "true" { // only run once; used for testing
return
}
time.Sleep(time.Duration(service.SleepInterval) * time.Second)
}
}
}(service)
}
serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), database.Database{
Client: databaseClient,
})
wg.Wait() // Wait for all services to finish
}
}
done <- struct{}{}
}()

for {
select {
case <-serviceCtx.Done():
logger.WarnContext(serviceCtx, "service is stopping")
return // Exit the loop and hence the goroutine
default:
run.Plugin(serviceCtx, logger)
if ctx.Err() != nil {
return
}
if runOnce == "true" { // only run once; used for testing
return
}
time.Sleep(time.Duration(service.SleepInterval) * time.Second)
}
}
}(service)
err = daemon.ServeSignals()
if err != nil {
log.Printf("Error: %s", err.Error())
}
wg.Wait() // Wait for all services to finish
logger.InfoContext(ctx, "all services stopped")

logger.InfoContext(ctx, "anklet terminated")
}
6 changes: 5 additions & 1 deletion plugins/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,15 @@ func setLoggingContext(ctx context.Context, workflowRunJob WorkflowRunJobDetail)
}

func getWorkflowRunJobs(ctx context.Context, logger *slog.Logger) ([]WorkflowRunJobDetail, error) {
if ctx.Err() != nil {
return nil, fmt.Errorf("context cancelled before getWorkflowRunJobs")
}
githubClient := internalGithub.GetGitHubClientFromContext(ctx)
service := config.GetServiceFromContext(ctx)
var allWorkflowRunJobDetails []WorkflowRunJobDetail
// WORKFLOWS
fmt.Println("here")

workflows, _, err := ExecuteGitHubClientFunction[*github.Workflows](ctx, logger, func() (**github.Workflows, *github.Response, error) {
workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{})
return &workflows, resp, err // Adjusted to return the direct result
Expand Down Expand Up @@ -220,7 +225,6 @@ func Run(ctx context.Context, logger *slog.Logger) {
repositoryURL := fmt.Sprintf("https://github.com/%s/%s", service.Owner, service.Repo)

// obtain all queued workflow runs and jobs

allWorkflowRunJobDetails, err := getWorkflowRunJobs(ctx, logger)
if err != nil {
return
Expand Down

0 comments on commit f209dd8

Please sign in to comment.