Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into support-serverless-…
Browse files Browse the repository at this point in the history
…and-beats
  • Loading branch information
fearful-symmetry committed Oct 6, 2023
2 parents 9edc32c + 6cc587a commit 0c7d0cb
Show file tree
Hide file tree
Showing 11 changed files with 490 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Start/stop monitoring server based on monitoring config

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component: elastic-agent

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3492

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2735
2 changes: 1 addition & 1 deletion docs/test-framework-dev-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Go version should be at least the same than the one in [.go-version](https://git

### Configuration

ESS (QA) API Key to create on https://console.qa.cld.elstc.co/deployment-features/keys
ESS (staging) API Key to create on https://staging.found.no/account/keys

Warning: if you never created a deployment on it, you won't have permission to get this key, so you will need to create one first.

Expand Down
16 changes: 16 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
var CoordinatorShutdownTimeout = time.Second * 5

type configReloader interface {
Reload(*config.Config) error
}

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand All @@ -173,6 +177,8 @@ type Coordinator struct {
upgradeMgr UpgradeManager
monitorMgr MonitorManager

monitoringServerReloader configReloader

runtimeMgr RuntimeManager
configMgr ConfigManager
varsMgr VarsManager
Expand Down Expand Up @@ -365,6 +371,10 @@ func (c *Coordinator) State() State {
return c.stateBroadcaster.Get()
}

func (c *Coordinator) RegisterMonitoringServer(s configReloader) {
c.monitoringServerReloader = s
}

// StateSubscribe returns a channel that reports changes in Coordinator state.
//
// bufferLen specifies how many state changes should be queued in addition to
Expand Down Expand Up @@ -1008,6 +1018,12 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
}
}

if c.monitoringServerReloader != nil {
if err := c.monitoringServerReloader.Reload(cfg); err != nil {
return fmt.Errorf("failed to reload monitor manager configuration: %w", err)
}
}

c.ast = rawAst
return nil
}
Expand Down
155 changes: 155 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -520,6 +522,137 @@ inputs:
}
}

func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
// runtime manager, then send an empty policy and verify it calls
// another runtime manager update with an empty component model.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return nil
},
}

monitoringServer := &fakeMonitoringServer{}
newServerFn := func() (reload.ServerController, error) {
return monitoringServer, nil
}
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig())

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
}
coord.RegisterMonitoringServer(monitoringReloader)

// Create a policy with one input and one output
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started by default
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// disable monitoring
cfgDisableMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgDisableMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)

// enable monitoring
cfgEnabledMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started again
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// enable monitoring and disable metrics
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
agent.monitoring.metrics: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring.metrics is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)
}

func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
Expand Down Expand Up @@ -861,3 +994,25 @@ func emptyAST(t *testing.T) *transpiler.AST {
require.NoError(t, err, "AST creation must succeed")
return ast
}

type fakeMonitoringServer struct {
startTriggered bool
stopTriggered bool
isRunning bool
}

func (fs *fakeMonitoringServer) Start() {
fs.startTriggered = true
fs.isRunning = true
}

func (fs *fakeMonitoringServer) Stop() error {
fs.stopTriggered = true
fs.isRunning = false
return nil
}

func (fs *fakeMonitoringServer) Reset() {
fs.stopTriggered = false
fs.startTriggered = false
}
120 changes: 120 additions & 0 deletions internal/pkg/agent/application/monitoring/reload/reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package reload

import (
"sync"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
aConfig "github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// ServerController controls the server runtime
type ServerController interface {
Start()
Stop() error
}
type serverConstructor func() (ServerController, error)

type ServerReloader struct {
s ServerController
log *logger.Logger
newServerFn serverConstructor

config *monitoringCfg.MonitoringConfig
isServerRunning bool
isServerRunningLock sync.Mutex
}

func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader {
sr := &ServerReloader{
log: log,
config: mcfg,
newServerFn: newServerFn,
}

return sr
}

func (sr *ServerReloader) Start() {
sr.isServerRunningLock.Lock()
defer sr.isServerRunningLock.Unlock()

sr.start()
}

func (sr *ServerReloader) start() {
if sr.s != nil && sr.isServerRunning {
// server is already running
return
}

sr.log.Info("Starting server")
var err error
sr.s, err = sr.newServerFn()
if err != nil {
sr.log.Errorf("Failed creating a server: %v", err)
return
}

sr.s.Start()
sr.log.Debugf("Server started")
sr.isServerRunning = true
}

func (sr *ServerReloader) Stop() error {
sr.isServerRunningLock.Lock()
defer sr.isServerRunningLock.Unlock()

return sr.stop()
}

func (sr *ServerReloader) stop() error {
if sr.s == nil {
// stopping not started server
sr.isServerRunning = false
return nil
}
sr.log.Info("Stopping server")

sr.isServerRunning = false
if err := sr.s.Stop(); err != nil {
return err
}

sr.log.Debugf("Server stopped")
sr.s = nil
return nil
}

func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error {
sr.isServerRunningLock.Lock()
defer sr.isServerRunningLock.Unlock()

newConfig := configuration.DefaultConfiguration()
if err := rawConfig.Unpack(&newConfig); err != nil {
return errors.New(err, "failed to unpack monitoring config during reload")
}

sr.config = newConfig.Settings.MonitoringConfig

shouldRunMetrics := sr.config.Enabled && sr.config.MonitorMetrics
if shouldRunMetrics && !sr.isServerRunning {
sr.start()

sr.isServerRunning = true
return nil
}

if !shouldRunMetrics && sr.isServerRunning {
sr.isServerRunning = false
return sr.stop()
}

return nil
}
Loading

0 comments on commit 0c7d0cb

Please sign in to comment.