From 929d564562ccb61da7d00c2a86b83ea10fc4497f Mon Sep 17 00:00:00 2001 From: "Zyad A. Ali" Date: Thu, 1 Feb 2024 23:01:47 +0200 Subject: [PATCH] pkg/kgo: allow marking using an offset Currently marking is only available via records, expose a new function, MarkCommitOffsets, to allow marking using offsets only. --- pkg/kgo/consumer_group.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index e71fcb52..01e00fd4 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2433,6 +2433,44 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) { } } +// MarkCommitOffsets marks offsets to be available for autocommitting. This +// function is only useful if you use the AutoCommitMarks config option, see +// the documentation on that option for more details. This function does not +// allow rewinds. +func (cl *Client) MarkCommitOffsets(unmarked map[string]map[int32]EpochOffset) { + g := cl.consumer.g + if g == nil || !cl.cfg.autocommitMarks { + return + } + + // protect g.uncommitted map + g.mu.Lock() + defer g.mu.Unlock() + + if g.uncommitted == nil { + g.uncommitted = make(uncommitted) + } + + for topic, partitions := range unmarked { + curPartitions := g.uncommitted[topic] + if curPartitions == nil { + curPartitions = make(map[int32]uncommit) + g.uncommitted[topic] = curPartitions + } + + for partition, newHead := range partitions { + current := curPartitions[partition] + if current.head.Less(newHead) { + curPartitions[partition] = uncommit{ + dirty: current.dirty, + committed: current.committed, + head: newHead, + } + } + } + } +} + // CommitUncommittedOffsets issues a synchronous offset commit for any // partition that has been consumed from that has uncommitted offsets. // Retryable errors are retried up to the configured retry limit, and any