From 89de365ccb89e3ae7d11b59a6b4de28bbdaeed3c Mon Sep 17 00:00:00 2001 From: Adi Muraru Date: Mon, 2 Aug 2021 14:14:39 +0300 Subject: [PATCH] Export number of members and assigned partitions for each topic in a consumer group Added consumer_group_empty_members, consumer_group_topic_members and consumer_group_topic_assigned_partitions metrics. Fixes #104 --- docs/metrics.md | 12 +++++ prometheus/collect_consumer_groups.go | 71 ++++++++++++++++++++++++++- prometheus/exporter.go | 37 ++++++++++++-- 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index ea29791..6e390ab 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -71,6 +71,18 @@ kminion_kafka_topic_high_water_mark_sum{topic_name="__consumer_offsets"} 1.51202 # TYPE kminion_kafka_consumer_group_info gauge kminion_kafka_consumer_group_info{coordinator_id="0",group_id="bigquery-sink",member_count="2",protocol="range",protocol_type="consumer",state="Stable"} 1 +# HELP kminion_kafka_consumer_group_empty_members Consumer Group Empty Members. It will report the number of members in the consumer group with no partition assigned +# TYPE kminion_kafka_consumer_group_empty_members gauge +kminion_kafka_consumer_group_empty_members{group_id="bigquery-sink"} 1 + +# HELP kminion_kafka_consumer_group_topic_members Consumer Group topic member count metrics. It will report the number of members in the consumer group assigned on a given topic +# TYPE kminion_kafka_consumer_group_topic_members gauge +kminion_kafka_consumer_group_topic_members{group_id="bigquery-sink",topic_name="shop-activity"} 4 + +# HELP kminion_kafka_consumer_group_topic_assigned_partitions Consumer Group topic partitions count metrics. It will report the number of partitions assigned in the consumer group for a given topic +# TYPE kminion_kafka_consumer_group_topic_assigned_partitions gauge +kminion_kafka_consumer_group_topic_assigned_partitions{group_id="bigquery-sink",topic_name="shop-activity"} 32 + # HELP kminion_kafka_consumer_group_topic_offset_sum The sum of all committed group offsets across all partitions in a topic # TYPE kminion_kafka_consumer_group_topic_offset_sum gauge kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_name="shop-activity"} 4.259513e+06 diff --git a/prometheus/collect_consumer_groups.go b/prometheus/collect_consumer_groups.go index a3c600b..a1fdbe0 100644 --- a/prometheus/collect_consumer_groups.go +++ b/prometheus/collect_consumer_groups.go @@ -2,10 +2,12 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- prometheus.Metric) bool { @@ -18,7 +20,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe return false } - // The list of groups may be incomplete due to group coordinators that might fail to respond. We do log a error + // The list of groups may be incomplete due to group coordinators that might fail to respond. We do log an error // message in that case (in the kafka request method) and groups will not be included in this list. for _, grp := range groups { coordinator := grp.BrokerMetadata.NodeID @@ -49,6 +51,71 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe group.State, strconv.FormatInt(int64(coordinator), 10), ) + + // iterate all members and build two maps: + // - {topic -> number-of-consumers} + // - {topic -> number-of-partitions-assigned} + topicConsumers := make(map[string]int) + topicPartitionsAssigned := make(map[string]int) + membersWithEmptyAssignment := 0 + failedAssignmentsDecode := int64(0) + for _, member := range group.Members { + kassignment := kmsg.NewGroupMemberAssignment() + if err := kassignment.ReadFrom(member.MemberAssignment); err != nil { + e.logger.Debug("failed to decode consumer group member assignment, internal kafka error", + zap.Error(err), + zap.String("group_id", group.Group), + zap.String("client_id", member.ClientID), + zap.String("member_id", member.MemberID), + zap.String("client_host", member.ClientHost), + ) + failedAssignmentsDecode++ + continue + } + if len(kassignment.Topics) == 0 { + membersWithEmptyAssignment++ + } + for _, topic := range kassignment.Topics { + topicConsumers[topic.Topic]++ + topicPartitionsAssigned[topic.Topic] += len(topic.Partitions) + } + } + if failedAssignmentsDecode > 0 { + e.logger.Error("failed to decode consumer group member assignment, internal kafka error", + zap.Error(err), + zap.String("group_id", group.Group), + zap.Int64("assignment_decode_failures", failedAssignmentsDecode), + ) + } + // number of members with no assignment in a stable consumer group + if membersWithEmptyAssignment > 0 && group.State == "Stable" { + ch <- prometheus.MustNewConstMetric( + e.consumerGroupMembersEmpty, + prometheus.GaugeValue, + float64(membersWithEmptyAssignment), + group.Group, + ) + } + // number of members in consumer groups for each topic + for topicName, consumers := range topicConsumers { + ch <- prometheus.MustNewConstMetric( + e.consumerGroupTopicMembers, + prometheus.GaugeValue, + float64(consumers), + group.Group, + topicName, + ) + } + // number of partitions assigned in consumer groups for each topic + for topicName, partitions := range topicPartitionsAssigned { + ch <- prometheus.MustNewConstMetric( + e.consumerGroupAssignedTopicPartitions, + prometheus.GaugeValue, + float64(partitions), + group.Group, + topicName, + ) + } } } return true diff --git a/prometheus/exporter.go b/prometheus/exporter.go index 0795c34..b1c19ee 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -39,11 +39,14 @@ type Exporter struct { partitionLowWaterMark *prometheus.Desc // Consumer Groups - consumerGroupInfo *prometheus.Desc - consumerGroupTopicOffsetSum *prometheus.Desc - consumerGroupTopicPartitionLag *prometheus.Desc - consumerGroupTopicLag *prometheus.Desc - offsetCommits *prometheus.Desc + consumerGroupInfo *prometheus.Desc + consumerGroupMembersEmpty *prometheus.Desc + consumerGroupTopicMembers *prometheus.Desc + consumerGroupAssignedTopicPartitions *prometheus.Desc + consumerGroupTopicOffsetSum *prometheus.Desc + consumerGroupTopicPartitionLag *prometheus.Desc + consumerGroupTopicLag *prometheus.Desc + offsetCommits *prometheus.Desc } func NewExporter(cfg Config, logger *zap.Logger, minionSvc *minion.Service) (*Exporter, error) { @@ -147,6 +150,30 @@ func (e *Exporter) InitializeMetrics() { []string{"group_id", "member_count", "protocol", "protocol_type", "state", "coordinator_id"}, nil, ) + // Group Empty Memmbers + e.consumerGroupMembersEmpty = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"), + "Consumer Group Empty Members. "+ + "It will report the number of members in the consumer group with no partition assigned", + []string{"group_id"}, + nil, + ) + // Group Topic Members + e.consumerGroupTopicMembers = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_members"), + "Consumer Group topic member count metrics. "+ + "It will report the number of members in the consumer group assigned on a given topic", + []string{"group_id", "topic_name"}, + nil, + ) + // Group Topic Assigned Partitions + e.consumerGroupAssignedTopicPartitions = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_assigned_partitions"), + "Consumer Group topic partitions count metrics. "+ + "It will report the number of partitions assigned in the consumer group for a given topic", + []string{"group_id", "topic_name"}, + nil, + ) // Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic) e.consumerGroupTopicOffsetSum = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"),