From 20fe3f32c946c298fda63d91016d1e183de98372 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Sat, 6 Apr 2024 13:46:53 +0200 Subject: [PATCH] (#2150) Support graphite as a metric destination for metric watcher Signed-off-by: R.I.Pienaar --- aagent/watchers/metricwatcher/metric.go | 65 +++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/aagent/watchers/metricwatcher/metric.go b/aagent/watchers/metricwatcher/metric.go index b456fd5d..17709fb6 100644 --- a/aagent/watchers/metricwatcher/metric.go +++ b/aagent/watchers/metricwatcher/metric.go @@ -10,8 +10,10 @@ import ( "encoding/json" "fmt" "math/rand" + "net" "os" "os/exec" + "strings" "sync" "time" @@ -37,9 +39,12 @@ type Metric struct { } type properties struct { - Command string - Interval time.Duration - Labels map[string]string + Command string + Interval time.Duration + Labels map[string]string + GraphiteHost string `mapstructure:"graphite_host"` + GraphitePort int `mapstructure:"graphite_port"` + GraphitePrefix string `mapstructure:"graphite_prefix"` } type Watcher struct { @@ -74,6 +79,10 @@ func New(machine model.Machine, name string, states []string, failEvent string, return nil, fmt.Errorf("could not set properties: %s", err) } + if mw.properties.GraphitePrefix == "" { + mw.properties.GraphitePrefix = fmt.Sprintf("choria.%s", strings.ReplaceAll(name, " ", "-")) + } + savePromState(machine.TextFileDirectory(), mw) return mw, nil @@ -187,7 +196,7 @@ func (w *Watcher) performWatch(ctx context.Context) { } metric, err := w.watch(ctx) - err = w.handleCheck(metric, err) + err = w.handleCheck(ctx, metric, err) if err != nil { w.Errorf("could not handle watcher event: %s", err) } @@ -229,7 +238,7 @@ func (w *Watcher) parseNagiosCheck(output []byte) (*Metric, error) { return metric, nil } -func (w *Watcher) handleCheck(output []byte, err error) error { +func (w *Watcher) handleCheck(ctx context.Context, output []byte, err error) error { var metric *Metric if err == nil { @@ -260,6 +269,11 @@ func (w *Watcher) handleCheck(output []byte, err error) error { w.Errorf("Could not update prometheus: %s", err) } + err = w.publishToGraphite(ctx, metric) + if err != nil { + return err + } + w.mu.Lock() w.previousResult = metric w.mu.Unlock() @@ -269,6 +283,47 @@ func (w *Watcher) handleCheck(output []byte, err error) error { return nil } +func (w *Watcher) publishToGraphite(ctx context.Context, metric *Metric) error { + if w.properties.GraphiteHost == "" { + w.Debugf("Skipping graphite publish without a host defined") + return nil + } + + if w.properties.GraphitePort == 0 { + w.Debugf("Skipping graphite publish without a port defined") + return nil + } + + if len(metric.Metrics) == 0 { + w.Debugf("Skipping graphite publish without any metrics") + return nil + } + + connCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + host := fmt.Sprintf("%s:%d", w.properties.GraphiteHost, w.properties.GraphitePort) + w.Debugf("Sending %d metrics to graphite %s", len(metric.Metrics), host) + var d net.Dialer + conn, err := d.DialContext(connCtx, "tcp", host) + if err != nil { + return err + } + defer conn.Close() + + now := time.Now().Unix() + + for k, v := range metric.Metrics { + name := fmt.Sprintf("%s.%s", w.properties.GraphitePrefix, k) + _, err := conn.Write([]byte(fmt.Sprintf("%s %f %d\n", name, v, now))) + if err != nil { + return err + } + } + + return nil +} + func (w *Watcher) CurrentState() any { w.mu.Lock() defer w.mu.Unlock()