Skip to content

Commit

Permalink
health config dynamic update
Browse files Browse the repository at this point in the history
  • Loading branch information
zakhar1i committed Dec 13, 2024
1 parent a005e6d commit 3da8ff8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
27 changes: 24 additions & 3 deletions example/health/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const (
mercedesTarget = "mercedes.citaro"
bogdanTarget = "bogdan.A091"

speedMetricID = "speed"
stationsCounterID = "stations"
Expand Down Expand Up @@ -57,13 +58,13 @@ func main() {

wg := &sync.WaitGroup{}
wg.Add(1)
go bus(wg)
go bus(manager, wg)
wg.Wait()

frameworkStop()
}

func bus(wg *sync.WaitGroup) {
func bus(manager health.Manager, wg *sync.WaitGroup) {
log := log.GetLogger()
defer wg.Done()

Expand Down Expand Up @@ -122,6 +123,26 @@ func bus(wg *sync.WaitGroup) {
collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 25.0)

time.Sleep(sleeps / 2)

log.Info().Msg("An unregistered vehicle appears at a crossroads")
collector.AddMetricValue(bogdanTarget, speedMetricID, 2)
time.Sleep(sleeps / 2)

log.Info().Msg("Updating the config to monitor it")
cfg := health.NewConfig()
cfg.RegistrationOnCollect = true
cfg.CollectionCycle = 4 * time.Second
manager.UpdateConfig(cfg)
sleeps = 4 * time.Second
time.Sleep(sleeps / 2)

collector.AddMetricValue(bogdanTarget, speedMetricID, 4.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0)
time.Sleep(sleeps)

collector.AddMetricValue(bogdanTarget, speedMetricID, 7.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 11.0)
time.Sleep(sleeps)
log.Info().Msg("Bus is keep moving but we don't need to monitor it anymore")
log.Info().Msg("Buses keep moving but we don't need to monitor them anymore")
}
9 changes: 8 additions & 1 deletion health/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

logging "github.com/zenoss/zenoss-go-sdk/health/log"
"github.com/zenoss/zenoss-go-sdk/health/target"
"github.com/zenoss/zenoss-go-sdk/health/utils"
)
Expand Down Expand Up @@ -33,9 +34,10 @@ type Collector interface {
}

// NewCollector creates a new healthCollector instance.
func NewCollector(cycleDuration time.Duration, metricsIn chan<- *TargetMeasurement) Collector {
func NewCollector(cycleDuration time.Duration, configUpd <-chan *Config, metricsIn chan<- *TargetMeasurement) Collector {
return &healthCollector{
cycleDuration: cycleDuration,
configUpd: configUpd,
metricsIn: metricsIn,
done: make(chan struct{}),
}
Expand Down Expand Up @@ -79,6 +81,7 @@ func ResetCollectorSingleton() {

type healthCollector struct {
cycleDuration time.Duration
configUpd <-chan *Config
metricsIn chan<- *TargetMeasurement
done chan struct{}
doneOnce sync.Once
Expand All @@ -101,6 +104,10 @@ func (hc *healthCollector) HeartBeat(targetID string) (context.CancelFunc, error
return
case <-hc.done:
return
case cfg := <-hc.configUpd:
hc.cycleDuration = cfg.CollectionCycle
ticker.Reset(hc.cycleDuration)
logging.GetLogger().Info().Msgf("Updated heartbeat interval to %v", hc.cycleDuration)
case <-ticker.C:
measure := &TargetMeasurement{
TargetID: targetID,
Expand Down
43 changes: 34 additions & 9 deletions health/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Package health implements a simple tool for health data + metrics collection.
We defines three main parts of the health monitorin tool: manager, collector and writer.
- Writer description is avilable in its own package.
We define three main parts of the health monitoring tool: manager, collector and writer.
- Writer description is available in its own package.
- Collector is stored in collector.go file. It provides a simple interface that allows you to collect
different type of health data per target. All methods should have targetID as a parameter. Collector
will automatically send data to health manager.
- Manger is a heart of the health collection tool. It initialize all comunication channels and has
- Manger is a heart of the health collection tool. It initializes all communication channels and has
a control over collector and writer. Manager keeps all data collected by collector, calculates every
target health once per cycle and sends calculated data to writer.
*/
Expand Down Expand Up @@ -36,11 +36,12 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health
logging.SetLogLevel(cfg.LogLevel)
}

configCh := make(chan *Config)
measurementsCh := make(chan *TargetMeasurement)
healthCh := make(chan *target.Health)
targetCh := make(chan *target.Target)

c := NewCollector(cfg.CollectionCycle, measurementsCh)
c := NewCollector(cfg.CollectionCycle, configCh, measurementsCh)
SetCollectorSingleton(c)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -60,7 +61,7 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health
defer doneWg.Done()
defer cancel()

m.Start(ctx, measurementsCh, healthCh, targetCh)
m.Start(ctx, configCh, measurementsCh, healthCh, targetCh)
<-m.Done()
}()

Expand Down Expand Up @@ -90,9 +91,11 @@ type Manager interface {
// require readers and measureOut requires a writer.
// Make sure to close measureOut before manager shutdown to not lose any data.
Start(
ctx context.Context, measureOut <-chan *TargetMeasurement,
ctx context.Context, configCh chan *Config, measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health, targetIn chan<- *target.Target,
)
// UpdateConfig applies the new configuration
UpdateConfig(config *Config)
// Shutdown method closes manager's channels and terminates goroutines
Shutdown()
// IsStarted return the status of the manager
Expand All @@ -119,6 +122,7 @@ func NewManager(_ context.Context, config *Config) Manager {
type healthManager struct {
registry healthRegistry
config *Config
configIn chan<- *Config
healthIn chan<- *target.Health
targetIn chan<- *target.Target

Expand All @@ -134,10 +138,11 @@ type healthManager struct {
}

func (hm *healthManager) Start(
ctx context.Context, measureOut <-chan *TargetMeasurement,
ctx context.Context, configCh chan *Config, measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health, targetIn chan<- *target.Target,
) {
hm.mu.Lock()
hm.configIn = configCh
hm.targetIn = targetIn
hm.healthIn = healthIn
hm.stopSig = make(chan struct{})
Expand All @@ -152,7 +157,7 @@ func (hm *healthManager) Start(
hm.wg.Add(1)
go func() {
defer hm.wg.Done()
hm.healthForwarder(ctx, healthIn)
hm.healthForwarder(ctx, configCh, healthIn)
}()

hm.started.Store(true)
Expand All @@ -166,12 +171,29 @@ func (hm *healthManager) Start(
hm.sendTargetsInfo() // if some targets were added before start we need to register them
}

func (hm *healthManager) UpdateConfig(newConfig *Config) {
hm.mu.Lock()
if hm.config.CollectionCycle != newConfig.CollectionCycle {
defer func() {
go func() {
// need 2 copies: for healthForwarder and healthCollector.HeartBeat
for range 2 {
hm.configIn <- newConfig
}
}()
}()
}
hm.config = newConfig
hm.mu.Unlock()
}

func (hm *healthManager) Shutdown() {
hm.mu.Lock()
defer hm.mu.Unlock()
close(hm.stopSig)
<-hm.stopWait
close(hm.targetIn)
close(hm.configIn)
}

func (hm *healthManager) Done() <-chan struct{} {
Expand Down Expand Up @@ -337,7 +359,7 @@ func (*healthManager) buildTargetFromMeasure(measure *TargetMeasurement) (*rawHe
}

// Calculates raw health data from the registry and forwards all health data to the writer once per cycle
func (hm *healthManager) healthForwarder(ctx context.Context, healthIn chan<- *target.Health) {
func (hm *healthManager) healthForwarder(ctx context.Context, configUpd <-chan *Config, healthIn chan<- *target.Health) {
log := logging.GetLogger()
log.Info().Msgf("Start to send health data to a writer with cycle %v", hm.config.CollectionCycle)
defer func() { log.Info().Msg("Finish to send health data to a writer") }()
Expand All @@ -346,6 +368,9 @@ func (hm *healthManager) healthForwarder(ctx context.Context, healthIn chan<- *t
select {
case <-ticker.C:
hm.writeHealthResult(healthIn)
case cfg := <-configUpd:
ticker.Reset(cfg.CollectionCycle)
logging.GetLogger().Info().Msgf("Updated collection interval to %v", cfg.CollectionCycle)
case <-hm.stopSig:
hm.writeHealthResult(healthIn)
close(healthIn)
Expand Down

0 comments on commit 3da8ff8

Please sign in to comment.