Skip to content

Commit

Permalink
Merge pull request #738 from twmb/692
Browse files Browse the repository at this point in the history
kgo.Offset: add AtCommitted()
  • Loading branch information
twmb authored May 26, 2024
2 parents 051703b + f91cf73 commit 128840a
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 70 deletions.
20 changes: 10 additions & 10 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,19 +1588,19 @@ func CalculateGroupLag(
tend := endOffsets[t.Topic]
for _, p := range t.Partitions {
var (
pcommit OffsetResponse
pend ListedOffset
perr error
ok bool
pcommit = OffsetResponse{Offset: Offset{
Topic: t.Topic,
Partition: p,
At: -1,
}}
pend ListedOffset
perr error
ok bool
)

if tcommit != nil {
if pcommit, ok = tcommit[p]; !ok {
pcommit = OffsetResponse{Offset: Offset{
Topic: t.Topic,
Partition: p,
At: -1,
}}
if actualpcommit, ok := tcommit[p]; ok {
pcommit = actualpcommit
}
}
if tend == nil {
Expand Down
10 changes: 2 additions & 8 deletions pkg/kfake/08_offset_commit.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package kfake

import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(8, 0, 8) }

func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

if c.groups.handleOffsetCommit(creq) {
return nil, nil
}

fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp, nil
c.groups.handleOffsetCommit(creq)
return nil, nil
}
105 changes: 77 additions & 28 deletions pkg/kfake/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string {
// GROUPS //
////////////

func (gs *groups) newGroup(name string) *group {
return &group{
c: gs.c,
gs: gs,
name: name,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
}

// handleJoin completely hijacks the incoming request.
func (gs *groups) handleJoin(creq *clientReq) {
if gs.gs == nil {
Expand All @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) {
start:
g := gs.gs[req.Group]
if g == nil {
g = &group{
c: gs.c,
gs: gs,
name: req.Group,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
g = gs.newGroup(req.Group)
waitJoin := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitJoin) })
Expand Down Expand Up @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq)
}

func (gs *groups) handleOffsetCommit(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq)
func (gs *groups) handleOffsetCommit(creq *clientReq) {
if gs.gs == nil {
gs.gs = make(map[string]*group)
}
req := creq.kreq.(*kmsg.OffsetCommitRequest)
start:
g := gs.gs[req.Group]
if g == nil {
g = gs.newGroup(req.Group)
waitCommit := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitCommit) })
defer func() { <-waitCommit }()
}
select {
case g.reqCh <- creq:
case <-g.quitCh:
goto start
}
}

func (gs *groups) handleOffsetDelete(creq *clientReq) bool {
Expand Down Expand Up @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) {
case *kmsg.LeaveGroupRequest:
kresp = g.handleLeave(creq)
case *kmsg.OffsetCommitRequest:
kresp = g.handleOffsetCommit(creq)
var ok bool
kresp, ok = g.handleOffsetCommit(creq)
firstJoin(ok)
case *kmsg.OffsetDeleteRequest:
kresp = g.handleOffsetDelete(creq)
}
Expand Down Expand Up @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp
}

// Handles a commit.
func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if kerr := g.c.validateGroup(creq, req.Group); kerr != nil {
fillOffsetCommit(req, resp, kerr.Code)
return resp
return resp, false
}
if req.InstanceID != nil {
fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code)
return resp
}
m, ok := g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp

var m *groupMember
if len(g.members) > 0 {
var ok bool
m, ok = g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
} else {
if req.MemberID != "" {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != -1 {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
if g.state != groupEmpty {
panic("invalid state: no members, but group not empty")
}
}

switch g.state {
default:
fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp
return resp, true
case groupEmpty:
// for when we support empty group commits
for _, t := range req.Topics {
for _, p := range t.Partitions {
g.commits.set(t.Topic, p.Partition, offsetCommit{
offset: p.Offset,
leaderEpoch: p.LeaderEpoch,
metadata: p.Metadata,
})
}
}
fillOffsetCommit(req, resp, 0)
case groupPreparingRebalance, groupStable:
for _, t := range req.Topics {
for _, p := range t.Partitions {
Expand All @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code)
g.updateHeartbeat(m)
}
return resp
return resp, true
}

// Transitions the group to the preparing rebalance state. We first need to
Expand Down
16 changes: 14 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,8 +1237,10 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// from. For group consumers, this is the offset that partitions begin to
// consume from if a partition has no commits. If partitions have commits, the
// commit offset is used. While fetching, if OffsetOutOfRange is encountered,
// the partition resets to ConsumeResetOffset. Conversely, using NoResetOffset
// stops consuming a partition if the client encounters OffsetOutOfRange.
// the partition resets to ConsumeResetOffset. Using [NoResetOffset] stops
// consuming a partition if the client encounters OffsetOutOfRange. Using
// [Offset.AtCommitted] prevents consuming a partition in a group if the
// partition has no prior commits.
//
// If you use an exact offset or relative offsets and the offset ends up out of
// range, the client chooses the nearest of either the log start offset or the
Expand All @@ -1254,6 +1256,16 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// reset relative? => the above, + / - the relative amount
// reset exact or relative out of bounds? => nearest boundary (start or end)
// reset after millisec? => high watermark, or first offset after millisec if one exists
//
// To match Kafka's auto.offset.reset,
//
// NewOffset().AtStart() == auto.offset.reset "earliest"
// NewOffset().AtEnd() == auto.offset.reset "latest"
// NewOffset().AtCommitted() == auto.offset.reset "none"
//
// With the above, make sure to use NoResetOffset() if you want to stop
// consuming when you encounter OffsetOutOfRange. It is highly recommended
// to read the docs for all Offset methods to see a few other alternatives.
func ConsumeResetOffset(offset Offset) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }}
}
Expand Down
70 changes: 48 additions & 22 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Offset struct {
afterMilli bool
}

