From 13a12f655339e00355219ad1c32e1eb42f7d9eab Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim <23424570+weeco@users.noreply.github.com> Date: Sun, 3 Nov 2024 23:06:16 +0000 Subject: [PATCH] backend: use kadm client to list partition offsets This should also fix issue #269 --- go.mod | 15 ++-- go.sum | 30 ++++---- minion/list_offsets.go | 73 ++++++++---------- minion/service.go | 10 ++- prometheus/collect_consumer_group_lags.go | 74 ++++++++----------- prometheus/collect_topic_partition_offsets.go | 40 +++++----- 6 files changed, 110 insertions(+), 132 deletions(-) diff --git a/go.mod b/go.mod index 9c4831e..006450a 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,9 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.1 github.com/stretchr/testify v1.8.1 - github.com/twmb/franz-go v1.16.1 - github.com/twmb/franz-go/pkg/kmsg v1.7.0 + github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go/pkg/kadm v1.14.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 go.uber.org/atomic v1.11.0 go.uber.org/zap v1.24.0 @@ -31,21 +32,21 @@ require ( github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pelletier/go-toml v1.9.1 // indirect - github.com/pierrec/lz4/v4 v4.1.19 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.43.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.26.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9a2ce1c..3ca5daf 100644 --- a/go.sum +++ b/go.sum @@ -172,8 +172,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -233,8 +233,8 @@ github.com/pelletier/go-toml v1.9.1 h1:a6qW1EVNZWH9WGI6CsYdD8WAylkoXBS5yv0XHlh17 github.com/pelletier/go-toml v1.9.1/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= -github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -293,11 +293,13 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= -github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= -github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= +github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= -github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 h1:alKdbddkPw3rDh+AwmUEwh6HNYgTvDSFIe/GWYRR9RM= github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0/go.mod h1:k8BoBjyUbFj34f0rRbn+Ky12sZFAPbmShrg0karAIMo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -328,8 +330,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -366,8 +368,8 @@ golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -420,8 +422,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/minion/list_offsets.go b/minion/list_offsets.go index 2ab38e1..ae8617f 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -2,21 +2,21 @@ package minion import ( "context" + "errors" "fmt" - "github.com/twmb/franz-go/pkg/kerr" - "go.uber.org/zap" "strconv" "time" - "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/kadm" + "go.uber.org/zap" ) -func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) { +func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { reqId := ctx.Value("requestId").(string) key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId if cachedRes, exists := s.getCachedItem(key); exists { - return cachedRes.(*kmsg.ListOffsetsResponse), nil + return cachedRes.(kadm.ListedOffsets), nil } res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) { @@ -33,38 +33,27 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg return nil, err } - return res.(*kmsg.ListOffsetsResponse), nil + return res.(kadm.ListedOffsets), nil } // ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions -func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) { - metadata, err := s.GetMetadataCached(ctx) +func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { + listedOffsets, err := s.admClient.ListEndOffsets(ctx) if err != nil { - return nil, fmt.Errorf("failed to list consumer groups: %w", err) - } - - topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics)) - for i, topic := range metadata.Topics { - req := kmsg.NewListOffsetsRequestTopic() - req.Topic = *topic.Topic - - partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions)) - for j, partition := range topic.Partitions { - partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition() - partitionReqs[j].Partition = partition.Partition - partitionReqs[j].Timestamp = timestamp + var se *kadm.ShardErrors + if !errors.As(err, &se) { + return nil, fmt.Errorf("failed to list offsets: %w", err) } - req.Partitions = partitionReqs - - topicReqs[i] = req - } - req := kmsg.NewListOffsetsRequest() - req.Topics = topicReqs - - res, err := req.RequestWith(ctx, s.client) - if err != nil { - return res, err + if se.AllFailed { + return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err) + } + s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs))) + for _, shardErr := range se.Errs { + s.logger.Warn("shard error for listing end offsets", + zap.Int32("broker_id", shardErr.Broker.NodeID), + zap.Error(shardErr.Err)) + } } // Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response @@ -72,25 +61,21 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO // // Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error // is too much. Typical errors are LEADER_NOT_AVAILABLE etc. - errorCountByErrCode := make(map[int16]int) + errorCountByErrCode := make(map[error]int) errorCountByTopic := make(map[string]int) // Iterate on all partitions - for _, topic := range res.Topics { - for _, partition := range topic.Partitions { - err := kerr.TypedErrorForCode(partition.ErrorCode) - if err != nil { - errorCountByErrCode[partition.ErrorCode]++ - errorCountByTopic[topic.Topic]++ - } + listedOffsets.Each(func(offset kadm.ListedOffset) { + if offset.Err != nil { + errorCountByTopic[offset.Topic]++ + errorCountByErrCode[offset.Err]++ } - } + }) // Print log line for each error type - for errCode, count := range errorCountByErrCode { - typedErr := kerr.TypedErrorForCode(errCode) + for err, count := range errorCountByErrCode { s.logger.Warn("failed to list some partitions watermarks", - zap.Error(typedErr), + zap.Error(err), zap.Int("error_count", count)) } if len(errorCountByTopic) > 0 { @@ -98,5 +83,5 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO zap.Int("topics_with_errors", len(errorCountByTopic))) } - return res, nil + return listedOffsets, nil } diff --git a/minion/service.go b/minion/service.go index b30b6c9..003bef2 100644 --- a/minion/service.go +++ b/minion/service.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" @@ -33,8 +34,9 @@ type Service struct { AllowedTopicsExpr []*regexp.Regexp IgnoredTopicsExpr []*regexp.Regexp - client *kgo.Client - storage *Storage + client *kgo.Client + admClient *kadm.Client + storage *Storage } func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) { @@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics AllowedTopicsExpr: allowedTopicsExpr, IgnoredTopicsExpr: ignoredTopicsExpr, - client: client, + client: client, + admClient: kadm.NewClient(client), + storage: storage, } diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index 1629d29..b641766 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -6,8 +6,8 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" "github.com/cloudhut/kminion/v2/minion" @@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan return isOk } -func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark { +func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark { type partitionID = int32 type topicName = string waterMarks := make(map[topicName]map[partitionID]waterMark) - for _, topic := range lowMarks.Topics { - _, exists := waterMarks[topic.Topic] + for topic, lowMarksByPartitionID := range lowMarks { + _, exists := waterMarks[topic] if !exists { - waterMarks[topic.Topic] = make(map[partitionID]waterMark) + waterMarks[topic] = make(map[partitionID]waterMark) } - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + + for _, lowOffset := range lowMarksByPartitionID { + if lowOffset.Err != nil { e.logger.Debug("failed to get partition low water mark, inner kafka error", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Error(err)) + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Error(lowOffset.Err)) continue } - waterMarks[topic.Topic][partition.Partition] = waterMark{ - TopicName: topic.Topic, - PartitionID: partition.Partition, - LowWaterMark: partition.Offset, - HighWaterMark: -1, - } - } - } - for _, topic := range highMarks.Topics { - mark, exists := waterMarks[topic.Topic] - if !exists { - e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic)) - delete(waterMarks, topic.Topic) - continue - } - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { - e.logger.Debug("failed to get partition high water mark, inner kafka error", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Error(err)) - continue - } - partitionMark, exists := mark[partition.Partition] + higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition) if !exists { - e.logger.Error("got high water marks for a topic's partition but no low watermarks", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Int64("offset", partition.Offset)) - delete(waterMarks, topic.Topic) + e.logger.Error("got low water marks for a topic's partition but no high watermarks", + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Int64("offset", lowOffset.Offset)) + delete(waterMarks, lowOffset.Topic) break // Topic watermarks are invalid -> delete & skip this topic } - partitionMark.HighWaterMark = partition.Offset - waterMarks[topic.Topic][partition.Partition] = partitionMark + if higOffset.Err != nil { + e.logger.Debug("failed to get partition low water mark, inner kafka error", + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Error(lowOffset.Err)) + continue + } + + waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{ + TopicName: lowOffset.Topic, + PartitionID: lowOffset.Partition, + LowWaterMark: lowOffset.Offset, + HighWaterMark: higOffset.Offset, + } } } diff --git a/prometheus/collect_topic_partition_offsets.go b/prometheus/collect_topic_partition_offsets.go index ef03aba..4ca6695 100644 --- a/prometheus/collect_topic_partition_offsets.go +++ b/prometheus/collect_topic_partition_offsets.go @@ -5,7 +5,6 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" - "github.com/twmb/franz-go/pkg/kerr" "go.uber.org/zap" "github.com/cloudhut/kminion/v2/minion" @@ -32,21 +31,21 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p } // Process Low Watermarks - for _, topic := range lowWaterMarks.Topics { - if !e.minionSvc.IsTopicAllowed(topic.Topic) { + + for topicName, partitions := range lowWaterMarks { + if !e.minionSvc.IsTopicAllowed(topicName) { continue } waterMarkSum := int64(0) hasErrors := false - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + for _, offset := range partitions { + if offset.Err != nil { hasErrors = true isOk = false continue } - waterMarkSum += partition.Offset + waterMarkSum += offset.Offset // Let's end here if partition metrics shall not be exposed if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { continue @@ -54,9 +53,9 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p ch <- prometheus.MustNewConstMetric( e.partitionLowWaterMark, prometheus.GaugeValue, - float64(partition.Offset), - topic.Topic, - strconv.Itoa(int(partition.Partition)), + float64(offset.Offset), + topicName, + strconv.Itoa(int(offset.Partition)), ) } // We only want to report the sum of all partition marks if we receive watermarks from all partition @@ -65,25 +64,24 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p e.topicLowWaterMarkSum, prometheus.GaugeValue, float64(waterMarkSum), - topic.Topic, + topicName, ) } } - for _, topic := range highWaterMarks.Topics { - if !e.minionSvc.IsTopicAllowed(topic.Topic) { + for topicName, partitions := range highWaterMarks { + if !e.minionSvc.IsTopicAllowed(topicName) { continue } waterMarkSum := int64(0) hasErrors := false - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + for _, offset := range partitions { + if offset.Err != nil { hasErrors = true isOk = false continue } - waterMarkSum += partition.Offset + waterMarkSum += offset.Offset // Let's end here if partition metrics shall not be exposed if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { continue @@ -91,9 +89,9 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p ch <- prometheus.MustNewConstMetric( e.partitionHighWaterMark, prometheus.GaugeValue, - float64(partition.Offset), - topic.Topic, - strconv.Itoa(int(partition.Partition)), + float64(offset.Offset), + topicName, + strconv.Itoa(int(offset.Partition)), ) } // We only want to report the sum of all partition marks if we receive watermarks from all partitions @@ -102,7 +100,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p e.topicHighWaterMarkSum, prometheus.GaugeValue, float64(waterMarkSum), - topic.Topic, + topicName, ) } }