Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo.Offset: add AtCommitted() #738

Merged
merged 3 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,19 +1588,19 @@ func CalculateGroupLag(
tend := endOffsets[t.Topic]
for _, p := range t.Partitions {
var (
pcommit OffsetResponse
pend ListedOffset
perr error
ok bool
pcommit = OffsetResponse{Offset: Offset{
Topic: t.Topic,
Partition: p,
At: -1,
}}
pend ListedOffset
perr error
ok bool
)

if tcommit != nil {
if pcommit, ok = tcommit[p]; !ok {
pcommit = OffsetResponse{Offset: Offset{
Topic: t.Topic,
Partition: p,
At: -1,
}}
if actualpcommit, ok := tcommit[p]; ok {
pcommit = actualpcommit
}
}
if tend == nil {
Expand Down
10 changes: 2 additions & 8 deletions pkg/kfake/08_offset_commit.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package kfake

import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(8, 0, 8) }

func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

if c.groups.handleOffsetCommit(creq) {
return nil, nil
}

fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp, nil
c.groups.handleOffsetCommit(creq)
return nil, nil
}
105 changes: 77 additions & 28 deletions pkg/kfake/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string {
// GROUPS //
////////////

func (gs *groups) newGroup(name string) *group {
return &group{
c: gs.c,
gs: gs,
name: name,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
}

// handleJoin completely hijacks the incoming request.
func (gs *groups) handleJoin(creq *clientReq) {
if gs.gs == nil {
Expand All @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) {
start:
g := gs.gs[req.Group]
if g == nil {
g = &group{
c: gs.c,
gs: gs,
name: req.Group,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
g = gs.newGroup(req.Group)
waitJoin := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitJoin) })
Expand Down Expand Up @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq)
}

func (gs *groups) handleOffsetCommit(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq)
func (gs *groups) handleOffsetCommit(creq *clientReq) {
if gs.gs == nil {
gs.gs = make(map[string]*group)
}
req := creq.kreq.(*kmsg.OffsetCommitRequest)
start:
g := gs.gs[req.Group]
if g == nil {
g = gs.newGroup(req.Group)
waitCommit := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitCommit) })
defer func() { <-waitCommit }()
}
select {
case g.reqCh <- creq:
case <-g.quitCh:
goto start
}
}

func (gs *groups) handleOffsetDelete(creq *clientReq) bool {
Expand Down Expand Up @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) {
case *kmsg.LeaveGroupRequest:
kresp = g.handleLeave(creq)
case *kmsg.OffsetCommitRequest:
kresp = g.handleOffsetCommit(creq)
var ok bool
kresp, ok = g.handleOffsetCommit(creq)
firstJoin(ok)
case *kmsg.OffsetDeleteRequest:
kresp = g.handleOffsetDelete(creq)
}
Expand Down Expand Up @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp
}

// Handles a commit.
func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if kerr := g.c.validateGroup(creq, req.Group); kerr != nil {
fillOffsetCommit(req, resp, kerr.Code)
return resp
return resp, false
}
if req.InstanceID != nil {
fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code)
return resp
}
m, ok := g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp

var m *groupMember
if len(g.members) > 0 {
var ok bool
m, ok = g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
} else {
if req.MemberID != "" {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != -1 {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
if g.state != groupEmpty {
panic("invalid state: no members, but group not empty")
}
}

switch g.state {
default:
fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp
return resp, true
case groupEmpty:
// for when we support empty group commits
for _, t := range req.Topics {
for _, p := range t.Partitions {
g.commits.set(t.Topic, p.Partition, offsetCommit{
offset: p.Offset,
leaderEpoch: p.LeaderEpoch,
metadata: p.Metadata,
})
}
}
fillOffsetCommit(req, resp, 0)
case groupPreparingRebalance, groupStable:
for _, t := range req.Topics {
for _, p := range t.Partitions {
Expand All @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code)
g.updateHeartbeat(m)
}
return resp
return resp, true
}

// Transitions the group to the preparing rebalance state. We first need to
Expand Down
16 changes: 14 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,8 +1237,10 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// from. For group consumers, this is the offset that partitions begin to
// consume from if a partition has no commits. If partitions have commits, the
// commit offset is used. While fetching, if OffsetOutOfRange is encountered,
// the partition resets to ConsumeResetOffset. Conversely, using NoResetOffset
// stops consuming a partition if the client encounters OffsetOutOfRange.
// the partition resets to ConsumeResetOffset. Using [NoResetOffset] stops
// consuming a partition if the client encounters OffsetOutOfRange. Using
// [Offset.AtCommitted] prevents consuming a partition in a group if the
// partition has no prior commits.
//
// If you use an exact offset or relative offsets and the offset ends up out of
// range, the client chooses the nearest of either the log start offset or the
Expand All @@ -1254,6 +1256,16 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// reset relative? => the above, + / - the relative amount
// reset exact or relative out of bounds? => nearest boundary (start or end)
// reset after millisec? => high watermark, or first offset after millisec if one exists
//
// To match Kafka's auto.offset.reset,
//
// NewOffset().AtStart() == auto.offset.reset "earliest"
// NewOffset().AtEnd() == auto.offset.reset "latest"
// NewOffset().AtCommitted() == auto.offset.reset "none"
//
// With the above, make sure to use NoResetOffset() if you want to stop
// consuming when you encounter OffsetOutOfRange. It is highly recommended
// to read the docs for all Offset methods to see a few other alternatives.
func ConsumeResetOffset(offset Offset) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }}
}
Expand Down
70 changes: 48 additions & 22 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Offset struct {
afterMilli bool
}