// Random negative, only significant within this package.
const atCommitted = -999

// MarshalJSON implements json.Marshaler.
func (o Offset) MarshalJSON() ([]byte, error) {
if o.relative == 0 {
Expand Down Expand Up @@ -54,8 +57,8 @@ func (o Offset) EpochOffset() EpochOffset {
}
}

// NewOffset creates and returns an offset to use in ConsumePartitions or
// ConsumeResetOffset.
// NewOffset creates and returns an offset to use in [ConsumePartitions] or
// [ConsumeResetOffset].
//
// The default offset begins at the end.
func NewOffset() Offset {
Expand All @@ -66,24 +69,26 @@ func NewOffset() Offset {
}

// NoResetOffset returns an offset that can be used as a "none" option for the
// ConsumeResetOffset option. By default, NoResetOffset starts consuming from
// [ConsumeResetOffset] option. By default, NoResetOffset starts consuming from
// the beginning of partitions (similar to NewOffset().AtStart()). This can be
// changed with AtEnd, Relative, etc.
//
// Using this offset will make it such that if OffsetOutOfRange is ever
// encountered while consuming, rather than trying to recover, the client will
// return the error to the user and enter a fatal state (for the partition).
// return the error to the user and enter a fatal state (for the affected
// partition).
func NoResetOffset() Offset {
return Offset{
at: math.MinInt64,
relative: 0,
noReset: true,
at: -1,
epoch: -1,
noReset: true,
}
}

// AfterMilli returns an offset that consumes from the first offset after a
// given timestamp. This option is not compatible with At/Relative/WithEpoch;
// using any of those will clear the special millisecond state.
// given timestamp. This option is *not* compatible with any At options (nor
// Relative nor WithEpoch); using any of those will clear the special
// millisecond state.
//
// This option can be used to consume at the end of existing partitions, but at
// the start of any new partitions that are created later:
Expand All @@ -93,7 +98,7 @@ func NoResetOffset() Offset {
// By default when using this offset, if consuming encounters an
// OffsetOutOfRange error, consuming will reset to the first offset after this
// timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the
// client to a fatal state.
// client to a fatal state (for the affected partition).
func (o Offset) AfterMilli(millisec int64) Offset {
o.at = millisec
o.relative = 0
Expand All @@ -102,36 +107,49 @@ func (o Offset) AfterMilli(millisec int64) Offset {
return o
}

// AtStart returns a copy of the calling offset, changing the returned offset
// to begin at the beginning of a partition.
// AtStart copies 'o' and returns an offset starting at the beginning of a
// partition.
func (o Offset) AtStart() Offset {
o.afterMilli = false
o.at = -2
return o
}

// AtEnd returns a copy of the calling offset, changing the returned offset to
// begin at the end of a partition. If you want to consume at the end of the
// topic as it exists right now, but at the beginning of new partitions as they
// are added to the topic later, check out AfterMilli.
// AtEnd copies 'o' and returns an offset starting at the end of a partition.
// If you want to consume at the end of the topic as it exists right now, but
// at the beginning of new partitions as they are added to the topic later,
// check out AfterMilli.
func (o Offset) AtEnd() Offset {
o.afterMilli = false
o.at = -1
return o
}

// Relative returns a copy of the calling offset, changing the returned offset
// to be n relative to what it currently is. If the offset is beginning at the
// end, Relative(-100) will begin 100 before the end.
// AtCommitted copies 'o' and returns an offset that is used *only if*
// there is an existing commit. This is only useful for group consumers.
// If a partition being consumed does not have a commit, the partition will
// enter a fatal state and return an error from PollFetches.
//
// Using this function automatically opts into [NoResetOffset].
func (o Offset) AtCommitted() Offset {
o.noReset = true
o.afterMilli = false
o.at = atCommitted
return o
}

// Relative copies 'o' and returns an offset that starts 'n' relative to what
// 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will
// begin 100 before the end.
func (o Offset) Relative(n int64) Offset {
o.afterMilli = false
o.relative = n
return o
}

// WithEpoch returns a copy of the calling offset, changing the returned offset
// to use the given epoch. This epoch is used for truncation detection; the
// default of -1 implies no truncation detection.
// WithEpoch copies 'o' and returns an offset with the given epoch. to use the
// given epoch. This epoch is used for truncation detection; the default of -1
// implies no truncation detection.
func (o Offset) WithEpoch(e int32) Offset {
o.afterMilli = false
if e < 0 {
Expand Down Expand Up @@ -1152,6 +1170,14 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
continue
}

// If the offset is atCommitted, then no offset was
// loaded from FetchOffsets. We inject an error and
// avoid using this partition.
if offset.at == atCommitted {
c.addFakeReadyForDraining(topic, partition, errNoCommittedOffset, "notification of uncommitted partition")
continue
}

loadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: offset,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ var (

errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using")

errNoCommittedOffset = errors.New("partition has no prior committed offset")

//////////////
// EXTERNAL //
//////////////
Expand Down

0 comments on commit 128840a

Please sign in to comment.