Skip to content

Commit

Permalink
[extension/opampagent] use status subscription for fine granular heal…
Browse files Browse the repository at this point in the history
…th reporting (open-telemetry#35892)
  • Loading branch information
bacherfl authored and ZenoCC-Peng committed Dec 6, 2024
1 parent 683d0f3 commit da17d70
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/opamp-extension-health-reporting.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use status subscription for fine granular component health reporting

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35856]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func TestSupervisorRestartCommand(t *testing.T) {
return health.Healthy && health.LastError == ""
}
return false
}, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart")
}, 30*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart")
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
}

// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
if runtime.GOOS != "windows" {
require.ErrorContains(t, err, "connection refused")
Expand Down
2 changes: 2 additions & 0 deletions extension/opampextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
require.NoError(t, ext.Shutdown(context.Background()))
}

func TestFactory_Create(t *testing.T) {
cfg := NewFactory().CreateDefaultConfig()
ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
require.NoError(t, ext.Shutdown(context.Background()))
}
3 changes: 3 additions & 0 deletions extension/opampextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.17.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.115.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.0
Expand Down Expand Up @@ -67,3 +68,5 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../opampcustommessages

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status => ../../pkg/status
180 changes: 175 additions & 5 deletions extension/opampextension/opamp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ import (
"gopkg.in/yaml.v3"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
)

var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil)
type statusAggregator interface {
Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc)
RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event)
}

type eventSourcePair struct {
source *componentstatus.InstanceID
event *componentstatus.Event
}

type opampAgent struct {
cfg *Config
Expand All @@ -62,12 +71,21 @@ type opampAgent struct {
opampClient client.OpAMPClient

customCapabilityRegistry *customCapabilityRegistry

statusAggregator statusAggregator
statusSubscriptionWg *sync.WaitGroup
componentHealthWg *sync.WaitGroup
startTimeUnixNano uint64
componentStatusCh chan *eventSourcePair
readyCh chan struct{}
}

var (
_ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil)
_ extensioncapabilities.Dependent = (*opampAgent)(nil)
_ extensioncapabilities.ConfigWatcher = (*opampAgent)(nil)
_ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil)
_ componentstatus.Watcher = (*opampAgent)(nil)
)

func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
Expand All @@ -85,8 +103,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
return err
}

o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background())

if o.cfg.PPID != 0 {
go monitorPPID(o.lifetimeCtx, o.cfg.PPIDPollInterval, o.cfg.PPID, o.reportFunc)
}
Expand Down Expand Up @@ -128,8 +144,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
return err
}

o.setHealth(&protobufs.ComponentHealth{Healthy: false})

o.logger.Debug("Starting OpAMP client...")

if err := o.opampClient.Start(context.Background(), settings); err != nil {
Expand All @@ -146,6 +160,9 @@ func (o *opampAgent) Shutdown(ctx context.Context) error {
o.lifetimeCtxCancel()
}

o.statusSubscriptionWg.Wait()
o.componentHealthWg.Wait()

o.logger.Debug("OpAMP agent shutting down...")
if o.opampClient == nil {
return nil
Expand Down Expand Up @@ -190,6 +207,7 @@ func (o *opampAgent) Register(capability string, opts ...opampcustommessages.Cus

func (o *opampAgent) Ready() error {
o.setHealth(&protobufs.ComponentHealth{Healthy: true})
close(o.readyCh)
return nil
}

Expand All @@ -198,6 +216,27 @@ func (o *opampAgent) NotReady() error {
return nil
}

// ComponentStatusChanged implements the componentstatus.Watcher interface.
func (o *opampAgent) ComponentStatusChanged(
source *componentstatus.InstanceID,
event *componentstatus.Event,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
o.logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
o.componentStatusCh <- &eventSourcePair{source: source, event: event}
}

func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) {
o.eclk.Lock()
defer o.eclk.Unlock()
Expand Down Expand Up @@ -249,9 +288,18 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) {
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: opampClient,
statusSubscriptionWg: &sync.WaitGroup{},
componentHealthWg: &sync.WaitGroup{},
readyCh: make(chan struct{}),
customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient),
}

agent.lifetimeCtx, agent.lifetimeCtxCancel = context.WithCancel(context.Background())

if agent.capabilities.ReportsHealth {
agent.initHealthReporting()
}

return agent, nil
}

Expand Down Expand Up @@ -372,6 +420,11 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) {

func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) {
if o.capabilities.ReportsHealth && o.opampClient != nil {
if ch.Healthy && o.startTimeUnixNano == 0 {
ch.StartTimeUnixNano = ch.StatusTimeUnixNano
} else {
ch.StartTimeUnixNano = o.startTimeUnixNano
}
if err := o.opampClient.SetHealth(ch); err != nil {
o.logger.Error("Could not report health to OpAMP server", zap.Error(err))
}
Expand All @@ -395,3 +448,120 @@ func getOSDescription(logger *zap.Logger) string {
return runtime.GOOS
}
}

func (o *opampAgent) initHealthReporting() {
if !o.capabilities.ReportsHealth {
return
}
o.setHealth(&protobufs.ComponentHealth{Healthy: false})

if o.statusAggregator == nil {
o.statusAggregator = status.NewAggregator(status.PriorityPermanent)
}
statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose)
o.statusSubscriptionWg.Add(1)
go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan)

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
o.componentStatusCh = make(chan *eventSourcePair)
o.componentHealthWg.Add(1)
go o.componentHealthEventLoop()
}

func (o *opampAgent) componentHealthEventLoop() {
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

defer o.componentHealthWg.Done()
for loop := true; loop; {
select {
case esp, ok := <-o.componentStatusCh:
if !ok {
return
}
if esp.event.Status() != componentstatus.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
o.statusAggregator.RecordStatus(esp.source, esp.event)
case <-o.readyCh:
for _, esp := range eventQueue {
o.statusAggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-o.lifetimeCtx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-o.componentStatusCh:
if !ok {
return
}
o.statusAggregator.RecordStatus(esp.source, esp.event)
case <-o.lifetimeCtx.Done():
return
}
}
}

func (o *opampAgent) statusAggregatorEventLoop(unsubscribeFunc status.UnsubscribeFunc, statusChan <-chan *status.AggregateStatus) {
defer func() {
unsubscribeFunc()
o.statusSubscriptionWg.Done()
}()
for {
select {
case <-o.lifetimeCtx.Done():
return
case statusUpdate, ok := <-statusChan:
if !ok {
return
}

if statusUpdate == nil || statusUpdate.Status() == componentstatus.StatusNone {
continue
}

componentHealth := convertComponentHealth(statusUpdate)

o.setHealth(componentHealth)
}
}
}

func convertComponentHealth(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth {
var isHealthy bool
if statusUpdate.Status() == componentstatus.StatusOK {
isHealthy = true
} else {
isHealthy = false
}

componentHealth := &protobufs.ComponentHealth{
Healthy: isHealthy,
Status: statusUpdate.Status().String(),
StatusTimeUnixNano: uint64(statusUpdate.Timestamp().UnixNano()),
}

if statusUpdate.Err() != nil {
componentHealth.LastError = statusUpdate.Err().Error()
}

if len(statusUpdate.ComponentStatusMap) > 0 {
componentHealth.ComponentHealthMap = map[string]*protobufs.ComponentHealth{}
for comp, compState := range statusUpdate.ComponentStatusMap {
componentHealth.ComponentHealthMap[comp] = convertComponentHealth(compState)
}
}

return componentHealth
}
Loading

0 comments on commit da17d70

Please sign in to comment.