Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-11603 pmm-agent reload. #2410

Merged
merged 41 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5ee3727
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 10, 2023
397dd78
PMM-11603 Temp, not done.
JiriCtvrtka Aug 14, 2023
d1ed579
PMM-11603 Another refactor.
JiriCtvrtka Aug 14, 2023
a798f6e
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 14, 2023
fe419c7
PMM-11603 Close reload channel.
JiriCtvrtka Aug 15, 2023
dc5fcfb
Merge remote-tracking branch 'origin/PMM-11603-agent-reload' into PMM…
JiriCtvrtka Aug 15, 2023
ef83b4b
PMM-11603 Fix logger, refactor.
JiriCtvrtka Aug 15, 2023
e236c23
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 15, 2023
7e6b2a8
PMM-11603 Print done only once even with reloads.
JiriCtvrtka Aug 16, 2023
00dbaad
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 16, 2023
0e75cc4
Update agent/agentlocal/agent_local.go
JiriCtvrtka Aug 16, 2023
f7425fa
PMM-11603 Recursion to for.
JiriCtvrtka Aug 16, 2023
f83b2b4
PMM-11603 Changes, refactor.
JiriCtvrtka Aug 17, 2023
33183b8
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 17, 2023
69afdca
PMM-11603 Small improvements.
JiriCtvrtka Aug 17, 2023
2392944
PMM-11603 Formatting.
JiriCtvrtka Aug 17, 2023
42e000a
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 18, 2023
69bcdf3
PMM-11603 2nd level loop for client.
JiriCtvrtka Aug 22, 2023
3ce3530
PMM-11603 Fix nil pointer problem.
JiriCtvrtka Aug 22, 2023
d72cd84
PMM-11603 Another places without check for nil coming from channel.
JiriCtvrtka Aug 22, 2023
5186562
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 23, 2023
7a28f9f
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 24, 2023
98a896b
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 24, 2023
a370bbf
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 25, 2023
62d417f
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Aug 26, 2023
83b9ed9
PMM-11603 Some required changes.
JiriCtvrtka Aug 26, 2023
7b030bd
PMM11603 Separate switch into func.
JiriCtvrtka Aug 26, 2023
2bade5b
PMM-11603 Remove chan value nil check.
JiriCtvrtka Aug 26, 2023
c309281
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 4, 2023
f7e91bc
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 11, 2023
b0b57c4
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 11, 2023
ed7cd0d
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 12, 2023
769547c
PMM-11603 Suggested changes.
JiriCtvrtka Sep 12, 2023
3db8435
Merge remote-tracking branch 'origin/PMM-11603-agent-reload' into PMM…
JiriCtvrtka Sep 12, 2023
9af8a27
PMM-11603 Logger refactor.
JiriCtvrtka Sep 12, 2023
316e62e
PMM-11603 Lint.
JiriCtvrtka Sep 12, 2023
4d40b0d
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 13, 2023
72e1ab3
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 13, 2023
014bfa8
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 13, 2023
28ec4cf
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 13, 2023
64aa322
Merge branch 'main' into PMM-11603-agent-reload
JiriCtvrtka Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agent/agentlocal/agent_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewServer(cfg configGetReloader, supervisor supervisor, client client, conf
// Run runs gRPC and JSON servers with API and debug endpoints until ctx is canceled.
//
// Run exits when ctx is canceled, or when a request to reload configuration is received.
func (s *Server) Run(ctx context.Context) {
func (s *Server) Run(ctx context.Context, reloadCh chan bool) {
defer s.l.Info("Done.")

serverCtx, serverCancel := context.WithCancel(ctx)
Expand All @@ -125,8 +125,10 @@ func (s *Server) Run(ctx context.Context) {
}()

select {
case <-ctx.Done():
case <-s.reload:
s.l.Debug("Agent reload triggered")
reloadCh <- true
case <-ctx.Done():
}

serverCancel()
Expand Down
12 changes: 12 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (c *Client) processActionResults(ctx context.Context) {
for {
select {
case result := <-c.runner.ActionsResults():
if result == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(result)
if err != nil {
c.l.Error(err)
Expand All @@ -281,6 +284,9 @@ func (c *Client) processJobsResults(ctx context.Context) {
for {
select {
case message := <-c.runner.JobsMessages():
if message == nil {
continue
}
c.channel.Send(&channel.AgentResponse{
ID: 0, // Jobs send messages that don't require any responses, so we can leave message ID blank.
Payload: message,
Expand All @@ -302,6 +308,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) {
for {
select {
case state := <-c.supervisor.Changes():
if state == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(state)
if err != nil {
c.l.Error(err)
Expand All @@ -324,6 +333,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) {
for {
select {
case collect := <-c.supervisor.QANRequests():
if collect == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(collect)
if err != nil {
c.l.Error(err)
Expand Down
134 changes: 77 additions & 57 deletions agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ import (

// Run implements `pmm-agent run` default command.
func Run() {
var cfg *config.Config
l := logrus.WithField("component", "main")
ctx, cancel := context.WithCancel(context.Background())
defer l.Info("Done.")

const initServerLogsMaxLength = 32 // store logs before load configuration
logStore := tailog.NewStore(initServerLogsMaxLength)
logrus.SetOutput(io.MultiWriter(os.Stderr, logStore))
l := logrus.WithField("component", "main")
rootCtx, rootCancel := context.WithCancel(context.Background())

defer l.Info("Done.")

// handle termination signals
signals := make(chan os.Signal, 1)
Expand All @@ -56,75 +55,89 @@ func Run() {
s := <-signals
signal.Stop(signals)
l.Warnf("Got %s, shutting down...", unix.SignalName(s.(unix.Signal))) //nolint:forcetypeassert
if cfg != nil {
cleanupTmp(cfg.Paths.TempDir, l)
}
cancel()
Comment on lines -59 to -62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to clean up Tmp directory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rootCancel()
}()

configStorage := config.NewStorage(nil)
configFilepath, err := configStorage.Reload(l)
if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}

cfg = configStorage.Get()

cleanupTmp(cfg.Paths.TempDir, l)
connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime)
connectionUptimeService.RunCleanupGoroutine(ctx)
v := versioner.New(&versioner.RealExecFunctions{})
supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
client := client.New(configStorage, supervisor, r, connectionChecker, v, connectionUptimeService, logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
supervisor.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
r.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
localServer.Run(ctx)
cancel()
}()
configStorage, configFilepath := prepareConfig(l)

for {
_, err = configStorage.Reload(l)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reload the configs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reload is called when you use pmm-admin config subcommand. So it should not be necessary do it here, or we want to cover also case that someone will modify config manually during its running? Or there another case how you could change config?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to cover that as well. We have API endpoint for that reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and actually the main reason to have /reload endpoint is to be able to reload configs without restarting pmm-agent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I wrong or reload for reload endpoint should be done by https://github.com/percona/pmm/pull/2410/files#diff-9bb22a7f420e8dfd90097d05536858550d6f7c00dfedada00e0606305d667db1R190 anyway? Thank you.

if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}

ctx, cancel := context.WithCancel(rootCtx)
cfg := configStorage.Get()

config.ConfigureLogger(cfg)
logStore.Resize(cfg.LogLinesCount)
l.Debugf("Loaded configuration: %+v", cfg)
prepareLogger(cfg, logStore, l)

supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
client := client.New(configStorage, supervisor, r, connectionChecker, v, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

logrus.Infof("Window check connection time is %.2f hour(s)", cfg.WindowConnectedTime.Hours())
connectionUptimeService.SetWindowPeriod(cfg.WindowConnectedTime)

var wg sync.WaitGroup
wg.Add(3)
reloadCh := make(chan bool, 1)
go func() {
defer wg.Done()
supervisor.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
r.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
localServer.Run(ctx, reloadCh)
cancel()
}()

processClientUntilCancel(ctx, client, reloadCh, l)

cleanupTmp(cfg.Paths.TempDir, l)
wg.Wait()
select {
case <-rootCtx.Done():
return
default:
}
}
}

func processClientUntilCancel(ctx context.Context, client *client.Client, reloadCh chan bool, l *logrus.Entry) {
for {
clientCtx, cancelClientCtx := context.WithCancel(ctx)
client.Run(clientCtx)

_ = client.Run(clientCtx)
cancelClientCtx()

<-client.Done()

if ctx.Err() != nil {
break
select {
case <-reloadCh:
return
case <-ctx.Done():
return
default:
}
}
wg.Wait()
}

func prepareConfig(l *logrus.Entry) (*config.Storage, string) {
configStorage := config.NewStorage(nil)
configFilepath, err := configStorage.Reload(l)
if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}
Comment on lines +129 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to reload right after storage created?

Copy link
Contributor Author

@JiriCtvrtka JiriCtvrtka Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its inside last func before loop https://github.com/percona/pmm/pull/2410/files#diff-ae8858561a752ed8fb97d610805944a2f0123026d91e30fdbcf8c34b3ae80f5bR62

In loop config is already used so it seems its late as possible. If you have an idea feel free to suggest it.


return configStorage, configFilepath
}

func prepareLogger(cfg *config.Config, logStore *tailog.Store, l *logrus.Entry) {
config.ConfigureLogger(cfg)
logStore.Resize(cfg.LogLinesCount)
l.Debugf("Loaded configuration: %+v", cfg)
}

func cleanupTmp(tmpRoot string, log *logrus.Entry) {
Expand All @@ -141,3 +154,10 @@ func cleanupTmp(tmpRoot string, log *logrus.Entry) {
}
}
}

func prepareConnectionService(ctx context.Context, cfg *config.Config) *connectionuptime.Service {
connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime)
connectionUptimeService.RunCleanupGoroutine(ctx)

return connectionUptimeService
}
2 changes: 0 additions & 2 deletions agent/config/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func ConfigureLogger(cfg *Config) {
logrus.SetLevel(level)

if level == logrus.TraceLevel {
// grpclog.SetLoggerV2 is not thread-safe

// logrus.SetReportCaller thread-safe: https://github.com/sirupsen/logrus/issues/954
logrus.SetReportCaller(true)
}
Expand Down