Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow agent to monitor endpoint #4789

Merged
merged 39 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e0621fa
first attempt
fearful-symmetry May 7, 2024
3d21818
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 7, 2024
6aad6e1
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 7, 2024
f165295
still tinkering
fearful-symmetry May 15, 2024
c869512
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 15, 2024
99de46c
first draft change
fearful-symmetry May 21, 2024
30028ff
cleanup
fearful-symmetry May 21, 2024
1359938
clean up
fearful-symmetry May 21, 2024
354b7d0
add changelog
fearful-symmetry May 21, 2024
5f5a7a3
format
fearful-symmetry May 22, 2024
a09b83d
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 22, 2024
704d9ea
fix component lookup
fearful-symmetry May 22, 2024
2e08d79
fix state tests
fearful-symmetry May 22, 2024
f806c93
complete integration tests
fearful-symmetry May 23, 2024
71b5815
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 23, 2024
04b64fb
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry May 31, 2024
3451d88
basic cleanup
fearful-symmetry Jun 3, 2024
6771de5
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 3, 2024
824cc7b
update metrics setup
fearful-symmetry Jun 3, 2024
937f7ac
remove mage changes
fearful-symmetry Jun 3, 2024
4853383
spelling
fearful-symmetry Jun 3, 2024
4d417cd
fix test
fearful-symmetry Jun 4, 2024
5f7d633
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 4, 2024
baafc48
figured out weird test failures
fearful-symmetry Jun 4, 2024
6f17e4e
still fixing unit tests
fearful-symmetry Jun 4, 2024
8aaa074
major test improvements
fearful-symmetry Jun 4, 2024
1c3121d
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 5, 2024
e15fba3
finish up tests
fearful-symmetry Jun 5, 2024
0e35f59
remove log line
fearful-symmetry Jun 7, 2024
aa4c5fc
update name
fearful-symmetry Jun 7, 2024
7ed6390
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 10, 2024
76fea5e
fix comments
fearful-symmetry Jun 10, 2024
9bcdb4f
fix bool logic
fearful-symmetry Jun 10, 2024
88d9bba
fix tests
fearful-symmetry Jun 10, 2024
c381be2
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 12, 2024
3e3aa15
fix merge
fearful-symmetry Jun 12, 2024
f1d0e5a
add warning and mock output to inspect
fearful-symmetry Jun 13, 2024
0346c58
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 13, 2024
2147418
Merge remote-tracking branch 'upstream/main' into service-monitoring
fearful-symmetry Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1716320508-add-endpoint-monitoring.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add monitoring of endpoint usage metrics

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4789

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4083
42 changes: 37 additions & 5 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -82,7 +83,7 @@ type MonitorManager interface {
Reload(rawConfig *config.Config) error

// MonitoringConfig injects monitoring configuration into resolved ast tree.
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string) (map[string]interface{}, error)
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

// Runner provides interface to run a manager and receive running errors.
Expand Down Expand Up @@ -284,6 +285,14 @@ type Coordinator struct {
// loop in runLoopIteration() is active and listening.
// Should only be interacted with via CoordinatorActive() or runLoopIteration()
heartbeatChan chan struct{}

// if a component (mostly endpoint) has a new PID, we need to update
// the monitoring components so they have a PID to monitor
// however, if endpoint is in some kind of restart loop,
// we could DOS the config system. Instead,
// run a ticker that checks to see if we have a new PID.
componentPIDTicker *time.Ticker
componentPidRequiresUpdate *atomic.Bool
}

// The channels Coordinator reads to receive updates from the various managers.
Expand Down Expand Up @@ -374,10 +383,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
// synchronization in the subscriber API, just set the input buffer to 0.
stateBroadcaster: broadcaster.New(state, 64, 32),

logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
heartbeatChan: make(chan struct{}),
logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
heartbeatChan: make(chan struct{}),
componentPIDTicker: time.NewTicker(time.Second * 30),
componentPidRequiresUpdate: &atomic.Bool{},
}
// Setup communication channels for any non-nil components. This pattern
// lets us transparently accept nil managers / simulated events during
Expand Down Expand Up @@ -926,6 +937,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

defer c.componentPIDTicker.Stop()

// We run nil checks before starting the various managers so that unit tests
// only have to initialize / mock the specific components they're testing.
// If a manager is nil, we prebuffer its return channel with nil also so
Expand Down Expand Up @@ -1038,6 +1051,19 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case c.heartbeatChan <- struct{}{}:

case <-c.componentPIDTicker.C:
// if we hit the ticker and we've got a new PID,
// reload the component model
if c.componentPidRequiresUpdate.Load() {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
c.componentPidRequiresUpdate.Store(false)
err := c.refreshComponentModel(ctx)
if err != nil {
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
c.setConfigManagerError(err)
c.logger.Errorf("%s", err)
}
}

case componentState := <-c.managerChans.runtimeManagerUpdate:
// New component change reported by the runtime manager via
// Coordinator.watchRuntimeComponents(), merge it with the
Expand Down Expand Up @@ -1277,11 +1303,17 @@ func (c *Coordinator) generateComponentModel() (err error) {
configInjector = c.monitorMgr.MonitoringConfig
}

var existingCompState = make(map[string]uint64, len(c.state.Components))
for _, comp := range c.state.Components {
existingCompState[comp.Component.ID] = comp.State.Pid
}

