Skip to content

Commit

Permalink
Add back PID tracking.
Browse files Browse the repository at this point in the history
  • Loading branch information
blakerouse committed Oct 18, 2023
1 parent 9acd53a commit d5a02e1
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 176 deletions.
2 changes: 2 additions & 0 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ message StateAgentInfo {
string buildTime = 4;
// Current running version is a snapshot.
bool snapshot = 5;
// Current running PID.
int32 pid = 6;
}

// StateResponse is the current state of Elastic Agent.
Expand Down
37 changes: 31 additions & 6 deletions internal/pkg/agent/application/upgrade/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type AgentWatcher struct {
connectCounter int
lostCounter int
lastPid int32

notifyChan chan error
log *logger.Logger
Expand All @@ -52,6 +53,7 @@ type AgentWatcher struct {
func NewAgentWatcher(ch chan error, log *logger.Logger, checkInterval time.Duration) *AgentWatcher {
c := client.New()
ec := &AgentWatcher{
lastPid: -1,
notifyChan: ch,
agentClient: c,
log: log,
Expand All @@ -77,21 +79,21 @@ func (ch *AgentWatcher) Run(ctx context.Context) {
failedTimer.Stop() // starts stopped
defer failedTimer.Stop() // stopped on exit always

var failedCount int
var flipFlopCount int
var failedErr error
for {
select {
case <-ctx.Done():
return
case reset := <-failedReset:
if reset {
failedCount = 0
flipFlopCount = 0
failedTimer.Stop()
}
case err := <-failedCh:
if err != nil {
if failedErr == nil {
failedCount++
flipFlopCount++
failedTimer.Reset(ch.checkInterval)
ch.log.Error("Agent reported failure (starting failed timer): %s", err)
} else {
Expand All @@ -104,8 +106,8 @@ func (ch *AgentWatcher) Run(ctx context.Context) {
}
}
failedErr = err
if failedCount > statusFailureFlipFlopsAllowed {
err := fmt.Errorf("%w '%d' times in a row", ErrAgentFlipFlopFailed, failedCount)
if flipFlopCount > statusFailureFlipFlopsAllowed {
err := fmt.Errorf("%w '%d' times in a row", ErrAgentFlipFlopFailed, flipFlopCount)
ch.log.Error(err)
ch.notifyChan <- err
}
Expand Down Expand Up @@ -143,9 +145,12 @@ LOOP:
// agent is probably not running
continue
}
watch, err := ch.agentClient.StateWatch(ctx)

stateCtx, stateCancel := context.WithCancel(ctx)
watch, err := ch.agentClient.StateWatch(stateCtx)
if err != nil {
// considered a connect error
stateCancel()
ch.agentClient.Disconnect()
ch.log.Error("Failed to start state watch: ", err)
ch.connectCounter++
Expand Down Expand Up @@ -174,6 +179,7 @@ LOOP:
state, err := watch.Recv()
if err != nil {
// agent has crashed or exited
stateCancel()
ch.agentClient.Disconnect()
ch.log.Error("Lost connection: failed reading next state: ", err)
ch.lostCounter++
Expand All @@ -183,6 +189,25 @@ LOOP:
continue LOOP
}

// gRPC is good at hiding the fact that connection was lost
// to ensure that we don't miss a restart a changed PID means
// we are now talking to a different spawned Elastic Agent
if ch.lastPid == -1 {
ch.lastPid = state.Info.PID
ch.log.Info("Communicating with PID %d", ch.lastPid)
} else if ch.lastPid != state.Info.PID {
ch.log.Error("Communication with PID %d lost, now communicating with PID %d", ch.lastPid, state.Info.PID)
ch.lastPid = state.Info.PID
// count the PID change as a lost connection, but allow
// the communication to continue unless has become a failure
ch.lostCounter++
if ch.checkFailures() {
stateCancel()
ch.agentClient.Disconnect()
return
}
}

if state.State == client.Failed {
// top-level failure (something is really wrong)
failedCh <- fmt.Errorf("%w: %s", ErrAgentStatusFailed, state.Message)
Expand Down
162 changes: 162 additions & 0 deletions internal/pkg/agent/application/upgrade/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,168 @@ func TestWatcher_LostConnection(t *testing.T) {
}
}

func TestWatcher_PIDChange(t *testing.T) {
// timeout ensures that if it doesn't work; it doesn't block forever
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

errCh := make(chan error)
logger, _ := logger.NewTesting("watcher")
w := NewAgentWatcher(errCh, logger, 1*time.Millisecond)

// error on watch (counts as lost connect)
mockHandler := func(srv cproto.ElasticAgentControl_StateWatchServer) error {
// starts with PID 1
err := srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 1,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// now with PID 2
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 2,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// now with PID 3
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 3,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// now with PID 4
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 4,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// keep open until end (exiting will count as a lost connection)
<-ctx.Done()
return nil
}
mock := &mockDaemon{watch: mockHandler}
require.NoError(t, mock.Start())
defer mock.Stop()

// set client to mock; before running
w.agentClient = mock.Client()
go w.Run(ctx)

select {
case <-ctx.Done():
require.NoError(t, ctx.Err())
case err := <-errCh:
assert.ErrorIs(t, err, ErrLostConnection)
}
}

func TestWatcher_PIDChangeSuccess(t *testing.T) {
// test tests for success, which only happens when no error comes in
// during this time period
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

errCh := make(chan error)
logger, _ := logger.NewTesting("watcher")
w := NewAgentWatcher(errCh, logger, 1*time.Millisecond)

// error on watch (counts as lost connect)
mockHandler := func(srv cproto.ElasticAgentControl_StateWatchServer) error {
// starts with PID 1
err := srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 1,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// now with PID 2
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 2,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// now with PID 3
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 3,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// still with PID 3
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 3,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// still with PID 3
err = srv.Send(&cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Pid: 3,
},
State: cproto.State_HEALTHY,
Message: "healthy",
})
if err != nil {
return err
}
// keep open until end (exiting will count as a lost connection)
<-ctx.Done()
return nil
}
mock := &mockDaemon{watch: mockHandler}
require.NoError(t, mock.Start())
defer mock.Stop()

// set client to mock; before running
w.agentClient = mock.Client()
go w.Run(ctx)

select {
case <-ctx.Done():
require.ErrorIs(t, ctx.Err(), context.DeadlineExceeded)
case err := <-errCh:
assert.NoError(t, err, "error should not have been reported")
}
}

func TestWatcher_AgentError(t *testing.T) {
// timeout ensures that if it doesn't work; it doesn't block forever
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/agent/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"runtime"
"strings"

"github.com/kardianos/service"

"github.com/jaypipes/ghw"
"github.com/otiai10/copy"

Expand Down Expand Up @@ -216,6 +218,15 @@ func RestartService(topPath string) error {
return nil
}

// StatusService returns the status of the service.
func StatusService(topPath string) (service.Status, error) {
svc, err := newService(topPath)
if err != nil {
return service.StatusUnknown, err
}
return svc.Status()
}

// FixPermissions fixes the permissions on the installed system.
func FixPermissions(topPath string) error {
return fixPermissions(topPath)
Expand Down
2 changes: 2 additions & 0 deletions pkg/control/v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type AgentStateInfo struct {
Commit string `json:"commit" yaml:"commit"`
BuildTime string `json:"build_time" yaml:"build_time"`
Snapshot bool `json:"snapshot" yaml:"snapshot"`
PID int32 `json:"pid" yaml:"pid"`
}

// AgentState is the current state of the Elastic Agent.
Expand Down Expand Up @@ -472,6 +473,7 @@ func toState(res *cproto.StateResponse) (*AgentState, error) {
Commit: res.Info.Commit,
BuildTime: res.Info.BuildTime,
Snapshot: res.Info.Snapshot,
PID: res.Info.Pid,
},
State: res.State,
Message: res.Message,
Expand Down
Loading

0 comments on commit d5a02e1

Please sign in to comment.