diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index cd76684b..c60e5f4f 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2433,6 +2433,44 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) { } } +// MarkCommitOffsets marks offsets to be available for autocommitting. This +// function is only useful if you use the AutoCommitMarks config option, see +// the documentation on that option for more details. This function does not +// allow rewinds. +func (cl *Client) MarkCommitOffsets(unmarked map[string]map[int32]EpochOffset) { + g := cl.consumer.g + if g == nil || !cl.cfg.autocommitMarks { + return + } + + // protect g.uncommitted map + g.mu.Lock() + defer g.mu.Unlock() + + if g.uncommitted == nil { + g.uncommitted = make(uncommitted) + } + + for topic, partitions := range unmarked { + curPartitions := g.uncommitted[topic] + if curPartitions == nil { + curPartitions = make(map[int32]uncommit) + g.uncommitted[topic] = curPartitions + } + + for partition, newHead := range partitions { + current := curPartitions[partition] + if current.head.Less(newHead) { + curPartitions[partition] = uncommit{ + dirty: current.dirty, + committed: current.committed, + head: newHead, + } + } + } + } +} + // CommitUncommittedOffsets issues a synchronous offset commit for any // partition that has been consumed from that has uncommitted offsets. // Retryable errors are retried up to the configured retry limit, and any