diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index de925658..a72c1065 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -1228,7 +1228,8 @@ type GroupMemberLag struct { Partition int32 // Partition is the partition this lag is for. Commit Offset // Commit is this member's current offset commit. - End ListedOffset // EndOffset is a reference to the end offset of this partition. + Start ListedOffset // Start is a reference to the start of this partition, if provided. Start offsets are optional; if not provided, Start.Err is a non-nil error saying this partition is missing from list offsets. This is always present if lag is calculated via Client.Lag. + End ListedOffset // End is a reference to the end offset of this partition. Lag int64 // Lag is how far behind this member is, or -1 if there is a commit error or list offset error. Err error // Err is either the commit error, or the list end offsets error, or nil. @@ -1404,12 +1405,12 @@ func (ls DescribedGroupLags) Ok() bool { } // Lag returns the lag for all input groups. This function is a shortcut for -// the steps required to use CalculateGroupLag properly, with some opinionated -// choices for error handling since calculating lag is multi-request process. -// If a group cannot be described or the offsets cannot be fetched, an error is -// returned for the group. If any topic cannot have its end offsets listed, the -// lag for the partition has a corresponding error. If any request fails with -// an auth error, this returns *AuthError. +// the steps required to use CalculateGroupLagWithStartOffsets properly, with +// some opinionated choices for error handling since calculating lag is +// multi-request process. If a group cannot be described or the offsets cannot +// be fetched, an error is returned for the group. If any topic cannot have its +// end offsets listed, the lag for the partition has a corresponding error. If +// any request fails with an auth error, this returns *AuthError. func (cl *Client) Lag(ctx context.Context, groups ...string) (DescribedGroupLags, error) { set := make(map[string]struct{}, len(groups)) for _, g := range groups { @@ -1486,42 +1487,51 @@ func (cl *Client) Lag(ctx context.Context, groups ...string) (DescribedGroupLags return lags, nil } - // Lastly, we have to list the end offset for all assigned and + // We have to list the start & end offset for all assigned and // committed partitions. - var listed ListedOffsets + var startOffsets, endOffsets ListedOffsets listPartitions := described.AssignedPartitions() listPartitions.Merge(fetched.CommittedPartitions()) if topics := listPartitions.Topics(); len(topics) > 0 { - listed, err = cl.ListEndOffsets(ctx, topics...) - // As above: return on auth error. If there are shard errors, - // the topics will be missing in the response and then - // CalculateGroupLag will return UnknownTopicOrPartition. - switch { - case errors.As(err, &ae): - return nil, err - case errors.As(err, &se): - // do nothing: these show up as errListMissing - case err != nil: - return nil, err - } - // For anything that lists with a single -1 partition, the - // topic does not exist. We add an UnknownTopicOrPartition - // error for all partitions that were committed to, so that - // this shows up in the lag output as UnknownTopicOrPartition - // rather than errListMissing. - for t, ps := range listed { - if len(ps) != 1 { - continue - } - if _, ok := ps[-1]; !ok { - continue + for _, list := range []struct { + fn func(context.Context, ...string) (ListedOffsets, error) + dst *ListedOffsets + }{ + {cl.ListStartOffsets, &startOffsets}, + {cl.ListEndOffsets, &endOffsets}, + } { + listed, err := list.fn(ctx, topics...) + *list.dst = listed + // As above: return on auth error. If there are shard errors, + // the topics will be missing in the response and then + // CalculateGroupLag will return UnknownTopicOrPartition. + switch { + case errors.As(err, &ae): + return nil, err + case errors.As(err, &se): + // do nothing: these show up as errListMissing + case err != nil: + return nil, err } - delete(ps, -1) - for p := range listPartitions[t] { - ps[p] = ListedOffset{ - Topic: t, - Partition: p, - Err: kerr.UnknownTopicOrPartition, + // For anything that lists with a single -1 partition, the + // topic does not exist. We add an UnknownTopicOrPartition + // error for all partitions that were committed to, so that + // this shows up in the lag output as UnknownTopicOrPartition + // rather than errListMissing. + for t, ps := range listed { + if len(ps) != 1 { + continue + } + if _, ok := ps[-1]; !ok { + continue + } + delete(ps, -1) + for p := range listPartitions[t] { + ps[p] = ListedOffset{ + Topic: t, + Partition: p, + Err: kerr.UnknownTopicOrPartition, + } } } } @@ -1529,46 +1539,46 @@ func (cl *Client) Lag(ctx context.Context, groups ...string) (DescribedGroupLags for _, g := range described { l := lags[g.Group] - l.Lag = CalculateGroupLag(g, fetched[g.Group].Fetched, listed) + l.Lag = CalculateGroupLagWithStartOffsets(g, fetched[g.Group].Fetched, startOffsets, endOffsets) lags[g.Group] = l } return lags, nil } -// CalculateGroupLag returns the per-partition lag of all members in a group. -// The input to this method is the returns from the following methods (make -// sure to check shard errors): +var noOffsets = make(ListedOffsets) + +// CalculateGroupLagWithStartOffsets returns the per-partition lag of all +// members in a group. This function slightly expands on [CalculateGroupLag] to +// handle calculating lag for partitions that (1) have no commits AND (2) have +// some segments deleted (cleanup.policy=delete) such that the log start offset +// is non-zero. // -// // Note that FetchOffsets exists to fetch only one group's offsets, -// // but some of the code below slightly changes. -// groups := DescribeGroups(ctx, group) -// commits := FetchManyOffsets(ctx, group) -// var endOffsets ListedOffsets -// listPartitions := described.AssignedPartitions() -// listPartitions.Merge(commits.CommittedPartitions() -// if topics := listPartitions.Topics(); len(topics) > 0 { -// endOffsets = ListEndOffsets(ctx, listPartitions.Topics()) -// } -// for _, group := range groups { -// lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets) -// } +// As an example, if a group is consuming a partition with log end offset 30 +// and log start offset 10 and has not yet committed to the group, this +// function can correctly tell you that the lag is 20, whereas +// CalculateGroupLag would tell you the lag is 30. // -// If assigned partitions are missing in the listed end offsets, the partition -// will have an error indicating it is missing. A missing topic or partition in -// the commits is assumed to be nothing committing yet. -func CalculateGroupLag( +// This function accepts 'nil' for startOffsets, which will result in the same +// behavior as CalculateGroupLag. This function is useful if you have +// infrequently committing groups against topics that have segments being +// deleted. +func CalculateGroupLagWithStartOffsets( group DescribedGroup, commit OffsetResponses, + startOffsets ListedOffsets, endOffsets ListedOffsets, ) GroupLag { - if group.State == "Empty" { - return calculateEmptyLag(commit, endOffsets) - } if commit == nil { // avoid panics below commit = make(OffsetResponses) } + if startOffsets == nil { + startOffsets = noOffsets + } if endOffsets == nil { - endOffsets = make(ListedOffsets) + endOffsets = noOffsets + } + if group.State == "Empty" { + return calculateEmptyLag(commit, startOffsets, endOffsets) } l := make(map[string]map[int32]GroupMemberLag) @@ -1585,6 +1595,7 @@ func CalculateGroupLag( } tcommit := commit[t.Topic] + tstart := startOffsets[t.Topic] tend := endOffsets[t.Topic] for _, p := range t.Partitions { var ( @@ -1593,36 +1604,55 @@ func CalculateGroupLag( Partition: p, At: -1, }} - pend ListedOffset - perr error - ok bool + pend = ListedOffset{ + Topic: t.Topic, + Partition: p, + Err: errListMissing, + } + pstart = pend + perr error ) if tcommit != nil { - if actualpcommit, ok := tcommit[p]; ok { - pcommit = actualpcommit + if pcommitActual, ok := tcommit[p]; ok { + pcommit = pcommitActual } } - if tend == nil { - perr = errListMissing - } else { - if pend, ok = tend[p]; !ok { - perr = errListMissing + perr = errListMissing + if tend != nil { + if pendActual, ok := tend[p]; ok { + pend = pendActual + perr = nil } } - if perr == nil { if perr = pcommit.Err; perr == nil { perr = pend.Err } } + if tstart != nil { + if pstartActual, ok := tstart[p]; ok { + pstart = pstartActual + } + } lag := int64(-1) if perr == nil { lag = pend.Offset + if pstart.Err == nil { + lag = pend.Offset - pstart.Offset + } if pcommit.At >= 0 { lag = pend.Offset - pcommit.At } + // It is possible for a commit to be after the + // end, in which case we will round to 0. We do + // this check here to also handle a potential non-commit + // weird pend < pstart scenario where a segment + // was deleted between listing offsets. + if lag < 0 { + lag = 0 + } } lt[p] = GroupMemberLag{ @@ -1630,6 +1660,7 @@ func CalculateGroupLag( Topic: t.Topic, Partition: p, Commit: pcommit.Offset, + Start: pstart, End: pend, Lag: lag, Err: perr, @@ -1642,7 +1673,36 @@ func CalculateGroupLag( return l } -func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLag { +// CalculateGroupLag returns the per-partition lag of all members in a group. +// The input to this method is the returns from the following methods (make +// sure to check shard errors): +// +// // Note that FetchOffsets exists to fetch only one group's offsets, +// // but some of the code below slightly changes. +// groups := DescribeGroups(ctx, group) +// commits := FetchManyOffsets(ctx, group) +// var endOffsets ListedOffsets +// listPartitions := described.AssignedPartitions() +// listPartitions.Merge(commits.CommittedPartitions() +// if topics := listPartitions.Topics(); len(topics) > 0 { +// endOffsets = ListEndOffsets(ctx, listPartitions.Topics()) +// } +// for _, group := range groups { +// lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets) +// } +// +// If assigned partitions are missing in the listed end offsets, the partition +// will have an error indicating it is missing. A missing topic or partition in +// the commits is assumed to be nothing committing yet. +func CalculateGroupLag( + group DescribedGroup, + commit OffsetResponses, + endOffsets ListedOffsets, +) GroupLag { + return CalculateGroupLagWithStartOffsets(group, commit, nil, endOffsets) +} + +func calculateEmptyLag(commit OffsetResponses, startOffsets, endOffsets ListedOffsets) GroupLag { l := make(map[string]map[int32]GroupMemberLag) for t, ps := range commit { lt := l[t] @@ -1650,18 +1710,36 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa lt = make(map[int32]GroupMemberLag) l[t] = lt } + tstart := startOffsets[t] tend := endOffsets[t] for p, pcommit := range ps { var ( - pend ListedOffset - perr error - ok bool + pend = ListedOffset{ + Topic: t, + Partition: p, + Err: errListMissing, + } + pstart = pend + perr error ) - if tend == nil { - perr = errListMissing - } else { - if pend, ok = tend[p]; !ok { - perr = errListMissing + + // In order of priority, perr (the error on the Lag + // calculation) is non-nil if: + // + // * The topic is missing from end ListOffsets + // * The partition is missing from end ListOffsets + // * OffsetFetch has an error on the partition + // * ListOffsets has an error on the partition + // + // If we have no error, then we can calculate lag. + // We *do* allow an error on start ListedOffsets; + // if there are no start offsets or the start offset + // has an error, it is not used for lag calculation. + perr = errListMissing + if tend != nil { + if pendActual, ok := tend[p]; ok { + pend = pendActual + perr = nil } } if perr == nil { @@ -1669,19 +1747,31 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa perr = pend.Err } } + if tstart != nil { + if pstartActual, ok := tstart[p]; ok { + pstart = pstartActual + } + } lag := int64(-1) if perr == nil { lag = pend.Offset + if pstart.Err == nil { + lag = pend.Offset - pstart.Offset + } if pcommit.At >= 0 { lag = pend.Offset - pcommit.At } + if lag < 0 { + lag = 0 + } } lt[p] = GroupMemberLag{ Topic: t, Partition: p, Commit: pcommit.Offset, + Start: pstart, End: pend, Lag: lag, Err: perr, @@ -1694,6 +1784,7 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa // lag calculations above, the partitions were not committed to and we // count that as entirely lagging. for t, lt := range l { + tstart := startOffsets[t] tend := endOffsets[t] for p, pend := range tend { if _, ok := lt[p]; ok { @@ -1710,10 +1801,27 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa if perr == nil { lag = pend.Offset } + pstart := ListedOffset{ + Topic: t, + Partition: p, + Err: errListMissing, + } + if tstart != nil { + if pstartActual, ok := tstart[p]; ok { + pstart = pstartActual + if pstart.Err == nil { + lag = pend.Offset - pstart.Offset + if lag < 0 { + lag = 0 + } + } + } + } lt[p] = GroupMemberLag{ Topic: t, Partition: p, Commit: pcommit, + Start: pstart, End: pend, Lag: lag, Err: perr,