From 96c0538cfa29c8f3d3ef9cb6d1f689306880579d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Erbrech?= Date: Tue, 23 Aug 2022 14:54:28 +1000 Subject: [PATCH] add lockRenewedFailureCount informer (#85) --- prometheus/listener/types.go | 55 +++++++++++++++++++++++++++++-- prometheus/listener/types_test.go | 35 ++++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/prometheus/listener/types.go b/prometheus/listener/types.go index 81b675e5..4a3afb55 100644 --- a/prometheus/listener/types.go +++ b/prometheus/listener/types.go @@ -3,9 +3,9 @@ package listener import ( "fmt" - prom "github.com/prometheus/client_golang/prometheus" - servicebus "github.com/Azure/azure-service-bus-go" + prom "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) const ( @@ -15,7 +15,11 @@ const ( successLabel = "success" ) -var Metrics Recorder = newRegistry() +var ( + metricsRegistry = newRegistry() + + Metrics Recorder = metricsRegistry +) func newRegistry() *Registry { return &Registry{ @@ -110,3 +114,48 @@ func (m *Registry) IncMessageDeadlineReachedCount(msg *servicebus.Message) { labels := getMessageTypeLabel(msg) m.MessageDeadlineReachedCount.With(labels).Inc() } + +type Informer struct { + registry *Registry +} + +func NewInformer() *Informer { + return &Informer{registry: metricsRegistry} +} + +func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) { + var total float64 + collect(i.registry.MessageLockRenewedCount, func(m dto.Metric) { + if !hasLabel(m, successLabel, "false") { + return + } + total += m.GetCounter().GetValue() + }) + return total, nil +} + +func hasLabel(m dto.Metric, key string, value string) bool { + for _, pair := range m.Label { + if pair == nil { + continue + } + if pair.GetName() == key && pair.GetValue() == value { + return true + } + } + return false +} + +// collect calls the function for each metric associated with the Collector +func collect(col prom.Collector, do func(dto.Metric)) { + c := make(chan prom.Metric) + go func(c chan prom.Metric) { + col.Collect(c) + close(c) + }(c) + for x := range c { // eg range across distinct label vector values + m := dto.Metric{} + _ = x.Write(&m) + do(m) + } +} diff --git a/prometheus/listener/types_test.go b/prometheus/listener/types_test.go index ad2f06fa..b3517f7f 100644 --- a/prometheus/listener/types_test.go +++ b/prometheus/listener/types_test.go @@ -3,6 +3,7 @@ package listener import ( "testing" + servicebus "github.com/Azure/azure-service-bus-go" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" ) @@ -31,3 +32,37 @@ func TestRegistry_Init(t *testing.T) { g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) g.Expect(fRegistry.collectors).To(HaveLen(5)) } + +func TestInformer_GetMetric(t *testing.T) { + g := NewWithT(t) + r := newRegistry() + registerer := prometheus.NewRegistry() + informer := &Informer{registry: r} + + // before init + count, err := informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // after init, count 0 + g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // count incremented + msg := &servicebus.Message{} + msg.UserProperties = map[string]interface{}{ + "type": "someType", + } + r.IncMessageLockRenewedFailure(msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + + // count failure only + r.IncMessageLockRenewedSuccess(msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) +}