diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index 2ebddccf13..640d0279a0 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -20,6 +20,7 @@ import ( "fmt" "os/exec" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -38,6 +39,7 @@ const ( backoffMinDelay = 1 * time.Second backoffMaxDelay = 30 * time.Second + maxRetry = 5 keepLogLines = 100 ) @@ -53,13 +55,18 @@ const ( // implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used // only to avoid stack overflow; there are no extra goroutines for states. type Process struct { - params *Params - l *logrus.Entry - pl *processLogger - changes chan inventorypb.AgentStatus - backoff *backoff.Backoff - ctxDone chan struct{} - + params *Params + l *logrus.Entry + pl *processLogger + changes chan inventorypb.AgentStatus + backoff *backoff.Backoff + ctxDone chan struct{} + retryTime int8 + requireNewParams chan bool // send true if need new params + newParams <-chan *Params // for getting updated params + + rw sync.RWMutex + // recreated on each restart cmd *exec.Cmd cmdDone chan struct{} @@ -74,6 +81,7 @@ type Params struct { Type inventorypb.AgentType TemplateRenderer *templates.TemplateRenderer TemplateParams map[string]interface{} + Port uint16 } func (p *Params) String() string { @@ -86,14 +94,17 @@ func (p *Params) String() string { } // New creates new process. -func New(params *Params, redactWords []string, l *logrus.Entry) *Process { +func New(params *Params, redactWords []string, l *logrus.Entry, newParams <-chan *Params) *Process { return &Process{ - params: params, - l: l, - pl: newProcessLogger(l, keepLogLines, redactWords), - changes: make(chan inventorypb.AgentStatus, 10), - backoff: backoff.New(backoffMinDelay, backoffMaxDelay), - ctxDone: make(chan struct{}), + params: params, + l: l, + pl: newProcessLogger(l, keepLogLines, redactWords), + changes: make(chan inventorypb.AgentStatus, 10), + backoff: backoff.New(backoffMinDelay, backoffMaxDelay), + ctxDone: make(chan struct{}), + retryTime: 0, + requireNewParams: make(chan bool), + newParams : newParams, } } @@ -174,10 +185,24 @@ func (p *Process) toWaiting() { p.l.Infof("Process: waiting %s.", delay) p.changes <- inventorypb.AgentStatus_WAITING + p.checkMaxRetry() + t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: + if p.newParams!=nil { + select { + case params, ok := <-p.newParams: + if ok { + p.rw.Lock() + defer p.rw.Unlock() + p.params = params + } + default: + } + } + // recreate config file in temp dir. if p.params.TemplateRenderer != nil { _, err := p.params.TemplateRenderer.RenderFiles(p.params.TemplateParams) @@ -221,7 +246,8 @@ func (p *Process) toStopping() { func (p *Process) toDone() { p.l.Trace("Process: done.") p.changes <- inventorypb.AgentStatus_DONE - + + close(p.requireNewParams) close(p.changes) } @@ -230,11 +256,29 @@ func (p *Process) Changes() <-chan inventorypb.AgentStatus { return p.changes } +func (p *Process) RequireNewParams() chan bool { + return p.requireNewParams +} + +func (p *Process) checkMaxRetry() { + p.retryTime++ + if p.retryTime >= maxRetry { + p.requireNewParams <- true + p.retryTime = 0 + } +} + // Logs returns latest process logs. func (p *Process) Logs() []string { return p.pl.Latest() } +func (p *Process) GetPort() uint16 { + p.rw.RLock() + defer p.rw.RUnlock() + return p.params.Port +} + // check interfaces. var ( _ fmt.Stringer = (*Params)(nil) diff --git a/agent/agents/process/process_test.go b/agent/agents/process/process_test.go index d40d555e24..ddfbbf5fef 100644 --- a/agent/agents/process/process_test.go +++ b/agent/agents/process/process_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sys/unix" + "github.com/percona/pmm/agent/utils/backoff" "github.com/percona/pmm/api/inventorypb" ) @@ -71,7 +72,7 @@ func setup(t *testing.T) (context.Context, context.CancelFunc, *logrus.Entry) { func TestProcess(t *testing.T) { t.Run("Normal", func(t *testing.T) { ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) @@ -81,7 +82,7 @@ func TestProcess(t *testing.T) { t.Run("FailedToStart", func(t *testing.T) { ctx, cancel, l := setup(t) - p := New(&Params{Path: "no_such_command"}, nil, l) + p := New(&Params{Path: "no_such_command"}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) @@ -92,7 +93,7 @@ func TestProcess(t *testing.T) { t.Run("ExitedEarly", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) @@ -103,7 +104,7 @@ func TestProcess(t *testing.T) { t.Run("CancelStarting", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING) @@ -114,7 +115,7 @@ func TestProcess(t *testing.T) { t.Run("Exited", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()+0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING, inventorypb.AgentStatus_WAITING) @@ -133,7 +134,7 @@ func TestProcess(t *testing.T) { build(t, "", "process_noterm.go", f.Name()) ctx, cancel, l := setup(t) - p := New(&Params{Path: f.Name()}, nil, l) + p := New(&Params{Path: f.Name()}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) @@ -187,6 +188,39 @@ func TestProcess(t *testing.T) { err = proc.Signal(unix.Signal(0)) require.EqualError(t, err, "os: process already finished", "child process with pid %v is not killed", pid) }) + +} + +func TestNewParams(t *testing.T) { + t.Run("tryNewParams", func(t *testing.T) { + ctx, cancel, l := setup(t) + newParamsChan := make(chan *Params) + p := New(&Params{Path: "cat", Args: []string{"A_WRONG_PATH"}}, nil, l, newParamsChan) + p.backoff = backoff.New(0*time.Second, 1*time.Second) + + go p.Run(ctx) + + // the default maxRetry is 5 + for i := 0; i < 5; i++{ + assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) + } + go func() { + for value := range p.requireNewParams { + if !value { + continue + } + newParamsChan <- &Params{Path: "sleep", Args: []string{"100500"}} + } + }() + timer := time.NewTimer(time.Second * 6) + defer timer.Stop() + select { + case <-timer.C: + assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) + cancel() + assertStates(t, p, inventorypb.AgentStatus_STOPPING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID) + } + }) } func TestExtractLogLevel(t *testing.T) { diff --git a/agent/agents/supervisor/ports_registry.go b/agent/agents/supervisor/ports_registry.go index 53f04a37ee..f45059e161 100644 --- a/agent/agents/supervisor/ports_registry.go +++ b/agent/agents/supervisor/ports_registry.go @@ -33,6 +33,7 @@ type portsRegistry struct { max uint16 last uint16 reserved map[uint16]struct{} + ignoreCheck bool // used for testing } func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry { @@ -45,6 +46,7 @@ func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry { max: max, last: min - 1, reserved: make(map[uint16]struct{}, len(reserved)), + ignoreCheck: false, } for _, p := range reserved { r.reserved[p] = struct{}{} @@ -67,12 +69,14 @@ func (r *portsRegistry) Reserve() (uint16, error) { continue } - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if l != nil { - _ = l.Close() - } - if err != nil { - continue + if !r.ignoreCheck { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if l != nil { + _ = l.Close() + } + if err != nil { + continue + } } r.reserved[port] = struct{}{} @@ -83,8 +87,8 @@ func (r *portsRegistry) Reserve() (uint16, error) { return 0, errNoFreePort } -// Release releases port. -func (r *portsRegistry) Release(port uint16) error { +// Release releases port. +func (r *portsRegistry) Release(port uint16, ignoreBusy bool) error { r.m.Lock() defer r.m.Unlock() @@ -92,12 +96,14 @@ func (r *portsRegistry) Release(port uint16) error { return errPortNotReserved } - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if l != nil { - _ = l.Close() - } - if err != nil { - return errPortBusy + if !ignoreBusy { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if l != nil { + _ = l.Close() + } + if err != nil { + return errPortBusy + } } delete(r.reserved, port) diff --git a/agent/agents/supervisor/ports_registry_test.go b/agent/agents/supervisor/ports_registry_test.go index 2fd56331bd..c7124269a5 100644 --- a/agent/agents/supervisor/ports_registry_test.go +++ b/agent/agents/supervisor/ports_registry_test.go @@ -39,11 +39,11 @@ func TestRegistry(t *testing.T) { require.NoError(t, err) defer l2.Close() //nolint:errcheck,gosec - err = r.Release(65000) + err = r.Release(65000, false) assert.NoError(t, err) - err = r.Release(65001) + err = r.Release(65001, false) assert.Equal(t, errPortNotReserved, err) - err = r.Release(65002) + err = r.Release(65002, false) assert.Equal(t, errPortBusy, err) l1.Close() //nolint:errcheck @@ -58,7 +58,7 @@ func TestRegistry(t *testing.T) { _, err = r.Reserve() assert.Equal(t, errNoFreePort, err) - err = r.Release(65002) + err = r.Release(65002, false) assert.NoError(t, err) p, err = r.Reserve() @@ -75,7 +75,7 @@ func TestPreferNewPort(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 65000, p) - err = r.Release(p) + err = r.Release(p, false) assert.NoError(t, err) p, err = r.Reserve() @@ -101,10 +101,32 @@ func TestSinglePort(t *testing.T) { _, err = r.Reserve() assert.Equal(t, errNoFreePort, err) - err = r.Release(p) + err = r.Release(p, false) assert.NoError(t, err) p, err = r.Reserve() assert.NoError(t, err) assert.EqualValues(t, 65000, p) } + +func TestRegistryRaceCondition(t *testing.T) { + + r := newPortsRegistry(65000, 65002, nil) + + p, err := r.Reserve() + assert.NoError(t, err) + assert.EqualValues(t, 65000, p) + + l1, err := net.Listen("tcp", "127.0.0.1:65000") + require.NoError(t, err) + defer l1.Close() + + err = r.Release(65000, false) + assert.EqualValues(t, errPortBusy, err) + + err = r.Release(65000, true) + assert.NoError(t, err) + + assert.NotContains(t, r.reserved, 65000) + +} diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index d426980942..ff014d2e78 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -23,6 +23,7 @@ import ( "path/filepath" "runtime/pprof" "sort" + "strconv" "strings" "sync" @@ -73,12 +74,14 @@ type Supervisor struct { // agentProcessInfo describes Agent process. type agentProcessInfo struct { - cancel func() // to cancel Process.Run(ctx) - done <-chan struct{} // closes when Process.Changes() channel closes - requestedState *agentpb.SetStateRequest_AgentProcess - listenPort uint16 - processExecPath string - logStore *tailog.Store // store logs + cancel func() // to cancel Process.Run(ctx) + done <-chan struct{} // closes when Process.Changes() channel closes + requestedState *agentpb.SetStateRequest_AgentProcess + listenPort uint16 + processExecPath string + logStore *tailog.Store // store logs + requireNewParams <-chan bool + newParams chan<- *process.Params } // builtinAgentInfo describes built-in Agent. @@ -291,7 +294,7 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta agent.cancel() <-agent.done - if err := s.portsRegistry.Release(agent.listenPort); err != nil { + if err := s.portsRegistry.Release(agent.listenPort, false); err != nil { s.l.Errorf("Failed to release port: %s.", err) } @@ -314,6 +317,8 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta s.l.Errorf("Failed to start Agent: %s.", err) // TODO report that error to server } + + s.startNewParamsLister(agentID, agentProcesses[agentID]) } // start new agents @@ -329,6 +334,8 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta s.l.Errorf("Failed to start Agent: %s.", err) // TODO report that error to server } + + s.startNewParamsLister(agentID, agentProcesses[agentID]) } } @@ -427,6 +434,7 @@ func filter(existing, ap map[string]agentpb.AgentParams) ([]string, []string, [] //nolint:golint,stylecheck const ( + type_TEST_NC inventorypb.AgentType = 997 type_TEST_SLEEP inventorypb.AgentType = 998 // process type_TEST_NOOP inventorypb.AgentType = 999 // built-in ) @@ -449,7 +457,8 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState }) l.Debugf("Starting: %s.", processParams) - process := process.New(processParams, agentProcess.RedactWords, l) + newParamsChan := make(chan *process.Params) + process := process.New(processParams, agentProcess.RedactWords, l, newParamsChan) go pprof.Do(ctx, pprof.Labels("agentID", agentID, "type", agentType), process.Run) version, err := s.version(agentProcess.Type, processParams.Path) @@ -461,11 +470,11 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState go func() { for status := range process.Changes() { s.storeLastStatus(agentID, status) - l.Infof("Sending status: %s (port %d).", status, port) + l.Infof("Sending status: %s (port %d).", status, process.GetPort()) s.changes <- &agentpb.StateChangedRequest{ AgentId: agentID, Status: status, - ListenPort: uint32(port), + ListenPort: uint32(process.GetPort()), ProcessExecPath: processParams.Path, Version: version, } @@ -475,16 +484,55 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState //nolint:forcetypeassert s.agentProcesses[agentID] = &agentProcessInfo{ - cancel: cancel, - done: done, - requestedState: proto.Clone(agentProcess).(*agentpb.SetStateRequest_AgentProcess), - listenPort: port, - processExecPath: processParams.Path, - logStore: logStore, + cancel: cancel, + done: done, + requestedState: proto.Clone(agentProcess).(*agentpb.SetStateRequest_AgentProcess), + listenPort: port, + processExecPath: processParams.Path, + logStore: logStore, + requireNewParams: process.RequireNewParams(), + newParams: newParamsChan, } return nil } +// A lister for updating the params of the process. +// The listener will release the previous port, reserve a new one and send new params to the process +func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess) { + go func() { + s.rw.RLock() + process := s.agentProcesses[agentID] + s.rw.RUnlock() + for value := range process.requireNewParams { + if !value { + continue + } + + // need rw lock for to lock the agentProcess map + s.rw.Lock() + defer s.rw.Unlock() + + s.portsRegistry.Release(process.listenPort, true) + port, err := s.portsRegistry.Reserve() + if err != nil { + s.l.Errorf("Failed to reserve port during retry: %s.", err) + // TODO report that error to server + continue + } + params, err := s.processParams(agentID, agentProcess, port) + if err != nil { + s.l.Errorf("Failed to process params during retry: %s.", err) + // TODO report that error to server + continue + } + if process.newParams != nil { + process.listenPort = port + process.newParams <- params + } + } + }() +} + // startBuiltin starts built-in Agent. // Must be called with s.rw held for writing. func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentpb.SetStateRequest_BuiltinAgent) error { @@ -659,6 +707,13 @@ func (s *Supervisor) processParams(agentID string, agentProcess *agentpb.SetStat processParams.Path = cfg.Paths.AzureExporter case type_TEST_SLEEP: processParams.Path = "sleep" + case type_TEST_NC: + processParams.Path = "nc" + if agentProcess.Args[len(agentProcess.Args)-1] == "localhost" { + agentProcess.Args = append(agentProcess.Args, strconv.Itoa(int(port))) + } + agentProcess.Args[len(agentProcess.Args)-1] = strconv.Itoa(int(port)) + case inventorypb.AgentType_VM_AGENT: // add template params for vmagent. templateParams["server_insecure"] = cfg.Server.InsecureTLS @@ -711,6 +766,8 @@ func (s *Supervisor) processParams(agentID string, agentProcess *agentpb.SetStat processParams.Env[i] = string(b) } + processParams.Port = port + return &processParams, nil } diff --git a/agent/agents/supervisor/supervisor_test.go b/agent/agents/supervisor/supervisor_test.go index 5811e34504..d3d30b76c7 100644 --- a/agent/agents/supervisor/supervisor_test.go +++ b/agent/agents/supervisor/supervisor_test.go @@ -16,13 +16,16 @@ package supervisor import ( "context" + "net" "os" "path/filepath" "sort" + "syscall" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" "github.com/percona/pmm/agent/agents/process" "github.com/percona/pmm/agent/config" @@ -259,6 +262,73 @@ func TestSupervisor(t *testing.T) { }) } +// Simulate the situation where port assigned to a process is occupied. +// The supervisor is supposed to give the process a new port. +func TestSupervisorRetryPorts(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tempDir := t.TempDir() + cfgStorage := config.NewStorage(&config.Config{ + Paths: config.Paths{TempDir: tempDir}, + Ports: config.Ports{Min: 65011, Max: 65099}, + Server: config.Server{Address: "localhost:443"}, + LogLinesCount: 1, + }) + s := NewSupervisor(ctx, nil, cfgStorage) + s.portsRegistry = &portsRegistry{ + min: 65011, + max: 65099, + last: 65011 - 1, + reserved: make(map[uint16]struct{}, 0), + ignoreCheck: true, + } + + go s.Run(ctx) + + t.Run("StartWithConflictPort", func(t *testing.T) { + expectedList := []*agentlocalpb.AgentInfo{} + var lc = net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var opErr error + if err := c.Control(func(fd uintptr) { + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 0) + }); err != nil { + return err + } + return opErr + }, + } + l, err := lc.Listen(context.Background(),"tcp", "127.0.0.1:65011") + require.NoError(t, err) + defer l.Close() + + s.SetState(&agentpb.SetStateRequest{ + AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{ + "nc1": {Type: type_TEST_NC, Args: []string{"-lk4", "localhost"}}, + }, + }) + + for i := 0; i < 5; i++ { + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "nc"}) + } + + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "nc"}) + + cancel() + + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STOPPING, ListenPort: 65012, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65012, ProcessExecPath: "nc"}) + expectedList = []*agentlocalpb.AgentInfo{} + require.Equal(t, expectedList, s.AgentsList()) + }) + +} + func TestFilter(t *testing.T) { t.Parallel()