Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworks the prometheus metrics to adhere to best practices #5687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **General**: Add `validations.keda.sh/hpa-ownership` annotation to HPA to disable ownership validation ([#5516](https://github.com/kedacore/keda/issues/5516))
- **General**: Improve Prometheus metrics to align with best practices ([#4854](https://github.com/kedacore/keda/issues/4854))
- **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
Expand All @@ -95,7 +96,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- Various Prometheus metrics have been renamed to follow the preferred naming conventions. The old ones are still available, but will be removed in the future ([#4854](https://github.com/kedacore/keda/issues/4854)).
wozniakjan marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

Expand Down
6 changes: 3 additions & 3 deletions config/grafana/keda-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(job) (rate(keda_scaler_errors{}[5m]))",
"expr": "sum by(job) (rate(keda_scaler_detail_errors_total{}[5m]))",
"legendFormat": "{{ job }}",
"range": true,
"refId": "A"
Expand Down Expand Up @@ -313,7 +313,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(scaler) (rate(keda_scaler_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))",
"expr": "sum by(scaler) (rate(keda_scaler_detail_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))",
"legendFormat": "{{ scaler }}",
"range": true,
"refId": "A"
Expand Down Expand Up @@ -423,7 +423,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(scaledObject) (rate(keda_scaled_object_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))",
"expr": "sum by(scaledObject) (rate(keda_scaled_object_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))",
"legendFormat": "{{ scaledObject }}",
"range": true,
"refId": "A"
Expand Down
10 changes: 6 additions & 4 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package metricscollector

import (
"time"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
)

Expand All @@ -39,10 +41,10 @@ type MetricsCollector interface {
RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalerLatency create a measurement of the latency to external metric
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration)

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64)
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration)

// RecordScalerActive create a measurement of the activity of the scaler
RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool)
Expand Down Expand Up @@ -101,14 +103,14 @@ func RecordScalerMetric(namespace string, scaledObject string, scaler string, tr
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
for _, element := range collectors {
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
for _, element := range collectors {
element.RecordScalableObjectLatency(namespace, name, isScaledObject, value)
}
Expand Down
107 changes: 82 additions & 25 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"runtime"
"strconv"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -22,18 +23,22 @@ const meterName = "keda-open-telemetry-metrics"
const defaultNamespace = "default"

var (
meterProvider *metric.MeterProvider
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val
meterProvider *metric.MeterProvider
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounterDeprecated api.Int64UpDownCounter
otCrdTotalsCounterDeprecated api.Int64UpDownCounter
otTriggerRegisteredTotalsCounter api.Int64UpDownCounter
otCrdRegisteredTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val
Expand Down Expand Up @@ -95,19 +100,29 @@ func initMeters() {
otLog.Error(err, msg)
}

otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers"))
otTriggerTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.trigger.registered.count' instead"))
if err != nil {
otLog.Error(err, msg)
}

otCrdTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("Total resources"))
otTriggerRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.registered.count", api.WithDescription("Total number of triggers per trigger type registered"))
if err != nil {
otLog.Error(err, msg)
}

otCrdTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.resource.registered.count' instead"))
if err != nil {
otLog.Error(err, msg)
}

otCrdRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.registered.count", api.WithDescription("Total number of KEDA custom resources per namespace for each custom resource type (CRD) registered"))
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.value",
api.WithDescription("Metric Value used for HPA"),
api.WithDescription("The current value for each scaler's metric that would be used by the HPA in computing the target average"),
api.WithFloat64Callback(ScalerMetricValueCallback),
)
if err != nil {
Expand All @@ -116,7 +131,16 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency",
api.WithDescription("Scaler Metrics Latency"),
api.WithDescription("DEPRECATED - use `keda_scaler_metrics_latency_seconds` instead"),
api.WithFloat64Callback(ScalerMetricsLatencyCallbackDeprecated),
)
if err != nil {
otLog.Error(err, msg)
}
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency.seconds",
api.WithDescription("The latency of retrieving current metric from each scaler"),
api.WithUnit("s"),
api.WithFloat64Callback(ScalerMetricsLatencyCallback),
)
if err != nil {
Expand All @@ -125,7 +149,16 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("DEPRECATED - use `keda_internal_scale_loop_latency_seconds` instead"),
api.WithFloat64Callback(ScalableObjectLatencyCallbackDeprecated),
)
if err != nil {
otLog.Error(err, msg)
}
_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency.seconds",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithUnit("s"),
api.WithFloat64Callback(ScalableObjectLatencyCallback),
)
if err != nil {
Expand All @@ -134,7 +167,7 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithDescription("Indicates whether a scaler is active (1), or not (0)"),
api.WithFloat64Callback(ScalerActiveCallback),
)
if err != nil {
Expand Down Expand Up @@ -207,10 +240,20 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer)
return nil
}

