Skip to content

Commit

Permalink
Merge pull request #28 from kalbhor/main
Browse files Browse the repository at this point in the history
refactor: upgrade franz-go and minor lint changes
  • Loading branch information
knadh authored Jul 29, 2024
2 parents 05fce61 + b31e40b commit 87710cc
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/knadh/koanf/providers/posflag v0.1.0
github.com/knadh/koanf/v2 v2.0.1
github.com/spf13/pflag v1.0.5
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kmsg v1.8.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE=
github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
Expand Down
11 changes: 1 addition & 10 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,10 @@ func initMetricsServer(metrics *metrics.Set, ko *koanf.Koanf) *http.Server {
buf.WriteTo(w)
})

srv := &http.Server{
return &http.Server{
Addr: ko.MustString("app.metrics_server_addr"),
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

go func() {
err := srv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("error starting server: %v", err)
}
}()

return srv
}
20 changes: 11 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -39,13 +40,20 @@ func main() {
log.Fatalf("error initializing filter provider: %v", err)
}

// Initialize metrics.
metr := metrics.NewSet()

// Create a global context with interrupts signals.
globalCtx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

// Initialize metrics set and start the HTTP server.
metr := metrics.NewSet()
metrSrv := initMetricsServer(metr, ko)
go func() {
if err := metrSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("error starting server: %v", err)
}
}()
defer metrSrv.Shutdown(globalCtx)

// Initialize the source and target Kafka config.
consumerCfgs, prodConfig := initKafkaConfig(ko)

Expand Down Expand Up @@ -78,16 +86,10 @@ func main() {
log.Fatalf("error initializing relay controller: %v", err)
}

// Start the metrSrv HTTP server.
metrSrv := initMetricsServer(metr, ko)

// Start the relay. This is an indefinitely blocking call.
if err := relay.Start(globalCtx); err != nil {
log.Fatalf("error starting relay controller: %v", err)
}

if metrSrv != nil {
metrSrv.Shutdown(globalCtx)
}
lo.Info("bye")
}

0 comments on commit 87710cc

Please sign in to comment.