Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Shutdown to gracefully halt HTTP servers #3165

Merged
merged 9 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Drain HTTP connections on shutdown

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Attempt to safely drain HTTP connections on shutdown by using the http server's Shutdown method.
Add a new timeout.Drain config attribute that how long the shutdown will wait (default 10s).

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3165

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2902
2 changes: 2 additions & 0 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ fleet:
# checkin_jitter: 30s
# # checkin_max_poll is the maximum long_poll value a client can request.
# checkin_max_poll: 1h
# # drain is the amount of time fleet-server will wait for HTTP connections to terminate on a shutdown signal before forcing all connections closed
# drain: 10s
#
# # profiler will bind Go's pprof endpoints to a new listener if enabled.
# profiler:
Expand Down
11 changes: 10 additions & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,16 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
for {
select {
case <-ctx.Done():
span.End()
defer span.End()
// If the request context is canceled, the API server is shutting down.
// We want to immediately stop the long-poll and return a 200 with the ackToken and no actions.
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
if errors.Is(ctx.Err(), context.Canceled) {
resp := CheckinResponse{
AckToken: &ackToken,
Action: "checkin",
}
return ct.writeResponse(zlog, w, r, agent, resp)
}
return ctx.Err()
case acdocs := <-actCh:
var acs []Action
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/api/handleStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (st StatusT) handleStatus(zlog zerolog.Logger, sm policy.SelfMonitor, bi bu
span, _ = apm.StartSpan(r.Context(), "response", "write")
defer span.End()

// If the request context has been cancelled, such as the case when the server is stopping we should return a 503
// Note that the API server uses Shutdown, so no new requests should be accepted and this edge case will be rare.
if errors.Is(r.Context().Err(), context.Canceled) {
state = client.UnitStateStopping
}

data, err := json.Marshal(&resp)
if err != nil {
return err
Expand Down
37 changes: 13 additions & 24 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
slog "log"
"net"
"net/http"
Expand Down Expand Up @@ -75,25 +76,7 @@ func (s *server) Run(ctx context.Context) error {
ConnState: diagConn,
}

forceCh := make(chan struct{})
defer close(forceCh)

// handler to close server
go func() {
select {
case <-ctx.Done():
zerolog.Ctx(ctx).Debug().Msg("force server close on ctx.Done()")
err := srv.Close()
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("error while closing server")
}
case <-forceCh:
zerolog.Ctx(ctx).Debug().Msg("go routine forced closed on exit")
}
}()

var listenCfg net.ListenConfig

ln, err := listenCfg.Listen(ctx, "tcp", s.addr)
if err != nil {
return err
Expand Down Expand Up @@ -130,23 +113,29 @@ func (s *server) Run(ctx context.Context) error {
zerolog.Ctx(ctx).Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

// Start the API server on another goroutine and return any non ErrServerClosed errors through a channel.
errCh := make(chan error)
baseCtx, cancel := context.WithCancel(ctx)
defer cancel()

go func(ctx context.Context, errCh chan error, ln net.Listener) {
zerolog.Ctx(ctx).Info().Msgf("Listening on %s", s.addr)
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}(baseCtx, errCh, ln)
}(ctx, errCh, ln)

select {
// Listen and return any errors that occur from the server listener
case err := <-errCh:
if !errors.Is(err, context.Canceled) {
return err
return fmt.Errorf("error while serving API listener: %w", err)
}
// Do a clean shutdown if the context is cancelled
case <-ctx.Done():
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain)
defer cancel()
if err := srv.Shutdown(sCtx); err != nil {
cErr := srv.Close() // force it closed
return errors.Join(fmt.Errorf("error while shutting down api listener: %w", err), cErr)
}
case <-baseCtx.Done():
}

return nil
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (bc *Bulk) timestamp() string {

// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestConfig(t *testing.T) {
CheckinLongPoll: 5 * time.Minute,
CheckinJitter: 30 * time.Second,
CheckinMaxPoll: 10 * time.Minute,
Drain: 10 * time.Second,
},
Profiler: ServerProfiler{
Enabled: false,
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ServerTimeouts struct {
CheckinLongPoll time.Duration `config:"checkin_long_poll"`
CheckinJitter time.Duration `config:"checkin_jitter"`
CheckinMaxPoll time.Duration `config:"checkin_max_poll"`
Drain time.Duration `config:"drain"`
}

// InitDefaults initializes the defaults for the configuration.
Expand Down Expand Up @@ -64,4 +65,9 @@ func (c *ServerTimeouts) InitDefaults() {
// The long poll value is poll_timeout-2m, and the request's write timeout is set to poll_timeout-1m
// CheckinMaxPoll values of less then 1m are effectively ignored and a 1m limit is used.
c.CheckinMaxPoll = time.Hour

// Drain is the max duration that a server will keep connections open when a shutdown signal is received in order to gracefully handle in progress-requests.
// It is used as a context timeout value for server.ShutDown(ctx).
// A long-poll checkin connection should immediately return with a 200 status and the same ackToken it was sent, the same as if the long-poll completed with no changes detected.
c.Drain = 10 * time.Second
}
21 changes: 18 additions & 3 deletions internal/pkg/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package profile

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/http/pprof"
Expand All @@ -17,7 +19,6 @@ import (

// RunProfiler exposes /debug/pprof on the passed address by staring a server.
func RunProfiler(ctx context.Context, addr string) error {

if addr == "" {
zerolog.Ctx(ctx).Info().Msg("Profiler disabled")
return nil
Expand Down Expand Up @@ -47,10 +48,24 @@ func RunProfiler(ctx context.Context, addr string) error {
}

zerolog.Ctx(ctx).Info().Str("bind", addr).Msg("Installing profiler")
if err := server.ListenAndServe(); err != nil {
errCh := make(chan error)
go func() {
if err := server.ListenAndServe(); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
zerolog.Ctx(ctx).Error().Err(err).Str("bind", addr).Msg("Fail install profiler")
return err
case <-ctx.Done():
sCtx, cancel := context.WithTimeout(context.Background(), cfg.Drain)
defer cancel()
if err := server.Shutdown(sCtx); err != nil {
cErr := server.Close() // force it closed
return errors.Join(fmt.Errorf("error while shutting down profile listener: %w", err), cErr)
}
}

return nil
}
56 changes: 56 additions & 0 deletions internal/pkg/profile/profile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package profile

import (
"context"
"net"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRunProfiler(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:8081")
if err != nil {
t.Skip("Port 8081 must be free to run this test")
}
ln.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh := make(chan error)

go func() {
errCh <- RunProfiler(ctx, "localhost:8081")
}()

req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:8081/debug/pprof", nil)
require.NoError(t, err)

var resp *http.Response
for i := 0; i < 10; i++ {
resp, err = http.DefaultClient.Do(req) //nolint:bodyclose // closed outside the loop
if err == nil {
Comment on lines +38 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Could it be closed here?

Suggested change
resp, err = http.DefaultClient.Do(req) //nolint:bodyclose // closed outside the loop
if err == nil {
resp, err = http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()

break
}
t.Logf("profile request %d failed with: %v, retrying...", i, err)
time.Sleep(time.Millisecond * 200)
}
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

cancel()

select {
case err := <-errCh:
require.NoError(t, err)
default:
}
}
6 changes: 3 additions & 3 deletions internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ LOOP:

// Server is coming down; wait for the server group to exit cleanly.
// Timeout if something is locked up.
err = safeWait(srvEg, time.Second)
err = safeWait(srvEg, curCfg.Inputs[0].Server.Timeouts.Drain)

// Eat cancel error to minimize confusion in logs
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -506,10 +506,10 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro
if err != nil {
return err
}
g.Go(loggedRunFunc(ctx, "Revision monitor", am.Run))
g.Go(loggedRunFunc(ctx, "Action monitor", am.Run))

ad = action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst)
g.Go(loggedRunFunc(ctx, "Revision dispatcher", ad.Run))
g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run))
tr, err = action.NewTokenResolver(bulker)
if err != nil {
return err
Expand Down
Loading
Loading