Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix metrics server shutdown #761

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Bootstrap struct {
publishers *Publishers
collector metrics.Collector
server *api.Server
metrics *flowMetrics.Server
metrics *metricsWrapper
events *ingestion.Engine
profiler *api.ProfileServer
db *pebbleDB.DB
Expand Down Expand Up @@ -372,42 +372,13 @@ func (b *Bootstrap) StopAPIServer() {
}

func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {
b.logger.Info().Msg("bootstrap starting metrics server")

b.metrics = flowMetrics.NewServer(b.logger, uint(b.config.MetricsPort))

// this logic is needed since the metric server is a component.
// we need to start and stop it manually here.

ictx, errCh := irrecoverable.WithSignaler(ctx)
b.metrics.Start(ictx)
if err := util.WaitClosed(ctx, b.metrics.Ready()); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
select {
case err := <-errCh:
// there might be an error already if the startup failed
return err
default:
}

go func() {
err := <-errCh
if err != nil {
b.logger.Err(err).Msg("error in metrics server")
panic(err)
}
}()

return nil
b.metrics = newMetricsWrapper(b.logger, b.config.MetricsPort)
return b.metrics.Start(ctx)
}

func (b *Bootstrap) StopMetricsServer() {
if b.metrics == nil {
return
}
<-b.metrics.Done()
b.logger.Warn().Msg("shutting down metrics server")
b.metrics.Stop()
}

func (b *Bootstrap) StartProfilerServer(_ context.Context) error {
Expand Down Expand Up @@ -710,3 +681,63 @@ func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) erro

return nil
}

// metricsWrapper is needed since the metric server is a component.
// We need to start and stop it manually.
type metricsWrapper struct {
*flowMetrics.Server
log zerolog.Logger
stopFN func()
}
Comment on lines +685 to +691
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix method accessibility in metricsWrapper.

The wrapper embeds *flowMetrics.Server but doesn't properly expose the Ready() and Done() methods, causing compilation errors. Add method forwarding to fix this.

Apply this diff:

type metricsWrapper struct {
	*flowMetrics.Server
	log    zerolog.Logger
	stopFN func()
}

+// Ready forwards to the embedded Server's Ready method
+func (m *metricsWrapper) Ready() <-chan struct{} {
+	return m.Server.Ready()
+}

+// Done forwards to the embedded Server's Done method
+func (m *metricsWrapper) Done() <-chan struct{} {
+	return m.Server.Done()
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// metricsWrapper is needed since the metric server is a component.
// We need to start and stop it manually.
type metricsWrapper struct {
*flowMetrics.Server
log zerolog.Logger
stopFN func()
}
// metricsWrapper is needed since the metric server is a component.
// We need to start and stop it manually.
type metricsWrapper struct {
*flowMetrics.Server
log zerolog.Logger
stopFN func()
}
// Ready forwards to the embedded Server's Ready method
func (m *metricsWrapper) Ready() <-chan struct{} {
return m.Server.Ready()
}
// Done forwards to the embedded Server's Done method
func (m *metricsWrapper) Done() <-chan struct{} {
return m.Server.Done()
}


func newMetricsWrapper(logger zerolog.Logger, port int) *metricsWrapper {
return &metricsWrapper{
Server: flowMetrics.NewServer(logger, uint(port)),
log: logger,
stopFN: nil,
}
}

func (m *metricsWrapper) Start(ctx context.Context) error {
m.log.Info().Msg("bootstrap starting metrics server")

ctx, cancel := context.WithCancel(ctx)
ictx, errCh := irrecoverable.WithSignaler(ctx)

m.Server.Start(ictx)
if err := util.WaitClosed(ctx, m.Ready()); err != nil {
cancel()
return fmt.Errorf("failed to start metrics server: %w", err)
}
select {
case err := <-errCh:
// there might be an error already if the startup failed
cancel()
return err
default:
}

go func() {
err := <-errCh
cancel()
if err != nil {
m.log.Err(err).Msg("error in metrics server")
panic(err)
}
}()

m.stopFN = cancel

return nil
}

func (m *metricsWrapper) Stop() {
if m == nil || m.stopFN == nil {
return
}
m.log.Warn().Msg("shutting down metrics server")
// context could already be cancelled, but no harm in calling cancel again.
// especially in testing scenarios.
m.stopFN()
<-m.Done()
}
Loading