From 2cab6b4ed82ba13d008a61f850dc9900fa6f1a22 Mon Sep 17 00:00:00 2001 From: GuanqunYang193 Date: Mon, 16 Oct 2023 00:38:28 -0400 Subject: [PATCH 1/5] support refresh the params --- agent/agents/process/process.go | 57 +++++++++++++++++------ agent/agents/supervisor/ports_registry.go | 16 ++++--- agent/agents/supervisor/supervisor.go | 43 ++++++++++++++++- 3 files changed, 93 insertions(+), 23 deletions(-) diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index 2ebddccf13..102759d823 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -38,6 +38,7 @@ const ( backoffMinDelay = 1 * time.Second backoffMaxDelay = 30 * time.Second + maxRetry = 5 keepLogLines = 100 ) @@ -53,12 +54,15 @@ const ( // implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used // only to avoid stack overflow; there are no extra goroutines for states. type Process struct { - params *Params - l *logrus.Entry - pl *processLogger - changes chan inventorypb.AgentStatus - backoff *backoff.Backoff - ctxDone chan struct{} + params *Params + l *logrus.Entry + pl *processLogger + changes chan inventorypb.AgentStatus + backoff *backoff.Backoff + ctxDone chan struct{} + retryTime int8 + requireNewParam chan bool + newParams <-chan *Params // recreated on each restart cmd *exec.Cmd @@ -86,14 +90,17 @@ func (p *Params) String() string { } // New creates new process. -func New(params *Params, redactWords []string, l *logrus.Entry) *Process { +func New(params *Params, redactWords []string, l *logrus.Entry, newParams <-chan *Params) *Process { return &Process{ - params: params, - l: l, - pl: newProcessLogger(l, keepLogLines, redactWords), - changes: make(chan inventorypb.AgentStatus, 10), - backoff: backoff.New(backoffMinDelay, backoffMaxDelay), - ctxDone: make(chan struct{}), + params: params, + l: l, + pl: newProcessLogger(l, keepLogLines, redactWords), + changes: make(chan inventorypb.AgentStatus, 10), + backoff: backoff.New(backoffMinDelay, backoffMaxDelay), + ctxDone: make(chan struct{}), + retryTime: 0, + requireNewParam: make(chan bool), + newParams : newParams, } } @@ -174,10 +181,23 @@ func (p *Process) toWaiting() { p.l.Infof("Process: waiting %s.", delay) p.changes <- inventorypb.AgentStatus_WAITING + if p.retryTime >= maxRetry { + p.requireNewParam <- true + p.retryTime = 0 + } + t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: + select { + case params, ok := <-p.newParams: + if ok { + p.params = params + } + default: + } + // recreate config file in temp dir. if p.params.TemplateRenderer != nil { _, err := p.params.TemplateRenderer.RenderFiles(p.params.TemplateParams) @@ -221,7 +241,8 @@ func (p *Process) toStopping() { func (p *Process) toDone() { p.l.Trace("Process: done.") p.changes <- inventorypb.AgentStatus_DONE - + + close(p.requireNewParam) close(p.changes) } @@ -230,6 +251,14 @@ func (p *Process) Changes() <-chan inventorypb.AgentStatus { return p.changes } +func (p *Process) RequireNewParam() chan bool { + return p.requireNewParam +} + +func (p *Process) NewParams() chan *Params { + return p.newParams +} + // Logs returns latest process logs. func (p *Process) Logs() []string { return p.pl.Latest() diff --git a/agent/agents/supervisor/ports_registry.go b/agent/agents/supervisor/ports_registry.go index 53f04a37ee..984e3c0c13 100644 --- a/agent/agents/supervisor/ports_registry.go +++ b/agent/agents/supervisor/ports_registry.go @@ -84,7 +84,7 @@ func (r *portsRegistry) Reserve() (uint16, error) { } // Release releases port. -func (r *portsRegistry) Release(port uint16) error { +func (r *portsRegistry) Release(port uint16, ignoreBusy bool) error { r.m.Lock() defer r.m.Unlock() @@ -92,12 +92,14 @@ func (r *portsRegistry) Release(port uint16) error { return errPortNotReserved } - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if l != nil { - _ = l.Close() - } - if err != nil { - return errPortBusy + if !ignoreBusy { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if l != nil { + _ = l.Close() + } + if err != nil { + return errPortBusy + } } delete(r.reserved, port) diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index d426980942..e86afccab1 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -79,6 +79,8 @@ type agentProcessInfo struct { listenPort uint16 processExecPath string logStore *tailog.Store // store logs + requireNewParam <-chan bool + newParams chan<- *process.Params } // builtinAgentInfo describes built-in Agent. @@ -291,7 +293,7 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta agent.cancel() <-agent.done - if err := s.portsRegistry.Release(agent.listenPort); err != nil { + if err := s.portsRegistry.Release(agent.listenPort, false); err != nil { s.l.Errorf("Failed to release port: %s.", err) } @@ -314,6 +316,8 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta s.l.Errorf("Failed to start Agent: %s.", err) // TODO report that error to server } + + s.startNewParamsLister(agentID, agentProcesses[agentID]) } // start new agents @@ -329,6 +333,8 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta s.l.Errorf("Failed to start Agent: %s.", err) // TODO report that error to server } + + s.startNewParamsLister(agentID, agentProcesses[agentID]) } } @@ -449,7 +455,8 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState }) l.Debugf("Starting: %s.", processParams) - process := process.New(processParams, agentProcess.RedactWords, l) + newParamsChan := make(chan *process.Params) + process := process.New(processParams, agentProcess.RedactWords, l, newParamsChan) go pprof.Do(ctx, pprof.Labels("agentID", agentID, "type", agentType), process.Run) version, err := s.version(agentProcess.Type, processParams.Path) @@ -481,10 +488,42 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState listenPort: port, processExecPath: processParams.Path, logStore: logStore, + requireNewParam: process.RequireNewParam(), + newParams: newParamsChan, } return nil } +func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess) { + go func() { + for value := range s.agentProcesses[agentID].requireNewParam { + if !value { + continue + } + + // need rw lock for to lock the agentProcess map + s.rw.Lock() + defer s.rw.Unlock() + + s.portsRegistry.Release(s.agentProcesses[agentID].listenPort, true) + port, err := s.portsRegistry.Reserve() + if err != nil { + s.l.Errorf("Failed to reserve port during retry: %s.", err) + // TODO report that error to server + continue + } + params, err := s.processParams(agentID, agentProcess, port) + if err != nil { + s.l.Errorf("Failed to process params during retry: %s.", err) + // TODO report that error to server + continue + } + + s.agentProcesses[agentID].newParams <- params + } + }() +} + // startBuiltin starts built-in Agent. // Must be called with s.rw held for writing. func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentpb.SetStateRequest_BuiltinAgent) error { From 1fd300e32c709ff9b53791946caaec1799a818c3 Mon Sep 17 00:00:00 2001 From: GuanqunYang193 Date: Tue, 17 Oct 2023 00:25:41 -0400 Subject: [PATCH 2/5] add test --- agent/agents/process/process.go | 25 +++--- agent/agents/process/process_test.go | 45 ++++++++-- agent/agents/supervisor/ports_registry.go | 16 ++-- .../agents/supervisor/ports_registry_test.go | 34 +++++-- agent/agents/supervisor/supervisor.go | 10 ++- agent/agents/supervisor/supervisor_test.go | 88 +++++++++++++++++++ 6 files changed, 187 insertions(+), 31 deletions(-) diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index 102759d823..f8fee13696 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -181,21 +181,20 @@ func (p *Process) toWaiting() { p.l.Infof("Process: waiting %s.", delay) p.changes <- inventorypb.AgentStatus_WAITING - if p.retryTime >= maxRetry { - p.requireNewParam <- true - p.retryTime = 0 - } + p.achiveMaxRetry() t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: - select { - case params, ok := <-p.newParams: - if ok { - p.params = params + if p.newParams!=nil { + select { + case params, ok := <-p.newParams: + if ok { + p.params = params + } + default: } - default: } // recreate config file in temp dir. @@ -255,8 +254,12 @@ func (p *Process) RequireNewParam() chan bool { return p.requireNewParam } -func (p *Process) NewParams() chan *Params { - return p.newParams +func (p *Process) achiveMaxRetry() { + p.retryTime++ + if p.retryTime >= maxRetry { + p.requireNewParam <- true + p.retryTime = 0 + } } // Logs returns latest process logs. diff --git a/agent/agents/process/process_test.go b/agent/agents/process/process_test.go index d40d555e24..ddb64ea0a1 100644 --- a/agent/agents/process/process_test.go +++ b/agent/agents/process/process_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sys/unix" + "github.com/percona/pmm/agent/utils/backoff" "github.com/percona/pmm/api/inventorypb" ) @@ -71,7 +72,7 @@ func setup(t *testing.T) (context.Context, context.CancelFunc, *logrus.Entry) { func TestProcess(t *testing.T) { t.Run("Normal", func(t *testing.T) { ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) @@ -81,7 +82,7 @@ func TestProcess(t *testing.T) { t.Run("FailedToStart", func(t *testing.T) { ctx, cancel, l := setup(t) - p := New(&Params{Path: "no_such_command"}, nil, l) + p := New(&Params{Path: "no_such_command"}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) @@ -92,7 +93,7 @@ func TestProcess(t *testing.T) { t.Run("ExitedEarly", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) @@ -103,7 +104,7 @@ func TestProcess(t *testing.T) { t.Run("CancelStarting", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING) @@ -114,7 +115,7 @@ func TestProcess(t *testing.T) { t.Run("Exited", func(t *testing.T) { sleep := strconv.FormatFloat(runningT.Seconds()+0.5, 'f', -1, 64) ctx, cancel, l := setup(t) - p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l) + p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING, inventorypb.AgentStatus_WAITING) @@ -133,7 +134,7 @@ func TestProcess(t *testing.T) { build(t, "", "process_noterm.go", f.Name()) ctx, cancel, l := setup(t) - p := New(&Params{Path: f.Name()}, nil, l) + p := New(&Params{Path: f.Name()}, nil, l, nil) go p.Run(ctx) assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) @@ -187,6 +188,38 @@ func TestProcess(t *testing.T) { err = proc.Signal(unix.Signal(0)) require.EqualError(t, err, "os: process already finished", "child process with pid %v is not killed", pid) }) + +} + +func TestNewParams(t *testing.T) { + t.Run("tryNewParams", func(t *testing.T) { + ctx, cancel, l := setup(t) + newParamsChan := make(chan *Params) + p := New(&Params{Path: "cat", Args: []string{"A_WRONG_PATH"}}, nil, l, newParamsChan) + p.backoff = backoff.New(0*time.Second, 1*time.Second) + + go p.Run(ctx) + + for i := 0; i < 5; i++{ + assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) + } + go func() { + for value := range p.requireNewParam { + if !value { + continue + } + newParamsChan <- &Params{Path: "sleep", Args: []string{"100500"}} + } + }() + timer := time.NewTimer(time.Second * 6) + defer timer.Stop() + select { + case <-timer.C: + assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING) + cancel() + assertStates(t, p, inventorypb.AgentStatus_STOPPING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID) + } + }) } func TestExtractLogLevel(t *testing.T) { diff --git a/agent/agents/supervisor/ports_registry.go b/agent/agents/supervisor/ports_registry.go index 984e3c0c13..d0af0d77f0 100644 --- a/agent/agents/supervisor/ports_registry.go +++ b/agent/agents/supervisor/ports_registry.go @@ -33,6 +33,7 @@ type portsRegistry struct { max uint16 last uint16 reserved map[uint16]struct{} + ignoreCheck bool } func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry { @@ -45,6 +46,7 @@ func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry { max: max, last: min - 1, reserved: make(map[uint16]struct{}, len(reserved)), + ignoreCheck: false, } for _, p := range reserved { r.reserved[p] = struct{}{} @@ -67,12 +69,14 @@ func (r *portsRegistry) Reserve() (uint16, error) { continue } - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if l != nil { - _ = l.Close() - } - if err != nil { - continue + if !r.ignoreCheck { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if l != nil { + _ = l.Close() + } + if err != nil { + continue + } } r.reserved[port] = struct{}{} diff --git a/agent/agents/supervisor/ports_registry_test.go b/agent/agents/supervisor/ports_registry_test.go index 4528a35793..3693472976 100644 --- a/agent/agents/supervisor/ports_registry_test.go +++ b/agent/agents/supervisor/ports_registry_test.go @@ -39,11 +39,11 @@ func TestRegistry(t *testing.T) { require.NoError(t, err) defer l2.Close() //nolint:errcheck,gosec - err = r.Release(65000) + err = r.Release(65000, false) assert.NoError(t, err) - err = r.Release(65001) + err = r.Release(65001, false) assert.Equal(t, errPortNotReserved, err) - err = r.Release(65002) + err = r.Release(65002, false) assert.Equal(t, errPortBusy, err) l1.Close() @@ -58,7 +58,7 @@ func TestRegistry(t *testing.T) { _, err = r.Reserve() assert.Equal(t, errNoFreePort, err) - err = r.Release(65002) + err = r.Release(65002, false) assert.NoError(t, err) p, err = r.Reserve() @@ -75,7 +75,7 @@ func TestPreferNewPort(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 65000, p) - err = r.Release(p) + err = r.Release(p, false) assert.NoError(t, err) p, err = r.Reserve() @@ -101,10 +101,32 @@ func TestSinglePort(t *testing.T) { _, err = r.Reserve() assert.Equal(t, errNoFreePort, err) - err = r.Release(p) + err = r.Release(p, false) assert.NoError(t, err) p, err = r.Reserve() assert.NoError(t, err) assert.EqualValues(t, 65000, p) } + +func TestRegistryRaceCondition(t *testing.T) { + + r := newPortsRegistry(65000, 65002, nil) + + p, err := r.Reserve() + assert.NoError(t, err) + assert.EqualValues(t, 65000, p) + + l1, err := net.Listen("tcp", "127.0.0.1:65000") + require.NoError(t, err) + defer l1.Close() + + err = r.Release(65000, false) + assert.EqualValues(t, errPortBusy, err) + + err = r.Release(65000, true) + assert.NoError(t, err) + + assert.NotContains(t, r.reserved, 65000) + +} diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index e86afccab1..ef72455ca1 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -23,6 +23,7 @@ import ( "path/filepath" "runtime/pprof" "sort" + "strconv" "strings" "sync" @@ -433,6 +434,7 @@ func filter(existing, ap map[string]agentpb.AgentParams) ([]string, []string, [] //nolint:golint,stylecheck const ( + type_TEST_NC inventorypb.AgentType = 997 type_TEST_SLEEP inventorypb.AgentType = 998 // process type_TEST_NOOP inventorypb.AgentType = 999 // built-in ) @@ -518,8 +520,9 @@ func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb. // TODO report that error to server continue } - - s.agentProcesses[agentID].newParams <- params + if s.agentProcesses[agentID].newParams != nil { + s.agentProcesses[agentID].newParams <- params + } } }() } @@ -698,6 +701,9 @@ func (s *Supervisor) processParams(agentID string, agentProcess *agentpb.SetStat processParams.Path = cfg.Paths.AzureExporter case type_TEST_SLEEP: processParams.Path = "sleep" + case type_TEST_NC: + processParams.Path = "nc" + agentProcess.Args = append(agentProcess.Args, strconv.Itoa(int(port))) case inventorypb.AgentType_VM_AGENT: // add template params for vmagent. templateParams["server_insecure"] = cfg.Server.InsecureTLS diff --git a/agent/agents/supervisor/supervisor_test.go b/agent/agents/supervisor/supervisor_test.go index 5811e34504..3a87646ec4 100644 --- a/agent/agents/supervisor/supervisor_test.go +++ b/agent/agents/supervisor/supervisor_test.go @@ -20,6 +20,7 @@ import ( "path/filepath" "sort" "testing" + "net" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -259,6 +260,93 @@ func TestSupervisor(t *testing.T) { }) } +func TestSupervisorRetryPorts(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tempDir := t.TempDir() + cfgStorage := config.NewStorage(&config.Config{ + Paths: config.Paths{TempDir: tempDir}, + Ports: config.Ports{Min: 65011, Max: 65099}, + Server: config.Server{Address: "localhost:443"}, + LogLinesCount: 1, + }) + s := NewSupervisor(ctx, nil, cfgStorage) + s.portsRegistry = &portsRegistry{ + min: 65011, + max: 65099, + last: 65011 - 1, + reserved: make(map[uint16]struct{}, 0), + ignoreCheck: true, + } + + go s.Run(ctx) + + t.Run("StartWithConflictPort", func(t *testing.T) { + expectedList := []*agentlocalpb.AgentInfo{} + + // var attr = os.ProcAttr{ + // Dir: ".", + // Env: os.Environ(), + // Files: []*os.File{ + // nil, + // nil, + // nil, + // }, + // } + // process, err := os.StartProcess("/usr/bin/nc", []string{"-lk", "65011"}, &attr) + // require.NoError(t, err) + // defer process.Kill() + + // err = process.Release() + // require.NoError(t, err) + + conn, err := net.Dial("tcp", "localhost:3333") + require.NoError(t, err) + defer conn.Close() + + // l, err := net.Listen("tcp", "127.0.0.1:65011") + // require.NoError(t, err) + // defer l.Close() + + s.SetState(&agentpb.SetStateRequest{ + AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{ + "nc1": {Type: type_TEST_NC, Args: []string{"-lk"}}, + }, + }) + + for i := 0; i < 5; i++ { + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}) + expectedList = []*agentlocalpb.AgentInfo{ + {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}, + } + assert.Equal(t, expectedList, s.AgentsList()) + + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}) + expectedList = []*agentlocalpb.AgentInfo{ + {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}, + } + assert.Equal(t, expectedList, s.AgentsList()) + } + + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}) + expectedList = []*agentlocalpb.AgentInfo{ + {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}, + } + assert.Equal(t, expectedList, s.AgentsList()) + + assertChanges(t, s, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}) + expectedList = []*agentlocalpb.AgentInfo{ + {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}, + } + assert.Equal(t, expectedList, s.AgentsList()) + }) + +} + func TestFilter(t *testing.T) { t.Parallel() From 07523a4a56bdcc660b43896679cf2460ddfd8ec8 Mon Sep 17 00:00:00 2001 From: GuanqunYang193 Date: Wed, 18 Oct 2023 23:55:30 -0400 Subject: [PATCH 3/5] add test and lock --- agent/agents/process/process.go | 12 ++++ agent/agents/supervisor/supervisor.go | 24 +++++-- agent/agents/supervisor/supervisor_test.go | 74 ++++++++-------------- 3 files changed, 56 insertions(+), 54 deletions(-) diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index f8fee13696..d35e68b3e2 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -20,6 +20,7 @@ import ( "fmt" "os/exec" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -64,6 +65,8 @@ type Process struct { requireNewParam chan bool newParams <-chan *Params + rw sync.RWMutex + // recreated on each restart cmd *exec.Cmd cmdDone chan struct{} @@ -78,6 +81,7 @@ type Params struct { Type inventorypb.AgentType TemplateRenderer *templates.TemplateRenderer TemplateParams map[string]interface{} + Port uint16 } func (p *Params) String() string { @@ -191,6 +195,8 @@ func (p *Process) toWaiting() { select { case params, ok := <-p.newParams: if ok { + p.rw.Lock() + defer p.rw.Unlock() p.params = params } default: @@ -267,6 +273,12 @@ func (p *Process) Logs() []string { return p.pl.Latest() } +func (p *Process) GetPort() uint16 { + p.rw.RLock() + defer p.rw.RUnlock() + return p.params.Port +} + // check interfaces. var ( _ fmt.Stringer = (*Params)(nil) diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index ef72455ca1..cc2d2e8383 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -470,11 +470,11 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState go func() { for status := range process.Changes() { s.storeLastStatus(agentID, status) - l.Infof("Sending status: %s (port %d).", status, port) + l.Infof("Sending status: %s (port %d).", status, process.GetPort()) s.changes <- &agentpb.StateChangedRequest{ AgentId: agentID, Status: status, - ListenPort: uint32(port), + ListenPort: uint32(process.GetPort()), ProcessExecPath: processParams.Path, Version: version, } @@ -498,7 +498,10 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess) { go func() { - for value := range s.agentProcesses[agentID].requireNewParam { + s.rw.RLock() + process := s.agentProcesses[agentID] + s.rw.RUnlock() + for value := range process.requireNewParam { if !value { continue } @@ -507,7 +510,7 @@ func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb. s.rw.Lock() defer s.rw.Unlock() - s.portsRegistry.Release(s.agentProcesses[agentID].listenPort, true) + s.portsRegistry.Release(process.listenPort, true) port, err := s.portsRegistry.Reserve() if err != nil { s.l.Errorf("Failed to reserve port during retry: %s.", err) @@ -520,8 +523,9 @@ func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb. // TODO report that error to server continue } - if s.agentProcesses[agentID].newParams != nil { - s.agentProcesses[agentID].newParams <- params + if process.newParams != nil { + process.listenPort = port + process.newParams <- params } } }() @@ -703,7 +707,11 @@ func (s *Supervisor) processParams(agentID string, agentProcess *agentpb.SetStat processParams.Path = "sleep" case type_TEST_NC: processParams.Path = "nc" - agentProcess.Args = append(agentProcess.Args, strconv.Itoa(int(port))) + if agentProcess.Args[len(agentProcess.Args)-1] == "localhost" { + agentProcess.Args = append(agentProcess.Args, strconv.Itoa(int(port))) + } + agentProcess.Args[len(agentProcess.Args)-1] = strconv.Itoa(int(port)) + case inventorypb.AgentType_VM_AGENT: // add template params for vmagent. templateParams["server_insecure"] = cfg.Server.InsecureTLS @@ -756,6 +764,8 @@ func (s *Supervisor) processParams(agentID string, agentProcess *agentpb.SetStat processParams.Env[i] = string(b) } + processParams.Port = port + return &processParams, nil } diff --git a/agent/agents/supervisor/supervisor_test.go b/agent/agents/supervisor/supervisor_test.go index 3a87646ec4..eb26db87bc 100644 --- a/agent/agents/supervisor/supervisor_test.go +++ b/agent/agents/supervisor/supervisor_test.go @@ -16,14 +16,16 @@ package supervisor import ( "context" + "net" "os" "path/filepath" "sort" + "syscall" "testing" - "net" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" "github.com/percona/pmm/agent/agents/process" "github.com/percona/pmm/agent/config" @@ -283,66 +285,44 @@ func TestSupervisorRetryPorts(t *testing.T) { t.Run("StartWithConflictPort", func(t *testing.T) { expectedList := []*agentlocalpb.AgentInfo{} - - // var attr = os.ProcAttr{ - // Dir: ".", - // Env: os.Environ(), - // Files: []*os.File{ - // nil, - // nil, - // nil, - // }, - // } - // process, err := os.StartProcess("/usr/bin/nc", []string{"-lk", "65011"}, &attr) - // require.NoError(t, err) - // defer process.Kill() - - // err = process.Release() - // require.NoError(t, err) - - conn, err := net.Dial("tcp", "localhost:3333") + var lc = net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var opErr error + if err := c.Control(func(fd uintptr) { + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 0) + }); err != nil { + return err + } + return opErr + }, + } + l, err := lc.Listen(context.Background(),"tcp", "127.0.0.1:65011") require.NoError(t, err) - defer conn.Close() - - // l, err := net.Listen("tcp", "127.0.0.1:65011") - // require.NoError(t, err) - // defer l.Close() + defer l.Close() s.SetState(&agentpb.SetStateRequest{ AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{ - "nc1": {Type: type_TEST_NC, Args: []string{"-lk"}}, + "nc1": {Type: type_TEST_NC, Args: []string{"-lk4", "localhost"}}, }, }) for i := 0; i < 5; i++ { assertChanges(t, s, - &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}) - expectedList = []*agentlocalpb.AgentInfo{ - {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}, - } - assert.Equal(t, expectedList, s.AgentsList()) - - assertChanges(t, s, - &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}) - expectedList = []*agentlocalpb.AgentInfo{ - {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "sleep 3 ; nc"}, - } - assert.Equal(t, expectedList, s.AgentsList()) + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65011, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_WAITING, ListenPort: 65011, ProcessExecPath: "nc"}) } assertChanges(t, s, - &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}) - expectedList = []*agentlocalpb.AgentInfo{ - {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}, - } - assert.Equal(t, expectedList, s.AgentsList()) + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65012, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "nc"}) + + cancel() assertChanges(t, s, - &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}) - expectedList = []*agentlocalpb.AgentInfo{ - {AgentType: type_TEST_NC, AgentId: "nc1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65012, ProcessExecPath: "sleep 3 ; nc"}, - } - assert.Equal(t, expectedList, s.AgentsList()) + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_STOPPING, ListenPort: 65012, ProcessExecPath: "nc"}, + &agentpb.StateChangedRequest{AgentId: "nc1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65012, ProcessExecPath: "nc"}) + expectedList = []*agentlocalpb.AgentInfo{} + require.Equal(t, expectedList, s.AgentsList()) }) } From a8177a9d5fdd2f8864c6f5fc905ee59922a2bf7f Mon Sep 17 00:00:00 2001 From: GuanqunYang193 Date: Thu, 19 Oct 2023 00:19:03 -0400 Subject: [PATCH 4/5] add some comments --- agent/agents/process/process.go | 14 ++++----- agent/agents/process/process_test.go | 3 +- agent/agents/supervisor/ports_registry.go | 4 +-- agent/agents/supervisor/supervisor.go | 36 ++++++++++++---------- agent/agents/supervisor/supervisor_test.go | 2 ++ 5 files changed, 32 insertions(+), 27 deletions(-) diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index d35e68b3e2..4054c96849 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -62,8 +62,8 @@ type Process struct { backoff *backoff.Backoff ctxDone chan struct{} retryTime int8 - requireNewParam chan bool - newParams <-chan *Params + requireNewParams chan bool // send true if need new params + newParams <-chan *Params // for getting updated params rw sync.RWMutex @@ -103,7 +103,7 @@ func New(params *Params, redactWords []string, l *logrus.Entry, newParams <-chan backoff: backoff.New(backoffMinDelay, backoffMaxDelay), ctxDone: make(chan struct{}), retryTime: 0, - requireNewParam: make(chan bool), + requireNewParams: make(chan bool), newParams : newParams, } } @@ -247,7 +247,7 @@ func (p *Process) toDone() { p.l.Trace("Process: done.") p.changes <- inventorypb.AgentStatus_DONE - close(p.requireNewParam) + close(p.requireNewParams) close(p.changes) } @@ -256,14 +256,14 @@ func (p *Process) Changes() <-chan inventorypb.AgentStatus { return p.changes } -func (p *Process) RequireNewParam() chan bool { - return p.requireNewParam +func (p *Process) RequireNewParams() chan bool { + return p.requireNewParams } func (p *Process) achiveMaxRetry() { p.retryTime++ if p.retryTime >= maxRetry { - p.requireNewParam <- true + p.requireNewParams <- true p.retryTime = 0 } } diff --git a/agent/agents/process/process_test.go b/agent/agents/process/process_test.go index ddb64ea0a1..ddfbbf5fef 100644 --- a/agent/agents/process/process_test.go +++ b/agent/agents/process/process_test.go @@ -200,11 +200,12 @@ func TestNewParams(t *testing.T) { go p.Run(ctx) + // the default maxRetry is 5 for i := 0; i < 5; i++{ assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING) } go func() { - for value := range p.requireNewParam { + for value := range p.requireNewParams { if !value { continue } diff --git a/agent/agents/supervisor/ports_registry.go b/agent/agents/supervisor/ports_registry.go index d0af0d77f0..f45059e161 100644 --- a/agent/agents/supervisor/ports_registry.go +++ b/agent/agents/supervisor/ports_registry.go @@ -33,7 +33,7 @@ type portsRegistry struct { max uint16 last uint16 reserved map[uint16]struct{} - ignoreCheck bool + ignoreCheck bool // used for testing } func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry { @@ -87,7 +87,7 @@ func (r *portsRegistry) Reserve() (uint16, error) { return 0, errNoFreePort } -// Release releases port. +// Release releases port. func (r *portsRegistry) Release(port uint16, ignoreBusy bool) error { r.m.Lock() defer r.m.Unlock() diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index cc2d2e8383..ff014d2e78 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -74,14 +74,14 @@ type Supervisor struct { // agentProcessInfo describes Agent process. type agentProcessInfo struct { - cancel func() // to cancel Process.Run(ctx) - done <-chan struct{} // closes when Process.Changes() channel closes - requestedState *agentpb.SetStateRequest_AgentProcess - listenPort uint16 - processExecPath string - logStore *tailog.Store // store logs - requireNewParam <-chan bool - newParams chan<- *process.Params + cancel func() // to cancel Process.Run(ctx) + done <-chan struct{} // closes when Process.Changes() channel closes + requestedState *agentpb.SetStateRequest_AgentProcess + listenPort uint16 + processExecPath string + logStore *tailog.Store // store logs + requireNewParams <-chan bool + newParams chan<- *process.Params } // builtinAgentInfo describes built-in Agent. @@ -484,24 +484,26 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState //nolint:forcetypeassert s.agentProcesses[agentID] = &agentProcessInfo{ - cancel: cancel, - done: done, - requestedState: proto.Clone(agentProcess).(*agentpb.SetStateRequest_AgentProcess), - listenPort: port, - processExecPath: processParams.Path, - logStore: logStore, - requireNewParam: process.RequireNewParam(), - newParams: newParamsChan, + cancel: cancel, + done: done, + requestedState: proto.Clone(agentProcess).(*agentpb.SetStateRequest_AgentProcess), + listenPort: port, + processExecPath: processParams.Path, + logStore: logStore, + requireNewParams: process.RequireNewParams(), + newParams: newParamsChan, } return nil } +// A lister for updating the params of the process. +// The listener will release the previous port, reserve a new one and send new params to the process func (s *Supervisor) startNewParamsLister(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess) { go func() { s.rw.RLock() process := s.agentProcesses[agentID] s.rw.RUnlock() - for value := range process.requireNewParam { + for value := range process.requireNewParams { if !value { continue } diff --git a/agent/agents/supervisor/supervisor_test.go b/agent/agents/supervisor/supervisor_test.go index eb26db87bc..d3d30b76c7 100644 --- a/agent/agents/supervisor/supervisor_test.go +++ b/agent/agents/supervisor/supervisor_test.go @@ -262,6 +262,8 @@ func TestSupervisor(t *testing.T) { }) } +// Simulate the situation where port assigned to a process is occupied. +// The supervisor is supposed to give the process a new port. func TestSupervisorRetryPorts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From b04a029d2a5cb1114c8020b991e7ab8959e84c6c Mon Sep 17 00:00:00 2001 From: GuanqunYang193 Date: Thu, 26 Oct 2023 22:27:56 -0400 Subject: [PATCH 5/5] nit --- agent/agents/process/process.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/agents/process/process.go b/agent/agents/process/process.go index 4054c96849..640d0279a0 100644 --- a/agent/agents/process/process.go +++ b/agent/agents/process/process.go @@ -185,7 +185,7 @@ func (p *Process) toWaiting() { p.l.Infof("Process: waiting %s.", delay) p.changes <- inventorypb.AgentStatus_WAITING - p.achiveMaxRetry() + p.checkMaxRetry() t := time.NewTimer(delay) defer t.Stop() @@ -260,7 +260,7 @@ func (p *Process) RequireNewParams() chan bool { return p.requireNewParams } -func (p *Process) achiveMaxRetry() { +func (p *Process) checkMaxRetry() { p.retryTime++ if p.retryTime >= maxRetry { p.requireNewParams <- true