Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#2150) Support graphite as a metric destination for metric watcher #2151

Merged
merged 1 commit into from
Apr 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions aagent/watchers/metricwatcher/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net"
"os"
"os/exec"
"strings"
"sync"
"time"

Expand All @@ -37,9 +39,12 @@ type Metric struct {
}

type properties struct {
Command string
Interval time.Duration
Labels map[string]string
Command string
Interval time.Duration
Labels map[string]string
GraphiteHost string `mapstructure:"graphite_host"`
GraphitePort int `mapstructure:"graphite_port"`
GraphitePrefix string `mapstructure:"graphite_prefix"`
}

type Watcher struct {
Expand Down Expand Up @@ -74,6 +79,10 @@ func New(machine model.Machine, name string, states []string, failEvent string,
return nil, fmt.Errorf("could not set properties: %s", err)
}

if mw.properties.GraphitePrefix == "" {
mw.properties.GraphitePrefix = fmt.Sprintf("choria.%s", strings.ReplaceAll(name, " ", "-"))
}

savePromState(machine.TextFileDirectory(), mw)

return mw, nil
Expand Down Expand Up @@ -187,7 +196,7 @@ func (w *Watcher) performWatch(ctx context.Context) {
}

metric, err := w.watch(ctx)
err = w.handleCheck(metric, err)
err = w.handleCheck(ctx, metric, err)
if err != nil {
w.Errorf("could not handle watcher event: %s", err)
}
Expand Down Expand Up @@ -229,7 +238,7 @@ func (w *Watcher) parseNagiosCheck(output []byte) (*Metric, error) {
return metric, nil
}

func (w *Watcher) handleCheck(output []byte, err error) error {
func (w *Watcher) handleCheck(ctx context.Context, output []byte, err error) error {
var metric *Metric

if err == nil {
Expand Down Expand Up @@ -260,6 +269,11 @@ func (w *Watcher) handleCheck(output []byte, err error) error {
w.Errorf("Could not update prometheus: %s", err)
}

err = w.publishToGraphite(ctx, metric)
if err != nil {
return err
}

w.mu.Lock()
w.previousResult = metric
w.mu.Unlock()
Expand All @@ -269,6 +283,47 @@ func (w *Watcher) handleCheck(output []byte, err error) error {
return nil
}

func (w *Watcher) publishToGraphite(ctx context.Context, metric *Metric) error {
if w.properties.GraphiteHost == "" {
w.Debugf("Skipping graphite publish without a host defined")
return nil
}

if w.properties.GraphitePort == 0 {
w.Debugf("Skipping graphite publish without a port defined")
return nil
}

if len(metric.Metrics) == 0 {
w.Debugf("Skipping graphite publish without any metrics")
return nil
}

connCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

host := fmt.Sprintf("%s:%d", w.properties.GraphiteHost, w.properties.GraphitePort)
w.Debugf("Sending %d metrics to graphite %s", len(metric.Metrics), host)
var d net.Dialer
conn, err := d.DialContext(connCtx, "tcp", host)
if err != nil {
return err
}
defer conn.Close()

now := time.Now().Unix()

for k, v := range metric.Metrics {
name := fmt.Sprintf("%s.%s", w.properties.GraphitePrefix, k)
_, err := conn.Write([]byte(fmt.Sprintf("%s %f %d\n", name, v, now)))
if err != nil {
return err
}
}

return nil
}

func (w *Watcher) CurrentState() any {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down
Loading