diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index d8684a57..bbcb94c2 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -72,7 +72,7 @@ type Bootstrap struct { publishers *Publishers collector metrics.Collector server *api.Server - metrics *flowMetrics.Server + metrics *metricsWrapper events *ingestion.Engine profiler *api.ProfileServer db *pebbleDB.DB @@ -372,42 +372,13 @@ func (b *Bootstrap) StopAPIServer() { } func (b *Bootstrap) StartMetricsServer(ctx context.Context) error { - b.logger.Info().Msg("bootstrap starting metrics server") - - b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort)) - - // this logic is needed since the metric server is a component. - // we need to start and stop it manually here. - - ictx, errCh := irrecoverable.WithSignaler(ctx) - b.metrics.Start(ictx) - if err := util.WaitClosed(ctx, b.metrics.Ready()); err != nil { - return fmt.Errorf("failed to start metrics server: %w", err) - } - select { - case err := <-errCh: - // there might be an error already if the startup failed - return err - default: - } - - go func() { - err := <-errCh - if err != nil { - b.logger.Err(err).Msg("error in metrics server") - panic(err) - } - }() - return nil + b.metrics = newMetricsWrapper(b.logger, b.config.MetricsPort) + return b.metrics.Start(ctx) } func (b *Bootstrap) StopMetricsServer() { - if b.metrics == nil { - return - } - <-b.metrics.Done() - b.logger.Warn().Msg("shutting down metrics server") + b.metrics.Stop() } func (b *Bootstrap) StartProfilerServer(_ context.Context) error { @@ -710,3 +681,63 @@ func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) erro return nil } + +// metricsWrapper is needed since the metric server is a component. +// We need to start and stop it manually. +type metricsWrapper struct { + *flowMetrics.Server + log zerolog.Logger + stopFN func() +} + +func newMetricsWrapper(logger zerolog.Logger, port int) *metricsWrapper { + return &metricsWrapper{ + Server: flowMetrics.NewServer(logger, uint(port)), + log: logger, + stopFN: nil, + } +} + +func (m *metricsWrapper) Start(ctx context.Context) error { + m.log.Info().Msg("bootstrap starting metrics server") + + ctx, cancel := context.WithCancel(ctx) + ictx, errCh := irrecoverable.WithSignaler(ctx) + + m.Server.Start(ictx) + if err := util.WaitClosed(ctx, m.Ready()); err != nil { + cancel() + return fmt.Errorf("failed to start metrics server: %w", err) + } + select { + case err := <-errCh: + // there might be an error already if the startup failed + cancel() + return err + default: + } + + go func() { + err := <-errCh + cancel() + if err != nil { + m.log.Err(err).Msg("error in metrics server") + panic(err) + } + }() + + m.stopFN = cancel + + return nil +} + +func (m *metricsWrapper) Stop() { + if m == nil || m.stopFN == nil { + return + } + m.log.Warn().Msg("shutting down metrics server") + // context could already be cancelled, but no harm in calling cancel again. + // especially in testing scenarios. + m.stopFN() + <-m.Done() +}