Skip to content

Commit

Permalink
add additional label consumer name on dispatching metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
k15r authored and muralov committed Nov 13, 2023
1 parent 565f0e2 commit f0a63d9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
13 changes: 9 additions & 4 deletions pkg/backend/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam
js.namedLogger().Errorw("Failed to convert sink value to string", "sinkValue", sinkValue)
return
}
ci, err := msg.Sub.ConsumerInfo()
if err != nil {
js.namedLogger().Errorw("Failed to extract consumer info", "error", err)
return
}
ce, err := backendutils.ConvertMsgToCE(msg)
if err != nil {
js.namedLogger().Errorw("Failed to convert JetStream message to CloudEvent", "error", err)
Expand Down Expand Up @@ -533,8 +538,8 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam
status = res.StatusCode
}

js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), sink, status)
js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), sink, status)
js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status)
js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status)

// NAK the msg with a delay so it is redelivered after jsConsumerNakDelay period.
if err := msg.NakWithDelay(jsConsumerNakDelay); err != nil {
Expand All @@ -556,8 +561,8 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam
status = res.StatusCode
}

js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), sink, status)
js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), sink, status)
js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status)
js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status)
ceLogger.Debugw("CloudEvent was dispatched")
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/backend/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func NewCollector() *Collector {
Name: deliveryMetricKey,
Help: deliveryMetricHelp,
},
[]string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel},
[]string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel, consumerNameLabel},
),
latencyPerSubscriber: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: latencyMetricKey,
Help: latencyMetricHelp,
Buckets: prometheus.ExponentialBuckets(0.002, 2, 10),
},
[]string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel},
[]string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel, consumerNameLabel},
),
eventTypes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -127,26 +127,28 @@ func (c *Collector) RegisterMetrics() {
}

// RecordDeliveryPerSubscription records an eventing_ec_nats_delivery_per_subscription_total metric.
func (c *Collector) RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, eventType, sink string, statusCode int) {
func (c *Collector) RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, eventType, consumerName, sink string, statusCode int) {
c.deliveryPerSubscription.WithLabelValues(
subscriptionName,
subscriptionNamespace,
eventType,
fmt.Sprintf("%v", sink),
fmt.Sprintf("%v", statusCode)).Inc()
fmt.Sprintf("%v", statusCode),
consumerName).Inc()
}

// RecordLatencyPerSubscription records an eventing_ec_nats_subscriber_dispatch_duration_seconds.
func (c *Collector) RecordLatencyPerSubscription(
duration time.Duration,
subscriptionName, subscriptionNamespace, eventType, sink string,
subscriptionName, subscriptionNamespace, eventType, consumerName, sink string,
statusCode int) {
c.latencyPerSubscriber.WithLabelValues(
subscriptionName,
subscriptionNamespace,
eventType,
fmt.Sprintf("%v", sink),
fmt.Sprintf("%v", statusCode)).Observe(duration.Seconds())
fmt.Sprintf("%v", statusCode),
consumerName).Observe(duration.Seconds())
}

// RecordEventTypes records a eventing_ec_event_type_subscribed_total metric.
Expand Down

0 comments on commit f0a63d9

Please sign in to comment.