comps, err := c.specs.ToComponents(
cfg,
configInjector,
c.state.LogLevel,
c.agentInfo,
existingCompState,
)
if err != nil {
return fmt.Errorf("failed to render components: %w", err)
Expand Down
13 changes: 13 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,25 @@ func (c *Coordinator) refreshState() {
// Coordinator state and sets stateNeedsRefresh.
// Must be called on the main Coordinator goroutine.
func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) {

// check for any component updates to the known PID, so we can update the component monitoring
found := false
pidRequiresUpdate := false
for i, other := range c.state.Components {
if other.Component.ID == state.Component.ID {
if other.State.Pid != state.State.Pid {
pidRequiresUpdate = true
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}
c.state.Components[i] = state
found = true
break
}
}
if !found {
c.state.Components = append(c.state.Components, state)
if state.State.Pid != 0 {
pidRequiresUpdate = true
}
}

// In the case that the component has stopped, it is now removed.
Expand All @@ -160,6 +169,10 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
}

c.stateNeedsRefresh = true

if pidRequiresUpdate {
c.componentPidRequiresUpdate.Store(true)
}
}

// generateReportableState aggregates the internal state of the Coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
resultChan <- ctx.Err()
return
case state := <-subChan:
t.Logf("%+v", state)
if len(state.Components) == 3 {
compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0")
compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1")
Expand All @@ -599,6 +598,11 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
(unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") {
resultChan <- nil
return
} else if unit0.State == client.UnitStateFailed && unit1.State == client.UnitStateFailed {
// if you get a really strange failed state, check to make sure the mock binaries in
// elastic-agent/pkg/component/fake/ are updated
t.Fail()
t.Logf("got units with failed state: %#v / %#v", unit1, unit0)
}
}
}
Expand Down Expand Up @@ -1004,7 +1008,7 @@ func (*testMonitoringManager) Prepare(_ string) error
func (*testMonitoringManager) Cleanup(string) error { return nil }
func (*testMonitoringManager) Enabled() bool { return false }
func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil }
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string) (map[string]interface{}, error) {
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) {
return nil, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestVarsManagerError(t *testing.T) {
managerChans: managerChans{
varsManagerError: varsErrorChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}
// Send an error via the vars manager channel, and let Coordinator update
const errorStr = "force error"
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestCoordinatorReportsUnhealthyComponents(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

unhealthyComponent := runtime.ComponentComponentState{
Expand Down Expand Up @@ -186,6 +188,7 @@ func TestCoordinatorComponentStatesAreSeparate(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

comp1 := runtime.ComponentComponentState{
Expand Down Expand Up @@ -256,6 +259,7 @@ func TestCoordinatorReportsUnhealthyUnits(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a healthy component with healthy input and output units
Expand Down Expand Up @@ -375,8 +379,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
runtimeMgr: &fakeRuntimeManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
ast: emptyAST(t),
vars: emptyVars(t),
ast: emptyAST(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Send an invalid config update and confirm that Coordinator reports
Expand All @@ -389,7 +394,6 @@ agent.download.sourceURI:
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)

assert.True(t, cfgChange.failed, "Policy with invalid field should have reported failed config change")
require.ErrorContainsf(t,
cfgChange.err,
Expand Down Expand Up @@ -420,6 +424,7 @@ agent.download.sourceURI:
// (This check is based on a previous bug in which a vars update could
// discard active policy errors.)
varsChan <- emptyVars(t)
t.Logf("after emptyVars statement")
coord.runLoopIteration(ctx)

assert.Error(t, coord.configErr, "Vars update shouldn't affect configErr")
Expand Down Expand Up @@ -489,8 +494,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) {
runtimeMgr: &fakeRuntimeManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
ast: emptyAST(t),
vars: emptyVars(t),
ast: emptyAST(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// This configuration produces a valid AST but its EQL condition is
Expand Down Expand Up @@ -583,8 +589,9 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}
coord.RegisterMonitoringServer(monitoringReloader)

Expand Down Expand Up @@ -711,8 +718,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output
Expand Down Expand Up @@ -798,8 +806,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// manager, so it receives the update result.
runtimeManagerError: updateErrChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Send an empty policy which should forward an empty component model to
Expand Down Expand Up @@ -860,8 +869,9 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) {
configManagerUpdate: configChan,
varsManagerUpdate: varsChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output
Expand Down Expand Up @@ -936,7 +946,8 @@ func TestCoordinatorReportsOverrideState(t *testing.T) {
stateBroadcaster: &broadcaster.Broadcaster[State]{
InputChan: stateChan,
},
overrideStateChan: overrideStateChan,
overrideStateChan: overrideStateChan,
componentPIDTicker: time.NewTicker(time.Second * 30),
}
// Send an error via the vars manager channel, and let Coordinator update
overrideStateChan <- &coordinatorOverrideState{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ log_level: "warning"
components:
- id: "comp-1"
state:
pid: 0
state: 3
message: "degraded message"
features_idx: 0
Expand Down Expand Up @@ -570,6 +571,7 @@ log_level: "warning"
components:
- id: "comp-1"
state:
pid: 0
state: 3
message: "degraded message"
features_idx: 0
Expand Down
Loading