Skip to content

Commit

Permalink
Merge pull request #26 from zerodha/randomize-initial
Browse files Browse the repository at this point in the history
Add `randomize_initial` for randomizing the initial connection.
  • Loading branch information
knadh authored Jul 5, 2024
2 parents 43e191d + 0febbc4 commit 5fac087
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ backoff_max = "10s"
# whether it's healthy or not.
request_timeout = "100ms"

# Pick a random server from the [[sources]] list to connect first on boot
# instead of the first one from the list. This can be useful for testing
# servers in production environments that may never be consumed from except
# during rare failover events.
randomize_initial = false


[[sources]]
name = "node1"
Expand Down
16 changes: 16 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"log/slog"
"math/rand"
"net/http"
"os"
"plugin"
Expand Down Expand Up @@ -184,6 +185,21 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC
log.Fatalf("error unmarshalling `sources` config: %v", err)
}

log.Printf("read config for %d servers in the source pool", len(src.Sources))
if ko.Bool("source_pool.randomize_initial") {
log.Println("randomizing source pool for initial connection")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(src.Sources), func(i, j int) {
src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i]
})
}

// If it's single mode, eliminate all servers in the pool except one
// to disable healthcehcks and failover.
if ko.String("mode") == relay.ModeSingle {
src.Sources = src.Sources[:1]
}

// Read target Kafka config.
var prod relay.ProducerCfg
if err := ko.Unmarshal("target", &prod); err != nil {
Expand Down

0 comments on commit 5fac087

Please sign in to comment.