Skip to content

Commit

Permalink
[8.x](backport #4168) Replace most context.TODO calls, comment on oth…
Browse files Browse the repository at this point in the history
…er Background and TODOs (#4180)

* Replace most context.TODO calls, comment on other Background and TODOs (#4168)

(cherry picked from commit cf41f38)

# Conflicts:
#	internal/pkg/config/config.go
#	internal/pkg/server/fleet.go

* Fix merge conflict

* Fix linter

* fix deprecated options check

---------

Co-authored-by: Michel Laterman <[email protected]>
Co-authored-by: michel-laterman <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 1483e72 commit 14b40c9
Show file tree
Hide file tree
Showing 29 changed files with 241 additions and 189 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1733164018-Replace-use-of-context.TODO.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: other

# Change summary; a 80ish characters long description of the change.
summary: Replace use of context.TODO

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/fleet-server/issues/3087
6 changes: 4 additions & 2 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/signal"
"github.com/elastic/fleet-server/v7/internal/pkg/state"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -117,12 +118,13 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error
return err
}

srv, err := server.NewFleet(bi, state.NewLog(), true)
ctx := installSignalHandler()
srv, err := server.NewFleet(bi, state.NewLog(zerolog.Ctx(ctx)), true)
if err != nil {
return err
}

if err := srv.Run(installSignalHandler(), cfg); err != nil && !errors.Is(err, context.Canceled) {
if err := srv.Run(ctx, cfg); err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Exiting")
l.Sync()
return err
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {

// Subscribe generates a new subscription with the Dispatcher using the provided agentID and seqNo.
// There is no check to ensure that the agentID has not been used; using the same one twice results in undefined behaviour.
func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
func (d *Dispatcher) Subscribe(log zerolog.Logger, agentID string, seqNo sqn.SeqNo) *Sub {
cbCh := make(chan []model.Action, 1)

sub := Sub{
Expand All @@ -84,14 +84,14 @@ func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
sz := len(d.subs)
d.mx.Unlock()

zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")
log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")

return &sub
}

// Unsubscribe removes the given subscription from the dispatcher.
// Note that the channel sub.Ch() provides is not closed in this event.
func (d *Dispatcher) Unsubscribe(sub *Sub) {
func (d *Dispatcher) Unsubscribe(log zerolog.Logger, sub *Sub) {
if sub == nil {
return
}
Expand All @@ -101,7 +101,7 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) {
sz := len(d.subs)
d.mx.Unlock()

zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
}

// process gathers actions from the monitor and dispatches them to the corresponding subscriptions.
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleArtifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func (at ArtifactT) fetchArtifact(ctx context.Context, zlog zerolog.Logger, iden
span, ctx := apm.StartSpan(ctx, "fetchArtifact", "search")
defer span.End()
// Throttle prevents more than N outstanding requests to elastic globally and per sha2.
if token := at.esThrottle.Acquire(sha2, defaultThrottleTTL); token == nil {
if token := at.esThrottle.Acquire(zlog, sha2, defaultThrottleTTL); token == nil {
return nil, ErrorThrottle
} else {
defer token.Release()
defer token.Release(zlog)
}

start := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
}

// Subscribe to actions dispatcher
aSub := ct.ad.Subscribe(agent.Id, seqno)
defer ct.ad.Unsubscribe(aSub)
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
defer ct.ad.Unsubscribe(zlog, aSub)
actCh := aSub.Ch()

// use revision_idx=0 if the agent has a single output where no API key is defined
Expand Down
9 changes: 2 additions & 7 deletions internal/pkg/api/handleFileDelivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
package api

import (
"context"
"errors"
"net/http"
"strconv"

"github.com/rs/zerolog"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/file/delivery"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
)

type FileDeliveryT struct {
Expand All @@ -28,11 +28,6 @@ type FileDeliveryT struct {
}

func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")

return &FileDeliveryT{
chunkClient: chunkClient,
bulker: bulker,
Expand Down
10 changes: 3 additions & 7 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"strings"
"time"

"github.com/rs/zerolog"
"go.elastic.co/apm/v2"

"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
Expand All @@ -25,8 +28,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/file/uploader"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
)

const (
Expand All @@ -53,11 +54,6 @@ type UploadT struct {
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")

return &UploadT{
chunkClient: chunkClient,
bulker: bulker,
Expand Down
11 changes: 6 additions & 5 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"fmt"
"sync"

"github.com/elastic/elastic-agent-libs/api"
cfglib "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-system-metrics/report"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
apmprometheus "go.elastic.co/apm/module/apmprometheus/v2"
"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-libs/api"
cfglib "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-system-metrics/report"

"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
Expand Down Expand Up @@ -56,7 +57,7 @@ var (
func init() {
err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion)
if err != nil {
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics")
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics") // TODO is used because this may logged during the package load
}

registry = newMetricsRegistry("http_server")
Expand Down
42 changes: 22 additions & 20 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *server) Run(ctx context.Context) error {
MaxHeaderBytes: mhbz,
BaseContext: func(net.Listener) context.Context { return ctx },
ErrorLog: errLogger(ctx),
ConnState: diagConn,
ConnState: getDiagConnFunc(ctx),
}

var listenCfg net.ListenConfig
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *server) Run(ctx context.Context) error {
}
// Do a clean shutdown if the context is cancelled
case <-ctx.Done():
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain)
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain) // Background context to allow connections to drain when server context is cancelled.
defer cancel()
if err := srv.Shutdown(sCtx); err != nil {
cErr := srv.Close() // force it closed
Expand All @@ -128,24 +128,26 @@ func (s *server) Run(ctx context.Context) error {
return nil
}

func diagConn(c net.Conn, s http.ConnState) {
if c == nil {
return
}
func getDiagConnFunc(ctx context.Context) func(c net.Conn, s http.ConnState) {
return func(c net.Conn, s http.ConnState) {
if c == nil {
return
}

zerolog.Ctx(context.TODO()).Trace().
Str("local", c.LocalAddr().String()).
Str("remote", c.RemoteAddr().String()).
Str("state", s.String()).
Msg("connection state change")

switch s {
case http.StateNew:
cntHTTPNew.Inc()
cntHTTPActive.Inc()
case http.StateClosed:
cntHTTPClose.Inc()
cntHTTPActive.Dec()
zerolog.Ctx(ctx).Trace().
Str("local", c.LocalAddr().String()).
Str("remote", c.RemoteAddr().String()).
Str("state", s.String()).
Msg("connection state change")

switch s {
case http.StateNew:
cntHTTPNew.Inc()
cntHTTPActive.Inc()
case http.StateClosed:
cntHTTPClose.Inc()
cntHTTPActive.Dec()
}
}
}

Expand All @@ -157,7 +159,7 @@ func wrapConnLimitter(ctx context.Context, ln net.Listener, cfg *config.Server)
Int("hardConnLimit", hardLimit).
Msg("server hard connection limiter installed")

ln = limit.Listener(ln, hardLimit)
ln = limit.Listener(ln, hardLimit, zerolog.Ctx(ctx))
} else {
zerolog.Ctx(ctx).Info().Msg("server hard connection limiter disabled")
}
Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/go-ucfg"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
"golang.org/x/sync/semaphore"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

type APIKey = apikey.APIKey
Expand Down Expand Up @@ -175,7 +176,7 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou
cancelFn()
}
}
bulkCtx, bulkCancel := context.WithCancel(context.Background())
bulkCtx, bulkCancel := context.WithCancel(context.Background()) // background context used to allow bulker to flush on exit, exits when config changes or primary bulker exits.
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap)
if err != nil {
defer bulkCancel()
Expand Down
Loading

0 comments on commit 14b40c9

Please sign in to comment.