Skip to content

Commit

Permalink
Merge pull request #761 from onflow/janez/fix-metrics-server-shutdown
Browse files Browse the repository at this point in the history
fix metrics server shutdown
  • Loading branch information
janezpodhostnik authored Feb 19, 2025
2 parents 6f4a6cb + d48c663 commit f8efaff
Showing 1 changed file with 64 additions and 33 deletions.
97 changes: 64 additions & 33 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Bootstrap struct {
publishers *Publishers
collector metrics.Collector
server *api.Server
metrics *flowMetrics.Server
metrics *metricsWrapper
events *ingestion.Engine
profiler *api.ProfileServer
db *pebbleDB.DB
Expand Down Expand Up @@ -372,42 +372,13 @@ func (b *Bootstrap) StopAPIServer() {
}

func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {
b.logger.Info().Msg("bootstrap starting metrics server")

b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort))

// this logic is needed since the metric server is a component.
// we need to start and stop it manually here.

ictx, errCh := irrecoverable.WithSignaler(ctx)
b.metrics.Start(ictx)
if err := util.WaitClosed(ctx, b.metrics.Ready()); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
select {
case err := <-errCh:
// there might be an error already if the startup failed
return err
default:
}

go func() {
err := <-errCh
if err != nil {
b.logger.Err(err).Msg("error in metrics server")
panic(err)
}
}()

return nil
b.metrics = newMetricsWrapper(b.logger, b.config.MetricsPort)
return b.metrics.Start(ctx)
}

func (b *Bootstrap) StopMetricsServer() {
if b.metrics == nil {
return
}
<-b.metrics.Done()
b.logger.Warn().Msg("shutting down metrics server")
b.metrics.Stop()
}

func (b *Bootstrap) StartProfilerServer(_ context.Context) error {
Expand Down Expand Up @@ -710,3 +681,63 @@ func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) erro

return nil
}

// metricsWrapper is needed since the metric server is a component.
// We need to start and stop it manually.
type metricsWrapper struct {
*flowMetrics.Server
log zerolog.Logger
stopFN func()
}

func newMetricsWrapper(logger zerolog.Logger, port int) *metricsWrapper {
return &metricsWrapper{
Server: flowMetrics.NewServer(logger, uint(port)),
log: logger,
stopFN: nil,
}
}

func (m *metricsWrapper) Start(ctx context.Context) error {
m.log.Info().Msg("bootstrap starting metrics server")

ctx, cancel := context.WithCancel(ctx)
ictx, errCh := irrecoverable.WithSignaler(ctx)

m.Server.Start(ictx)
if err := util.WaitClosed(ctx, m.Ready()); err != nil {
cancel()
return fmt.Errorf("failed to start metrics server: %w", err)
}
select {
case err := <-errCh:
// there might be an error already if the startup failed
cancel()
return err
default:
}

go func() {
err := <-errCh
cancel()
if err != nil {
m.log.Err(err).Msg("error in metrics server")
panic(err)
}
}()

m.stopFN = cancel

return nil
}

func (m *metricsWrapper) Stop() {
if m == nil || m.stopFN == nil {
return
}
m.log.Warn().Msg("shutting down metrics server")
// context could already be cancelled, but no harm in calling cancel again.
// especially in testing scenarios.
m.stopFN()
<-m.Done()
}

0 comments on commit f8efaff

Please sign in to comment.