Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #4168) Replace most context.TODO calls, comment on other Background and TODOs #4180

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1733164018-Replace-use-of-context.TODO.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: other

# Change summary; a 80ish characters long description of the change.
summary: Replace use of context.TODO

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/fleet-server/issues/3087
6 changes: 4 additions & 2 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/signal"
"github.com/elastic/fleet-server/v7/internal/pkg/state"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -117,12 +118,13 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error
return err
}

srv, err := server.NewFleet(bi, state.NewLog(), true)
ctx := installSignalHandler()
srv, err := server.NewFleet(bi, state.NewLog(zerolog.Ctx(ctx)), true)
if err != nil {
return err
}

if err := srv.Run(installSignalHandler(), cfg); err != nil && !errors.Is(err, context.Canceled) {
if err := srv.Run(ctx, cfg); err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Exiting")
l.Sync()
return err
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {

// Subscribe generates a new subscription with the Dispatcher using the provided agentID and seqNo.
// There is no check to ensure that the agentID has not been used; using the same one twice results in undefined behaviour.
func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
func (d *Dispatcher) Subscribe(log zerolog.Logger, agentID string, seqNo sqn.SeqNo) *Sub {
cbCh := make(chan []model.Action, 1)

sub := Sub{
Expand All @@ -84,14 +84,14 @@ func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
sz := len(d.subs)
d.mx.Unlock()

zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")
log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")

return &sub
}

// Unsubscribe removes the given subscription from the dispatcher.
// Note that the channel sub.Ch() provides is not closed in this event.
func (d *Dispatcher) Unsubscribe(sub *Sub) {
func (d *Dispatcher) Unsubscribe(log zerolog.Logger, sub *Sub) {
if sub == nil {
return
}
Expand All @@ -101,7 +101,7 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) {
sz := len(d.subs)
d.mx.Unlock()

zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
}

// process gathers actions from the monitor and dispatches them to the corresponding subscriptions.
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleArtifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func (at ArtifactT) fetchArtifact(ctx context.Context, zlog zerolog.Logger, iden
span, ctx := apm.StartSpan(ctx, "fetchArtifact", "search")
defer span.End()
// Throttle prevents more than N outstanding requests to elastic globally and per sha2.
if token := at.esThrottle.Acquire(sha2, defaultThrottleTTL); token == nil {
if token := at.esThrottle.Acquire(zlog, sha2, defaultThrottleTTL); token == nil {
return nil, ErrorThrottle
} else {
defer token.Release()
defer token.Release(zlog)
}

start := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
}

// Subscribe to actions dispatcher
aSub := ct.ad.Subscribe(agent.Id, seqno)
defer ct.ad.Unsubscribe(aSub)
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
defer ct.ad.Unsubscribe(zlog, aSub)
actCh := aSub.Ch()

// use revision_idx=0 if the agent has a single output where no API key is defined
Expand Down
9 changes: 2 additions & 7 deletions internal/pkg/api/handleFileDelivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
package api

import (
"context"
"errors"
"net/http"
"strconv"

"github.com/rs/zerolog"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/file/delivery"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
)

type FileDeliveryT struct {
Expand All @@ -28,11 +28,6 @@ type FileDeliveryT struct {
}

func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")

return &FileDeliveryT{
chunkClient: chunkClient,
bulker: bulker,
Expand Down
10 changes: 3 additions & 7 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"strings"
"time"

"github.com/rs/zerolog"
"go.elastic.co/apm/v2"

"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
Expand All @@ -25,8 +28,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/file/uploader"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
)

const (
Expand All @@ -53,11 +54,6 @@ type UploadT struct {
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")

return &UploadT{
chunkClient: chunkClient,
bulker: bulker,
Expand Down
11 changes: 6 additions & 5 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"fmt"
"sync"

"github.com/elastic/elastic-agent-libs/api"
cfglib "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-system-metrics/report"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
apmprometheus "go.elastic.co/apm/module/apmprometheus/v2"
"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-libs/api"
cfglib "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-system-metrics/report"

"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
Expand Down Expand Up @@ -56,7 +57,7 @@ var (
func init() {
err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion)
if err != nil {
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics")
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics") // TODO is used because this may logged during the package load
}

registry = newMetricsRegistry("http_server")
Expand Down
42 changes: 22 additions & 20 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *server) Run(ctx context.Context) error {
MaxHeaderBytes: mhbz,
BaseContext: func(net.Listener) context.Context { return ctx },
ErrorLog: errLogger(ctx),
ConnState: diagConn,
ConnState: getDiagConnFunc(ctx),
}

var listenCfg net.ListenConfig
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *server) Run(ctx context.Context) error {
}
// Do a clean shutdown if the context is cancelled
case <-ctx.Done():
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain)
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain) // Background context to allow connections to drain when server context is cancelled.
defer cancel()
if err := srv.Shutdown(sCtx); err != nil {
cErr := srv.Close() // force it closed
Expand All @@ -128,24 +128,26 @@ func (s *server) Run(ctx context.Context) error {
return nil
}

func diagConn(c net.Conn, s http.ConnState) {
if c == nil {
return
}
func getDiagConnFunc(ctx context.Context) func(c net.Conn, s http.ConnState) {
return func(c net.Conn, s http.ConnState) {
if c == nil {
return
}

zerolog.Ctx(context.TODO()).Trace().
Str("local", c.LocalAddr().String()).
Str("remote", c.RemoteAddr().String()).
Str("state", s.String()).
Msg("connection state change")

switch s {
case http.StateNew:
cntHTTPNew.Inc()
cntHTTPActive.Inc()
case http.StateClosed:
cntHTTPClose.Inc()
cntHTTPActive.Dec()
zerolog.Ctx(ctx).Trace().
Str("local", c.LocalAddr().String()).
Str("remote", c.RemoteAddr().String()).
Str("state", s.String()).
Msg("connection state change")

switch s {
case http.StateNew:
cntHTTPNew.Inc()
cntHTTPActive.Inc()
case http.StateClosed:
cntHTTPClose.Inc()
cntHTTPActive.Dec()
}
}
}

Expand All @@ -157,7 +159,7 @@ func wrapConnLimitter(ctx context.Context, ln net.Listener, cfg *config.Server)
Int("hardConnLimit", hardLimit).
Msg("server hard connection limiter installed")

ln = limit.Listener(ln, hardLimit)
ln = limit.Listener(ln, hardLimit, zerolog.Ctx(ctx))
} else {
zerolog.Ctx(ctx).Info().Msg("server hard connection limiter disabled")
}
Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/go-ucfg"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
"golang.org/x/sync/semaphore"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

type APIKey = apikey.APIKey
Expand Down Expand Up @@ -175,7 +176,7 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou
cancelFn()
}
}
bulkCtx, bulkCancel := context.WithCancel(context.Background())
bulkCtx, bulkCancel := context.WithCancel(context.Background()) // background context used to allow bulker to flush on exit, exits when config changes or primary bulker exits.
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap)
if err != nil {
defer bulkCancel()
Expand Down
Loading
Loading