diff --git a/agent/agentlocal/agent_local.go b/agent/agentlocal/agent_local.go index 60a6e3325f..e0809b2494 100644 --- a/agent/agentlocal/agent_local.go +++ b/agent/agentlocal/agent_local.go @@ -100,7 +100,7 @@ func NewServer(cfg configGetReloader, supervisor supervisor, client client, conf // Run runs gRPC and JSON servers with API and debug endpoints until ctx is canceled. // // Run exits when ctx is canceled, or when a request to reload configuration is received. -func (s *Server) Run(ctx context.Context) { +func (s *Server) Run(ctx context.Context, reloadCh chan bool) { defer s.l.Info("Done.") serverCtx, serverCancel := context.WithCancel(ctx) @@ -125,8 +125,10 @@ func (s *Server) Run(ctx context.Context) { }() select { - case <-ctx.Done(): case <-s.reload: + s.l.Debug("Agent reload triggered") + reloadCh <- true + case <-ctx.Done(): } serverCancel() diff --git a/agent/client/client.go b/agent/client/client.go index c2106072d0..a7906b6ef1 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -262,6 +262,9 @@ func (c *Client) processActionResults(ctx context.Context) { for { select { case result := <-c.runner.ActionsResults(): + if result == nil { + continue + } resp, err := c.channel.SendAndWaitResponse(result) if err != nil { c.l.Error(err) @@ -281,6 +284,9 @@ func (c *Client) processJobsResults(ctx context.Context) { for { select { case message := <-c.runner.JobsMessages(): + if message == nil { + continue + } c.channel.Send(&channel.AgentResponse{ ID: 0, // Jobs send messages that don't require any responses, so we can leave message ID blank. Payload: message, @@ -302,6 +308,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) { for { select { case state := <-c.supervisor.Changes(): + if state == nil { + continue + } resp, err := c.channel.SendAndWaitResponse(state) if err != nil { c.l.Error(err) @@ -324,6 +333,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) { for { select { case collect := <-c.supervisor.QANRequests(): + if collect == nil { + continue + } resp, err := c.channel.SendAndWaitResponse(collect) if err != nil { c.l.Error(err) diff --git a/agent/commands/run.go b/agent/commands/run.go index 22878c350b..cc384a75a9 100644 --- a/agent/commands/run.go +++ b/agent/commands/run.go @@ -40,14 +40,13 @@ import ( // Run implements `pmm-agent run` default command. func Run() { - var cfg *config.Config - l := logrus.WithField("component", "main") - ctx, cancel := context.WithCancel(context.Background()) - defer l.Info("Done.") - const initServerLogsMaxLength = 32 // store logs before load configuration logStore := tailog.NewStore(initServerLogsMaxLength) logrus.SetOutput(io.MultiWriter(os.Stderr, logStore)) + l := logrus.WithField("component", "main") + rootCtx, rootCancel := context.WithCancel(context.Background()) + + defer l.Info("Done.") // handle termination signals signals := make(chan os.Signal, 1) @@ -56,75 +55,89 @@ func Run() { s := <-signals signal.Stop(signals) l.Warnf("Got %s, shutting down...", unix.SignalName(s.(unix.Signal))) //nolint:forcetypeassert - if cfg != nil { - cleanupTmp(cfg.Paths.TempDir, l) - } - cancel() + rootCancel() }() - configStorage := config.NewStorage(nil) - configFilepath, err := configStorage.Reload(l) - if err != nil { - l.Fatalf("Failed to load configuration: %s.", err) - } - - cfg = configStorage.Get() - - cleanupTmp(cfg.Paths.TempDir, l) - connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime) - connectionUptimeService.RunCleanupGoroutine(ctx) v := versioner.New(&versioner.RealExecFunctions{}) - supervisor := supervisor.NewSupervisor(ctx, v, configStorage) - connectionChecker := connectionchecker.New(configStorage) - r := runner.New(cfg.RunnerCapacity) - client := client.New(configStorage, supervisor, r, connectionChecker, v, connectionUptimeService, logStore) - localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore) - - var wg sync.WaitGroup - wg.Add(3) - go func() { - defer wg.Done() - supervisor.Run(ctx) - cancel() - }() - go func() { - defer wg.Done() - r.Run(ctx) - cancel() - }() - go func() { - defer wg.Done() - localServer.Run(ctx) - cancel() - }() + configStorage, configFilepath := prepareConfig(l) for { - _, err = configStorage.Reload(l) - if err != nil { - l.Fatalf("Failed to load configuration: %s.", err) - } - + ctx, cancel := context.WithCancel(rootCtx) cfg := configStorage.Get() - config.ConfigureLogger(cfg) - logStore.Resize(cfg.LogLinesCount) - l.Debugf("Loaded configuration: %+v", cfg) + prepareLogger(cfg, logStore, l) + + supervisor := supervisor.NewSupervisor(ctx, v, configStorage) + connectionChecker := connectionchecker.New(configStorage) + r := runner.New(cfg.RunnerCapacity) + client := client.New(configStorage, supervisor, r, connectionChecker, v, prepareConnectionService(ctx, cfg), logStore) + localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore) logrus.Infof("Window check connection time is %.2f hour(s)", cfg.WindowConnectedTime.Hours()) - connectionUptimeService.SetWindowPeriod(cfg.WindowConnectedTime) + var wg sync.WaitGroup + wg.Add(3) + reloadCh := make(chan bool, 1) + go func() { + defer wg.Done() + supervisor.Run(ctx) + cancel() + }() + go func() { + defer wg.Done() + r.Run(ctx) + cancel() + }() + go func() { + defer wg.Done() + localServer.Run(ctx, reloadCh) + cancel() + }() + + processClientUntilCancel(ctx, client, reloadCh) + + cleanupTmp(cfg.Paths.TempDir, l) + wg.Wait() + select { + case <-rootCtx.Done(): + return + default: + } + } +} + +func processClientUntilCancel(ctx context.Context, client *client.Client, reloadCh chan bool) { + for { clientCtx, cancelClientCtx := context.WithCancel(ctx) + client.Run(clientCtx) - _ = client.Run(clientCtx) cancelClientCtx() - <-client.Done() - if ctx.Err() != nil { - break + select { + case <-reloadCh: + return + case <-ctx.Done(): + return + default: } } - wg.Wait() +} + +func prepareConfig(l *logrus.Entry) (*config.Storage, string) { + configStorage := config.NewStorage(nil) + configFilepath, err := configStorage.Reload(l) + if err != nil { + l.Fatalf("Failed to load configuration: %s.", err) + } + + return configStorage, configFilepath +} + +func prepareLogger(cfg *config.Config, logStore *tailog.Store, l *logrus.Entry) { + config.ConfigureLogger(cfg) + logStore.Resize(cfg.LogLinesCount) + l.Debugf("Loaded configuration: %+v", cfg) } func cleanupTmp(tmpRoot string, log *logrus.Entry) { @@ -141,3 +154,10 @@ func cleanupTmp(tmpRoot string, log *logrus.Entry) { } } } + +func prepareConnectionService(ctx context.Context, cfg *config.Config) *connectionuptime.Service { + connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime) + connectionUptimeService.RunCleanupGoroutine(ctx) + + return connectionUptimeService +} diff --git a/agent/config/logger.go b/agent/config/logger.go index 570ab0dbfa..781f206c4f 100644 --- a/agent/config/logger.go +++ b/agent/config/logger.go @@ -63,8 +63,6 @@ func ConfigureLogger(cfg *Config) { logrus.SetLevel(level) if level == logrus.TraceLevel { - // grpclog.SetLoggerV2 is not thread-safe - // logrus.SetReportCaller thread-safe: https://github.com/sirupsen/logrus/issues/954 logrus.SetReportCaller(true) }