diff --git a/internal/impl/kafka/franz_reader_unordered.go b/internal/impl/kafka/franz_reader_unordered.go index 0e68b9946..4b3602039 100644 --- a/internal/impl/kafka/franz_reader_unordered.go +++ b/internal/impl/kafka/franz_reader_unordered.go @@ -17,6 +17,7 @@ package kafka import ( "context" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -491,6 +492,11 @@ func (f *FranzReaderUnordered) Connect(ctx context.Context) error { return err } + // Check connectivity to cluster + if err = cl.Ping(ctx); err != nil { + return fmt.Errorf("failed to connect to cluster: %s", err) + } + go func() { defer func() { cl.Close()