Skip to content

Commit

Permalink
Use Shutdown to gracefully halt HTTP servers
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Dec 12, 2023
1 parent bee6ad4 commit 2839a6c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 27 deletions.
11 changes: 10 additions & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,16 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
for {
select {
case <-ctx.Done():
span.End()
defer span.End()
// If the request context is canceled, the API server is shutting down.
// We want to immediatly stop the long-poll and return a 200 with the ackToken and no actions.
if errors.Is(ctx.Err(), context.Canceled) {
resp := CheckinResponse{
AckToken: &ackToken,
Action: "checkin",
}
return ct.writeResponse(zlog, w, r, agent, resp)
}
return ctx.Err()
case acdocs := <-actCh:
var acs []Action
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/api/handleStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (st StatusT) handleStatus(zlog zerolog.Logger, sm policy.SelfMonitor, bi bu
span, _ = apm.StartSpan(r.Context(), "response", "write")
defer span.End()

// If the request context has been cancelled, such as the case when the server is stopping we should return a 503
// Note that the API server uses Shutdown, so no new requests should be accepted and this edge case will be rare.
if errors.Is(r.Context().Err(), context.Canceled) {
state = client.UnitStateStopping
}

data, err := json.Marshal(&resp)
if err != nil {
return err
Expand Down
35 changes: 11 additions & 24 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
slog "log"
"net"
"net/http"
Expand Down Expand Up @@ -75,25 +76,7 @@ func (s *server) Run(ctx context.Context) error {
ConnState: diagConn,
}

forceCh := make(chan struct{})
defer close(forceCh)

// handler to close server
go func() {
select {
case <-ctx.Done():
zerolog.Ctx(ctx).Debug().Msg("force server close on ctx.Done()")
err := srv.Close()
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("error while closing server")
}
case <-forceCh:
zerolog.Ctx(ctx).Debug().Msg("go routine forced closed on exit")
}
}()

var listenCfg net.ListenConfig

ln, err := listenCfg.Listen(ctx, "tcp", s.addr)
if err != nil {
return err
Expand Down Expand Up @@ -130,23 +113,27 @@ func (s *server) Run(ctx context.Context) error {
zerolog.Ctx(ctx).Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

// Start the API server on another goroutine and return any non ErrServerClosed errors through a channel.
errCh := make(chan error)
baseCtx, cancel := context.WithCancel(ctx)
defer cancel()

go func(ctx context.Context, errCh chan error, ln net.Listener) {
zerolog.Ctx(ctx).Info().Msgf("Listening on %s", s.addr)
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}(baseCtx, errCh, ln)
}(ctx, errCh, ln)

select {
// Listen and return any errors that occur from the server listener
case err := <-errCh:
if !errors.Is(err, context.Canceled) {
return err
return fmt.Errorf("error while serving API listener: %w", err)
}
// Do a clean shutdown if the context is cancelled
// Note that is will wait for connections to close, we may want to use a new timeout value here.
case <-ctx.Done():
if err := srv.Shutdown(context.TODO()); err != nil {
return fmt.Errorf("error while shutting down api listener: %w", err)
}
case <-baseCtx.Done():
}

return nil
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (bc *Bulk) timestamp() string {

// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ LOOP:
}
}

// FIXME cancelling the context will break out of the above loop and wait 1s, we should instead have a cancel context or some way to ensure that the API servers have shut down correctly.

// Server is coming down; wait for the server group to exit cleanly.
// Timeout if something is locked up.
err = safeWait(srvEg, time.Second)
Expand Down Expand Up @@ -376,6 +378,7 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) {
// unexpectedly (ie. not cancelled by the bulkCancel context).
errCh := make(chan error)

// FIXME why not run this in the errgroup?
go func() {
runFunc := loggedRunFunc(bulkCtx, "Bulker", bulker.Run)

Expand Down Expand Up @@ -505,10 +508,10 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro
if err != nil {
return err
}
g.Go(loggedRunFunc(ctx, "Revision monitor", am.Run))
g.Go(loggedRunFunc(ctx, "Action monitor", am.Run))

ad = action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst)
g.Go(loggedRunFunc(ctx, "Revision dispatcher", ad.Run))
g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run))
tr, err = action.NewTokenResolver(bulker)
if err != nil {
return err
Expand Down

0 comments on commit 2839a6c

Please sign in to comment.