Skip to content

Commit

Permalink
Revert "[Feature] Start/stop monitoring server based on monitoring co…
Browse files Browse the repository at this point in the history
…nfig (#3492)" (#3583)

This reverts commit 6cc587a.
  • Loading branch information
michalpristas authored Oct 11, 2023
1 parent fd06296 commit ef598c5
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 478 deletions.

This file was deleted.

16 changes: 0 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
var CoordinatorShutdownTimeout = time.Second * 5

type configReloader interface {
Reload(*config.Config) error
}

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand All @@ -177,8 +173,6 @@ type Coordinator struct {
upgradeMgr UpgradeManager
monitorMgr MonitorManager

monitoringServerReloader configReloader

runtimeMgr RuntimeManager
configMgr ConfigManager
varsMgr VarsManager
Expand Down Expand Up @@ -371,10 +365,6 @@ func (c *Coordinator) State() State {
return c.stateBroadcaster.Get()
}

func (c *Coordinator) RegisterMonitoringServer(s configReloader) {
c.monitoringServerReloader = s
}

// StateSubscribe returns a channel that reports changes in Coordinator state.
//
// bufferLen specifies how many state changes should be queued in addition to
Expand Down Expand Up @@ -1018,12 +1008,6 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
}
}

if c.monitoringServerReloader != nil {
if err := c.monitoringServerReloader.Reload(cfg); err != nil {
return fmt.Errorf("failed to reload monitor manager configuration: %w", err)
}
}

c.ast = rawAst
return nil
}
Expand Down
155 changes: 0 additions & 155 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -522,137 +520,6 @@ inputs:
}
}

func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
// runtime manager, then send an empty policy and verify it calls
// another runtime manager update with an empty component model.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return nil
},
}

monitoringServer := &fakeMonitoringServer{}
newServerFn := func() (reload.ServerController, error) {
return monitoringServer, nil
}
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig())

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
}
coord.RegisterMonitoringServer(monitoringReloader)

// Create a policy with one input and one output
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started by default
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// disable monitoring
cfgDisableMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgDisableMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)

// enable monitoring
cfgEnabledMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started again
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// enable monitoring and disable metrics
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
agent.monitoring.metrics: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring.metrics is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)
}

func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
Expand Down Expand Up @@ -994,25 +861,3 @@ func emptyAST(t *testing.T) *transpiler.AST {
require.NoError(t, err, "AST creation must succeed")
return ast
}

type fakeMonitoringServer struct {
startTriggered bool
stopTriggered bool
isRunning bool
}

func (fs *fakeMonitoringServer) Start() {
fs.startTriggered = true
fs.isRunning = true
}

func (fs *fakeMonitoringServer) Stop() error {
fs.stopTriggered = true
fs.isRunning = false
return nil
}

func (fs *fakeMonitoringServer) Reset() {
fs.stopTriggered = false
fs.startTriggered = false
}
120 changes: 0 additions & 120 deletions internal/pkg/agent/application/monitoring/reload/reload.go

This file was deleted.

Loading

0 comments on commit ef598c5

Please sign in to comment.