diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 5483f5cf..09a853d9 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -348,6 +348,7 @@ func (g *groupConsumer) manage() { h.OnGroupManageError(err) } }) + g.c.addFakeReadyForDraining("", 0, &ErrGroupSession{err}) } // If we are eager, we should have invalidated everything diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 78a49ad7..3d6777cf 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -286,3 +286,16 @@ func (e *errUnknownCoordinator) Error() string { " but did not reply with that broker in the broker list", e.key.name, e.key.typ, e.coordinator) } } + +// ErrGroupSession is injected into a poll if an error occurred such that your +// consumer group member was kicked from the group or was never able to join +// the group. +type ErrGroupSession struct { + err error +} + +func (e *ErrGroupSession) Error() string { + return fmt.Sprintf("unable to join group session: %v", e.err) +} + +func (e *ErrGroupSession) Unwrap() error { return e.err } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 55c71a84..7edff28b 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -324,7 +324,7 @@ type FetchError struct { // Errors returns all errors in a fetch with the topic and partition that // errored. // -// There are four classes of errors possible: +// There are a few classes of errors possible: // // 1. a normal kerr.Error; these are usually the non-retryable kerr.Errors, // but theoretically a non-retryable error can be fixed at runtime (auth @@ -345,6 +345,17 @@ type FetchError struct { // is returned from every Poll call if the client has been closed. // A corresponding helper function IsClientClosed can be used to detect // this error. +// +// 5. an injected context error; this can be present if the context you were +// using for polling timed out or was canceled. +// +// 6. an injected ErrGroupSession; this is an informational error that is +// injected once a group session is lost in a way that is not the standard +// rebalance. This error can signify that your consumer member is not able +// to connect to the group (ACL problems, unreachable broker), or you +// blocked rebalancing for too long, or your callbacks took too long. +// +// This list may grow over time. func (fs Fetches) Errors() []FetchError { var errs []FetchError fs.EachError(func(t string, p int32, err error) {