func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption)
}
otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricsLatencyVal.val = value
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
otelScalerMetricsLatencyVal.val = value.Seconds()
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand All @@ -221,8 +264,16 @@ func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer)
return nil
}

func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption)
}
otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{}
return nil
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
Expand All @@ -233,8 +284,10 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

otelInternalLoopLatencyVal.val = value
otelInternalLoopLatencyVal.val = value.Seconds()
otelInternalLoopLatencyVal.measurementOption = opt
otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds())
otelInternalLoopLatencyValDeprecated.measurementOption = opt
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down Expand Up @@ -315,13 +368,15 @@ func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, e

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerTotalsCounterDeprecated.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerRegisteredTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
}
}

func (o *OtelMetrics) DecrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerTotalsCounterDeprecated.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerRegisteredTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
}
}

Expand All @@ -334,7 +389,8 @@ func (o *OtelMetrics) IncrementCRDTotal(crdType, namespace string) {
attribute.Key("type").String(crdType),
)

otCrdTotalsCounter.Add(context.Background(), 1, opt)
otCrdTotalsCounterDeprecated.Add(context.Background(), 1, opt)
otCrdRegisteredTotalsCounter.Add(context.Background(), 1, opt)
}

func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
Expand All @@ -346,7 +402,8 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
attribute.Key("namespace").String(namespace),
attribute.Key("type").String(crdType),
)
otCrdTotalsCounter.Add(context.Background(), -1, opt)
otCrdTotalsCounterDeprecated.Add(context.Background(), -1, opt)
otCrdRegisteredTotalsCounter.Add(context.Background(), -1, opt)
}

func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption {
Expand Down
35 changes: 29 additions & 6 deletions pkg/metricscollector/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metricscollector
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/sdk/metric"
Expand Down Expand Up @@ -59,11 +60,11 @@ func TestIncrementTriggerTotal(t *testing.T) {
assert.Nil(t, err)
scopeMetrics := got.ScopeMetrics[0]
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)
buildInfo := retrieveMetric(scopeMetrics.Metrics, "keda.trigger.totals")
triggercount := retrieveMetric(scopeMetrics.Metrics, "keda.trigger.registered.count")

assert.NotNil(t, buildInfo)
assert.NotNil(t, triggercount)

data := buildInfo.Data.(metricdata.Sum[int64]).DataPoints[0]
data := triggercount.Data.(metricdata.Sum[int64]).DataPoints[0]
assert.Equal(t, data.Value, int64(1))

testOtel.DecrementTriggerTotal("testtrigger")
Expand All @@ -72,10 +73,32 @@ func TestIncrementTriggerTotal(t *testing.T) {
assert.Nil(t, err)
scopeMetrics = got.ScopeMetrics[0]
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)
buildInfo = retrieveMetric(scopeMetrics.Metrics, "keda.trigger.totals")
triggercount = retrieveMetric(scopeMetrics.Metrics, "keda.trigger.registered.count")

assert.NotNil(t, buildInfo)
assert.NotNil(t, triggercount)

data = buildInfo.Data.(metricdata.Sum[int64]).DataPoints[0]
data = triggercount.Data.(metricdata.Sum[int64]).DataPoints[0]
assert.Equal(t, data.Value, int64(0))
}

func TestLoopLatency(t *testing.T) {
testOtel.RecordScalableObjectLatency("namespace", "name", true, 500*time.Millisecond)
got := metricdata.ResourceMetrics{}
err := testReader.Collect(context.Background(), &got)

assert.Nil(t, err)
scopeMetrics := got.ScopeMetrics[0]
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)

latency := retrieveMetric(scopeMetrics.Metrics, "keda.internal.scale.loop.latency")
assert.NotNil(t, latency)
assert.Equal(t, latency.Unit, "")
data := latency.Data.(metricdata.Gauge[float64]).DataPoints[0]
assert.Equal(t, data.Value, float64(500))

latencySeconds := retrieveMetric(scopeMetrics.Metrics, "keda.internal.scale.loop.latency.seconds")
assert.NotNil(t, latencySeconds)
assert.Equal(t, latencySeconds.Unit, "s")
data = latencySeconds.Data.(metricdata.Gauge[float64]).DataPoints[0]
assert.Equal(t, data.Value, float64(0.5))
}
Loading
Loading