diff --git a/example/health/demo.go b/example/health/demo.go index 63fc762..f868a32 100644 --- a/example/health/demo.go +++ b/example/health/demo.go @@ -14,6 +14,7 @@ import ( const ( mercedesTarget = "mercedes.citaro" + bogdanTarget = "bogdan.A091" speedMetricID = "speed" stationsCounterID = "stations" @@ -57,13 +58,13 @@ func main() { wg := &sync.WaitGroup{} wg.Add(1) - go bus(wg) + go bus(manager, wg) wg.Wait() frameworkStop() } -func bus(wg *sync.WaitGroup) { +func bus(manager health.Manager, wg *sync.WaitGroup) { log := log.GetLogger() defer wg.Done() @@ -122,6 +123,26 @@ func bus(wg *sync.WaitGroup) { collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0) collector.AddMetricValue(mercedesTarget, speedMetricID, 25.0) + time.Sleep(sleeps / 2) + + log.Info().Msg("An unregistered vehicle appears at a crossroads") + collector.AddMetricValue(bogdanTarget, speedMetricID, 2) + time.Sleep(sleeps / 2) + + log.Info().Msg("Updating the config to monitor it") + cfg := health.NewConfig() + cfg.RegistrationOnCollect = true + cfg.CollectionCycle = 4 * time.Second + manager.UpdateConfig(cfg) + sleeps = 4 * time.Second + time.Sleep(sleeps / 2) + + collector.AddMetricValue(bogdanTarget, speedMetricID, 4.0) + collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0) + time.Sleep(sleeps) + + collector.AddMetricValue(bogdanTarget, speedMetricID, 7.0) + collector.AddMetricValue(mercedesTarget, speedMetricID, 11.0) time.Sleep(sleeps) - log.Info().Msg("Bus is keep moving but we don't need to monitor it anymore") + log.Info().Msg("Buses keep moving but we don't need to monitor them anymore") } diff --git a/health/collector.go b/health/collector.go index 68bb948..15098da 100644 --- a/health/collector.go +++ b/health/collector.go @@ -5,6 +5,7 @@ import ( "sync" "time" + logging "github.com/zenoss/zenoss-go-sdk/health/log" "github.com/zenoss/zenoss-go-sdk/health/target" "github.com/zenoss/zenoss-go-sdk/health/utils" ) @@ -33,9 +34,10 @@ type Collector interface { } // NewCollector creates a new healthCollector instance. -func NewCollector(cycleDuration time.Duration, metricsIn chan<- *TargetMeasurement) Collector { +func NewCollector(cycleDuration time.Duration, configUpd <-chan *Config, metricsIn chan<- *TargetMeasurement) Collector { return &healthCollector{ cycleDuration: cycleDuration, + configUpd: configUpd, metricsIn: metricsIn, done: make(chan struct{}), } @@ -79,6 +81,7 @@ func ResetCollectorSingleton() { type healthCollector struct { cycleDuration time.Duration + configUpd <-chan *Config metricsIn chan<- *TargetMeasurement done chan struct{} doneOnce sync.Once @@ -101,6 +104,10 @@ func (hc *healthCollector) HeartBeat(targetID string) (context.CancelFunc, error return case <-hc.done: return + case cfg := <-hc.configUpd: + hc.cycleDuration = cfg.CollectionCycle + ticker.Reset(hc.cycleDuration) + logging.GetLogger().Info().Msgf("Updated heartbeat interval to %v", hc.cycleDuration) case <-ticker.C: measure := &TargetMeasurement{ TargetID: targetID, diff --git a/health/manager.go b/health/manager.go index 395fccb..90e753d 100644 --- a/health/manager.go +++ b/health/manager.go @@ -1,12 +1,12 @@ /* Package health implements a simple tool for health data + metrics collection. -We defines three main parts of the health monitorin tool: manager, collector and writer. -- Writer description is avilable in its own package. +We define three main parts of the health monitoring tool: manager, collector and writer. +- Writer description is available in its own package. - Collector is stored in collector.go file. It provides a simple interface that allows you to collect different type of health data per target. All methods should have targetID as a parameter. Collector will automatically send data to health manager. -- Manger is a heart of the health collection tool. It initialize all comunication channels and has +- Manger is a heart of the health collection tool. It initializes all communication channels and has a control over collector and writer. Manager keeps all data collected by collector, calculates every target health once per cycle and sends calculated data to writer. */ @@ -36,11 +36,12 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health logging.SetLogLevel(cfg.LogLevel) } + configCh := make(chan *Config) measurementsCh := make(chan *TargetMeasurement) healthCh := make(chan *target.Health) targetCh := make(chan *target.Target) - c := NewCollector(cfg.CollectionCycle, measurementsCh) + c := NewCollector(cfg.CollectionCycle, configCh, measurementsCh) SetCollectorSingleton(c) ctx, cancel := context.WithCancel(ctx) @@ -60,7 +61,7 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health defer doneWg.Done() defer cancel() - m.Start(ctx, measurementsCh, healthCh, targetCh) + m.Start(ctx, configCh, measurementsCh, healthCh, targetCh) <-m.Done() }() @@ -90,9 +91,11 @@ type Manager interface { // require readers and measureOut requires a writer. // Make sure to close measureOut before manager shutdown to not lose any data. Start( - ctx context.Context, measureOut <-chan *TargetMeasurement, + ctx context.Context, configCh chan *Config, measureOut <-chan *TargetMeasurement, healthIn chan<- *target.Health, targetIn chan<- *target.Target, ) + // UpdateConfig applies the new configuration + UpdateConfig(config *Config) // Shutdown method closes manager's channels and terminates goroutines Shutdown() // IsStarted return the status of the manager @@ -119,6 +122,7 @@ func NewManager(_ context.Context, config *Config) Manager { type healthManager struct { registry healthRegistry config *Config + configIn chan<- *Config healthIn chan<- *target.Health targetIn chan<- *target.Target @@ -134,10 +138,11 @@ type healthManager struct { } func (hm *healthManager) Start( - ctx context.Context, measureOut <-chan *TargetMeasurement, + ctx context.Context, configCh chan *Config, measureOut <-chan *TargetMeasurement, healthIn chan<- *target.Health, targetIn chan<- *target.Target, ) { hm.mu.Lock() + hm.configIn = configCh hm.targetIn = targetIn hm.healthIn = healthIn hm.stopSig = make(chan struct{}) @@ -152,7 +157,7 @@ func (hm *healthManager) Start( hm.wg.Add(1) go func() { defer hm.wg.Done() - hm.healthForwarder(ctx, healthIn) + hm.healthForwarder(ctx, configCh, healthIn) }() hm.started.Store(true) @@ -166,12 +171,29 @@ func (hm *healthManager) Start( hm.sendTargetsInfo() // if some targets were added before start we need to register them } +func (hm *healthManager) UpdateConfig(newConfig *Config) { + hm.mu.Lock() + if hm.config.CollectionCycle != newConfig.CollectionCycle { + defer func() { + go func() { + // need 2 copies: for healthForwarder and healthCollector.HeartBeat + for range 2 { + hm.configIn <- newConfig + } + }() + }() + } + hm.config = newConfig + hm.mu.Unlock() +} + func (hm *healthManager) Shutdown() { hm.mu.Lock() defer hm.mu.Unlock() close(hm.stopSig) <-hm.stopWait close(hm.targetIn) + close(hm.configIn) } func (hm *healthManager) Done() <-chan struct{} { @@ -337,7 +359,7 @@ func (*healthManager) buildTargetFromMeasure(measure *TargetMeasurement) (*rawHe } // Calculates raw health data from the registry and forwards all health data to the writer once per cycle -func (hm *healthManager) healthForwarder(ctx context.Context, healthIn chan<- *target.Health) { +func (hm *healthManager) healthForwarder(ctx context.Context, configUpd <-chan *Config, healthIn chan<- *target.Health) { log := logging.GetLogger() log.Info().Msgf("Start to send health data to a writer with cycle %v", hm.config.CollectionCycle) defer func() { log.Info().Msg("Finish to send health data to a writer") }() @@ -346,6 +368,9 @@ func (hm *healthManager) healthForwarder(ctx context.Context, healthIn chan<- *t select { case <-ticker.C: hm.writeHealthResult(healthIn) + case cfg := <-configUpd: + ticker.Reset(cfg.CollectionCycle) + logging.GetLogger().Info().Msgf("Updated collection interval to %v", cfg.CollectionCycle) case <-hm.stopSig: hm.writeHealthResult(healthIn) close(healthIn)