Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks to report remote config status #34907

Merged
merged 29 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
697f96e
[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks…
srikanthccv Aug 28, 2024
9259bf6
Merge branch 'main' into issue_21079
srikanthccv Aug 28, 2024
a74ef23
Update agent config validation
srikanthccv Aug 30, 2024
9f83104
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Aug 30, 2024
136b2b6
Review comments
srikanthccv Sep 6, 2024
8e85f7b
Frequent checks for subsequent asserts
srikanthccv Sep 6, 2024
3cabc23
Resolve conflicts
srikanthccv Sep 6, 2024
8208ca2
Merge branch 'main' into issue_21079
srikanthccv Sep 11, 2024
fd57ea9
resolve conflicts
srikanthccv Oct 8, 2024
88da95f
resolve conflicts again
srikanthccv Oct 8, 2024
04261f3
Fix tests
srikanthccv Oct 8, 2024
6c7f617
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 8, 2024
dedd6a0
Merge branch 'main' into issue_21079
srikanthccv Oct 8, 2024
169d25f
Merge branch 'main' into issue_21079
srikanthccv Oct 16, 2024
8d56b88
Fix tests
srikanthccv Oct 16, 2024
6752571
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 16, 2024
a149e5f
Remove unnecessary check
srikanthccv Oct 16, 2024
873c072
Add CHANGELOG entry
srikanthccv Oct 16, 2024
b9b4d20
Merge branch 'main' into issue_21079
srikanthccv Oct 17, 2024
38016ff
Merge branch 'main' into issue_21079
srikanthccv Oct 20, 2024
e317745
Use agent health from opamp extension for config status report
srikanthccv Oct 20, 2024
fde058d
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 20, 2024
165f1a2
go mod tidy
srikanthccv Oct 20, 2024
b7dbd63
Merge branch 'main' into issue_21079
srikanthccv Oct 30, 2024
2e0908d
Remove health check interval option
srikanthccv Oct 30, 2024
3a61771
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 30, 2024
4c43fed
Update config_test
srikanthccv Oct 30, 2024
89f143e
Remove removed interval refs
srikanthccv Oct 30, 2024
6e3f678
Resolve conflicts
srikanthccv Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,129 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
require.FileExists(t, filepath.Join(storageDir, "effective.yaml"))
}

func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
var agentConfig atomic.Value
var healthReport atomic.Value
var remoteConfigStatus atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
if message.Health != nil {
healthReport.Store(message.Health)
}
if message.RemoteConfigStatus != nil {
remoteConfigStatus.Store(message.RemoteConfigStatus)
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "report_status", map[string]string{
"url": server.addr,
"successful_health_checks": "2",
"config_apply_timeout": "10s",
})
require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 500*time.Millisecond, "Remote config status was not set to APPLYING")

// Wait for the required number of successful health checks
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && health.Healthy
}, 30*time.Second, 500*time.Millisecond, "Collector did not become healthy")

// Check that the status is set to APPLIED after successful health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
}, 5*time.Second, 500*time.Millisecond, "Remote config status was not set to APPLIED")

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")

// Test with bad configuration
badCfg, badHash := createBadCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: badCfg.Bytes()},
},
},
ConfigHash: badHash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 500*time.Millisecond, "Remote config status was not set to APPLYING for bad config")

// Wait for the health checks to fail
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && !health.Healthy
}, 30*time.Second, 500*time.Millisecond, "Collector did not become unhealthy with bad config")

// Check that the status is set to FAILED after failed health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED
}, 15*time.Second, 500*time.Millisecond, "Remote config status was not set to FAILED for bad config")
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
4 changes: 4 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type Agent struct {
Executable string
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
Description AgentDescription `mapstructure:"description"`
SuccessfulHealthChecks int32 `mapstructure:"successful_health_checks"`
ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"`
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
}

