Skip to content

Commit

Permalink
coordinator: correct shutdown, report serve errors
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Meyer <[email protected]>
  • Loading branch information
katexochen committed Aug 1, 2024
1 parent 6d0c713 commit 0ef1d88
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
Expand Down Expand Up @@ -38,6 +39,9 @@ func main() {
}

func run() (retErr error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logger.Default()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: creating logger: %v\n", err)
Expand Down Expand Up @@ -72,8 +76,10 @@ func run() (retErr error) {

userapi.RegisterUserAPIServer(grpcServer, meshAuth)
serverMetrics.InitializeMetrics(grpcServer)
meshAPI := newMeshAPIServer(meshAuth, meshAuth, promRegistry, serverMetrics, logger)
metricsServer := &http.Server{}

eg := errgroup.Group{}
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
if metricsPort == "" {
Expand All @@ -90,7 +96,10 @@ func run() (retErr error) {
promhttp.HandlerOpts{Registry: promRegistry},
),
))
if err := http.ListenAndServe(":"+metricsPort, mux); err != nil {
metricsServer.Addr = ":" + metricsPort
metricsServer.Handler = mux
if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("Serving Prometheus /metrics endpoint", "err", err)
return fmt.Errorf("serving Prometheus endpoint: %w", err)
}
return nil
Expand All @@ -103,20 +112,30 @@ func run() (retErr error) {
return fmt.Errorf("failed to listen: %w", err)
}
if err := grpcServer.Serve(lis); err != nil {
logger.Error("Serving Coordinator API", "err", err)
return fmt.Errorf("serving Coordinator API: %w", err)
}
return nil
})

eg.Go(func() error {
meshAPI := newMeshAPIServer(meshAuth, meshAuth, promRegistry, serverMetrics, logger)
logger.Info("Coordinator mesh API listening")
if err := meshAPI.Serve(net.JoinHostPort("0.0.0.0", meshapi.Port)); err != nil {
logger.Error("Serving mesh API", "err", err)
return fmt.Errorf("serving mesh API: %w", err)
}
return nil
})

eg.Go(func() error {
<-ctx.Done()
logger.Info("Error detected, shutting down")
grpcServer.GracefulStop()
meshAPI.grpc.GracefulStop()
//nolint:contextcheck // fresh context for cleanup
return metricsServer.Shutdown(context.Background())
})

return eg.Wait()
}

Expand Down

0 comments on commit 0ef1d88

Please sign in to comment.