Skip to content

Commit

Permalink
backend: use kadm client to list partition offsets
Browse files Browse the repository at this point in the history
This should also fix issue #269
  • Loading branch information
weeco committed Nov 3, 2024
1 parent 5d646d0 commit 13a12f6
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 132 deletions.
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.15.1
github.com/stretchr/testify v1.8.1
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
github.com/twmb/franz-go v1.18.0
github.com/twmb/franz-go/pkg/kadm v1.14.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.24.0
Expand All @@ -31,21 +32,21 @@ require (
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.1 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.43.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.26.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
30 changes: 16 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs=
github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -233,8 +233,8 @@ github.com/pelletier/go-toml v1.9.1 h1:a6qW1EVNZWH9WGI6CsYdD8WAylkoXBS5yv0XHlh17
github.com/pelletier/go-toml v1.9.1/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -293,11 +293,13 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 h1:alKdbddkPw3rDh+AwmUEwh6HNYgTvDSFIe/GWYRR9RM=
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0/go.mod h1:k8BoBjyUbFj34f0rRbn+Ky12sZFAPbmShrg0karAIMo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -328,8 +330,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand Down Expand Up @@ -366,8 +368,8 @@ golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY
golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -420,8 +422,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand Down
73 changes: 29 additions & 44 deletions minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package minion

import (
"context"
"errors"
"fmt"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/zap"
)

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
reqId := ctx.Value("requestId").(string)
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId

if cachedRes, exists := s.getCachedItem(key); exists {
return cachedRes.(*kmsg.ListOffsetsResponse), nil
return cachedRes.(kadm.ListedOffsets), nil
}

res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
Expand All @@ -33,70 +33,55 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg
return nil, err
}

return res.(*kmsg.ListOffsetsResponse), nil
return res.(kadm.ListedOffsets), nil
}

// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
metadata, err := s.GetMetadataCached(ctx)
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list consumer groups: %w", err)
}

topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics))
for i, topic := range metadata.Topics {
req := kmsg.NewListOffsetsRequestTopic()
req.Topic = *topic.Topic

partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
for j, partition := range topic.Partitions {
partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition()
partitionReqs[j].Partition = partition.Partition
partitionReqs[j].Timestamp = timestamp
var se *kadm.ShardErrors
if !errors.As(err, &se) {
return nil, fmt.Errorf("failed to list offsets: %w", err)
}
req.Partitions = partitionReqs

topicReqs[i] = req
}

req := kmsg.NewListOffsetsRequest()
req.Topics = topicReqs

res, err := req.RequestWith(ctx, s.client)
if err != nil {
return res, err
if se.AllFailed {
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
}
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
for _, shardErr := range se.Errs {
s.logger.Warn("shard error for listing end offsets",
zap.Int32("broker_id", shardErr.Broker.NodeID),
zap.Error(shardErr.Err))
}
}

// Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response
// are cached for each scrape anyways.
//
// Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error
// is too much. Typical errors are LEADER_NOT_AVAILABLE etc.
errorCountByErrCode := make(map[int16]int)
errorCountByErrCode := make(map[error]int)
errorCountByTopic := make(map[string]int)

// Iterate on all partitions
for _, topic := range res.Topics {
for _, partition := range topic.Partitions {
err := kerr.TypedErrorForCode(partition.ErrorCode)
if err != nil {
errorCountByErrCode[partition.ErrorCode]++
errorCountByTopic[topic.Topic]++
}
listedOffsets.Each(func(offset kadm.ListedOffset) {
if offset.Err != nil {
errorCountByTopic[offset.Topic]++
errorCountByErrCode[offset.Err]++
}
}
})

// Print log line for each error type
for errCode, count := range errorCountByErrCode {
typedErr := kerr.TypedErrorForCode(errCode)
for err, count := range errorCountByErrCode {
s.logger.Warn("failed to list some partitions watermarks",
zap.Error(typedErr),
zap.Error(err),
zap.Int("error_count", count))
}
if len(errorCountByTopic) > 0 {
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
zap.Int("topics_with_errors", len(errorCountByTopic)))
}

return res, nil
return listedOffsets, nil
}
10 changes: 7 additions & 3 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
Expand All @@ -33,8 +34,9 @@ type Service struct {
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp

client *kgo.Client
storage *Storage
client *kgo.Client
admClient *kadm.Client
storage *Storage
}

func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
AllowedTopicsExpr: allowedTopicsExpr,
IgnoredTopicsExpr: ignoredTopicsExpr,

client: client,
client: client,
admClient: kadm.NewClient(client),

storage: storage,
}

Expand Down
74 changes: 31 additions & 43 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/minion"
Expand Down Expand Up @@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan
return isOk
}

func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark {
func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark {
type partitionID = int32
type topicName = string
waterMarks := make(map[topicName]map[partitionID]waterMark)

for _, topic := range lowMarks.Topics {
_, exists := waterMarks[topic.Topic]
for topic, lowMarksByPartitionID := range lowMarks {
_, exists := waterMarks[topic]
if !exists {
waterMarks[topic.Topic] = make(map[partitionID]waterMark)
waterMarks[topic] = make(map[partitionID]waterMark)
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {

for _, lowOffset := range lowMarksByPartitionID {
if lowOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}
waterMarks[topic.Topic][partition.Partition] = waterMark{
TopicName: topic.Topic,
PartitionID: partition.Partition,
LowWaterMark: partition.Offset,
HighWaterMark: -1,
}
}
}

for _, topic := range highMarks.Topics {
mark, exists := waterMarks[topic.Topic]
if !exists {
e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic))
delete(waterMarks, topic.Topic)
continue
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Debug("failed to get partition high water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
continue
}
partitionMark, exists := mark[partition.Partition]
higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition)
if !exists {
e.logger.Error("got high water marks for a topic's partition but no low watermarks",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Int64("offset", partition.Offset))
delete(waterMarks, topic.Topic)
e.logger.Error("got low water marks for a topic's partition but no high watermarks",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Int64("offset", lowOffset.Offset))
delete(waterMarks, lowOffset.Topic)
break // Topic watermarks are invalid -> delete & skip this topic
}
partitionMark.HighWaterMark = partition.Offset
waterMarks[topic.Topic][partition.Partition] = partitionMark
if higOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}

waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{
TopicName: lowOffset.Topic,
PartitionID: lowOffset.Partition,
LowWaterMark: lowOffset.Offset,
HighWaterMark: higOffset.Offset,
}
}
}

Expand Down
Loading

0 comments on commit 13a12f6

Please sign in to comment.