Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FetchAllHostsMetrics for SignalFx using new API #25

Merged
merged 3 commits into from
Mar 3, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 241 additions & 25 deletions pkg/watcher/internal/metricsprovider/signalfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
Expand All @@ -33,17 +34,17 @@ import (

const (
// SignalFX Request Params
DefaultSignalFxAddress = "https://api.signalfx.com"
signalFxMetricsAPI = "/v1/timeserieswindow"
// SignalFx adds a suffix to hostnames if configured
signalFxHostNameSuffix = ".group.region.gcp.com"
signalFxHostFilter = "host:"

DefaultSignalFxAddress = "https://api.signalfx.com"
signalFxMetricsAPI = "/v1/timeserieswindow"
signalFxMetdataAPI = "/v2/metrictimeseries"
signalFxHostFilter = "host:"
signalFxHostNameSuffixKey = "SIGNALFX_HOST_NAME_SUFFIX"
// SignalFX Query Params
oneMinuteResolutionMs = 60000
cpuUtilizationMetric = `sf_metric:"cpu.utilization"`
memoryUtilizationMetric = `sf_metric:"memory.utilization"`
AND = "AND"
resultSetLimit = "10000"

// Miscellaneous
httpClientTimeout = 55 * time.Second
Expand All @@ -53,6 +54,7 @@ type signalFxClient struct {
client http.Client
authToken string
signalFxAddress string
hostNameSuffix string
}

func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProviderClient, error) {
Expand All @@ -62,7 +64,7 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide
tlsConfig := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO(aqadeer): Figure out a secure way to let users add SSL certs
}

hostNameSuffix, _ := os.LookupEnv(signalFxHostNameSuffixKey)
var signalFxAddress, signalFxAuthToken = DefaultSignalFxAddress, ""
if opts.Address != "" {
signalFxAddress = opts.Address
Expand All @@ -77,7 +79,8 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide
Timeout: httpClientTimeout,
Transport: tlsConfig},
authToken: signalFxAuthToken,
signalFxAddress: signalFxAddress}, nil
signalFxAddress: signalFxAddress,
hostNameSuffix: hostNameSuffix}, nil
}

