diff --git a/pkg/backend/jetstream/jetstream.go b/pkg/backend/jetstream/jetstream.go index 4cf052a4..ded9e7ac 100644 --- a/pkg/backend/jetstream/jetstream.go +++ b/pkg/backend/jetstream/jetstream.go @@ -502,6 +502,11 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam js.namedLogger().Errorw("Failed to convert sink value to string", "sinkValue", sinkValue) return } + ci, err := msg.Sub.ConsumerInfo() + if err != nil { + js.namedLogger().Errorw("Failed to extract consumer info", "error", err) + return + } ce, err := backendutils.ConvertMsgToCE(msg) if err != nil { js.namedLogger().Errorw("Failed to convert JetStream message to CloudEvent", "error", err) @@ -533,8 +538,8 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam status = res.StatusCode } - js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), sink, status) - js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), sink, status) + js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status) + js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status) // NAK the msg with a delay so it is redelivered after jsConsumerNakDelay period. if err := msg.NakWithDelay(jsConsumerNakDelay); err != nil { @@ -556,8 +561,8 @@ func (js *JetStream) getCallback(subKeyPrefix, subscriptionName, subscriptionNam status = res.StatusCode } - js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), sink, status) - js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), sink, status) + js.metricsCollector.RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status) + js.metricsCollector.RecordLatencyPerSubscription(duration, subscriptionName, subscriptionNamespace, ce.Type(), ci.Config.Name, sink, status) ceLogger.Debugw("CloudEvent was dispatched") } } diff --git a/pkg/backend/metrics/collector.go b/pkg/backend/metrics/collector.go index 4df23a51..7c9a2c4f 100644 --- a/pkg/backend/metrics/collector.go +++ b/pkg/backend/metrics/collector.go @@ -62,7 +62,7 @@ func NewCollector() *Collector { Name: deliveryMetricKey, Help: deliveryMetricHelp, }, - []string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel}, + []string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel, consumerNameLabel}, ), latencyPerSubscriber: prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -70,7 +70,7 @@ func NewCollector() *Collector { Help: latencyMetricHelp, Buckets: prometheus.ExponentialBuckets(0.002, 2, 10), }, - []string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel}, + []string{subscriptionNameLabel, subscriptionNamespaceLabel, eventTypeLabel, sinkLabel, responseCodeLabel, consumerNameLabel}, ), eventTypes: prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -127,26 +127,28 @@ func (c *Collector) RegisterMetrics() { } // RecordDeliveryPerSubscription records an eventing_ec_nats_delivery_per_subscription_total metric. -func (c *Collector) RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, eventType, sink string, statusCode int) { +func (c *Collector) RecordDeliveryPerSubscription(subscriptionName, subscriptionNamespace, eventType, consumerName, sink string, statusCode int) { c.deliveryPerSubscription.WithLabelValues( subscriptionName, subscriptionNamespace, eventType, fmt.Sprintf("%v", sink), - fmt.Sprintf("%v", statusCode)).Inc() + fmt.Sprintf("%v", statusCode), + consumerName).Inc() } // RecordLatencyPerSubscription records an eventing_ec_nats_subscriber_dispatch_duration_seconds. func (c *Collector) RecordLatencyPerSubscription( duration time.Duration, - subscriptionName, subscriptionNamespace, eventType, sink string, + subscriptionName, subscriptionNamespace, eventType, consumerName, sink string, statusCode int) { c.latencyPerSubscriber.WithLabelValues( subscriptionName, subscriptionNamespace, eventType, fmt.Sprintf("%v", sink), - fmt.Sprintf("%v", statusCode)).Observe(duration.Seconds()) + fmt.Sprintf("%v", statusCode), + consumerName).Observe(duration.Seconds()) } // RecordEventTypes records a eventing_ec_event_type_subscribed_total metric.