diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index 6adca804..de925658 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -1588,19 +1588,19 @@ func CalculateGroupLag( tend := endOffsets[t.Topic] for _, p := range t.Partitions { var ( - pcommit OffsetResponse - pend ListedOffset - perr error - ok bool + pcommit = OffsetResponse{Offset: Offset{ + Topic: t.Topic, + Partition: p, + At: -1, + }} + pend ListedOffset + perr error + ok bool ) if tcommit != nil { - if pcommit, ok = tcommit[p]; !ok { - pcommit = OffsetResponse{Offset: Offset{ - Topic: t.Topic, - Partition: p, - At: -1, - }} + if actualpcommit, ok := tcommit[p]; ok { + pcommit = actualpcommit } } if tend == nil { diff --git a/pkg/kfake/08_offset_commit.go b/pkg/kfake/08_offset_commit.go index 692f4cc0..b82400e0 100644 --- a/pkg/kfake/08_offset_commit.go +++ b/pkg/kfake/08_offset_commit.go @@ -1,7 +1,6 @@ package kfake import ( - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -9,16 +8,11 @@ func init() { regKey(8, 0, 8) } func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetCommitRequest) - resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if err := checkReqVersion(req.Key(), req.Version); err != nil { return nil, err } - if c.groups.handleOffsetCommit(creq) { - return nil, nil - } - - fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp, nil + c.groups.handleOffsetCommit(creq) + return nil, nil } diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index a0e5a98e..eec79391 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string { // GROUPS // //////////// +func (gs *groups) newGroup(name string) *group { + return &group{ + c: gs.c, + gs: gs, + name: name, + members: make(map[string]*groupMember), + pending: make(map[string]*groupMember), + protocols: make(map[string]int), + reqCh: make(chan *clientReq), + controlCh: make(chan func()), + quitCh: make(chan struct{}), + } +} + // handleJoin completely hijacks the incoming request. func (gs *groups) handleJoin(creq *clientReq) { if gs.gs == nil { @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) { start: g := gs.gs[req.Group] if g == nil { - g = &group{ - c: gs.c, - gs: gs, - name: req.Group, - members: make(map[string]*groupMember), - pending: make(map[string]*groupMember), - protocols: make(map[string]int), - reqCh: make(chan *clientReq), - controlCh: make(chan func()), - quitCh: make(chan struct{}), - } + g = gs.newGroup(req.Group) waitJoin := make(chan struct{}) gs.gs[req.Group] = g go g.manage(func() { close(waitJoin) }) @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq) } -func (gs *groups) handleOffsetCommit(creq *clientReq) bool { - return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq) +func (gs *groups) handleOffsetCommit(creq *clientReq) { + if gs.gs == nil { + gs.gs = make(map[string]*group) + } + req := creq.kreq.(*kmsg.OffsetCommitRequest) +start: + g := gs.gs[req.Group] + if g == nil { + g = gs.newGroup(req.Group) + waitCommit := make(chan struct{}) + gs.gs[req.Group] = g + go g.manage(func() { close(waitCommit) }) + defer func() { <-waitCommit }() + } + select { + case g.reqCh <- creq: + case <-g.quitCh: + goto start + } } func (gs *groups) handleOffsetDelete(creq *clientReq) bool { @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) { case *kmsg.LeaveGroupRequest: kresp = g.handleLeave(creq) case *kmsg.OffsetCommitRequest: - kresp = g.handleOffsetCommit(creq) + var ok bool + kresp, ok = g.handleOffsetCommit(creq) + firstJoin(ok) case *kmsg.OffsetDeleteRequest: kresp = g.handleOffsetDelete(creq) } @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp } // Handles a commit. -func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { +func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if kerr := g.c.validateGroup(creq, req.Group); kerr != nil { fillOffsetCommit(req, resp, kerr.Code) - return resp + return resp, false } if req.InstanceID != nil { fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code) - return resp - } - m, ok := g.members[req.MemberID] - if !ok { - fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) - return resp + return resp, false } - if req.Generation != g.generation { - fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) - return resp + + var m *groupMember + if len(g.members) > 0 { + var ok bool + m, ok = g.members[req.MemberID] + if !ok { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != g.generation { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + } else { + if req.MemberID != "" { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != -1 { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + if g.state != groupEmpty { + panic("invalid state: no members, but group not empty") + } } switch g.state { default: fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp + return resp, true case groupEmpty: - // for when we support empty group commits + for _, t := range req.Topics { + for _, p := range t.Partitions { + g.commits.set(t.Topic, p.Partition, offsetCommit{ + offset: p.Offset, + leaderEpoch: p.LeaderEpoch, + metadata: p.Metadata, + }) + } + } + fillOffsetCommit(req, resp, 0) case groupPreparingRebalance, groupStable: for _, t := range req.Topics { for _, p := range t.Partitions { @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code) g.updateHeartbeat(m) } - return resp + return resp, true } // Transitions the group to the preparing rebalance state. We first need to diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1c93f619..92ebeaa3 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1237,8 +1237,10 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // from. For group consumers, this is the offset that partitions begin to // consume from if a partition has no commits. If partitions have commits, the // commit offset is used. While fetching, if OffsetOutOfRange is encountered, -// the partition resets to ConsumeResetOffset. Conversely, using NoResetOffset -// stops consuming a partition if the client encounters OffsetOutOfRange. +// the partition resets to ConsumeResetOffset. Using [NoResetOffset] stops +// consuming a partition if the client encounters OffsetOutOfRange. Using +// [Offset.AtCommitted] prevents consuming a partition in a group if the +// partition has no prior commits. // // If you use an exact offset or relative offsets and the offset ends up out of // range, the client chooses the nearest of either the log start offset or the @@ -1254,6 +1256,16 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // reset relative? => the above, + / - the relative amount // reset exact or relative out of bounds? => nearest boundary (start or end) // reset after millisec? => high watermark, or first offset after millisec if one exists +// +// To match Kafka's auto.offset.reset, +// +// NewOffset().AtStart() == auto.offset.reset "earliest" +// NewOffset().AtEnd() == auto.offset.reset "latest" +// NewOffset().AtCommitted() == auto.offset.reset "none" +// +// With the above, make sure to use NoResetOffset() if you want to stop +// consuming when you encounter OffsetOutOfRange. It is highly recommended +// to read the docs for all Offset methods to see a few other alternatives. func ConsumeResetOffset(offset Offset) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 0733d85f..15ae2693 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -27,6 +27,9 @@ type Offset struct { afterMilli bool } +// Random negative, only significant within this package. +const atCommitted = -999 + // MarshalJSON implements json.Marshaler. func (o Offset) MarshalJSON() ([]byte, error) { if o.relative == 0 { @@ -54,8 +57,8 @@ func (o Offset) EpochOffset() EpochOffset { } } -// NewOffset creates and returns an offset to use in ConsumePartitions or -// ConsumeResetOffset. +// NewOffset creates and returns an offset to use in [ConsumePartitions] or +// [ConsumeResetOffset]. // // The default offset begins at the end. func NewOffset() Offset { @@ -66,24 +69,26 @@ func NewOffset() Offset { } // NoResetOffset returns an offset that can be used as a "none" option for the -// ConsumeResetOffset option. By default, NoResetOffset starts consuming from +// [ConsumeResetOffset] option. By default, NoResetOffset starts consuming from // the beginning of partitions (similar to NewOffset().AtStart()). This can be // changed with AtEnd, Relative, etc. // // Using this offset will make it such that if OffsetOutOfRange is ever // encountered while consuming, rather than trying to recover, the client will -// return the error to the user and enter a fatal state (for the partition). +// return the error to the user and enter a fatal state (for the affected +// partition). func NoResetOffset() Offset { return Offset{ - at: math.MinInt64, - relative: 0, - noReset: true, + at: -1, + epoch: -1, + noReset: true, } } // AfterMilli returns an offset that consumes from the first offset after a -// given timestamp. This option is not compatible with At/Relative/WithEpoch; -// using any of those will clear the special millisecond state. +// given timestamp. This option is *not* compatible with any At options (nor +// Relative nor WithEpoch); using any of those will clear the special +// millisecond state. // // This option can be used to consume at the end of existing partitions, but at // the start of any new partitions that are created later: @@ -93,7 +98,7 @@ func NoResetOffset() Offset { // By default when using this offset, if consuming encounters an // OffsetOutOfRange error, consuming will reset to the first offset after this // timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the -// client to a fatal state. +// client to a fatal state (for the affected partition). func (o Offset) AfterMilli(millisec int64) Offset { o.at = millisec o.relative = 0 @@ -102,36 +107,49 @@ func (o Offset) AfterMilli(millisec int64) Offset { return o } -// AtStart returns a copy of the calling offset, changing the returned offset -// to begin at the beginning of a partition. +// AtStart copies 'o' and returns an offset starting at the beginning of a +// partition. func (o Offset) AtStart() Offset { o.afterMilli = false o.at = -2 return o } -// AtEnd returns a copy of the calling offset, changing the returned offset to -// begin at the end of a partition. If you want to consume at the end of the -// topic as it exists right now, but at the beginning of new partitions as they -// are added to the topic later, check out AfterMilli. +// AtEnd copies 'o' and returns an offset starting at the end of a partition. +// If you want to consume at the end of the topic as it exists right now, but +// at the beginning of new partitions as they are added to the topic later, +// check out AfterMilli. func (o Offset) AtEnd() Offset { o.afterMilli = false o.at = -1 return o } -// Relative returns a copy of the calling offset, changing the returned offset -// to be n relative to what it currently is. If the offset is beginning at the -// end, Relative(-100) will begin 100 before the end. +// AtCommitted copies 'o' and returns an offset that is used *only if* +// there is an existing commit. This is only useful for group consumers. +// If a partition being consumed does not have a commit, the partition will +// enter a fatal state and return an error from PollFetches. +// +// Using this function automatically opts into [NoResetOffset]. +func (o Offset) AtCommitted() Offset { + o.noReset = true + o.afterMilli = false + o.at = atCommitted + return o +} + +// Relative copies 'o' and returns an offset that starts 'n' relative to what +// 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will +// begin 100 before the end. func (o Offset) Relative(n int64) Offset { o.afterMilli = false o.relative = n return o } -// WithEpoch returns a copy of the calling offset, changing the returned offset -// to use the given epoch. This epoch is used for truncation detection; the -// default of -1 implies no truncation detection. +// WithEpoch copies 'o' and returns an offset with the given epoch. to use the +// given epoch. This epoch is used for truncation detection; the default of -1 +// implies no truncation detection. func (o Offset) WithEpoch(e int32) Offset { o.afterMilli = false if e < 0 { @@ -1152,6 +1170,14 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how continue } + // If the offset is atCommitted, then no offset was + // loaded from FetchOffsets. We inject an error and + // avoid using this partition. + if offset.at == atCommitted { + c.addFakeReadyForDraining(topic, partition, errNoCommittedOffset, "notification of uncommitted partition") + continue + } + loadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ replica: -1, Offset: offset, diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 8d574b90..37c4f8fc 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -173,6 +173,8 @@ var ( errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using") + errNoCommittedOffset = errors.New("partition has no prior committed offset") + ////////////// // EXTERNAL // //////////////