Skip to content

Commit

Permalink
Check for cluster connectivity in the kafka_franz input
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Dec 16, 2024
1 parent 8c7dc47 commit ad2cd0a
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/impl/kafka/franz_reader_unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kafka
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -491,6 +492,11 @@ func (f *FranzReaderUnordered) Connect(ctx context.Context) error {
return err
}

// Check connectivity to cluster
if err = cl.Ping(ctx); err != nil {
return fmt.Errorf("failed to connect to cluster: %s", err)
}

go func() {
defer func() {
cl.Close()
Expand Down

0 comments on commit ad2cd0a

Please sign in to comment.