From 9b04ba7ab0144be67ac992fbb0581a6df8df1ba0 Mon Sep 17 00:00:00 2001 From: Sam Lucidi Date: Wed, 10 Jul 2024 10:51:59 -0400 Subject: [PATCH] wip lifecycle management --- api/recovery.go | 31 +++++++++++++++++++++ api/server.go | 55 +++++++++++++++++++++++++++++++++++++ cmd/main.go | 49 +++++++++++++++++++++++++-------- importer/manager.go | 4 ++- lifecycle/manager.go | 63 +++++++++++++++++++++++++++++++++++++++++++ metrics/manager.go | 19 ++++++++++--- reaper/manager.go | 13 ++++----- settings/all.go | 5 ++++ settings/hub.go | 24 ++++++++++++++--- settings/lifecycle.go | 35 ++++++++++++++++++++++++ task/manager.go | 29 +++++++++++++------- tracker/manager.go | 20 +++++++++++--- 12 files changed, 309 insertions(+), 38 deletions(-) create mode 100644 api/recovery.go create mode 100644 api/server.go create mode 100644 lifecycle/manager.go create mode 100644 settings/lifecycle.go diff --git a/api/recovery.go b/api/recovery.go new file mode 100644 index 000000000..151153c48 --- /dev/null +++ b/api/recovery.go @@ -0,0 +1,31 @@ +package api + +import ( + "github.com/gin-gonic/gin" + "github.com/konveyor/tackle2-hub/lifecycle" +) + +const ( + StopRoute = "/service/stop" + StartRoute = "/service/start" +) + +type RecoveryHandler struct { + Manager *lifecycle.Manager +} + +func (h RecoveryHandler) AddRoutes(e *gin.Engine) { + routeGroup := e.Group("/") + routeGroup.POST(StopRoute, h.Stop) + routeGroup.POST(StartRoute, h.Start) +} + +func (h RecoveryHandler) Stop(ctx *gin.Context) { + h.Manager.Stop() + ctx.Status(202) +} + +func (h RecoveryHandler) Start(ctx *gin.Context) { + h.Manager.Run() + ctx.Status(202) +} diff --git a/api/server.go b/api/server.go new file mode 100644 index 000000000..5ae09e6f2 --- /dev/null +++ b/api/server.go @@ -0,0 +1,55 @@ +package api + +import ( + "context" + "errors" + "net/http" + "sync" + "time" + + "github.com/jortel/go-utils/logr" +) + +const ( + Unit = time.Second +) + +type Server struct { + Handler http.Handler + Address string + Name string +} + +func (r *Server) Run(ctx context.Context, wg *sync.WaitGroup) { + Log = logr.WithName(r.Name) + srv := &http.Server{ + Addr: r.Address, + Handler: r.Handler, + } + go func() { + Log.Info("api server starting", "address", r.Address) + err := srv.ListenAndServe() + if !errors.Is(err, http.ErrServerClosed) { + Log.Error(err, "api server failed", "address", r.Address) + } + }() + + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + force, cancel := context.WithTimeout(context.Background(), r.timeout()) + defer cancel() + err := srv.Shutdown(force) + if err != nil { + Log.Error(err, "api server shutdown failed", "address", r.Address) + } + return + } + }() +} + +func (r *Server) timeout() (d time.Duration) { + d = Unit * time.Duration(Settings.Shutdown.Timeout) + return +} diff --git a/cmd/main.go b/cmd/main.go index 8be4667bb..f08ad0dc2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ import ( "github.com/konveyor/tackle2-hub/importer" "github.com/konveyor/tackle2-hub/k8s" crd "github.com/konveyor/tackle2-hub/k8s/api" + "github.com/konveyor/tackle2-hub/lifecycle" "github.com/konveyor/tackle2-hub/metrics" "github.com/konveyor/tackle2-hub/migration" "github.com/konveyor/tackle2-hub/reaper" @@ -155,20 +156,22 @@ func main() { settings.Settings.Auth.Keycloak.Realm, ) } + + lf := lifecycle.NewManager(db) // // Task taskManager := task.Manager{ Client: client, DB: db, } - taskManager.Run(context.Background()) + lf.Register(&taskManager) // // Reaper reaperManager := reaper.Manager{ Client: client, DB: db, } - reaperManager.Run(context.Background()) + lf.Register(&reaperManager) // // Application import. importManager := importer.Manager{ @@ -176,28 +179,36 @@ func main() { TaskManager: &taskManager, Client: client, } - importManager.Run(context.Background()) + lf.Register(&importManager) // // Ticket trackers. trackerManager := tracker.Manager{ DB: db, } - trackerManager.Run(context.Background()) + lf.Register(&trackerManager) + // // Metrics if Settings.Metrics.Enabled { log.Info("Serving Prometheus metrics", "port", Settings.Metrics.Port) - http.Handle("/metrics", api.MetricsHandler()) - go func() { - _ = http.ListenAndServe(Settings.Metrics.Address(), nil) - }() + mux := http.NewServeMux() + mux.Handle("/metrics", api.MetricsHandler()) metricsManager := metrics.Manager{ DB: db, } - metricsManager.Run(context.Background()) + metricsServer := api.Server{ + Handler: mux, + Address: Settings.Metrics.Address(), + Name: "metrics", + } + lf.Register(&metricsServer) + lf.Register(&metricsManager) } + // Web - router := gin.Default() + router := gin.New() + router.Use(gin.Logger()) + router.Use(gin.Recovery()) router.Use( func(ctx *gin.Context) { rtx := api.RichContext(ctx) @@ -212,5 +223,21 @@ func main() { for _, h := range api.All() { h.AddRoutes(router) } - err = router.Run() + server := api.Server{ + Handler: router.Handler(), + Address: ":8080", + Name: "api", + } + lf.Register(&server) + lf.Run() + + // Recovery + recovery := gin.New() + recovery.Use(gin.Logger()) + recovery.Use(gin.Recovery()) + handler := api.RecoveryHandler{ + Manager: lf, + } + handler.AddRoutes(recovery) + err = recovery.Run(Settings.Recovery.Address) } diff --git a/importer/manager.go b/importer/manager.go index b11f45e4b..5443209e6 100644 --- a/importer/manager.go +++ b/importer/manager.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "regexp" + "sync" "strings" "time" @@ -32,8 +33,9 @@ type Manager struct { } // Run the manager. -func (m *Manager) Run(ctx context.Context) { +func (m *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { go func() { + defer wg.Done() for { select { case <-ctx.Done(): diff --git a/lifecycle/manager.go b/lifecycle/manager.go new file mode 100644 index 000000000..b199603b5 --- /dev/null +++ b/lifecycle/manager.go @@ -0,0 +1,63 @@ +package lifecycle + +import ( + "sync" + + "github.com/jortel/go-utils/logr" + "github.com/konveyor/tackle2-hub/database" + "golang.org/x/net/context" + "gorm.io/gorm" +) + +var Log = logr.WithName("lifecycle") + +type Runnable interface { + Run(ctx context.Context, wg *sync.WaitGroup) +} + +func NewManager(db *gorm.DB) (m *Manager) { + m = &Manager{ + db: db, + } + return +} + +type Manager struct { + db *gorm.DB + running bool + cancel context.CancelFunc + wg sync.WaitGroup + runnable []Runnable +} + +func (r *Manager) Register(runnable Runnable) { + r.runnable = append(r.runnable, runnable) +} + +func (r *Manager) Run() { + if r.running { + return + } + r.wg = sync.WaitGroup{} + var ctx context.Context + ctx, r.cancel = context.WithCancel(context.Background()) + for _, runnable := range r.runnable { + r.wg.Add(1) + runnable.Run(ctx, &r.wg) + } + r.running = true +} + +func (r *Manager) Stop() { + if !r.running { + return + } + r.cancel() + r.wg.Wait() + err := database.Close(r.db) + if err != nil { + Log.Error(err, "could not close database after shutting down") + panic(err) + } + r.running = false +} diff --git a/metrics/manager.go b/metrics/manager.go index 72d75f5cb..6700935c0 100644 --- a/metrics/manager.go +++ b/metrics/manager.go @@ -2,15 +2,22 @@ package metrics import ( "context" + "sync" "time" "github.com/jortel/go-utils/logr" "github.com/konveyor/tackle2-hub/model" + "github.com/konveyor/tackle2-hub/settings" "gorm.io/gorm" ) var ( - Log = logr.WithName("metrics") + Settings = &settings.Settings + Log = logr.WithName("metrics") +) + +const ( + Unit = time.Second ) // Manager provides metrics management. @@ -20,15 +27,16 @@ type Manager struct { } // Run the manager. -func (m *Manager) Run(ctx context.Context) { +func (m *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { go func() { Log.Info("Started.") defer Log.Info("Died.") + defer wg.Done() for { select { case <-ctx.Done(): return - default: + case <-time.After(m.pause()): time.Sleep(time.Second * 30) m.gaugeApplications() } @@ -36,6 +44,11 @@ func (m *Manager) Run(ctx context.Context) { }() } +func (m *Manager) pause() (d time.Duration) { + d = Unit * time.Duration(Settings.Frequency.Metrics) + return +} + // gaugeApplications reports the number of applications in inventory func (m *Manager) gaugeApplications() { count := int64(0) diff --git a/reaper/manager.go b/reaper/manager.go index c3efb7095..2927f8ea2 100644 --- a/reaper/manager.go +++ b/reaper/manager.go @@ -2,6 +2,7 @@ package reaper import ( "context" + "sync" "time" "github.com/jortel/go-utils/logr" @@ -31,7 +32,7 @@ type Manager struct { } // Run the manager. -func (m *Manager) Run(ctx context.Context) { +func (m *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { registered := []Reaper{ &TaskReaper{ Client: m.Client, @@ -50,24 +51,24 @@ func (m *Manager) Run(ctx context.Context) { go func() { Log.Info("Started.") defer Log.Info("Died.") + defer wg.Done() for { select { case <-ctx.Done(): return - default: + case <-time.After(m.pause()): for _, r := range registered { r.Run() } - m.pause() } } }() } // Pause. -func (m *Manager) pause() { - d := Unit * time.Duration(Settings.Frequency.Reaper) - time.Sleep(d) +func (m *Manager) pause() (d time.Duration) { + d = Unit * time.Duration(Settings.Frequency.Reaper) + return } // Reaper interface. diff --git a/settings/all.go b/settings/all.go index e69e9adb7..13d378a86 100644 --- a/settings/all.go +++ b/settings/all.go @@ -12,6 +12,7 @@ type TackleSettings struct { Metrics Addon Auth + Lifecycle } func (r *TackleSettings) Load() (err error) { @@ -31,6 +32,10 @@ func (r *TackleSettings) Load() (err error) { if err != nil { return } + err = r.Lifecycle.Load() + if err != nil { + return + } return } diff --git a/settings/hub.go b/settings/hub.go index 192acfec3..07ce769ce 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -27,6 +27,8 @@ const ( EnvTaskPreemptRate = "TASK_PREEMPT_RATE" EnvFrequencyTask = "FREQUENCY_TASK" EnvFrequencyReaper = "FREQUENCY_REAPER" + EnvFrequencyTracker = "FREQUENCY_TRACKER" + EnvFrequencyMetrics = "FREQUENCY_METRICS" EnvDevelopment = "DEVELOPMENT" EnvBucketTTL = "BUCKET_TTL" EnvFileTTL = "FILE_TTL" @@ -87,9 +89,11 @@ type Hub struct { } // Frequency Frequency struct { - Task int - Reaper int - Volume int + Task int + Reaper int + Volume int + Tracker int + Metrics int } // Development environment Development bool @@ -194,6 +198,20 @@ func (r *Hub) Load() (err error) { } else { r.Frequency.Reaper = 1 // 1 minute. } + s, found = os.LookupEnv(EnvFrequencyTracker) + if found { + n, _ := strconv.Atoi(s) + r.Frequency.Tracker = n + } else { + r.Frequency.Tracker = 1 // 1 second. + } + s, found = os.LookupEnv(EnvFrequencyMetrics) + if found { + n, _ := strconv.Atoi(s) + r.Frequency.Metrics = n + } else { + r.Frequency.Metrics = 30 // 30 seconds. + } s, found = os.LookupEnv(EnvTaskPreemptEnabled) if found { b, _ := strconv.ParseBool(s) diff --git a/settings/lifecycle.go b/settings/lifecycle.go new file mode 100644 index 000000000..3befebd4c --- /dev/null +++ b/settings/lifecycle.go @@ -0,0 +1,35 @@ +package settings + +import ( + "os" + "strconv" +) + +const ( + EnvShutdownTimeout = "SHUTDOWN_TIMEOUT" + EnvRecoveryAddress = "RECOVERY_ADDRESS" +) + +type Lifecycle struct { + Shutdown struct { + Timeout int + } + Recovery struct { + Address string + } +} + +func (r *Lifecycle) Load() (err error) { + s, found := os.LookupEnv(EnvShutdownTimeout) + if found { + n, _ := strconv.Atoi(s) + r.Shutdown.Timeout = n + } else { + r.Shutdown.Timeout = 30 // 30 seconds. + } + r.Recovery.Address, found = os.LookupEnv(EnvRecoveryAddress) + if !found { + r.Recovery.Address = ":8083" + } + return +} diff --git a/task/manager.go b/task/manager.go index 15a2e5158..1f414b06a 100644 --- a/task/manager.go +++ b/task/manager.go @@ -97,35 +97,44 @@ type Manager struct { cluster Cluster // queue of actions. queue chan func() + // validator registered + registered bool } -// Run the manager. -func (m *Manager) Run(ctx context.Context) { - m.queue = make(chan func(), 100) - m.cluster.Client = m.Client +func (m *Manager) registerValidator() { + if m.registered { + return + } auth.Validators = append( auth.Validators, &Validator{ Client: m.Client, }) + m.registered = true +} + +// Run the manager. +func (m *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { + m.queue = make(chan func(), 100) + m.cluster.Client = m.Client + m.registerValidator() go func() { Log.Info("Started.") defer Log.Info("Done.") + defer wg.Done() for { select { case <-ctx.Done(): return - default: + case <-time.After(m.pause()): err := m.cluster.Refresh() if err == nil { m.deleteOrphanPods() m.runActions() m.updateRunning() m.startReady() - m.pause() } else { Log.Error(err, "") - m.pause() } } } @@ -279,9 +288,9 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { } // Pause. -func (m *Manager) pause() { - d := Unit * time.Duration(Settings.Frequency.Task) - time.Sleep(d) +func (m *Manager) pause() (d time.Duration) { + d = Unit * time.Duration(Settings.Frequency.Task) + return } // action enqueues an asynchronous action. diff --git a/tracker/manager.go b/tracker/manager.go index 7a451d8f6..06904db75 100644 --- a/tracker/manager.go +++ b/tracker/manager.go @@ -2,16 +2,23 @@ package tracker import ( "context" + "sync" "time" "github.com/jortel/go-utils/logr" "github.com/konveyor/tackle2-hub/model" + "github.com/konveyor/tackle2-hub/settings" "gorm.io/gorm" "gorm.io/gorm/clause" ) var ( - Log = logr.WithName("tickets") + Settings = &settings.Settings + Log = logr.WithName("tickets") +) + +const ( + Unit = time.Second ) // Intervals @@ -29,16 +36,16 @@ type Manager struct { } // Run the manager. -func (m *Manager) Run(ctx context.Context) { +func (m *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { go func() { Log.Info("Started.") defer Log.Info("Died.") + defer wg.Done() for { select { case <-ctx.Done(): return - default: - time.Sleep(time.Second) + case <-time.After(m.pause()): m.testConnections() m.refreshTickets() m.createPending() @@ -47,6 +54,11 @@ func (m *Manager) Run(ctx context.Context) { }() } +func (m *Manager) pause() (d time.Duration) { + d = Unit * time.Duration(Settings.Frequency.Tracker) + return +} + // testConnections to external trackers. func (m *Manager) testConnections() { var list []model.Tracker