Skip to content

Commit

Permalink
feat:support claim the broker not support OffsetForLeaderEpoch
Browse files Browse the repository at this point in the history
  • Loading branch information
Max-Cheng committed Mar 28, 2024
1 parent aff441d commit e6ca39d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
10 changes: 8 additions & 2 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.txnBackoff}
case namefn(ConsiderMissingTopicDeletedAfter):
return []any{cfg.missingTopicDelete}
case namefn(ForceUsingListOffset):
return []any{cfg.forceListOffsets}

case namefn(DefaultProduceTopic):
return []any{cfg.defaultProduceTopic}
Expand Down Expand Up @@ -358,6 +356,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.rack}
case namefn(KeepRetryableFetchErrors):
return []any{cfg.keepRetryableFetchErrors}
case namefn(DisableOffsetForLeaderEpoch):
return []any{cfg.disableOffsetForLeaderEpoch}

case namefn(AdjustFetchOffsetsFn):
return []any{cfg.adjustOffsetsBeforeAssign}
Expand Down Expand Up @@ -815,6 +815,12 @@ func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool {
// not support (even though one in the cluster does), we will loop fail until
// the rest of the cluster is upgraded and supports the request.
func (cl *Client) supportsOffsetForLeaderEpoch() bool {
if cl.cfg.disableOffsetForLeaderEpoch {
cl.cfg.logger.Log(LogLevelDebug,
"reason", "disable OffsetForLeaderEpoch",
)
return false
}
return cl.supportsKeyVersion(int16(kmsg.OffsetForLeaderEpoch), 2)
}

Expand Down
35 changes: 17 additions & 18 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ type cfg struct {
setLost bool
setCommitCallback bool

autocommitDisable bool // true if autocommit was disabled or we are transactional
autocommitGreedy bool
autocommitMarks bool
autocommitInterval time.Duration
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
forceListOffsets bool
autocommitDisable bool // true if autocommit was disabled or we are transactional
autocommitGreedy bool
autocommitMarks bool
autocommitInterval time.Duration
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
disableOffsetForLeaderEpoch bool
}

func (cfg *cfg) validate() error {
Expand Down Expand Up @@ -515,8 +515,8 @@ func defaultCfg() cfg {
resetOffset: NewOffset().AtStart(),
isolationLevel: 0,

maxConcurrentFetches: 0, // unbounded default
forceListOffsets: false,
maxConcurrentFetches: 0, // unbounded default
disableOffsetForLeaderEpoch: false,

///////////
// group //
Expand Down Expand Up @@ -848,16 +848,7 @@ func ConsiderMissingTopicDeletedAfter(t time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.missingTopicDelete = t }}
}

// ForceUsingListOffset set the group consumer to use ListOffsets for fetching
// offsets rather than the default OffsetFetch. This is useful if you are
// seeing issues with OffsetFetch and want to use ListOffsets instead.
func ForceUsingListOffset(enable bool) ConsumerOpt {
return consumerOpt{func(cfg *cfg) {
cfg.forceListOffsets = enable
}}
}

////////////////////////////
// //////////////////////////
// PRODUCER CONFIGURATION //
////////////////////////////

Expand Down Expand Up @@ -1406,6 +1397,14 @@ func KeepRetryableFetchErrors() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.keepRetryableFetchErrors = true }}
}

// DisableOffsetForLeaderEpoch sets the default behavior claim the broker not support
// OffsetForLeaderEpoch
func DisableOffsetForLeaderEpoch(enable bool) ConsumerOpt {
return consumerOpt{func(cfg *cfg) {
cfg.disableOffsetForLeaderEpoch = enable
}}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand Down

0 comments on commit e6ca39d

Please sign in to comment.