diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index bb9c4656..844673e4 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -830,6 +830,14 @@ func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetRespons return rs, nil } +// FetchAllGroupTopics is a kadm "internal" topic name that can be used in +// [FetchOffsetsForTopics]. By default, [FetchOffsetsForTopics] only returns +// topics that are explicitly requested. Other topics that may be committed to +// in the group are not returned. Using FetchAllRequestedTopics switches the +// behavior to return the union of all committed topics and all requested +// topics. +const FetchAllGroupTopics = "|fetch-all-group-topics|" + // FetchOffsetsForTopics is a helper function that returns the currently // committed offsets for the given group, as well as default -1 offsets for any // topic/partition that does not yet have a commit. @@ -838,9 +846,31 @@ func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetRespons // error. The returned offset responses are ready to be used or converted // directly to pure offsets with `Into`, and again into kgo offsets with // another `Into`. +// +// By default, this function returns offsets for only the requested topics. You +// can use the special "topic" [FetchAllGroupTopics] to return all committed-to +// topics in addition to all requested topics. func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error) { os := make(Offsets) + var all bool + keept := topics[:0] + for _, topic := range topics { + if topic == FetchAllGroupTopics { + all = true + continue + } + keept = append(keept, topic) + } + topics = keept + + if !all && len(topics) == 0 { + return make(OffsetResponses), nil + } + + // We have to request metadata to learn all partitions in all the + // topics. The default returned offset for all partitions is filled in + // to be -1. if len(topics) > 0 { listed, err := cl.ListTopics(ctx, topics...) if err != nil { @@ -865,11 +895,29 @@ func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topic if err := resps.Error(); err != nil { return nil, fmt.Errorf("offset fetches had a load error, first error: %w", err) } + + // For any topic (and any partition) we explicitly asked for, if the + // partition does not exist in the response, we fill the default -1 + // from above. os.Each(func(o Offset) { if _, ok := resps.Lookup(o.Topic, o.Partition); !ok { resps.Add(OffsetResponse{Offset: o}) } }) + + // If we are not requesting all group offsets, then we strip any topic + // that was not explicitly requested. + if !all { + tset := make(map[string]struct{}) + for _, t := range topics { + tset[t] = struct{}{} + } + for t := range resps { + if _, ok := tset[t]; !ok { + delete(resps, t) + } + } + } return resps, nil }