-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Start/stop monitoring server based on monitoring config #3492
Changes from 7 commits
13c921f
b87bda3
84bc8be
f5da510
5edbf53
e47be8b
37186bd
7852c42
3372c5f
c733556
cb8464e
c2ceaed
8f90743
8c7d5a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: feature | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: Start/stop monitoring server based on monitoring config | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
#description: | ||
|
||
# Affected component; a word indicating the component this changeset affects. | ||
component: elastic-agent | ||
|
||
# PR number; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
pr: 3492 | ||
|
||
# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: 2735 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,10 +24,12 @@ import ( | |
"github.com/elastic/elastic-agent-client/v7/pkg/client" | ||
"github.com/elastic/elastic-agent-libs/logp" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" | ||
"github.com/elastic/elastic-agent/internal/pkg/config" | ||
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" | ||
"github.com/elastic/elastic-agent/pkg/component" | ||
"github.com/elastic/elastic-agent/pkg/component/runtime" | ||
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" | ||
|
@@ -520,6 +522,137 @@ inputs: | |
} | ||
} | ||
|
||
func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { | ||
// Send a test policy to the Coordinator as a Config Manager update, | ||
// verify it generates the right component model and sends it to the | ||
// runtime manager, then send an empty policy and verify it calls | ||
// another runtime manager update with an empty component model. | ||
|
||
// Set a one-second timeout -- nothing here should block, but if it | ||
// does let's report a failure instead of timing out the test runner. | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
logger := logp.NewLogger("testing") | ||
|
||
configChan := make(chan ConfigChange, 1) | ||
|
||
// Create a mocked runtime manager that will report the update call | ||
runtimeManager := &fakeRuntimeManager{ | ||
updateCallback: func(comp []component.Component) error { | ||
return nil | ||
}, | ||
} | ||
|
||
monitoringServer := &fakeMonitoringServer{} | ||
newServerFn := func() (reload.ServerController, error) { | ||
return monitoringServer, nil | ||
} | ||
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig()) | ||
|
||
coord := &Coordinator{ | ||
logger: logger, | ||
agentInfo: &info.AgentInfo{}, | ||
stateBroadcaster: broadcaster.New(State{}, 0, 0), | ||
managerChans: managerChans{ | ||
configManagerUpdate: configChan, | ||
}, | ||
runtimeMgr: runtimeManager, | ||
vars: emptyVars(t), | ||
} | ||
coord.RegisterMonitoringServer(monitoringReloader) | ||
|
||
// Create a policy with one input and one output | ||
cfg := config.MustNewConfigFrom(` | ||
outputs: | ||
default: | ||
type: elasticsearch | ||
inputs: | ||
- id: test-input | ||
type: filestream | ||
use_output: default | ||
`) | ||
|
||
// Send the policy change and make sure it was acknowledged. | ||
cfgChange := &configChange{cfg: cfg} | ||
configChan <- cfgChange | ||
coord.runLoopIteration(ctx) | ||
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") | ||
|
||
// server is started by default | ||
assert.True(t, monitoringServer.startTriggered) | ||
assert.True(t, monitoringServer.isRunning) | ||
|
||
// disable monitoring | ||
cfgDisableMonitoring := config.MustNewConfigFrom(` | ||
agent.monitoring.enabled: false | ||
outputs: | ||
default: | ||
type: elasticsearch | ||
inputs: | ||
- id: test-input | ||
type: filestream | ||
use_output: default | ||
`) | ||
|
||
// Send the policy change and make sure it was acknowledged. | ||
monitoringServer.Reset() | ||
cfgChange = &configChange{cfg: cfgDisableMonitoring} | ||
configChan <- cfgChange | ||
coord.runLoopIteration(ctx) | ||
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") | ||
|
||
// server is started by default | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment should be changed? |
||
assert.True(t, monitoringServer.stopTriggered) | ||
assert.False(t, monitoringServer.isRunning) | ||
|
||
// enable monitoring | ||
cfgEnabledMonitoring := config.MustNewConfigFrom(` | ||
agent.monitoring.enabled: true | ||
outputs: | ||
default: | ||
type: elasticsearch | ||
inputs: | ||
- id: test-input | ||
type: filestream | ||
use_output: default | ||
`) | ||
|
||
// Send the policy change and make sure it was acknowledged. | ||
monitoringServer.Reset() | ||
cfgChange = &configChange{cfg: cfgEnabledMonitoring} | ||
configChan <- cfgChange | ||
coord.runLoopIteration(ctx) | ||
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") | ||
|
||
// server is started by default | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment should be changed since we are explicitly enabling monitoring in the policy now (as opposed to it being enabled by default)? |
||
assert.True(t, monitoringServer.startTriggered) | ||
assert.True(t, monitoringServer.isRunning) | ||
|
||
// enable monitoring and disable metrics | ||
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(` | ||
agent.monitoring.enabled: true | ||
agent.monitoring.metrics: false | ||
outputs: | ||
default: | ||
type: elasticsearch | ||
inputs: | ||
- id: test-input | ||
type: filestream | ||
use_output: default | ||
`) | ||
|
||
// Send the policy change and make sure it was acknowledged. | ||
monitoringServer.Reset() | ||
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics} | ||
configChan <- cfgChange | ||
coord.runLoopIteration(ctx) | ||
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") | ||
|
||
// server is started by default | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment should be changed? |
||
assert.True(t, monitoringServer.stopTriggered) | ||
assert.False(t, monitoringServer.isRunning) | ||
} | ||
|
||
func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { | ||
// Send a test policy to the Coordinator as a Config Manager update, | ||
// verify it generates the right component model and sends it to the | ||
|
@@ -861,3 +994,25 @@ func emptyAST(t *testing.T) *transpiler.AST { | |
require.NoError(t, err, "AST creation must succeed") | ||
return ast | ||
} | ||
|
||
type fakeMonitoringServer struct { | ||
startTriggered bool | ||
stopTriggered bool | ||
isRunning bool | ||
} | ||
|
||
func (fs *fakeMonitoringServer) Start() { | ||
fs.startTriggered = true | ||
fs.isRunning = true | ||
} | ||
|
||
func (fs *fakeMonitoringServer) Stop() error { | ||
fs.stopTriggered = true | ||
fs.isRunning = false | ||
return nil | ||
} | ||
|
||
func (fs *fakeMonitoringServer) Reset() { | ||
fs.stopTriggered = false | ||
fs.startTriggered = false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package reload | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration" | ||
"github.com/elastic/elastic-agent/internal/pkg/agent/errors" | ||
aConfig "github.com/elastic/elastic-agent/internal/pkg/config" | ||
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" | ||
"github.com/elastic/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
// ServerController controls the server runtime | ||
type ServerController interface { | ||
Start() | ||
Stop() error | ||
} | ||
type serverConstructor func() (ServerController, error) | ||
|
||
type ServerReloader struct { | ||
s ServerController | ||
log *logger.Logger | ||
newServerFn serverConstructor | ||
|
||
config *monitoringCfg.MonitoringConfig | ||
isServerRunning bool | ||
isServerRunningLock sync.Mutex | ||
} | ||
|
||
func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader { | ||
sr := &ServerReloader{ | ||
log: log, | ||
config: mcfg, | ||
newServerFn: newServerFn, | ||
} | ||
|
||
return sr | ||
} | ||
|
||
func (sr *ServerReloader) Start() { | ||
sr.isServerRunningLock.Lock() | ||
defer sr.isServerRunningLock.Unlock() | ||
|
||
sr.start() | ||
} | ||
|
||
func (sr *ServerReloader) start() { | ||
sr.log.Info("Starting server") | ||
var err error | ||
sr.s, err = sr.newServerFn() | ||
if err != nil { | ||
sr.log.Errorf("Failed creating a server: %v", err) | ||
return | ||
} | ||
|
||
sr.s.Start() | ||
sr.log.Debugf("Server started") | ||
sr.isServerRunning = true | ||
} | ||
|
||
func (sr *ServerReloader) Stop() error { | ||
sr.isServerRunningLock.Lock() | ||
defer sr.isServerRunningLock.Unlock() | ||
|
||
return sr.stop() | ||
} | ||
|
||
func (sr *ServerReloader) stop() error { | ||
sr.log.Info("Stopping server") | ||
|
||
sr.isServerRunning = false | ||
if err := sr.s.Stop(); err != nil { | ||
return err | ||
} | ||
|
||
sr.log.Debugf("Server stopped") | ||
sr.s = nil | ||
return nil | ||
} | ||
|
||
func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { | ||
sr.isServerRunningLock.Lock() | ||
defer sr.isServerRunningLock.Unlock() | ||
|
||
newConfig := configuration.DefaultConfiguration() | ||
if err := rawConfig.Unpack(&newConfig); err != nil { | ||
return errors.New(err, "failed to unpack monitoring config during reload") | ||
} | ||
|
||
sr.config = newConfig.Settings.MonitoringConfig | ||
|
||
shouldRunMetrics := sr.config.Enabled && sr.config.MonitorMetrics | ||
if shouldRunMetrics && !sr.isServerRunning { | ||
sr.start() | ||
|
||
sr.isServerRunning = true | ||
return nil | ||
} | ||
|
||
if !shouldRunMetrics && sr.isServerRunning { | ||
sr.isServerRunning = false | ||
return sr.stop() | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think strictly speaking this should be an
enhancement
since it's not completely new functionality :)