From 72778cb25e2fa22da30105d81e46504aab694e4f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 31 Oct 2023 20:50:17 -0600 Subject: [PATCH] kgo: no-op mark functions when not using AutoCommitMarks Closes #598. --- pkg/kgo/consumer_group.go | 51 +++++++++++++-------------------------- 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 43203023..f4475ec7 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2201,14 +2201,6 @@ func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffse // offsets are always updated on calls to PollFetches. // // If there are no uncommitted offsets, this returns nil. -// -// Note that, if manually committing, you should be careful with committing -// during group rebalances. You must ensure you commit before the group's -// session timeout is reached, otherwise this client will be kicked from the -// group and the commit will fail. -// -// If using a cooperative balancer, commits while consuming during rebalancing -// may fail with REBALANCE_IN_PROGRESS. func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset { if g := cl.consumer.g; g != nil { return g.getUncommitted(true) @@ -2218,27 +2210,14 @@ func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset { // MarkedOffsets returns the latest marked offsets. When autocommitting, a // marked offset is an offset that can be committed, in comparison to a dirty -// offset that cannot yet be committed. You usually see marked offsets with -// AutoCommitMarks and MarkCommitRecords, but you can also use this function to -// grab the current offsets that are candidates for committing from normal -// autocommitting. -// -// If you set a custom OnPartitionsRevoked, marked offsets are not committed -// when partitions are revoked. You can use this function to mark records and -// issue a commit inside your OnPartitionsRevoked. -// -// Note that, if manually committing, you should be careful with committing -// during group rebalances. You must ensure you commit before the group's -// session timeout is reached, otherwise this client will be kicked from the -// group and the commit will fail. -// -// If using a cooperative balancer, commits while consuming during rebalancing -// may fail with REBALANCE_IN_PROGRESS. +// offset that cannot yet be committed. MarkedOffsets returns nil if you are +// not using AutoCommitMarks. func (cl *Client) MarkedOffsets() map[string]map[int32]EpochOffset { - if g := cl.consumer.g; g != nil { - return g.getUncommitted(false) + g := cl.consumer.g + if g == nil || !cl.cfg.autocommitMarks { + return nil } - return nil + return g.getUncommitted(false) } // CommittedOffsets returns the latest committed offsets. Committed offsets are @@ -2464,13 +2443,13 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error { return cl.commitOffsets(ctx, cl.UncommittedOffsets()) } -// CommitMarkedOffsets issues a synchronous offset commit for any -// partition that has been consumed from that has marked offsets. -// Retryable errors are retried up to the configured retry limit, and any -// unretryable error is returned. +// CommitMarkedOffsets issues a synchronous offset commit for any partition +// that has been consumed from that has marked offsets. Retryable errors are +// retried up to the configured retry limit, and any unretryable error is +// returned. // -// This function is useful if you have marked offsets with MarkCommitRecords -// when using AutoCommitMarks. +// This function is only useful if you have marked offsets with +// MarkCommitRecords when using AutoCommitMarks, otherwise this is a no-op. // // The recommended pattern for using this function is to have a poll / process // / commit loop. First PollFetches, then process every record, @@ -2480,7 +2459,11 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error { // As an alternative if you want to commit specific records, see CommitRecords. func (cl *Client) CommitMarkedOffsets(ctx context.Context) error { // This function is just the tail end of CommitRecords just above. - return cl.commitOffsets(ctx, cl.MarkedOffsets()) + marked := cl.MarkedOffsets() + if len(marked) == 0 { + return nil + } + return cl.commitOffsets(ctx, marked) } func (cl *Client) commitOffsets(ctx context.Context, offsets map[string]map[int32]EpochOffset) error {