// Random negative, only significant within this package.
const atCommitted = -999

// MarshalJSON implements json.Marshaler.
func (o Offset) MarshalJSON() ([]byte, error) {
if o.relative == 0 {
Expand Down Expand Up @@ -54,8 +57,8 @@ func (o Offset) EpochOffset() EpochOffset {
}
}

// NewOffset creates and returns an offset to use in ConsumePartitions or
// ConsumeResetOffset.
// NewOffset creates and returns an offset to use in [ConsumePartitions] or
// [ConsumeResetOffset].
//
// The default offset begins at the end.
func NewOffset() Offset {
Expand All @@ -66,24 +69,26 @@ func NewOffset() Offset {
}

// NoResetOffset returns an offset that can be used as a "none" option for the
// ConsumeResetOffset option. By default, NoResetOffset starts consuming from
// [ConsumeResetOffset] option. By default, NoResetOffset starts consuming from
// the beginning of partitions (similar to NewOffset().AtStart()). This can be
// changed with AtEnd, Relative, etc.
//
// Using this offset will make it such that if OffsetOutOfRange is ever
// encountered while consuming, rather than trying to recover, the client will
// return the error to the user and enter a fatal state (for the partition).
// return the error to the user and enter a fatal state (for the affected
// partition).
func NoResetOffset() Offset {
return Offset{
at: math.MinInt64,
relative: 0,
noReset: true,
at: -1,
epoch: -1,
noReset: true,
}
}

// AfterMilli returns an offset that consumes from the first offset after a
// given timestamp. This option is not compatible with At/Relative/WithEpoch;
// using any of those will clear the special millisecond state.
// given timestamp. This option is *not* compatible with any At options (nor
// Relative nor WithEpoch); using any of those will clear the special
// millisecond state.
//
// This option can be used to consume at the end of existing partitions, but at
// the start of any new partitions that are created later:
Expand All @@ -93,7 +98,7 @@ func NoResetOffset() Offset {
// By default when using this offset, if consuming encounters an
// OffsetOutOfRange error, consuming will reset to the first offset after this
// timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the
// client to a fatal state.
// client to a fatal state (for the affected partition).
func (o Offset) AfterMilli(millisec int64) Offset {
o.at = millisec
o.relative = 0
Expand All @@ -102,36 +107,49 @@ func (o Offset) AfterMilli(millisec int64) Offset {
return o
}

// AtStart returns a copy of the calling offset, changing the returned offset
// to begin at the beginning of a partition.
// AtStart copies 'o' and returns an offset starting at the beginning of a
// partition.
func (o Offset) AtStart() Offset {
o.afterMilli = false
o.at = -2
return o
}

// AtEnd returns a copy of the calling offset, changing the returned offset to
// begin at the end of a partition. If you want to consume at the end of the
// topic as it exists right now, but at the beginning of new partitions as they
// are added to the topic later, check out AfterMilli.
// AtEnd copies 'o' and returns an offset starting at the end of a partition.
// If you want to consume at the end of the topic as it exists right now, but
// at the beginning of new partitions as they are added to the topic later,
// check out AfterMilli.
func (o Offset) AtEnd() Offset {
o.afterMilli = false
o.at = -1
return o
}

// Relative returns a copy of the calling offset, changing the returned offset
// to be n relative to what it currently is. If the offset is beginning at the
// end, Relative(-100) will begin 100 before the end.
// AtCommitted copies 'o' and returns an offset that is used *only if*
// there is an existing commit. This is only useful for group consumers.
// If a partition being consumed does not have a commit, the partition will
// enter a fatal state and return an error from PollFetches.
//
// Using this function automatically opts into [NoResetOffset].
func (o Offset) AtCommitted() Offset {
o.noReset = true
o.afterMilli = false
o.at = atCommitted
return o
}

// Relative copies 'o' and returns an offset that starts 'n' relative to what
// 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will
// begin 100 before the end.
func (o Offset) Relative(n int64) Offset {
o.afterMilli = false
o.relative = n
return o
}

// WithEpoch returns a copy of the calling offset, changing the returned offset
// to use the given epoch. This epoch is used for truncation detection; the
// default of -1 implies no truncation detection.
// WithEpoch copies 'o' and returns an offset with the given epoch. to use the
// given epoch. This epoch is used for truncation detection; the default of -1
// implies no truncation detection.
func (o Offset) WithEpoch(e int32) Offset {
o.afterMilli = false
if e < 0 {
Expand Down Expand Up @@ -1129,6 +1147,14 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
continue
}

// If the offset is atCommitted, then no offset was
// loaded from FetchOffsets. We inject an error and
// avoid using this partition.
if offset.at == atCommitted {
c.addFakeReadyForDraining(topic, partition, errNoCommittedOffset, "notification of uncommitted partition")
continue
}

loadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: offset,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ var (

errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using")

errNoCommittedOffset = errors.New("partition has no prior committed offset")

//////////////
// EXTERNAL //
//////////////
Expand Down
Loading