From da17d702c86c9724a345041c0a65805d290cb24d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 6 Dec 2024 18:54:07 +0100 Subject: [PATCH] [extension/opampagent] use status subscription for fine granular health reporting (#35892) --- .../opamp-extension-health-reporting.yaml | 27 ++ cmd/opampsupervisor/e2e_test.go | 4 +- extension/opampextension/factory_test.go | 2 + extension/opampextension/go.mod | 3 + extension/opampextension/opamp_agent.go | 180 +++++++- extension/opampextension/opamp_agent_test.go | 419 +++++++++++++++++- 6 files changed, 625 insertions(+), 10 deletions(-) create mode 100644 .chloggen/opamp-extension-health-reporting.yaml diff --git a/.chloggen/opamp-extension-health-reporting.yaml b/.chloggen/opamp-extension-health-reporting.yaml new file mode 100644 index 000000000000..b0a0e7f209dc --- /dev/null +++ b/.chloggen/opamp-extension-health-reporting.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use status subscription for fine granular component health reporting + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35856] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 56f0b8cae361..50003fbb2d4f 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -940,7 +940,7 @@ func TestSupervisorRestartCommand(t *testing.T) { return health.Healthy && health.LastError == "" } return false - }, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") + }, 30*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") } func TestSupervisorOpAMPConnectionSettings(t *testing.T) { @@ -1348,7 +1348,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { } // Verify the collector is not running after 250 ms by checking the healthcheck endpoint - time.Sleep(250 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) _, err := http.DefaultClient.Get("http://localhost:12345") if runtime.GOOS != "windows" { require.ErrorContains(t, err, "connection refused") diff --git a/extension/opampextension/factory_test.go b/extension/opampextension/factory_test.go index 5f763ab06f9f..eaa4d40c6d29 100644 --- a/extension/opampextension/factory_test.go +++ b/extension/opampextension/factory_test.go @@ -21,6 +21,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) require.NotNil(t, ext) + require.NoError(t, ext.Shutdown(context.Background())) } func TestFactory_Create(t *testing.T) { @@ -28,4 +29,5 @@ func TestFactory_Create(t *testing.T) { ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) require.NotNil(t, ext) + require.NoError(t, ext.Shutdown(context.Background())) } diff --git a/extension/opampextension/go.mod b/extension/opampextension/go.mod index d3f52d4cdffd..f8ca3811d77c 100644 --- a/extension/opampextension/go.mod +++ b/extension/opampextension/go.mod @@ -7,6 +7,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.17.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.115.0 github.com/shirou/gopsutil/v4 v4.24.10 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.0 @@ -67,3 +68,5 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../opampcustommessages + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status => ../../pkg/status diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 3482dc4fc071..c638e8727b05 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -33,9 +33,18 @@ import ( "gopkg.in/yaml.v3" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" ) -var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) +type statusAggregator interface { + Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) + RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) +} + +type eventSourcePair struct { + source *componentstatus.InstanceID + event *componentstatus.Event +} type opampAgent struct { cfg *Config @@ -62,12 +71,21 @@ type opampAgent struct { opampClient client.OpAMPClient customCapabilityRegistry *customCapabilityRegistry + + statusAggregator statusAggregator + statusSubscriptionWg *sync.WaitGroup + componentHealthWg *sync.WaitGroup + startTimeUnixNano uint64 + componentStatusCh chan *eventSourcePair + readyCh chan struct{} } var ( _ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil) _ extensioncapabilities.Dependent = (*opampAgent)(nil) _ extensioncapabilities.ConfigWatcher = (*opampAgent)(nil) + _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) + _ componentstatus.Watcher = (*opampAgent)(nil) ) func (o *opampAgent) Start(ctx context.Context, host component.Host) error { @@ -85,8 +103,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } - o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background()) - if o.cfg.PPID != 0 { go monitorPPID(o.lifetimeCtx, o.cfg.PPIDPollInterval, o.cfg.PPID, o.reportFunc) } @@ -128,8 +144,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } - o.setHealth(&protobufs.ComponentHealth{Healthy: false}) - o.logger.Debug("Starting OpAMP client...") if err := o.opampClient.Start(context.Background(), settings); err != nil { @@ -146,6 +160,9 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { o.lifetimeCtxCancel() } + o.statusSubscriptionWg.Wait() + o.componentHealthWg.Wait() + o.logger.Debug("OpAMP agent shutting down...") if o.opampClient == nil { return nil @@ -190,6 +207,7 @@ func (o *opampAgent) Register(capability string, opts ...opampcustommessages.Cus func (o *opampAgent) Ready() error { o.setHealth(&protobufs.ComponentHealth{Healthy: true}) + close(o.readyCh) return nil } @@ -198,6 +216,27 @@ func (o *opampAgent) NotReady() error { return nil } +// ComponentStatusChanged implements the componentstatus.Watcher interface. +func (o *opampAgent) ComponentStatusChanged( + source *componentstatus.InstanceID, + event *componentstatus.Event, +) { + // There can be late arriving events after shutdown. We need to close + // the event channel so that this function doesn't block and we release all + // goroutines, but attempting to write to a closed channel will panic; log + // and recover. + defer func() { + if r := recover(); r != nil { + o.logger.Info( + "discarding event received after shutdown", + zap.Any("source", source), + zap.Any("event", event), + ) + } + }() + o.componentStatusCh <- &eventSourcePair{source: source, event: event} +} + func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { o.eclk.Lock() defer o.eclk.Unlock() @@ -249,9 +288,18 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { instanceID: uid, capabilities: cfg.Capabilities, opampClient: opampClient, + statusSubscriptionWg: &sync.WaitGroup{}, + componentHealthWg: &sync.WaitGroup{}, + readyCh: make(chan struct{}), customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient), } + agent.lifetimeCtx, agent.lifetimeCtxCancel = context.WithCancel(context.Background()) + + if agent.capabilities.ReportsHealth { + agent.initHealthReporting() + } + return agent, nil } @@ -372,6 +420,11 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) { if o.capabilities.ReportsHealth && o.opampClient != nil { + if ch.Healthy && o.startTimeUnixNano == 0 { + ch.StartTimeUnixNano = ch.StatusTimeUnixNano + } else { + ch.StartTimeUnixNano = o.startTimeUnixNano + } if err := o.opampClient.SetHealth(ch); err != nil { o.logger.Error("Could not report health to OpAMP server", zap.Error(err)) } @@ -395,3 +448,120 @@ func getOSDescription(logger *zap.Logger) string { return runtime.GOOS } } + +func (o *opampAgent) initHealthReporting() { + if !o.capabilities.ReportsHealth { + return + } + o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + + if o.statusAggregator == nil { + o.statusAggregator = status.NewAggregator(status.PriorityPermanent) + } + statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) + o.statusSubscriptionWg.Add(1) + go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan) + + // Start processing events in the background so that our status watcher doesn't + // block others before the extension starts. + o.componentStatusCh = make(chan *eventSourcePair) + o.componentHealthWg.Add(1) + go o.componentHealthEventLoop() +} + +func (o *opampAgent) componentHealthEventLoop() { + // Record events with component.StatusStarting, but queue other events until + // PipelineWatcher.Ready is called. This prevents aggregate statuses from + // flapping between StatusStarting and StatusOK as components are started + // individually by the service. + var eventQueue []*eventSourcePair + + defer o.componentHealthWg.Done() + for loop := true; loop; { + select { + case esp, ok := <-o.componentStatusCh: + if !ok { + return + } + if esp.event.Status() != componentstatus.StatusStarting { + eventQueue = append(eventQueue, esp) + continue + } + o.statusAggregator.RecordStatus(esp.source, esp.event) + case <-o.readyCh: + for _, esp := range eventQueue { + o.statusAggregator.RecordStatus(esp.source, esp.event) + } + eventQueue = nil + loop = false + case <-o.lifetimeCtx.Done(): + return + } + } + + // After PipelineWatcher.Ready, record statuses as they are received. + for { + select { + case esp, ok := <-o.componentStatusCh: + if !ok { + return + } + o.statusAggregator.RecordStatus(esp.source, esp.event) + case <-o.lifetimeCtx.Done(): + return + } + } +} + +func (o *opampAgent) statusAggregatorEventLoop(unsubscribeFunc status.UnsubscribeFunc, statusChan <-chan *status.AggregateStatus) { + defer func() { + unsubscribeFunc() + o.statusSubscriptionWg.Done() + }() + for { + select { + case <-o.lifetimeCtx.Done(): + return + case statusUpdate, ok := <-statusChan: + if !ok { + return + } + + if statusUpdate == nil || statusUpdate.Status() == componentstatus.StatusNone { + continue + } + + componentHealth := convertComponentHealth(statusUpdate) + + o.setHealth(componentHealth) + } + } +} + +func convertComponentHealth(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth { + var isHealthy bool + if statusUpdate.Status() == componentstatus.StatusOK { + isHealthy = true + } else { + isHealthy = false + } + + componentHealth := &protobufs.ComponentHealth{ + Healthy: isHealthy, + Status: statusUpdate.Status().String(), + StatusTimeUnixNano: uint64(statusUpdate.Timestamp().UnixNano()), + } + + if statusUpdate.Err() != nil { + componentHealth.LastError = statusUpdate.Err().Error() + } + + if len(statusUpdate.ComponentStatusMap) > 0 { + componentHealth.ComponentHealthMap = map[string]*protobufs.ComponentHealth{} + for comp, compState := range statusUpdate.ComponentStatusMap { + componentHealth.ComponentHealthMap[comp] = convertComponentHealth(compState) + } + } + + return componentHealth +} diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index b9ee21e0e905..7921bd767470 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -5,21 +5,30 @@ package opampextension import ( "context" + "fmt" "os" "path/filepath" "runtime" + "sync" "testing" + "time" "github.com/google/uuid" + "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status/testhelpers" ) func TestNewOpampAgent(t *testing.T) { @@ -35,6 +44,7 @@ func TestNewOpampAgent(t *testing.T) { assert.True(t, o.capabilities.ReportsHealth) assert.Empty(t, o.effectiveConfig) assert.Nil(t, o.agentDescription) + assert.NoError(t, o.Shutdown(context.Background())) } func TestNewOpampAgentAttributes(t *testing.T) { @@ -49,6 +59,7 @@ func TestNewOpampAgentAttributes(t *testing.T) { assert.Equal(t, "otelcol-distro", o.agentType) assert.Equal(t, "distro.0", o.agentVersion) assert.Equal(t, "f8999bc1-4c9b-4619-9bae-7f009d2411ec", o.instanceID.String()) + assert.NoError(t, o.Shutdown(context.Background())) } func TestCreateAgentDescription(t *testing.T) { @@ -147,6 +158,7 @@ func TestCreateAgentDescription(t *testing.T) { err = o.createAgentDescription() assert.NoError(t, err) require.Equal(t, tc.expected, o.agentDescription) + assert.NoError(t, o.Shutdown(context.Background())) }) } } @@ -165,6 +177,7 @@ func TestUpdateAgentIdentity(t *testing.T) { o.updateAgentIdentity(uid) assert.Equal(t, o.instanceID, uid) + assert.NoError(t, o.Shutdown(context.Background())) } func TestComposeEffectiveConfig(t *testing.T) { @@ -188,6 +201,8 @@ func TestComposeEffectiveConfig(t *testing.T) { assert.NotNil(t, ec) assert.YAMLEq(t, string(expected), string(ec.ConfigMap.ConfigMap[""].Body)) assert.Equal(t, "text/yaml", ec.ConfigMap.ConfigMap[""].ContentType) + + assert.NoError(t, o.Shutdown(context.Background())) } func TestShutdown(t *testing.T) { @@ -197,7 +212,7 @@ func TestShutdown(t *testing.T) { assert.NoError(t, err) // Shutdown with no OpAMP client - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestStart(t *testing.T) { @@ -206,8 +221,295 @@ func TestStart(t *testing.T) { o, err := newOpampAgent(cfg.(*Config), set) assert.NoError(t, err) - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, o.Shutdown(context.Background())) +} + +func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { + cfg := createDefaultConfig().(*Config) + set := extensiontest.NewNopSettings() + + statusUpdateChannel := make(chan *status.AggregateStatus) + + mtx := &sync.RWMutex{} + now := time.Now() + expectedHealthUpdates := []*protobufs.ComponentHealth{ + { + Healthy: false, + }, + { + Healthy: true, + StartTimeUnixNano: uint64(now.UnixNano()), + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: true, + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + }, + }, + }, + { + Healthy: false, + Status: "StatusPermanentError", + StatusTimeUnixNano: uint64(now.UnixNano()), + LastError: "unexpected error", + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: false, + Status: "StatusPermanentError", + StatusTimeUnixNano: uint64(now.UnixNano()), + LastError: "unexpected error", + }, + }, + }, + } + receivedHealthUpdates := 0 + + mockOpampClient := &mockOpAMPClient{ + setHealthFunc: func(health *protobufs.ComponentHealth) error { + mtx.Lock() + defer mtx.Unlock() + require.Equal(t, expectedHealthUpdates[receivedHealthUpdates], health) + receivedHealthUpdates++ + return nil + }, + } + + sa := &mockStatusAggregator{ + statusChan: statusUpdateChannel, + } + + o := newTestOpampAgent(cfg, set, mockOpampClient, sa) + + o.initHealthReporting() + + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + + statusUpdateChannel <- nil + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + }, + }, + } + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusPermanentError, + err: fmt.Errorf("unexpected error"), + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusPermanentError, + err: fmt.Errorf("unexpected error"), + timestamp: now, + }, + }, + }, + } + + close(statusUpdateChannel) + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return receivedHealthUpdates == len(expectedHealthUpdates) + }, 1*time.Second, 100*time.Millisecond) + + assert.NoError(t, o.Shutdown(context.Background())) + require.True(t, sa.unsubscribed) +} + +func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { + cfg := createDefaultConfig().(*Config) + set := extensiontest.NewNopSettings() + + mtx := &sync.RWMutex{} + + sa := &mockStatusAggregator{ + mtx: mtx, + } + + o := newTestOpampAgent( + cfg, + set, + &mockOpAMPClient{ + setHealthFunc: func(_ *protobufs.ComponentHealth) error { + return nil + }, + }, sa) + + o.initHealthReporting() + + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + + traces := testhelpers.NewPipelineMetadata("traces") + + // StatusStarting will be sent immediately. + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusStarting)) + } + + // StatusOK will be queued until the PipelineWatcher Ready method is called. + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusOK)) + } + + // verify we have received the StatusStarting events + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusStarting).Status(), event.event.Status()) + } + + // clean the received events of the mocked status aggregator + sa.receivedEvents = nil + + err := o.Ready() + require.NoError(t, err) + + // verify we have received the StatusOK events that have been queued while the agent has not been ready + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusOK).Status(), event.event.Status()) + } + + // clean the received events of the mocked status aggregator + sa.receivedEvents = nil + + // send another set of events - these should be passed through immediately + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusStopping)) + } + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusStopping).Status(), event.event.Status()) + } + + assert.NoError(t, o.Shutdown(context.Background())) + require.True(t, sa.unsubscribed) +} + +func TestHealthReportingExitsOnClosedContext(t *testing.T) { + cfg := createDefaultConfig().(*Config) + set := extensiontest.NewNopSettings() + + statusUpdateChannel := make(chan *status.AggregateStatus) + sa := &mockStatusAggregator{ + statusChan: statusUpdateChannel, + } + + mtx := &sync.RWMutex{} + now := time.Now() + expectedHealthUpdates := []*protobufs.ComponentHealth{ + { + Healthy: false, + }, + { + Healthy: true, + StartTimeUnixNano: uint64(now.UnixNano()), + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: true, + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + }, + }, + }, + } + receivedHealthUpdates := 0 + + mockOpampClient := &mockOpAMPClient{ + setHealthFunc: func(health *protobufs.ComponentHealth) error { + mtx.Lock() + defer mtx.Unlock() + require.Equal(t, expectedHealthUpdates[receivedHealthUpdates], health) + receivedHealthUpdates++ + return nil + }, + } + + o := newTestOpampAgent(cfg, set, mockOpampClient, sa) + + o.initHealthReporting() + + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + + statusUpdateChannel <- nil + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + }, + }, + } + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return receivedHealthUpdates == len(expectedHealthUpdates) + }, 1*time.Second, 100*time.Millisecond) + + // invoke Shutdown before health update channel has been closed + assert.NoError(t, o.Shutdown(context.Background())) + require.True(t, sa.unsubscribed) +} + +func TestHealthReportingDisabled(t *testing.T) { + cfg := createDefaultConfig() + set := extensiontest.NewNopSettings() + o, err := newOpampAgent(cfg.(*Config), set) + assert.NoError(t, err) + + o.capabilities.ReportsHealth = false + o.opampClient = &mockOpAMPClient{ + setHealthFunc: func(_ *protobufs.ComponentHealth) error { + t.Errorf("setHealth is not supposed to be called with deactivated ReportsHealth capability") + return nil + }, + } + + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestParseInstanceIDString(t *testing.T) { @@ -283,3 +585,114 @@ func TestOpAMPAgent_Dependencies(t *testing.T) { require.Equal(t, []component.ID{authID}, o.Dependencies()) }) } + +type mockStatusAggregator struct { + statusChan chan *status.AggregateStatus + receivedEvents []eventSourcePair + unsubscribed bool + mtx *sync.RWMutex +} + +func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) { + return m.statusChan, func() { + m.unsubscribed = true + } +} + +func (m *mockStatusAggregator) RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.receivedEvents = append(m.receivedEvents, eventSourcePair{ + source: source, + event: event, + }) +} + +type mockOpAMPClient struct { + setHealthFunc func(health *protobufs.ComponentHealth) error +} + +func (m mockOpAMPClient) Start(_ context.Context, _ types.StartSettings) error { + return nil +} + +func (m mockOpAMPClient) Stop(_ context.Context) error { + return nil +} + +func (m mockOpAMPClient) SetAgentDescription(_ *protobufs.AgentDescription) error { + return nil +} + +func (m mockOpAMPClient) AgentDescription() *protobufs.AgentDescription { + return nil +} + +func (m mockOpAMPClient) SetHealth(health *protobufs.ComponentHealth) error { + return m.setHealthFunc(health) +} + +func (m mockOpAMPClient) UpdateEffectiveConfig(_ context.Context) error { + return nil +} + +func (m mockOpAMPClient) SetRemoteConfigStatus(_ *protobufs.RemoteConfigStatus) error { + return nil +} + +func (m mockOpAMPClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error { + return nil +} + +func (m mockOpAMPClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { + return nil +} + +func (m mockOpAMPClient) SetCustomCapabilities(_ *protobufs.CustomCapabilities) error { + return nil +} + +func (m mockOpAMPClient) SendCustomMessage(_ *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + return nil, nil +} + +func (m mockOpAMPClient) SetFlags(_ protobufs.AgentToServerFlags) {} + +type mockStatusEvent struct { + status componentstatus.Status + err error + timestamp time.Time +} + +func (m mockStatusEvent) Status() componentstatus.Status { + return m.status +} + +func (m mockStatusEvent) Err() error { + return m.err +} + +func (m mockStatusEvent) Timestamp() time.Time { + return m.timestamp +} + +func newTestOpampAgent(cfg *Config, set extension.Settings, mockOpampClient *mockOpAMPClient, sa *mockStatusAggregator) *opampAgent { + uid := uuid.New() + o := &opampAgent{ + cfg: cfg, + logger: set.Logger, + agentType: set.BuildInfo.Command, + agentVersion: set.BuildInfo.Version, + instanceID: uid, + capabilities: cfg.Capabilities, + opampClient: mockOpampClient, + statusSubscriptionWg: &sync.WaitGroup{}, + componentHealthWg: &sync.WaitGroup{}, + readyCh: make(chan struct{}), + customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, mockOpampClient), + statusAggregator: sa, + } + + o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background()) + return o +}