Skip to content

Commit

Permalink
backend: use kadm client to list partition offsets
Browse files Browse the repository at this point in the history
This should also fix issue #269
  • Loading branch information
weeco committed Nov 15, 2024
1 parent f47c58b commit b32ffb8
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 111 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
Expand Down
73 changes: 29 additions & 44 deletions minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package minion

import (
"context"
"errors"
"fmt"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/zap"
)

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
reqId := ctx.Value("requestId").(string)
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId

if cachedRes, exists := s.getCachedItem(key); exists {
return cachedRes.(*kmsg.ListOffsetsResponse), nil
return cachedRes.(kadm.ListedOffsets), nil
}

res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
Expand All @@ -33,70 +33,55 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg
return nil, err
}

return res.(*kmsg.ListOffsetsResponse), nil
return res.(kadm.ListedOffsets), nil
}

// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
metadata, err := s.GetMetadataCached(ctx)
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list consumer groups: %w", err)
}

topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics))
for i, topic := range metadata.Topics {
req := kmsg.NewListOffsetsRequestTopic()
req.Topic = *topic.Topic

partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
for j, partition := range topic.Partitions {
partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition()
partitionReqs[j].Partition = partition.Partition
partitionReqs[j].Timestamp = timestamp
var se *kadm.ShardErrors
if !errors.As(err, &se) {
return nil, fmt.Errorf("failed to list offsets: %w", err)
}
req.Partitions = partitionReqs

topicReqs[i] = req
}

req := kmsg.NewListOffsetsRequest()
req.Topics = topicReqs

res, err := req.RequestWith(ctx, s.client)
if err != nil {
return res, err
if se.AllFailed {
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
}
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
for _, shardErr := range se.Errs {
s.logger.Warn("shard error for listing end offsets",
zap.Int32("broker_id", shardErr.Broker.NodeID),
zap.Error(shardErr.Err))
}
}

// Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response
// are cached for each scrape anyways.
//
// Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error
// is too much. Typical errors are LEADER_NOT_AVAILABLE etc.
errorCountByErrCode := make(map[int16]int)
errorCountByErrCode := make(map[error]int)
errorCountByTopic := make(map[string]int)

// Iterate on all partitions
for _, topic := range res.Topics {
for _, partition := range topic.Partitions {
err := kerr.TypedErrorForCode(partition.ErrorCode)
if err != nil {
errorCountByErrCode[partition.ErrorCode]++
errorCountByTopic[topic.Topic]++
}
listedOffsets.Each(func(offset kadm.ListedOffset) {
if offset.Err != nil {
errorCountByTopic[offset.Topic]++
errorCountByErrCode[offset.Err]++
}
}
})

// Print log line for each error type
for errCode, count := range errorCountByErrCode {
typedErr := kerr.TypedErrorForCode(errCode)
for err, count := range errorCountByErrCode {
s.logger.Warn("failed to list some partitions watermarks",
zap.Error(typedErr),
zap.Error(err),
zap.Int("error_count", count))
}
if len(errorCountByTopic) > 0 {
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
zap.Int("topics_with_errors", len(errorCountByTopic)))
}

return res, nil
return listedOffsets, nil
}
10 changes: 7 additions & 3 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
Expand All @@ -33,8 +34,9 @@ type Service struct {
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp

client *kgo.Client
storage *Storage
client *kgo.Client
admClient *kadm.Client
storage *Storage
}

func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
AllowedTopicsExpr: allowedTopicsExpr,
IgnoredTopicsExpr: ignoredTopicsExpr,

client: client,
client: client,
admClient: kadm.NewClient(client),

storage: storage,
}

Expand Down
74 changes: 31 additions & 43 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/minion"
Expand Down Expand Up @@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan
return isOk
}

func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark {
func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark {
type partitionID = int32
type topicName = string
waterMarks := make(map[topicName]map[partitionID]waterMark)

for _, topic := range lowMarks.Topics {
_, exists := waterMarks[topic.Topic]
for topic, lowMarksByPartitionID := range lowMarks {
_, exists := waterMarks[topic]
if !exists {
waterMarks[topic.Topic] = make(map[partitionID]waterMark)
waterMarks[topic] = make(map[partitionID]waterMark)
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {

for _, lowOffset := range lowMarksByPartitionID {
if lowOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}
waterMarks[topic.Topic][partition.Partition] = waterMark{
TopicName: topic.Topic,
PartitionID: partition.Partition,
LowWaterMark: partition.Offset,
HighWaterMark: -1,
}
}
}

for _, topic := range highMarks.Topics {
mark, exists := waterMarks[topic.Topic]
if !exists {
e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic))
delete(waterMarks, topic.Topic)
continue
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Debug("failed to get partition high water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
continue
}
partitionMark, exists := mark[partition.Partition]
higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition)
if !exists {
e.logger.Error("got high water marks for a topic's partition but no low watermarks",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Int64("offset", partition.Offset))
delete(waterMarks, topic.Topic)
e.logger.Error("got low water marks for a topic's partition but no high watermarks",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Int64("offset", lowOffset.Offset))
delete(waterMarks, lowOffset.Topic)
break // Topic watermarks are invalid -> delete & skip this topic
}
partitionMark.HighWaterMark = partition.Offset
waterMarks[topic.Topic][partition.Partition] = partitionMark
if higOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}

waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{
TopicName: lowOffset.Topic,
PartitionID: lowOffset.Partition,
LowWaterMark: lowOffset.Offset,
HighWaterMark: higOffset.Offset,
}
}
}

Expand Down
40 changes: 19 additions & 21 deletions prometheus/collect_topic_partition_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/minion"
Expand All @@ -32,31 +31,31 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
}

// Process Low Watermarks
for _, topic := range lowWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {

for topicName, partitions := range lowWaterMarks {
if !e.minionSvc.IsTopicAllowed(topicName) {
continue
}

waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
for _, offset := range partitions {
if offset.Err != nil {
hasErrors = true
isOk = false
continue
}
waterMarkSum += partition.Offset
waterMarkSum += offset.Offset
// Let's end here if partition metrics shall not be exposed
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
continue
}
ch <- prometheus.MustNewConstMetric(
e.partitionLowWaterMark,
prometheus.GaugeValue,
float64(partition.Offset),
topic.Topic,
strconv.Itoa(int(partition.Partition)),
float64(offset.Offset),
topicName,
strconv.Itoa(int(offset.Partition)),
)
}
// We only want to report the sum of all partition marks if we receive watermarks from all partition
Expand All @@ -65,35 +64,34 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
e.topicLowWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
topicName,
)
}
}

for _, topic := range highWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
for topicName, partitions := range highWaterMarks {
if !e.minionSvc.IsTopicAllowed(topicName) {
continue
}
waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
for _, offset := range partitions {
if offset.Err != nil {
hasErrors = true
isOk = false
continue
}
waterMarkSum += partition.Offset
waterMarkSum += offset.Offset
// Let's end here if partition metrics shall not be exposed
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
continue
}
ch <- prometheus.MustNewConstMetric(
e.partitionHighWaterMark,
prometheus.GaugeValue,
float64(partition.Offset),
topic.Topic,
strconv.Itoa(int(partition.Partition)),
float64(offset.Offset),
topicName,
strconv.Itoa(int(offset.Partition)),
)
}
// We only want to report the sum of all partition marks if we receive watermarks from all partitions
Expand All @@ -102,7 +100,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
e.topicHighWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
topicName,
)
}
}
Expand Down

0 comments on commit b32ffb8

Please sign in to comment.