From 08378a4b25c7447238184863670d94f9251f1b09 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Wed, 11 Oct 2023 15:36:09 +0200 Subject: [PATCH 1/4] start stop server decision based only on metrics.enabled --- internal/pkg/agent/application/monitoring/reload/reload.go | 2 +- .../pkg/agent/application/monitoring/reload/reload_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/reload/reload.go b/internal/pkg/agent/application/monitoring/reload/reload.go index b39d46474f9..685a5fd6fa5 100644 --- a/internal/pkg/agent/application/monitoring/reload/reload.go +++ b/internal/pkg/agent/application/monitoring/reload/reload.go @@ -103,7 +103,7 @@ func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { sr.config = newConfig.Settings.MonitoringConfig - shouldRunMetrics := sr.config.Enabled && sr.config.MonitorMetrics + shouldRunMetrics := sr.config.Enabled if shouldRunMetrics && !sr.isServerRunning { sr.start() diff --git a/internal/pkg/agent/application/monitoring/reload/reload_test.go b/internal/pkg/agent/application/monitoring/reload/reload_test.go index b686c5346a8..ee666029f7c 100644 --- a/internal/pkg/agent/application/monitoring/reload/reload_test.go +++ b/internal/pkg/agent/application/monitoring/reload/reload_test.go @@ -59,18 +59,18 @@ agent.monitoring.enabled: false false, false, true, }, { - "stop when running, monitoring.metrics disabled", + "do not stop when running, monitoring.metrics disabled", true, true, true, ` agent.monitoring.metrics: false `, - false, false, true, + true, false, false, }, { "stop stopped server", false, false, false, ` -agent.monitoring.metrics: false +agent.monitoring.enabled: false `, false, false, false, }, From 970c9a3b075c18654708fbb7b65fed3b5c79dcd1 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Wed, 11 Oct 2023 16:10:45 +0200 Subject: [PATCH 2/4] coordinator ut --- .../agent/application/coordinator/coordinator_unit_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index dbcaa8b616e..7e6d2b224a5 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -648,9 +648,8 @@ inputs: coord.runLoopIteration(ctx) assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - // server is stopped: monitoring.metrics is disabled - assert.True(t, monitoringServer.stopTriggered) - assert.False(t, monitoringServer.isRunning) + // server is running: monitoring.metrics is disabled does not have an effect + assert.True(t, monitoringServer.isRunning) } func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { From 7de6ef77e7fd02a17e50c759862d0406d4b23714 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 19 Oct 2023 10:09:14 +0200 Subject: [PATCH 3/4] conflicts --- ...ing-server-based-on-monitoring-config.yaml | 31 +++++++++++++++++++ .../application/coordinator/coordinator.go | 16 ++++++++++ .../coordinator/coordinator_unit_test.go | 23 ++++++++++++++ .../agent/application/monitoring/server.go | 21 ++++++++++--- internal/pkg/agent/cmd/run.go | 16 +++++----- 5 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml diff --git a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml b/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml new file mode 100644 index 00000000000..141c18715bf --- /dev/null +++ b/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml @@ -0,0 +1,31 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Start/stop monitoring server based on monitoring config + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: elastic-agent + +# PR number; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 3492 + +# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 2735 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e6a2dbc182f..856c624076f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -158,6 +158,10 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components var CoordinatorShutdownTimeout = time.Second * 5 +type configReloader interface { + Reload(*config.Config) error +} + // Coordinator manages the entire state of the Elastic Agent. // // All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator. @@ -173,6 +177,8 @@ type Coordinator struct { upgradeMgr UpgradeManager monitorMgr MonitorManager + monitoringServerReloader configReloader + runtimeMgr RuntimeManager configMgr ConfigManager varsMgr VarsManager @@ -365,6 +371,10 @@ func (c *Coordinator) State() State { return c.stateBroadcaster.Get() } +func (c *Coordinator) RegisterMonitoringServer(s configReloader) { + c.monitoringServerReloader = s +} + // StateSubscribe returns a channel that reports changes in Coordinator state. // // bufferLen specifies how many state changes should be queued in addition to @@ -1008,6 +1018,12 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { } } + if c.monitoringServerReloader != nil { + if err := c.monitoringServerReloader.Reload(cfg); err != nil { + return fmt.Errorf("failed to reload monitor manager configuration: %w", err) + } + } + c.ast = rawAst return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 1a49d0b7927..7e6d2b224a5 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -992,3 +993,25 @@ func emptyAST(t *testing.T) *transpiler.AST { require.NoError(t, err, "AST creation must succeed") return ast } + +type fakeMonitoringServer struct { + startTriggered bool + stopTriggered bool + isRunning bool +} + +func (fs *fakeMonitoringServer) Start() { + fs.startTriggered = true + fs.isRunning = true +} + +func (fs *fakeMonitoringServer) Stop() error { + fs.stopTriggered = true + fs.isRunning = false + return nil +} + +func (fs *fakeMonitoringServer) Reset() { + fs.stopTriggered = false + fs.startTriggered = false +} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 390a472d5ed..7f43bf1866b 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -20,6 +20,9 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -32,7 +35,8 @@ func NewServer( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, -) (*api.Server, error) { + mcfg *monitoringCfg.MonitoringConfig, +) (*reload.ServerReloader, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore log.Errorf("failed to create monitoring drop: %v", err) @@ -43,7 +47,7 @@ func NewServer( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem) + return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem, mcfg) } func exposeMetricsEndpoint( @@ -54,7 +58,8 @@ func exposeMetricsEndpoint( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, -) (*api.Server, error) { + mcfg *monitoringCfg.MonitoringConfig, +) (*reload.ServerReloader, error) { r := mux.NewRouter() if tracer != nil { r.Use(apmgorilla.Middleware(apmgorilla.WithTracer(tracer))) @@ -72,7 +77,15 @@ func exposeMetricsEndpoint( mux := http.NewServeMux() mux.Handle("/", r) - return api.New(log, mux, config) + newServerFn := func() (reload.ServerController, error) { + apiServer, err := api.New(log, mux, config) + if err != nil { + return nil, errors.New(err, "failed to create api server") + } + return apiServer, nil + } + + return reload.NewServerReloader(newServerFn, log, mcfg), nil } func createAgentMonitoringDrop(drop string) error { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 91d470ac00b..353c9d1e7a7 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" @@ -248,12 +249,15 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration } defer composable.Close() - serverStopFn, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) + monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) if err != nil { return err } + coord.RegisterMonitoringServer(monitoringServer) defer func() { - _ = serverStopFn() + if monitoringServer != nil { + _ = monitoringServer.Stop() + } }() diagHooks := diagnostics.GlobalHooks() @@ -547,7 +551,7 @@ func setupMetrics( cfg *monitoringCfg.MonitoringConfig, tracer *apm.Tracer, coord *coordinator.Coordinator, -) (func() error, error) { +) (*reload.ServerReloader, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err } @@ -558,14 +562,12 @@ func setupMetrics( Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), } - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem) + s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem, cfg) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } - s.Start() - // return server stopper - return s.Stop, nil + return s, nil } func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool { From ec69b77b6724f1bd057d8118061977b418d89a44 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 24 Oct 2023 10:06:23 +0200 Subject: [PATCH 4/4] use rather atomic --- .../application/monitoring/reload/reload.go | 44 ++++++------------- .../monitoring/reload/reload_test.go | 4 +- 2 files changed, 15 insertions(+), 33 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/reload/reload.go b/internal/pkg/agent/application/monitoring/reload/reload.go index 685a5fd6fa5..430b425d6c6 100644 --- a/internal/pkg/agent/application/monitoring/reload/reload.go +++ b/internal/pkg/agent/application/monitoring/reload/reload.go @@ -5,7 +5,7 @@ package reload import ( - "sync" + "sync/atomic" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" @@ -26,9 +26,8 @@ type ServerReloader struct { log *logger.Logger newServerFn serverConstructor - config *monitoringCfg.MonitoringConfig - isServerRunning bool - isServerRunningLock sync.Mutex + config *monitoringCfg.MonitoringConfig + isServerRunning atomic.Bool } func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader { @@ -42,14 +41,7 @@ func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg * } func (sr *ServerReloader) Start() { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - sr.start() -} - -func (sr *ServerReloader) start() { - if sr.s != nil && sr.isServerRunning { + if sr.s != nil && sr.isServerRunning.Load() { // server is already running return } @@ -64,25 +56,18 @@ func (sr *ServerReloader) start() { sr.s.Start() sr.log.Debugf("Server started") - sr.isServerRunning = true + sr.isServerRunning.Store(true) } func (sr *ServerReloader) Stop() error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - return sr.stop() -} - -func (sr *ServerReloader) stop() error { if sr.s == nil { // stopping not started server - sr.isServerRunning = false + sr.isServerRunning.Store(false) return nil } sr.log.Info("Stopping server") - sr.isServerRunning = false + sr.isServerRunning.Store(false) if err := sr.s.Stop(); err != nil { return err } @@ -93,9 +78,6 @@ func (sr *ServerReloader) stop() error { } func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - newConfig := configuration.DefaultConfiguration() if err := rawConfig.Unpack(&newConfig); err != nil { return errors.New(err, "failed to unpack monitoring config during reload") @@ -104,16 +86,16 @@ func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { sr.config = newConfig.Settings.MonitoringConfig shouldRunMetrics := sr.config.Enabled - if shouldRunMetrics && !sr.isServerRunning { - sr.start() + if shouldRunMetrics && !sr.isServerRunning.Load() { + sr.Start() - sr.isServerRunning = true + sr.isServerRunning.Store(true) return nil } - if !shouldRunMetrics && sr.isServerRunning { - sr.isServerRunning = false - return sr.stop() + if !shouldRunMetrics && sr.isServerRunning.Load() { + sr.isServerRunning.Store(false) + return sr.Stop() } return nil diff --git a/internal/pkg/agent/application/monitoring/reload/reload_test.go b/internal/pkg/agent/application/monitoring/reload/reload_test.go index ee666029f7c..e45eae4d006 100644 --- a/internal/pkg/agent/application/monitoring/reload/reload_test.go +++ b/internal/pkg/agent/application/monitoring/reload/reload_test.go @@ -99,7 +99,7 @@ agent.monitoring.enabled: true log, cfg, ) - r.isServerRunning = tc.currRunning + r.isServerRunning.Store(tc.currRunning) if tc.currRunning { r.s = fsc } @@ -107,7 +107,7 @@ agent.monitoring.enabled: true newCfg := aConfig.MustNewConfigFrom(tc.newConfig) require.NoError(t, r.Reload(newCfg)) - require.Equal(t, tc.expectedRunning, r.isServerRunning) + require.Equal(t, tc.expectedRunning, r.isServerRunning.Load()) require.Equal(t, tc.expectedStart, fsc.startTriggered) require.Equal(t, tc.expectedStop, fsc.stopTriggered) })