func (s signalFxClient) Name() string {
Expand All @@ -87,7 +90,7 @@ func (s signalFxClient) Name() string {
func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
log.Debugf("fetching metrics for host %v", host)
var metrics []watcher.Metric
hostQuery := signalFxHostFilter + host + signalFxHostNameSuffix
hostQuery := signalFxHostFilter + host + s.hostNameSuffix

for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} {
uri, err := s.buildMetricURL(hostQuery, metric, window)
Expand All @@ -113,16 +116,7 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([
}

var fetchedMetric watcher.Metric
// Added default operator and rollup for signalfx client.
fetchedMetric.Operator = watcher.Average
fetchedMetric.Rollup = window.Duration
if metric == cpuUtilizationMetric {
fetchedMetric.Name = cpuUtilizationMetric
fetchedMetric.Type = watcher.CPU
} else {
fetchedMetric.Name = memoryUtilizationMetric
fetchedMetric.Type = watcher.Memory
}
addMetadata(&fetchedMetric, metric)
fetchedMetric.Value, err = decodeMetricsPayload(res)
if err != nil {
return metrics, err
Expand All @@ -133,9 +127,74 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([
return metrics, nil
}

// TODO(aqadeer): Fetching metrics for all hosts is not possible currently via timeserieswindow SignalFx API
func (s signalFxClient) FetchAllHostsMetrics(*watcher.Window) (map[string][]watcher.Metric, error) {
return nil, errors.New("This function is not yet implemented")
func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) {
hostQuery := signalFxHostFilter + "*" + s.hostNameSuffix
metrics := make(map[string][]watcher.Metric)
for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} {
uri, err := s.buildMetricURL(hostQuery, metric, window)
if err != nil {
return metrics, err
}
req, _ := http.NewRequest(http.MethodGet, uri.String(), nil)
req.Header.Set("X-SF-Token", s.authToken)
req.Header.Set("Content-Type", "application/json")

metricResp, err := s.client.Do(req)
if err != nil {
return metrics, err
}
defer metricResp.Body.Close()
if metricResp.StatusCode != http.StatusOK {
return metrics, fmt.Errorf("received status code: %v", metricResp.StatusCode)
}
var metricPayload interface{}
err = json.NewDecoder(metricResp.Body).Decode(&metricPayload)
if err != nil {
return metrics, err
}

uri, err = s.buildMetadataURL(hostQuery, metric)
if err != nil {
return metrics, err
}
req, _ = http.NewRequest(http.MethodGet, uri.String(), nil)
req.Header.Set("X-SF-Token", s.authToken)
req.Header.Set("Content-Type", "application/json")

metadataResp, err := s.client.Do(req)
if err != nil {
return metrics, err
}
defer metadataResp.Body.Close()
if metadataResp.StatusCode != http.StatusOK {
return metrics, fmt.Errorf("received status code: %v", metadataResp.StatusCode)
}
var metadataPayload interface{}
err = json.NewDecoder(metadataResp.Body).Decode(&metadataPayload)
if err != nil {
return metrics, err
}
mappedMetrics, err := getMetricsFromPayloads(metricPayload, metadataPayload)
if err != nil {
return metrics, err
}
for k, v := range mappedMetrics {
addMetadata(&v, metric)
metrics[k] = append(metrics[k], v)
}
}
return metrics, nil
}

func addMetadata(metric *watcher.Metric, metricType string) {
metric.Operator = watcher.Average
if metricType == cpuUtilizationMetric {
metric.Name = cpuUtilizationMetric
metric.Type = watcher.CPU
} else {
metric.Name = memoryUtilizationMetric
metric.Type = watcher.Memory
}
}

func (s signalFxClient) buildMetricURL(host string, metric string, window *watcher.Window) (uri *url.URL, err error) {
Expand All @@ -150,14 +209,30 @@ func (s signalFxClient) buildMetricURL(host string, metric string, window *watch
builder.WriteString(fmt.Sprintf(" %v ", AND))
builder.WriteString(metric)
q.Set("query", builder.String())

q.Set("startMs", strconv.FormatInt(window.Start, 10))
q.Set("endMs", strconv.FormatInt(window.End, 10))
q.Set("startMs", strconv.FormatInt(window.Start*1000, 10))
q.Set("endMs", strconv.FormatInt(window.End*1000, 10))
q.Set("resolution", strconv.Itoa(oneMinuteResolutionMs))
uri.RawQuery = q.Encode()
return
}

func (s signalFxClient) buildMetadataURL(host string, metric string) (uri *url.URL, err error) {
uri, err = url.Parse(s.signalFxAddress + signalFxMetdataAPI)
if err != nil {
return nil, err
}
q := uri.Query()

builder := strings.Builder{}
builder.WriteString(host)
builder.WriteString(fmt.Sprintf(" %v ", AND))
builder.WriteString(metric)
q.Set("query", builder.String())
q.Set("limit", resultSetLimit)
uri.RawQuery = q.Encode()
return
}

/**
Sample payload:
{
Expand Down Expand Up @@ -206,3 +281,144 @@ func decodeMetricsPayload(payload interface{}) (float64, error) {
}
return timestampUtilisation[1].(float64), nil
}

/**
Sample metricData payload:
{
"data": {
"Ehql_bxBgAc": [
[
1600213380000,
84.64246793530153
]
],
"EuXgJm7BkAA": [
[
1614634260000,
5.450946379084264
]
],
....
....
},
"errors": []
}

https://dev.splunk.com/observability/reference/api/metrics_metadata/latest#endpoint-retrieve-metric-timeseries-metadata
Sample metaData payload:
{
"count": 5,
"partialCount": false,
"results": [
{
"active": true,
"created": 1614534848000,
"creator": null,
"dimensions": {
"host": "test.dev.com",
"sf_metric": null
},
"id": "EvVH6P7BgAA",
"lastUpdated": 0,
"lastUpdatedBy": null,
"metric": "cpu.utilization"
},
....
....
]
}
*/
func getMetricsFromPayloads(metricData interface{}, metadata interface{}) (map[string]watcher.Metric, error) {
keyHostMap := make(map[string]string)
hostMetricMap := make(map[string]watcher.Metric)
if _, ok := metadata.(map[string]interface{}); !ok {
return hostMetricMap, fmt.Errorf("type conversion failed, found %T", metadata)
}
results := metadata.(map[string]interface{})["results"]
if results == nil {
return hostMetricMap, errors.New("unexpected payload: missing results field")
}

for _, v := range results.([]interface{}) {
_, ok := v.(map[string]interface{})
if !ok {
log.Errorf("type conversion failed, found %T", v)
continue
}
id := v.(map[string]interface{})["id"]
if id == nil {
log.Errorf("id not found in %v", v)
continue
}
_, ok = id.(string)
if !ok {
log.Errorf("id not expected type string, found %T", id)
continue
}
dimensions := v.(map[string]interface{})["dimensions"]
if dimensions == nil {
log.Errorf("no dimensions found in %v", v)
continue
}
_, ok = dimensions.(map[string]interface{})
if !ok {
log.Errorf("type conversion failed, found %T", dimensions)
continue
}
host := dimensions.(map[string]interface{})["host"]
if host == nil {
log.Errorf("no host found in %v", dimensions)
continue
}
if _, ok := host.(string); !ok {
log.Errorf("host not expected type string, found %T", host)
}
keyHostMap[id.(string)] = host.(string)
}

var data interface{}
data = metricData.(map[string]interface{})["data"]
if data == nil {
return hostMetricMap, errors.New("unexpected payload: missing data field")
}
keyMetricMap, ok := data.(map[string]interface{})
if !ok {
return hostMetricMap, errors.New("unable to deserialise data field")
}
for key, metric := range keyMetricMap {
if _, ok := keyHostMap[key]; !ok {
log.Errorf("no metadata found for key %v", key)
continue
}
values, ok := metric.([]interface{})
if !ok {
log.Errorf("unable to deserialise values for key %v", key)
continue
}
if len(values) == 0 {
log.Errorf("no metric value array could be decoded for key %v", key)
continue
}
// Find the average across returned values per 1 minute resolution
var sum float64
var count float64
for _, value := range values {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing, we may want to break this arithmetic mean calculation into its own function but not a blocker for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure will do it in future PR when I add tests #26

var timestampUtilisation []interface{}
timestampUtilisation, ok = value.([]interface{})
if !ok || len(timestampUtilisation) < 2 {
log.Errorf("unable to deserialise metric values for key %v", key)
continue
}
if _, ok := timestampUtilisation[1].(float64); !ok {
log.Errorf("unable to typecast value to float64: %v of type %T", timestampUtilisation, timestampUtilisation)
}
sum += timestampUtilisation[1].(float64)
count += 1
}

fetchedMetric := watcher.Metric{Value: sum / count}
hostMetricMap[keyHostMap[key]] = fetchedMetric
}

return hostMetricMap, nil
}