Skip to content

Commit

Permalink
Merge pull request twmb#387 from twmb/group_error
Browse files Browse the repository at this point in the history
kgo: inject the group lost error into polling
  • Loading branch information
twmb authored Mar 13, 2023
2 parents 768cc7f + b487a15 commit 3a0fca1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (g *groupConsumer) manage() {
h.OnGroupManageError(err)
}
})
g.c.addFakeReadyForDraining("", 0, &ErrGroupSession{err})
}

// If we are eager, we should have invalidated everything
Expand Down
13 changes: 13 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,16 @@ func (e *errUnknownCoordinator) Error() string {
" but did not reply with that broker in the broker list", e.key.name, e.key.typ, e.coordinator)
}
}

// ErrGroupSession is injected into a poll if an error occurred such that your
// consumer group member was kicked from the group or was never able to join
// the group.
type ErrGroupSession struct {
err error
}

func (e *ErrGroupSession) Error() string {
return fmt.Sprintf("unable to join group session: %v", e.err)
}

func (e *ErrGroupSession) Unwrap() error { return e.err }
13 changes: 12 additions & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ type FetchError struct {
// Errors returns all errors in a fetch with the topic and partition that
// errored.
//
// There are four classes of errors possible:
// There are a few classes of errors possible:
//
// 1. a normal kerr.Error; these are usually the non-retryable kerr.Errors,
// but theoretically a non-retryable error can be fixed at runtime (auth
Expand All @@ -345,6 +345,17 @@ type FetchError struct {
// is returned from every Poll call if the client has been closed.
// A corresponding helper function IsClientClosed can be used to detect
// this error.
//
// 5. an injected context error; this can be present if the context you were
// using for polling timed out or was canceled.
//
// 6. an injected ErrGroupSession; this is an informational error that is
// injected once a group session is lost in a way that is not the standard
// rebalance. This error can signify that your consumer member is not able
// to connect to the group (ACL problems, unreachable broker), or you
// blocked rebalancing for too long, or your callbacks took too long.
//
// This list may grow over time.
func (fs Fetches) Errors() []FetchError {
var errs []FetchError
fs.EachError(func(t string, p int32, err error) {
Expand Down

0 comments on commit 3a0fca1

Please sign in to comment.