Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Start/stop monitoring server based on monitoring config #3584

Merged
merged 12 commits into from
Oct 30, 2023
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Start/stop monitoring server based on monitoring config

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component: elastic-agent

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3492

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2735
16 changes: 16 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
var CoordinatorShutdownTimeout = time.Second * 5

type configReloader interface {
Reload(*config.Config) error
}

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand All @@ -175,6 +179,8 @@ type Coordinator struct {
upgradeMgr UpgradeManager
monitorMgr MonitorManager

monitoringServerReloader configReloader

runtimeMgr RuntimeManager
configMgr ConfigManager
varsMgr VarsManager
Expand Down Expand Up @@ -376,6 +382,10 @@ func (c *Coordinator) State() State {
return c.stateBroadcaster.Get()
}

func (c *Coordinator) RegisterMonitoringServer(s configReloader) {
c.monitoringServerReloader = s
}

// StateSubscribe returns a channel that reports changes in Coordinator state.
//
// bufferLen specifies how many state changes should be queued in addition to
Expand Down Expand Up @@ -1038,6 +1048,12 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
}
}

if c.monitoringServerReloader != nil {
if err := c.monitoringServerReloader.Reload(cfg); err != nil {
return fmt.Errorf("failed to reload monitor manager configuration: %w", err)
}
}

c.ast = rawAst
return nil
}
Expand Down
154 changes: 154 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -521,6 +523,136 @@ inputs:
}
}

func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
// runtime manager, then send an empty policy and verify it calls
// another runtime manager update with an empty component model.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return nil
},
}

monitoringServer := &fakeMonitoringServer{}
newServerFn := func() (reload.ServerController, error) {
return monitoringServer, nil
}
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig())

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
}
coord.RegisterMonitoringServer(monitoringReloader)

// Create a policy with one input and one output
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started by default
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// disable monitoring
cfgDisableMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgDisableMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)

// enable monitoring
cfgEnabledMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started again
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// enable monitoring and disable metrics
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
agent.monitoring.metrics: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is running: monitoring.metrics is disabled does not have an effect
assert.True(t, monitoringServer.isRunning)
}

func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
Expand Down Expand Up @@ -867,3 +999,25 @@ func emptyAST(t *testing.T) *transpiler.AST {
require.NoError(t, err, "AST creation must succeed")
return ast
}

type fakeMonitoringServer struct {
startTriggered bool
stopTriggered bool
isRunning bool
}

func (fs *fakeMonitoringServer) Start() {
fs.startTriggered = true
fs.isRunning = true
}

func (fs *fakeMonitoringServer) Stop() error {
fs.stopTriggered = true
fs.isRunning = false
return nil
}

func (fs *fakeMonitoringServer) Reset() {
fs.stopTriggered = false
fs.startTriggered = false
}
102 changes: 102 additions & 0 deletions internal/pkg/agent/application/monitoring/reload/reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package reload

import (
"sync/atomic"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
aConfig "github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// ServerController controls the server runtime
type ServerController interface {
Start()
Stop() error
}
type serverConstructor func() (ServerController, error)

type ServerReloader struct {
s ServerController
log *logger.Logger
newServerFn serverConstructor

config *monitoringCfg.MonitoringConfig
isServerRunning atomic.Bool
}

func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader {
sr := &ServerReloader{
log: log,
config: mcfg,
newServerFn: newServerFn,
}

return sr
}

func (sr *ServerReloader) Start() {
if sr.s != nil && sr.isServerRunning.Load() {
// server is already running
return
}

sr.log.Info("Starting server")
var err error
sr.s, err = sr.newServerFn()
if err != nil {
sr.log.Errorf("Failed creating a server: %v", err)
return
}

sr.s.Start()
sr.log.Debugf("Server started")
sr.isServerRunning.Store(true)
}

func (sr *ServerReloader) Stop() error {
if sr.s == nil {
// stopping not started server
sr.isServerRunning.Store(false)
return nil
}
sr.log.Info("Stopping server")

sr.isServerRunning.Store(false)
if err := sr.s.Stop(); err != nil {
return err
}

sr.log.Debugf("Server stopped")
sr.s = nil
return nil
}

func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error {
newConfig := configuration.DefaultConfiguration()
if err := rawConfig.Unpack(&newConfig); err != nil {
return errors.New(err, "failed to unpack monitoring config during reload")
}

sr.config = newConfig.Settings.MonitoringConfig

shouldRunMetrics := sr.config.Enabled
if shouldRunMetrics && !sr.isServerRunning.Load() {
sr.Start()

sr.isServerRunning.Store(true)
return nil
}

if !shouldRunMetrics && sr.isServerRunning.Load() {
sr.isServerRunning.Store(false)
return sr.Stop()
}

return nil
}
Loading