func (a Agent) Validate() error {
Expand Down Expand Up @@ -175,6 +177,8 @@ func DefaultSupervisor() Supervisor {
},
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
SuccessfulHealthChecks: 3,
ConfigApplyTimeout: 30 * time.Second,
},
}
}
66 changes: 45 additions & 21 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ type Supervisor struct {
remoteConfig *protobufs.AgentRemoteConfig

// A channel to indicate there is a new config to apply.
hasNewConfig chan struct{}
hasNewConfig chan struct{}
waitingForHealthCheck atomic.Bool
successfulHealthChecks atomic.Int32
requiredHealthChecks int32
configApplyTimeout time.Duration
lastConfigChangeTime atomic.Value // stores time.Time
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

// The OpAMP client to connect to the OpAMP Server.
opampClient client.OpAMPClient
Expand Down Expand Up @@ -165,6 +170,9 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}

s.requiredHealthChecks = s.config.Agent.SuccessfulHealthChecks
s.configApplyTimeout = s.config.Agent.ConfigApplyTimeout

return s, nil
}

Expand Down Expand Up @@ -1011,9 +1019,12 @@ func (s *Supervisor) healthCheck() {
err := s.healthChecker.Check(ctx)
cancel()

if errors.Is(err, s.lastHealthCheckErr) {
// No difference from last check. Nothing new to report.
return
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
if s.waitingForHealthCheck.Load() {
timeSinceChange := time.Since(s.lastConfigChangeTime.Load().(time.Time))
if timeSinceChange > s.configApplyTimeout {
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, "Config apply timeout exceeded")
s.waitingForHealthCheck.Store(false)
}
}

// Prepare OpAMP health report.
Expand All @@ -1022,15 +1033,26 @@ func (s *Supervisor) healthCheck() {
}

if err != nil {
health.Healthy = false
if s.waitingForHealthCheck.Load() {
s.successfulHealthChecks.Store(0) // Reset successful checks on error
}

if !s.agentHasStarted && s.agentStartHealthCheckAttempts < 10 {
health.LastError = "Agent is starting"
s.agentStartHealthCheckAttempts++
} else {
health.Healthy = false
health.LastError = err.Error()
s.logger.Error("Agent is not healthy", zap.Error(err))
}
} else {
if s.waitingForHealthCheck.Load() {
count := s.successfulHealthChecks.Add(1)
if count >= s.requiredHealthChecks {
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, "")
s.waitingForHealthCheck.Store(false)
}
}

s.agentHasStarted = true
health.Healthy = true
s.logger.Debug("Agent is healthy.")
Expand Down Expand Up @@ -1185,6 +1207,17 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet
return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600)
}

func (s *Supervisor) reportConfigStatus(status protobufs.RemoteConfigStatuses, errorMessage string) {
err := s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: s.remoteConfig.ConfigHash,
Status: status,
ErrorMessage: errorMessage,
})
if err != nil {
s.logger.Error("Could not report OpAMP remote config status", zap.Error(err))
}
}

func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
configChanged := false
if msg.RemoteConfig != nil {
Expand All @@ -1201,6 +1234,10 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {

// Update the agent config if any messages have touched the config
if configChanged {
s.waitingForHealthCheck.Store(true)
s.successfulHealthChecks.Store(0)
s.lastConfigChangeTime.Store(time.Now())

err := s.opampClient.UpdateEffectiveConfig(ctx)
if err != nil {
s.logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
Expand Down Expand Up @@ -1255,22 +1292,9 @@ func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
s.logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err))
err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
ErrorMessage: err.Error(),
})
if err != nil {
s.logger.Error("Could not report failed OpAMP remote config status", zap.Error(err))
}
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
} else {
err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
})
if err != nil {
s.logger.Error("Could not report applied OpAMP remote config status", zap.Error(err))
}
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, "")
}

return configChanged
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
server:
endpoint: ws://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

storage:
directory: "{{.storage_dir}}"

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
successful_health_checks: {{.successful_health_checks}}
config_apply_timeout: {{.config_apply_timeout}}
Loading