Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export number of members and assigned partitions for each topic in a consumer group #106

Merged
merged 2 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ kminion_kafka_topic_high_water_mark_sum{topic_name="__consumer_offsets"} 1.51202
# TYPE kminion_kafka_consumer_group_info gauge
kminion_kafka_consumer_group_info{coordinator_id="0",group_id="bigquery-sink",member_count="2",protocol="range",protocol_type="consumer",state="Stable"} 1

# HELP kminion_kafka_consumer_group_empty_members Consumer Group Empty Members. It will report the number of members in the consumer group with no partition assigned
# TYPE kminion_kafka_consumer_group_empty_members gauge
kminion_kafka_consumer_group_empty_members{group_id="bigquery-sink"} 1

# HELP kminion_kafka_consumer_group_topic_members Consumer Group topic member count metrics. It will report the number of members in the consumer group assigned on a given topic
# TYPE kminion_kafka_consumer_group_topic_members gauge
kminion_kafka_consumer_group_topic_members{group_id="bigquery-sink",topic_name="shop-activity"} 4

# HELP kminion_kafka_consumer_group_topic_assigned_partitions Consumer Group topic partitions count metrics. It will report the number of partitions assigned in the consumer group for a given topic
# TYPE kminion_kafka_consumer_group_topic_assigned_partitions gauge
kminion_kafka_consumer_group_topic_assigned_partitions{group_id="bigquery-sink",topic_name="shop-activity"} 32

# HELP kminion_kafka_consumer_group_topic_offset_sum The sum of all committed group offsets across all partitions in a topic
# TYPE kminion_kafka_consumer_group_topic_offset_sum gauge
kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_name="shop-activity"} 4.259513e+06
Expand Down
71 changes: 69 additions & 2 deletions prometheus/collect_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand All @@ -18,7 +20,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
return false
}

// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log a error
// The list of groups may be incomplete due to group coordinators that might fail to respond. We do log an error
// message in that case (in the kafka request method) and groups will not be included in this list.
for _, grp := range groups {
coordinator := grp.BrokerMetadata.NodeID
Expand Down Expand Up @@ -49,6 +51,71 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
group.State,
strconv.FormatInt(int64(coordinator), 10),
)

// iterate all members and build two maps:
// - {topic -> number-of-consumers}
// - {topic -> number-of-partitions-assigned}
topicConsumers := make(map[string]int)
topicPartitionsAssigned := make(map[string]int)
membersWithEmptyAssignment := 0
failedAssignmentsDecode := 0
for _, member := range group.Members {
kassignment := kmsg.NewGroupMemberAssignment()
if err := kassignment.ReadFrom(member.MemberAssignment); err != nil {
e.logger.Debug("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.String("client_id", member.ClientID),
zap.String("member_id", member.MemberID),
zap.String("client_host", member.ClientHost),
)
failedAssignmentsDecode++
continue
}
if len(kassignment.Topics) == 0 {
membersWithEmptyAssignment++
}
for _, topic := range kassignment.Topics {
topicConsumers[topic.Topic]++
topicPartitionsAssigned[topic.Topic] += len(topic.Partitions)
}
}
if failedAssignmentsDecode > 0 {
e.logger.Error("failed to decode consumer group member assignment, internal kafka error",
zap.Error(err),
zap.String("group_id", group.Group),
zap.Int("assignment_decode_failures", failedAssignmentsDecode),
)
}
// number of members with no assignment in a stable consumer group
if membersWithEmptyAssignment > 0 {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupMembersEmpty,
prometheus.GaugeValue,
float64(membersWithEmptyAssignment),
group.Group,
)
}
// number of members in consumer groups for each topic
for topicName, consumers := range topicConsumers {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupTopicMembers,
prometheus.GaugeValue,
float64(consumers),
group.Group,
topicName,
)
}
// number of partitions assigned in consumer groups for each topic
for topicName, partitions := range topicPartitionsAssigned {
ch <- prometheus.MustNewConstMetric(
e.consumerGroupAssignedTopicPartitions,
prometheus.GaugeValue,
float64(partitions),
group.Group,
topicName,
)
}
}
}
return true
Expand Down
34 changes: 29 additions & 5 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ type Exporter struct {
partitionLowWaterMark *prometheus.Desc

// Consumer Groups
consumerGroupInfo *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc
consumerGroupInfo *prometheus.Desc
consumerGroupMembersEmpty *prometheus.Desc
consumerGroupTopicMembers *prometheus.Desc
consumerGroupAssignedTopicPartitions *prometheus.Desc
consumerGroupTopicOffsetSum *prometheus.Desc
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc
}

func NewExporter(cfg Config, logger *zap.Logger, minionSvc *minion.Service) (*Exporter, error) {
Expand Down Expand Up @@ -147,6 +150,27 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id", "member_count", "protocol", "protocol_type", "state", "coordinator_id"},
nil,
)
// Group Empty Memmbers
e.consumerGroupMembersEmpty = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"),
"It will report the number of members in the consumer group with no partition assigned",
[]string{"group_id"},
nil,
)
// Group Topic Members
e.consumerGroupTopicMembers = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_members"),
"It will report the number of members in the consumer group assigned on a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Group Topic Assigned Partitions
e.consumerGroupAssignedTopicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_assigned_partitions"),
"It will report the number of partitions assigned in the consumer group for a given topic",
[]string{"group_id", "topic_name"},
nil,
)
// Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic)
e.consumerGroupTopicOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"),
Expand Down