Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Feature] Start/stop monitoring server based on monitoring config" #3583

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

16 changes: 0 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
var CoordinatorShutdownTimeout = time.Second * 5

type configReloader interface {
Reload(*config.Config) error
}

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand All @@ -177,8 +173,6 @@ type Coordinator struct {
upgradeMgr UpgradeManager
monitorMgr MonitorManager

monitoringServerReloader configReloader

runtimeMgr RuntimeManager
configMgr ConfigManager
varsMgr VarsManager
Expand Down Expand Up @@ -371,10 +365,6 @@ func (c *Coordinator) State() State {
return c.stateBroadcaster.Get()
}

func (c *Coordinator) RegisterMonitoringServer(s configReloader) {
c.monitoringServerReloader = s
}

// StateSubscribe returns a channel that reports changes in Coordinator state.
//
// bufferLen specifies how many state changes should be queued in addition to
Expand Down Expand Up @@ -1018,12 +1008,6 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
}
}

if c.monitoringServerReloader != nil {
if err := c.monitoringServerReloader.Reload(cfg); err != nil {
return fmt.Errorf("failed to reload monitor manager configuration: %w", err)
}
}

c.ast = rawAst
return nil
}
Expand Down
155 changes: 0 additions & 155 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -522,137 +520,6 @@ inputs:
}
}

func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
// runtime manager, then send an empty policy and verify it calls
// another runtime manager update with an empty component model.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return nil
},
}

monitoringServer := &fakeMonitoringServer{}
newServerFn := func() (reload.ServerController, error) {
return monitoringServer, nil
}
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig())

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
}
coord.RegisterMonitoringServer(monitoringReloader)

// Create a policy with one input and one output
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started by default
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// disable monitoring
cfgDisableMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgDisableMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)

// enable monitoring
cfgEnabledMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started again
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// enable monitoring and disable metrics
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
agent.monitoring.metrics: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring.metrics is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)
}

func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
Expand Down Expand Up @@ -994,25 +861,3 @@ func emptyAST(t *testing.T) *transpiler.AST {
require.NoError(t, err, "AST creation must succeed")
return ast
}

type fakeMonitoringServer struct {
startTriggered bool
stopTriggered bool
isRunning bool
}

func (fs *fakeMonitoringServer) Start() {
fs.startTriggered = true
fs.isRunning = true
}

func (fs *fakeMonitoringServer) Stop() error {
fs.stopTriggered = true
fs.isRunning = false
return nil
}

func (fs *fakeMonitoringServer) Reset() {
fs.stopTriggered = false
fs.startTriggered = false
}
120 changes: 0 additions & 120 deletions internal/pkg/agent/application/monitoring/reload/reload.go

This file was deleted.

Loading