From ef598c5edbe7c038a8655379c0b6496edc0a9fdd Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Wed, 11 Oct 2023 18:00:01 +0200 Subject: [PATCH] Revert "[Feature] Start/stop monitoring server based on monitoring config (#3492)" (#3583) This reverts commit 6cc587afda0b04e9f2209217fbb41910dbc4916f. --- ...ing-server-based-on-monitoring-config.yaml | 31 ---- .../application/coordinator/coordinator.go | 16 -- .../coordinator/coordinator_unit_test.go | 155 ------------------ .../application/monitoring/reload/reload.go | 120 -------------- .../monitoring/reload/reload_test.go | 130 --------------- .../agent/application/monitoring/server.go | 21 +-- internal/pkg/agent/cmd/run.go | 16 +- 7 files changed, 11 insertions(+), 478 deletions(-) delete mode 100644 changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml delete mode 100644 internal/pkg/agent/application/monitoring/reload/reload.go delete mode 100644 internal/pkg/agent/application/monitoring/reload/reload_test.go diff --git a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml b/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml deleted file mode 100644 index 141c18715bf..00000000000 --- a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# Kind can be one of: -# - breaking-change: a change to previously-documented behavior -# - deprecation: functionality that is being removed in a later release -# - bug-fix: fixes a problem in a previous version -# - enhancement: extends functionality but does not break or fix existing behavior -# - feature: new functionality -# - known-issue: problems that we are aware of in a given version -# - security: impacts on the security of a product or a user’s deployment. -# - upgrade: important information for someone upgrading from a prior version -# - other: does not fit into any of the other categories -kind: enhancement - -# Change summary; a 80ish characters long description of the change. -summary: Start/stop monitoring server based on monitoring config - -# Long description; in case the summary is not enough to describe the change -# this field accommodate a description without length limits. -#description: - -# Affected component; a word indicating the component this changeset affects. -component: elastic-agent - -# PR number; optional; the PR number that added the changeset. -# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. -# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. -# Please provide it if you are adding a fragment for a different PR. -pr: 3492 - -# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). -# If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: 2735 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 856c624076f..e6a2dbc182f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -158,10 +158,6 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components var CoordinatorShutdownTimeout = time.Second * 5 -type configReloader interface { - Reload(*config.Config) error -} - // Coordinator manages the entire state of the Elastic Agent. // // All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator. @@ -177,8 +173,6 @@ type Coordinator struct { upgradeMgr UpgradeManager monitorMgr MonitorManager - monitoringServerReloader configReloader - runtimeMgr RuntimeManager configMgr ConfigManager varsMgr VarsManager @@ -371,10 +365,6 @@ func (c *Coordinator) State() State { return c.stateBroadcaster.Get() } -func (c *Coordinator) RegisterMonitoringServer(s configReloader) { - c.monitoringServerReloader = s -} - // StateSubscribe returns a channel that reports changes in Coordinator state. // // bufferLen specifies how many state changes should be queued in addition to @@ -1018,12 +1008,6 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { } } - if c.monitoringServerReloader != nil { - if err := c.monitoringServerReloader.Reload(cfg); err != nil { - return fmt.Errorf("failed to reload monitor manager configuration: %w", err) - } - } - c.ast = rawAst return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index dbcaa8b616e..805139f26e8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -24,12 +24,10 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -522,137 +520,6 @@ inputs: } } -func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { - // Send a test policy to the Coordinator as a Config Manager update, - // verify it generates the right component model and sends it to the - // runtime manager, then send an empty policy and verify it calls - // another runtime manager update with an empty component model. - - // Set a one-second timeout -- nothing here should block, but if it - // does let's report a failure instead of timing out the test runner. - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - logger := logp.NewLogger("testing") - - configChan := make(chan ConfigChange, 1) - - // Create a mocked runtime manager that will report the update call - runtimeManager := &fakeRuntimeManager{ - updateCallback: func(comp []component.Component) error { - return nil - }, - } - - monitoringServer := &fakeMonitoringServer{} - newServerFn := func() (reload.ServerController, error) { - return monitoringServer, nil - } - monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig()) - - coord := &Coordinator{ - logger: logger, - agentInfo: &info.AgentInfo{}, - stateBroadcaster: broadcaster.New(State{}, 0, 0), - managerChans: managerChans{ - configManagerUpdate: configChan, - }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), - } - coord.RegisterMonitoringServer(monitoringReloader) - - // Create a policy with one input and one output - cfg := config.MustNewConfigFrom(` -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - cfgChange := &configChange{cfg: cfg} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is started by default - assert.True(t, monitoringServer.startTriggered) - assert.True(t, monitoringServer.isRunning) - - // disable monitoring - cfgDisableMonitoring := config.MustNewConfigFrom(` -agent.monitoring.enabled: false -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgDisableMonitoring} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is stopped: monitoring is disabled - assert.True(t, monitoringServer.stopTriggered) - assert.False(t, monitoringServer.isRunning) - - // enable monitoring - cfgEnabledMonitoring := config.MustNewConfigFrom(` -agent.monitoring.enabled: true -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgEnabledMonitoring} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is started again - assert.True(t, monitoringServer.startTriggered) - assert.True(t, monitoringServer.isRunning) - - // enable monitoring and disable metrics - cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(` -agent.monitoring.enabled: true -agent.monitoring.metrics: false -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is stopped: monitoring.metrics is disabled - assert.True(t, monitoringServer.stopTriggered) - assert.False(t, monitoringServer.isRunning) -} - func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { // Send a test policy to the Coordinator as a Config Manager update, // verify it generates the right component model and sends it to the @@ -994,25 +861,3 @@ func emptyAST(t *testing.T) *transpiler.AST { require.NoError(t, err, "AST creation must succeed") return ast } - -type fakeMonitoringServer struct { - startTriggered bool - stopTriggered bool - isRunning bool -} - -func (fs *fakeMonitoringServer) Start() { - fs.startTriggered = true - fs.isRunning = true -} - -func (fs *fakeMonitoringServer) Stop() error { - fs.stopTriggered = true - fs.isRunning = false - return nil -} - -func (fs *fakeMonitoringServer) Reset() { - fs.stopTriggered = false - fs.startTriggered = false -} diff --git a/internal/pkg/agent/application/monitoring/reload/reload.go b/internal/pkg/agent/application/monitoring/reload/reload.go deleted file mode 100644 index b39d46474f9..00000000000 --- a/internal/pkg/agent/application/monitoring/reload/reload.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reload - -import ( - "sync" - - "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" - aConfig "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -// ServerController controls the server runtime -type ServerController interface { - Start() - Stop() error -} -type serverConstructor func() (ServerController, error) - -type ServerReloader struct { - s ServerController - log *logger.Logger - newServerFn serverConstructor - - config *monitoringCfg.MonitoringConfig - isServerRunning bool - isServerRunningLock sync.Mutex -} - -func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader { - sr := &ServerReloader{ - log: log, - config: mcfg, - newServerFn: newServerFn, - } - - return sr -} - -func (sr *ServerReloader) Start() { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - sr.start() -} - -func (sr *ServerReloader) start() { - if sr.s != nil && sr.isServerRunning { - // server is already running - return - } - - sr.log.Info("Starting server") - var err error - sr.s, err = sr.newServerFn() - if err != nil { - sr.log.Errorf("Failed creating a server: %v", err) - return - } - - sr.s.Start() - sr.log.Debugf("Server started") - sr.isServerRunning = true -} - -func (sr *ServerReloader) Stop() error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - return sr.stop() -} - -func (sr *ServerReloader) stop() error { - if sr.s == nil { - // stopping not started server - sr.isServerRunning = false - return nil - } - sr.log.Info("Stopping server") - - sr.isServerRunning = false - if err := sr.s.Stop(); err != nil { - return err - } - - sr.log.Debugf("Server stopped") - sr.s = nil - return nil -} - -func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - newConfig := configuration.DefaultConfiguration() - if err := rawConfig.Unpack(&newConfig); err != nil { - return errors.New(err, "failed to unpack monitoring config during reload") - } - - sr.config = newConfig.Settings.MonitoringConfig - - shouldRunMetrics := sr.config.Enabled && sr.config.MonitorMetrics - if shouldRunMetrics && !sr.isServerRunning { - sr.start() - - sr.isServerRunning = true - return nil - } - - if !shouldRunMetrics && sr.isServerRunning { - sr.isServerRunning = false - return sr.stop() - } - - return nil -} diff --git a/internal/pkg/agent/application/monitoring/reload/reload_test.go b/internal/pkg/agent/application/monitoring/reload/reload_test.go deleted file mode 100644 index b686c5346a8..00000000000 --- a/internal/pkg/agent/application/monitoring/reload/reload_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reload - -import ( - "testing" - - "github.com/stretchr/testify/require" - - aConfig "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -func TestReload(t *testing.T) { - tcs := []struct { - name string - currEnabled bool - currMetrics bool - currRunning bool - - newConfig string - expectedRunning bool - expectedStart bool - expectedStop bool - }{ - { - "start with default config", - false, false, false, - ``, - true, true, false, - }, - { - "start when not running, monitoring enabled", - false, false, false, - ` -agent.monitoring.enabled: true -`, - true, true, false, - }, - { - "do not start when not running, only metrics enabled", - false, false, false, - ` -agent.monitoring.enabled: false -agent.monitoring.metrics: true -`, - false, false, false, - }, - - { - "stop when running, monitoring disabled", - true, true, true, - ` -agent.monitoring.enabled: false -`, - false, false, true, - }, - { - "stop when running, monitoring.metrics disabled", - true, true, true, - ` -agent.monitoring.metrics: false -`, - false, false, true, - }, - { - "stop stopped server", - false, false, false, - ` -agent.monitoring.metrics: false -`, - false, false, false, - }, - { - "start started server", - true, true, true, - ` -agent.monitoring.enabled: true -`, - true, false, false, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - fsc := &fakeServerController{} - log, _ := logger.NewTesting(tc.name) - cfg := &monitoringCfg.MonitoringConfig{ - Enabled: tc.currEnabled, - MonitorMetrics: tc.currMetrics, - } - r := NewServerReloader( - func() (ServerController, error) { - return fsc, nil - }, - log, - cfg, - ) - r.isServerRunning = tc.currRunning - if tc.currRunning { - r.s = fsc - } - - newCfg := aConfig.MustNewConfigFrom(tc.newConfig) - require.NoError(t, r.Reload(newCfg)) - - require.Equal(t, tc.expectedRunning, r.isServerRunning) - require.Equal(t, tc.expectedStart, fsc.startTriggered) - require.Equal(t, tc.expectedStop, fsc.stopTriggered) - }) - } -} - -type fakeServerController struct { - startTriggered bool - stopTriggered bool -} - -func (fsc *fakeServerController) Start() { fsc.startTriggered = true } -func (fsc *fakeServerController) Stop() error { - fsc.stopTriggered = true - return nil -} -func (fsc *fakeServerController) Reset() { - fsc.startTriggered = false - fsc.stopTriggered = false -} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 7f43bf1866b..390a472d5ed 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -20,9 +20,6 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -35,8 +32,7 @@ func NewServer( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, - mcfg *monitoringCfg.MonitoringConfig, -) (*reload.ServerReloader, error) { +) (*api.Server, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore log.Errorf("failed to create monitoring drop: %v", err) @@ -47,7 +43,7 @@ func NewServer( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem, mcfg) + return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem) } func exposeMetricsEndpoint( @@ -58,8 +54,7 @@ func exposeMetricsEndpoint( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, - mcfg *monitoringCfg.MonitoringConfig, -) (*reload.ServerReloader, error) { +) (*api.Server, error) { r := mux.NewRouter() if tracer != nil { r.Use(apmgorilla.Middleware(apmgorilla.WithTracer(tracer))) @@ -77,15 +72,7 @@ func exposeMetricsEndpoint( mux := http.NewServeMux() mux.Handle("/", r) - newServerFn := func() (reload.ServerController, error) { - apiServer, err := api.New(log, mux, config) - if err != nil { - return nil, errors.New(err, "failed to create api server") - } - return apiServer, nil - } - - return reload.NewServerReloader(newServerFn, log, mcfg), nil + return api.New(log, mux, config) } func createAgentMonitoringDrop(drop string) error { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 353c9d1e7a7..91d470ac00b 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" @@ -249,15 +248,12 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration } defer composable.Close() - monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) + serverStopFn, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) if err != nil { return err } - coord.RegisterMonitoringServer(monitoringServer) defer func() { - if monitoringServer != nil { - _ = monitoringServer.Stop() - } + _ = serverStopFn() }() diagHooks := diagnostics.GlobalHooks() @@ -551,7 +547,7 @@ func setupMetrics( cfg *monitoringCfg.MonitoringConfig, tracer *apm.Tracer, coord *coordinator.Coordinator, -) (*reload.ServerReloader, error) { +) (func() error, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err } @@ -562,12 +558,14 @@ func setupMetrics( Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), } - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem, cfg) + s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } + s.Start() - return s, nil + // return server stopper + return s.Stop, nil } func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool {