From e6919bd378b232daf06e78cb1bdc68b00d9b76e3 Mon Sep 17 00:00:00 2001 From: Andrew Chorny Date: Fri, 4 Oct 2024 13:56:44 +0300 Subject: [PATCH] Health monitoring: Start method should block --- health/manager.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/health/manager.go b/health/manager.go index b586566..395fccb 100644 --- a/health/manager.go +++ b/health/manager.go @@ -61,6 +61,7 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health defer cancel() m.Start(ctx, measurementsCh, healthCh, targetCh) + <-m.Done() }() doneWg.Add(1) @@ -98,6 +99,8 @@ type Manager interface { IsStarted() bool // AddTargets provides a simple interface to register monitored targets AddTargets(targets []*target.Target) + + Done() <-chan struct{} } // NewManager creates a new Manager instance with internal target registry. @@ -131,10 +134,8 @@ type healthManager struct { } func (hm *healthManager) Start( - ctx context.Context, - measureOut <-chan *TargetMeasurement, - healthIn chan<- *target.Health, - targetIn chan<- *target.Target, + ctx context.Context, measureOut <-chan *TargetMeasurement, + healthIn chan<- *target.Health, targetIn chan<- *target.Target, ) { hm.mu.Lock() hm.targetIn = targetIn @@ -155,13 +156,14 @@ func (hm *healthManager) Start( }() hm.started.Store(true) + go func() { hm.wg.Wait() hm.started.Store(false) - hm.stopWait <- struct{}{} + close(hm.stopWait) }() - hm.sendTargetsInfo() // if some targets where added before start we need to register them + hm.sendTargetsInfo() // if some targets were added before start we need to register them } func (hm *healthManager) Shutdown() { @@ -172,6 +174,10 @@ func (hm *healthManager) Shutdown() { close(hm.targetIn) } +func (hm *healthManager) Done() <-chan struct{} { + return hm.stopWait +} + func (hm *healthManager) IsStarted() bool { return hm.started.Load() } @@ -239,7 +245,7 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro } targetHealth, err = hm.buildTargetFromMeasure(measure) if err != nil { - return fmt.Errorf("Unable to register target automatically: %w", err) + return fmt.Errorf("unable to register target automatically: %w", err) } hm.registry.setRawHealthForTarget(targetHealth) } @@ -250,12 +256,12 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro case Metric: err := hm.updateTargetsMetric(targetHealth, measure) if err != nil { - return fmt.Errorf("Unable to update target metric: %w", err) + return fmt.Errorf("unable to update target metric: %w", err) } case CounterChange: err := hm.updateTargetsCounter(targetHealth, measure) if err != nil { - return fmt.Errorf("Unable to update target counter: %w", err) + return fmt.Errorf("unable to update target counter: %w", err) } case Message: if measure.Message.AffectHealth {