Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve port conflict when starting multiple PMM agents #2573

Closed
wants to merge 12 commits into from
74 changes: 59 additions & 15 deletions agent/agents/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os/exec"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -38,6 +39,7 @@ const (
backoffMinDelay = 1 * time.Second
backoffMaxDelay = 30 * time.Second

maxRetry = 5
keepLogLines = 100
)

Expand All @@ -53,13 +55,18 @@ const (
// implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used
// only to avoid stack overflow; there are no extra goroutines for states.
type Process struct {
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}

params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
retryTime int8
requireNewParams chan bool // send true if need new params
newParams <-chan *Params // for getting updated params

rw sync.RWMutex

// recreated on each restart
cmd *exec.Cmd
cmdDone chan struct{}
Expand All @@ -74,6 +81,7 @@ type Params struct {
Type inventorypb.AgentType
TemplateRenderer *templates.TemplateRenderer
TemplateParams map[string]interface{}
Port uint16
}

func (p *Params) String() string {
Expand All @@ -86,14 +94,17 @@ func (p *Params) String() string {
}

// New creates new process.
func New(params *Params, redactWords []string, l *logrus.Entry) *Process {
func New(params *Params, redactWords []string, l *logrus.Entry, newParams <-chan *Params) *Process {
return &Process{
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
retryTime: 0,
requireNewParams: make(chan bool),
newParams : newParams,
}
}

Expand Down Expand Up @@ -174,10 +185,24 @@ func (p *Process) toWaiting() {
p.l.Infof("Process: waiting %s.", delay)
p.changes <- inventorypb.AgentStatus_WAITING

p.checkMaxRetry()

t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
if p.newParams!=nil {
select {
case params, ok := <-p.newParams:
if ok {
p.rw.Lock()
defer p.rw.Unlock()
p.params = params
}
default:
}
}

// recreate config file in temp dir.
if p.params.TemplateRenderer != nil {
_, err := p.params.TemplateRenderer.RenderFiles(p.params.TemplateParams)
Expand Down Expand Up @@ -221,7 +246,8 @@ func (p *Process) toStopping() {
func (p *Process) toDone() {
p.l.Trace("Process: done.")
p.changes <- inventorypb.AgentStatus_DONE


close(p.requireNewParams)
close(p.changes)
}

Expand All @@ -230,11 +256,29 @@ func (p *Process) Changes() <-chan inventorypb.AgentStatus {
return p.changes
}

func (p *Process) RequireNewParams() chan bool {
return p.requireNewParams
}

func (p *Process) checkMaxRetry() {
p.retryTime++
if p.retryTime >= maxRetry {
p.requireNewParams <- true
p.retryTime = 0
}
}

// Logs returns latest process logs.
func (p *Process) Logs() []string {
return p.pl.Latest()
}

func (p *Process) GetPort() uint16 {
p.rw.RLock()
defer p.rw.RUnlock()
return p.params.Port
}

// check interfaces.
var (
_ fmt.Stringer = (*Params)(nil)
Expand Down
46 changes: 40 additions & 6 deletions agent/agents/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"

"github.com/percona/pmm/agent/utils/backoff"
"github.com/percona/pmm/api/inventorypb"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func setup(t *testing.T) (context.Context, context.CancelFunc, *logrus.Entry) {
func TestProcess(t *testing.T) {
t.Run("Normal", func(t *testing.T) {
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l)
p := New(&Params{Path: "sleep", Args: []string{"100500"}}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING)
Expand All @@ -81,7 +82,7 @@ func TestProcess(t *testing.T) {

t.Run("FailedToStart", func(t *testing.T) {
ctx, cancel, l := setup(t)
p := New(&Params{Path: "no_such_command"}, nil, l)
p := New(&Params{Path: "no_such_command"}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
Expand All @@ -92,7 +93,7 @@ func TestProcess(t *testing.T) {
t.Run("ExitedEarly", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
Expand All @@ -103,7 +104,7 @@ func TestProcess(t *testing.T) {
t.Run("CancelStarting", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING)
Expand All @@ -114,7 +115,7 @@ func TestProcess(t *testing.T) {
t.Run("Exited", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()+0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING, inventorypb.AgentStatus_WAITING)
Expand All @@ -133,7 +134,7 @@ func TestProcess(t *testing.T) {
build(t, "", "process_noterm.go", f.Name())

ctx, cancel, l := setup(t)
p := New(&Params{Path: f.Name()}, nil, l)
p := New(&Params{Path: f.Name()}, nil, l, nil)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING)
Expand Down Expand Up @@ -187,6 +188,39 @@ func TestProcess(t *testing.T) {
err = proc.Signal(unix.Signal(0))
require.EqualError(t, err, "os: process already finished", "child process with pid %v is not killed", pid)
})

}

func TestNewParams(t *testing.T) {
t.Run("tryNewParams", func(t *testing.T) {
ctx, cancel, l := setup(t)
newParamsChan := make(chan *Params)
p := New(&Params{Path: "cat", Args: []string{"A_WRONG_PATH"}}, nil, l, newParamsChan)
p.backoff = backoff.New(0*time.Second, 1*time.Second)

go p.Run(ctx)

// the default maxRetry is 5
for i := 0; i < 5; i++{
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
}
go func() {
for value := range p.requireNewParams {
if !value {
continue
}
newParamsChan <- &Params{Path: "sleep", Args: []string{"100500"}}
}
}()
timer := time.NewTimer(time.Second * 6)
defer timer.Stop()
select {
case <-timer.C:
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_RUNNING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_STOPPING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
}
})
}

func TestExtractLogLevel(t *testing.T) {
Expand Down
34 changes: 20 additions & 14 deletions agent/agents/supervisor/ports_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type portsRegistry struct {
max uint16
last uint16
reserved map[uint16]struct{}
ignoreCheck bool // used for testing
}

func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry {
Expand All @@ -45,6 +46,7 @@ func newPortsRegistry(min, max uint16, reserved []uint16) *portsRegistry {
max: max,
last: min - 1,
reserved: make(map[uint16]struct{}, len(reserved)),
ignoreCheck: false,
}
for _, p := range reserved {
r.reserved[p] = struct{}{}
Expand All @@ -67,12 +69,14 @@ func (r *portsRegistry) Reserve() (uint16, error) {
continue
}

l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if l != nil {
_ = l.Close()
}
if err != nil {
continue
if !r.ignoreCheck {
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if l != nil {
_ = l.Close()
}
if err != nil {
continue
}
}

r.reserved[port] = struct{}{}
Expand All @@ -83,21 +87,23 @@ func (r *portsRegistry) Reserve() (uint16, error) {
return 0, errNoFreePort
}

// Release releases port.
func (r *portsRegistry) Release(port uint16) error {
// Release releases port.
func (r *portsRegistry) Release(port uint16, ignoreBusy bool) error {
r.m.Lock()
defer r.m.Unlock()

if _, ok := r.reserved[port]; !ok {
return errPortNotReserved
}

l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if l != nil {
_ = l.Close()
}
if err != nil {
return errPortBusy
if !ignoreBusy {
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if l != nil {
_ = l.Close()
}
if err != nil {
return errPortBusy
}
}

delete(r.reserved, port)
Expand Down
34 changes: 28 additions & 6 deletions agent/agents/supervisor/ports_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func TestRegistry(t *testing.T) {
require.NoError(t, err)
defer l2.Close() //nolint:errcheck,gosec

err = r.Release(65000)
err = r.Release(65000, false)
assert.NoError(t, err)
err = r.Release(65001)
err = r.Release(65001, false)
assert.Equal(t, errPortNotReserved, err)
err = r.Release(65002)
err = r.Release(65002, false)
assert.Equal(t, errPortBusy, err)

l1.Close() //nolint:errcheck
Expand All @@ -58,7 +58,7 @@ func TestRegistry(t *testing.T) {
_, err = r.Reserve()
assert.Equal(t, errNoFreePort, err)

err = r.Release(65002)
err = r.Release(65002, false)
assert.NoError(t, err)

p, err = r.Reserve()
Expand All @@ -75,7 +75,7 @@ func TestPreferNewPort(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, 65000, p)

err = r.Release(p)
err = r.Release(p, false)
assert.NoError(t, err)

p, err = r.Reserve()
Expand All @@ -101,10 +101,32 @@ func TestSinglePort(t *testing.T) {
_, err = r.Reserve()
assert.Equal(t, errNoFreePort, err)

err = r.Release(p)
err = r.Release(p, false)
assert.NoError(t, err)

p, err = r.Reserve()
assert.NoError(t, err)
assert.EqualValues(t, 65000, p)
}

func TestRegistryRaceCondition(t *testing.T) {

r := newPortsRegistry(65000, 65002, nil)

p, err := r.Reserve()
assert.NoError(t, err)
assert.EqualValues(t, 65000, p)

l1, err := net.Listen("tcp", "127.0.0.1:65000")
require.NoError(t, err)
defer l1.Close()

err = r.Release(65000, false)
assert.EqualValues(t, errPortBusy, err)

err = r.Release(65000, true)
assert.NoError(t, err)

assert.NotContains(t, r.reserved, 65000)

}
Loading