From ae4c59aa5e0d7c2730d215d28d9d8e42bbbb09ef Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Tue, 6 Aug 2024 20:42:45 -0400 Subject: [PATCH] [lockrenewer] Add prom metric for lock renewal timeouts (#244) * add prom metric for renewal timeouts * fix ut --- v2/lockrenewer.go | 2 ++ v2/metrics/processor/types.go | 37 ++++++++++++++++++++++++------ v2/metrics/processor/types_test.go | 12 +++++++++- v2/metrics/registry_test.go | 2 +- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/v2/lockrenewer.go b/v2/lockrenewer.go index 1e866ec5..2ce8d1cf 100644 --- a/v2/lockrenewer.go +++ b/v2/lockrenewer.go @@ -206,6 +206,8 @@ func (plr *peekLockRenewer) renewMessageLock(ctx context.Context, message *azser if !errors.Is(renewErr, context.DeadlineExceeded) { return renewErr } + // renewErr is context.DeadlineExceeded, increment metric and retry + plr.metrics.IncMessageLockRenewalTimeoutCount(message) } // lock is expired or message context is done if ctx.Err() != nil { diff --git a/v2/metrics/processor/types.go b/v2/metrics/processor/types.go index 0fc62be7..b7d7ed22 100644 --- a/v2/metrics/processor/types.go +++ b/v2/metrics/processor/types.go @@ -45,9 +45,14 @@ func NewRegistry() *Registry { Help: "total number of message lock renewal", Subsystem: subsystem, }, []string{messageTypeLabel, successLabel}), + MessageLockRenewalTimeoutCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_lock_renewal_timeout_total", + Help: "total number of message lock renewal calls that timed out", + Subsystem: subsystem, + }, []string{messageTypeLabel}), MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{ Name: "message_deadline_reached_total", - Help: "total number of message lock renewal", + Help: "total number of message lock renewal that reached deadline", Subsystem: subsystem, }, []string{messageTypeLabel}), HealthCheckCount: prom.NewCounterVec(prom.CounterOpts{ @@ -76,6 +81,7 @@ func (m *Registry) Init(reg prom.Registerer) { m.MessageReceivedCount, m.MessageHandledCount, m.MessageLockRenewedCount, + m.MessageLockRenewalTimeoutCount, m.MessageDeadlineReachedCount, m.HealthCheckCount, m.ConcurrentMessageCount) @@ -83,12 +89,13 @@ func (m *Registry) Init(reg prom.Registerer) { // Registry provides the prometheus metrics for the message processor type Registry struct { - MessageReceivedCount *prom.CounterVec - MessageHandledCount *prom.CounterVec - MessageLockRenewedCount *prom.CounterVec - MessageDeadlineReachedCount *prom.CounterVec - HealthCheckCount *prom.CounterVec - ConcurrentMessageCount *prom.GaugeVec + MessageReceivedCount *prom.CounterVec + MessageHandledCount *prom.CounterVec + MessageLockRenewedCount *prom.CounterVec + MessageLockRenewalTimeoutCount *prom.CounterVec + MessageDeadlineReachedCount *prom.CounterVec + HealthCheckCount *prom.CounterVec + ConcurrentMessageCount *prom.GaugeVec } // Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime. @@ -97,6 +104,7 @@ type Recorder interface { IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) + IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage) IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage) IncMessageReceived(receiverName string, count float64) IncHealthCheckSuccessCount(namespace, entity, subscription string) @@ -141,6 +149,12 @@ func (m *Registry) DecConcurrentMessageCount(receiverName string, msg *azservice m.ConcurrentMessageCount.With(labels).Dec() } +// IncMessageDeadlineReachedCount increases the message deadline reached counter +func (m *Registry) IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + m.MessageLockRenewalTimeoutCount.With(labels).Inc() +} + // IncMessageDeadlineReachedCount increases the message deadline reached counter func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) { labels := getMessageTypeLabel(msg) @@ -201,6 +215,15 @@ func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) { return total, nil } +// GetMessageLockRenewalTimeoutCount retrieves the current value of the MessageLockRenewalTimeoutCount metric +func (i *Informer) GetMessageLockRenewalTimeoutCount() (float64, error) { + var total float64 + common.Collect(i.registry.MessageLockRenewalTimeoutCount, func(m *dto.Metric) { + total += m.GetCounter().GetValue() + }) + return total, nil +} + // GetHealthCheckSuccessCount retrieves the current value of the HealthCheckSuccessCount metric func (i *Informer) GetHealthCheckSuccessCount(namespace, entity, subscription string) (float64, error) { var total float64 diff --git a/v2/metrics/processor/types_test.go b/v2/metrics/processor/types_test.go index e24ab39f..bd4f7e8e 100644 --- a/v2/metrics/processor/types_test.go +++ b/v2/metrics/processor/types_test.go @@ -30,7 +30,7 @@ func TestRegistry_Init(t *testing.T) { fRegistry := &fakeRegistry{} g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) - g.Expect(fRegistry.collectors).To(HaveLen(6)) + g.Expect(fRegistry.collectors).To(HaveLen(7)) Metric.IncMessageReceived("testReceiverName", 10) } @@ -68,18 +68,28 @@ func TestLockRenewalMetrics(t *testing.T) { count, err := informer.GetMessageLockRenewedFailureCount() g.Expect(err).ToNot(HaveOccurred()) g.Expect(count).To(Equal(float64(0))) + count, err = informer.GetMessageLockRenewalTimeoutCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) // after init, count 0 g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) count, err = informer.GetMessageLockRenewedFailureCount() g.Expect(err).ToNot(HaveOccurred()) g.Expect(count).To(Equal(float64(0))) + count, err = informer.GetMessageLockRenewalTimeoutCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) // count incremented r.IncMessageLockRenewedFailure(tc.msg) count, err = informer.GetMessageLockRenewedFailureCount() g.Expect(err).ToNot(HaveOccurred()) g.Expect(count).To(Equal(float64(1))) + r.IncMessageLockRenewalTimeoutCount(tc.msg) + count, err = informer.GetMessageLockRenewalTimeoutCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) // count failure only r.IncMessageLockRenewedSuccess(tc.msg) diff --git a/v2/metrics/registry_test.go b/v2/metrics/registry_test.go index 96909d6a..8a4948f0 100644 --- a/v2/metrics/registry_test.go +++ b/v2/metrics/registry_test.go @@ -27,5 +27,5 @@ func TestRegister(t *testing.T) { g := NewWithT(t) reg := &fakeRegistry{} g.Expect(func() { Register(reg) }).ToNot(Panic()) - g.Expect(reg.collectors).To(HaveLen(8)) + g.Expect(reg.collectors).To(HaveLen(9)) }