Skip to content

Commit

Permalink
improve watcher logs and TestWatcher_AgentErrorQuick logs (#5345)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ authored Aug 26, 2024
1 parent eb572da commit a9de876
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
24 changes: 15 additions & 9 deletions internal/pkg/agent/application/upgrade/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (ch *AgentWatcher) Run(ctx context.Context) {
if failedErr == nil {
flipFlopCount++
failedTimer.Reset(ch.checkInterval)
ch.log.Error("Agent reported failure (starting failed timer): %s", err)
ch.log.Errorf("Agent reported failure (starting failed timer): %s", err)
} else {
ch.log.Error("Agent reported failure (failed timer already started): %s", err)
ch.log.Errorf("Agent reported failure (failed timer already started): %s", err)
}
} else {
if failedErr != nil {
failedTimer.Stop()
ch.log.Error("Agent reported healthy (failed timer stopped): %s", err)
ch.log.Info("Agent reported healthy (failed timer stopped)")
}
}
failedErr = err
Expand All @@ -115,7 +115,8 @@ func (ch *AgentWatcher) Run(ctx context.Context) {
continue
}
// error lasted longer than the checkInterval, notify!
ch.notifyChan <- failedErr
ch.notifyChan <- fmt.Errorf("last error was not cleared before checkInterval (%s) elapsed: %w",
ch.checkInterval, failedErr)
}
}
}()
Expand All @@ -138,7 +139,7 @@ LOOP:
connectCancel()
if err != nil {
ch.connectCounter++
ch.log.Error("Failed connecting to running daemon: ", err)
ch.log.Errorf("Failed connecting to running daemon: %s", err)
if ch.checkFailures() {
return
}
Expand All @@ -152,7 +153,7 @@ LOOP:
// considered a connect error
stateCancel()
ch.agentClient.Disconnect()
ch.log.Error("Failed to start state watch: ", err)
ch.log.Errorf("Failed to start state watch: %s", err)
ch.connectCounter++
if ch.checkFailures() {
return
Expand All @@ -178,25 +179,30 @@ LOOP:
for {
state, err := watch.Recv()
if err != nil {
ch.log.Debugf("received state: error: %s",
err)

// agent has crashed or exited
stateCancel()
ch.agentClient.Disconnect()
ch.log.Error("Lost connection: failed reading next state: ", err)
ch.log.Errorf("Lost connection: failed reading next state: %s", err)
ch.lostCounter++
if ch.checkFailures() {
return
}
continue LOOP
}
ch.log.Debugf("received state: %s:%s",
state.State, state.Message)

// gRPC is good at hiding the fact that connection was lost
// to ensure that we don't miss a restart a changed PID means
// we are now talking to a different spawned Elastic Agent
if ch.lastPid == -1 {
ch.lastPid = state.Info.PID
ch.log.Info(fmt.Sprintf("Communicating with PID %d", ch.lastPid))
ch.log.Infof("Communicating with PID %d", ch.lastPid)
} else if ch.lastPid != state.Info.PID {
ch.log.Error(fmt.Sprintf("Communication with PID %d lost, now communicating with PID %d", ch.lastPid, state.Info.PID))
ch.log.Errorf("Communication with PID %d lost, now communicating with PID %d", ch.lastPid, state.Info.PID)
ch.lastPid = state.Info.PID
// count the PID change as a lost connection, but allow
// the communication to continue unless has become a failure
Expand Down
21 changes: 16 additions & 5 deletions internal/pkg/agent/application/upgrade/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,25 @@ func TestWatcher_AgentError(t *testing.T) {
}

func TestWatcher_AgentErrorQuick(t *testing.T) {
// test tests for success, which only happens when no error comes in
// during this time period
// Success only happens when no error comes in during this time period
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

errCh := make(chan error)
logger, _ := logger.NewTesting("watcher")
w := NewAgentWatcher(errCh, logger, 100*time.Millisecond)
log, obs := logger.NewTesting("watcher")
defer func() {
if t.Failed() {
rawLogs := obs.All()
for _, rawLog := range rawLogs {
msg := fmt.Sprintf("[%s] %s", rawLog.Level, rawLog.Message)
for k, v := range rawLog.ContextMap() {
msg += fmt.Sprintf("%s=%v", k, v)
}
t.Log(msg)
}
}
}()
w := NewAgentWatcher(errCh, log, 100*time.Millisecond)

// reports an error state, followed by a healthy state (should not error)
mockHandler := func(srv cproto.ElasticAgentControl_StateWatchServer) error {
Expand All @@ -302,7 +313,7 @@ func TestWatcher_AgentErrorQuick(t *testing.T) {
return nil
}
mock := &mockDaemon{watch: mockHandler}
require.NoError(t, mock.Start())
require.NoError(t, mock.Start(), "could not start mock agent daemon")
defer mock.Stop()

// set client to mock; before running
Expand Down

0 comments on commit a9de876

Please sign in to comment.