Skip to content

Commit

Permalink
[lockrenewer] Add prom metric for lock renewal timeouts (#244)
Browse files Browse the repository at this point in the history
* add prom metric for renewal timeouts

* fix ut
  • Loading branch information
karenychen authored Aug 7, 2024
1 parent 30771b5 commit ae4c59a
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 9 deletions.
2 changes: 2 additions & 0 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (plr *peekLockRenewer) renewMessageLock(ctx context.Context, message *azser
if !errors.Is(renewErr, context.DeadlineExceeded) {
return renewErr
}
// renewErr is context.DeadlineExceeded, increment metric and retry
plr.metrics.IncMessageLockRenewalTimeoutCount(message)
}
// lock is expired or message context is done
if ctx.Err() != nil {
Expand Down
37 changes: 30 additions & 7 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@ func NewRegistry() *Registry {
Help: "total number of message lock renewal",
Subsystem: subsystem,
}, []string{messageTypeLabel, successLabel}),
MessageLockRenewalTimeoutCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_lock_renewal_timeout_total",
Help: "total number of message lock renewal calls that timed out",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_deadline_reached_total",
Help: "total number of message lock renewal",
Help: "total number of message lock renewal that reached deadline",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
HealthCheckCount: prom.NewCounterVec(prom.CounterOpts{
Expand Down Expand Up @@ -76,19 +81,21 @@ func (m *Registry) Init(reg prom.Registerer) {
m.MessageReceivedCount,
m.MessageHandledCount,
m.MessageLockRenewedCount,
m.MessageLockRenewalTimeoutCount,
m.MessageDeadlineReachedCount,
m.HealthCheckCount,
m.ConcurrentMessageCount)
}

// Registry provides the prometheus metrics for the message processor
type Registry struct {
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
MessageLockRenewedCount *prom.CounterVec
MessageDeadlineReachedCount *prom.CounterVec
HealthCheckCount *prom.CounterVec
ConcurrentMessageCount *prom.GaugeVec
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
MessageLockRenewedCount *prom.CounterVec
MessageLockRenewalTimeoutCount *prom.CounterVec
MessageDeadlineReachedCount *prom.CounterVec
HealthCheckCount *prom.CounterVec
ConcurrentMessageCount *prom.GaugeVec
}

// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime.
Expand All @@ -97,6 +104,7 @@ type Recorder interface {
IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage)
IncMessageReceived(receiverName string, count float64)
IncHealthCheckSuccessCount(namespace, entity, subscription string)
Expand Down Expand Up @@ -141,6 +149,12 @@ func (m *Registry) DecConcurrentMessageCount(receiverName string, msg *azservice
m.ConcurrentMessageCount.With(labels).Dec()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
func (m *Registry) IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
m.MessageLockRenewalTimeoutCount.With(labels).Inc()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
Expand Down Expand Up @@ -201,6 +215,15 @@ func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) {
return total, nil
}

// GetMessageLockRenewalTimeoutCount retrieves the current value of the MessageLockRenewalTimeoutCount metric
func (i *Informer) GetMessageLockRenewalTimeoutCount() (float64, error) {
var total float64
common.Collect(i.registry.MessageLockRenewalTimeoutCount, func(m *dto.Metric) {
total += m.GetCounter().GetValue()
})
return total, nil
}

// GetHealthCheckSuccessCount retrieves the current value of the HealthCheckSuccessCount metric
func (i *Informer) GetHealthCheckSuccessCount(namespace, entity, subscription string) (float64, error) {
var total float64
Expand Down
12 changes: 11 additions & 1 deletion v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRegistry_Init(t *testing.T) {
fRegistry := &fakeRegistry{}
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(6))
g.Expect(fRegistry.collectors).To(HaveLen(7))
Metric.IncMessageReceived("testReceiverName", 10)
}

Expand Down Expand Up @@ -68,18 +68,28 @@ func TestLockRenewalMetrics(t *testing.T) {
count, err := informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))

// after init, count 0
g.Expect(func() { r.Init(registerer) }).ToNot(Panic())
count, err = informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))

// count incremented
r.IncMessageLockRenewedFailure(tc.msg)
count, err = informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(1)))
r.IncMessageLockRenewalTimeoutCount(tc.msg)
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(1)))

// count failure only
r.IncMessageLockRenewedSuccess(tc.msg)
Expand Down
2 changes: 1 addition & 1 deletion v2/metrics/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ func TestRegister(t *testing.T) {
g := NewWithT(t)
reg := &fakeRegistry{}
g.Expect(func() { Register(reg) }).ToNot(Panic())
g.Expect(reg.collectors).To(HaveLen(8))
g.Expect(reg.collectors).To(HaveLen(9))
}

0 comments on commit ae4c59a

Please sign in to comment.