diff --git a/changelog/fragments/1716320508-add-endpoint-monitoring.yaml b/changelog/fragments/1716320508-add-endpoint-monitoring.yaml new file mode 100644 index 00000000000..fd052449064 --- /dev/null +++ b/changelog/fragments/1716320508-add-endpoint-monitoring.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add monitoring of endpoint usage metrics + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/4789 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/4083 \ No newline at end of file diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 1cf6945d3c8..c967b51b732 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -10,6 +10,7 @@ import ( "fmt" "reflect" "strings" + "sync/atomic" "time" "github.com/hashicorp/go-multierror" @@ -82,7 +83,12 @@ type MonitorManager interface { Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. - MonitoringConfig(map[string]interface{}, []component.Component, map[string]string) (map[string]interface{}, error) + // args: + // - the existing config policy + // - a list of the expected running components + // - a map of component IDs to binary names + // - a map of component IDs to the PIDs of the running components. + MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error) } // Runner provides interface to run a manager and receive running errors. @@ -284,6 +290,14 @@ type Coordinator struct { // loop in runLoopIteration() is active and listening. // Should only be interacted with via CoordinatorActive() or runLoopIteration() heartbeatChan chan struct{} + + // if a component (mostly endpoint) has a new PID, we need to update + // the monitoring components so they have a PID to monitor + // however, if endpoint is in some kind of restart loop, + // we could DOS the config system. Instead, + // run a ticker that checks to see if we have a new PID. + componentPIDTicker *time.Ticker + componentPidRequiresUpdate *atomic.Bool } // The channels Coordinator reads to receive updates from the various managers. @@ -374,10 +388,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), - logLevelCh: make(chan logp.Level), - overrideStateChan: make(chan *coordinatorOverrideState), - upgradeDetailsChan: make(chan *details.Details), - heartbeatChan: make(chan struct{}), + logLevelCh: make(chan logp.Level), + overrideStateChan: make(chan *coordinatorOverrideState), + upgradeDetailsChan: make(chan *details.Details), + heartbeatChan: make(chan struct{}), + componentPIDTicker: time.NewTicker(time.Second * 30), + componentPidRequiresUpdate: &atomic.Bool{}, } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -926,6 +942,8 @@ func (c *Coordinator) runner(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + defer c.componentPIDTicker.Stop() + // We run nil checks before starting the various managers so that unit tests // only have to initialize / mock the specific components they're testing. // If a manager is nil, we prebuffer its return channel with nil also so @@ -1038,6 +1056,18 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case c.heartbeatChan <- struct{}{}: + case <-c.componentPIDTicker.C: + // if we hit the ticker and we've got a new PID, + // reload the component model + if c.componentPidRequiresUpdate.Swap(false) { + err := c.refreshComponentModel(ctx) + if err != nil { + err = fmt.Errorf("error refreshing component model for PID update: %w", err) + c.setConfigManagerError(err) + c.logger.Errorf("%s", err) + } + } + case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the @@ -1277,11 +1307,17 @@ func (c *Coordinator) generateComponentModel() (err error) { configInjector = c.monitorMgr.MonitoringConfig } + var existingCompState = make(map[string]uint64, len(c.state.Components)) + for _, comp := range c.state.Components { + existingCompState[comp.Component.ID] = comp.State.Pid + } + comps, err := c.specs.ToComponents( cfg, configInjector, c.state.LogLevel, c.agentInfo, + existingCompState, ) if err != nil { return fmt.Errorf("failed to render components: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 0d6ba22bddd..6e388d961f1 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -135,9 +135,14 @@ func (c *Coordinator) refreshState() { // Coordinator state and sets stateNeedsRefresh. // Must be called on the main Coordinator goroutine. func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) { + + // check for any component updates to the known PID, so we can update the component monitoring found := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { + if other.State.Pid != state.State.Pid { + c.componentPidRequiresUpdate.Store(true) + } c.state.Components[i] = state found = true break @@ -145,6 +150,9 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) } if !found { c.state.Components = append(c.state.Components, state) + if state.State.Pid != 0 { + c.componentPidRequiresUpdate.Store(true) + } } // In the case that the component has stopped, it is now removed. @@ -160,6 +168,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) } c.stateNeedsRefresh = true + } // generateReportableState aggregates the internal state of the Coordinator diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 0b76ce4a5c7..d522638a2c4 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -587,7 +587,6 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) { resultChan <- ctx.Err() return case state := <-subChan: - t.Logf("%+v", state) if len(state.Components) == 3 { compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0") compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1") @@ -599,6 +598,11 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) { (unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") { resultChan <- nil return + } else if unit0.State == client.UnitStateFailed && unit1.State == client.UnitStateFailed { + // if you get a really strange failed state, check to make sure the mock binaries in + // elastic-agent/pkg/component/fake/ are updated + t.Fail() + t.Logf("got units with failed state: %#v / %#v", unit1, unit0) } } } @@ -1007,7 +1011,7 @@ func (*testMonitoringManager) Prepare(_ string) error func (*testMonitoringManager) Cleanup(string) error { return nil } func (*testMonitoringManager) Enabled() bool { return false } func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil } -func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string) (map[string]interface{}, error) { +func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) { return nil, nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 2831e3e6a4e..5aa8e975872 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -59,6 +59,7 @@ func TestVarsManagerError(t *testing.T) { managerChans: managerChans{ varsManagerError: varsErrorChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an error via the vars manager channel, and let Coordinator update const errorStr = "force error" @@ -110,6 +111,7 @@ func TestCoordinatorReportsUnhealthyComponents(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } unhealthyComponent := runtime.ComponentComponentState{ @@ -186,6 +188,7 @@ func TestCoordinatorComponentStatesAreSeparate(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } comp1 := runtime.ComponentComponentState{ @@ -256,6 +259,7 @@ func TestCoordinatorReportsUnhealthyUnits(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a healthy component with healthy input and output units @@ -375,8 +379,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { runtimeMgr: &fakeRuntimeManager{}, // Set valid but empty initial values for ast and vars - vars: emptyVars(t), - ast: emptyAST(t), + vars: emptyVars(t), + ast: emptyAST(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an invalid config update and confirm that Coordinator reports @@ -389,7 +394,6 @@ agent.download.sourceURI: cfgChange := &configChange{cfg: cfg} configChan <- cfgChange coord.runLoopIteration(ctx) - assert.True(t, cfgChange.failed, "Policy with invalid field should have reported failed config change") require.ErrorContainsf(t, cfgChange.err, @@ -420,6 +424,7 @@ agent.download.sourceURI: // (This check is based on a previous bug in which a vars update could // discard active policy errors.) varsChan <- emptyVars(t) + t.Logf("after emptyVars statement") coord.runLoopIteration(ctx) assert.Error(t, coord.configErr, "Vars update shouldn't affect configErr") @@ -489,8 +494,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { runtimeMgr: &fakeRuntimeManager{}, // Set valid but empty initial values for ast and vars - vars: emptyVars(t), - ast: emptyAST(t), + vars: emptyVars(t), + ast: emptyAST(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // This configuration produces a valid AST but its EQL condition is @@ -583,8 +589,9 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { managerChans: managerChans{ configManagerUpdate: configChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } coord.RegisterMonitoringServer(monitoringReloader) @@ -711,8 +718,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { managerChans: managerChans{ configManagerUpdate: configChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a policy with one input and one output @@ -798,8 +806,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // manager, so it receives the update result. runtimeManagerError: updateErrChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an empty policy which should forward an empty component model to @@ -860,8 +869,9 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) { configManagerUpdate: configChan, varsManagerUpdate: varsChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a policy with one input and one output @@ -936,7 +946,8 @@ func TestCoordinatorReportsOverrideState(t *testing.T) { stateBroadcaster: &broadcaster.Broadcaster[State]{ InputChan: stateChan, }, - overrideStateChan: overrideStateChan, + overrideStateChan: overrideStateChan, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an error via the vars manager channel, and let Coordinator update overrideStateChan <- &coordinatorOverrideState{ diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 9a9126aa5e5..8d0b65e0df1 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -488,6 +488,7 @@ log_level: "warning" components: - id: "comp-1" state: + pid: 0 state: 3 message: "degraded message" features_idx: 0 @@ -570,6 +571,7 @@ log_level: "warning" components: - id: "comp-1" state: + pid: 0 state: 3 message: "degraded message" features_idx: 0 diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 82fce6ff29b..ce6bda03fcc 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -67,7 +67,8 @@ var ( supportedBeatsComponents = []string{"filebeat", "metricbeat", "apm-server", "fleet-server", "auditbeat", "cloudbeat", "heartbeat", "osquerybeat", "packetbeat", "pf-elastic-collector", "pf-elastic-symbolizer"} ) -// BeatsMonitor is providing V1 monitoring support for metrics and logs for endpoint-security only. +// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc) +// by injecting the monitoring config into an existing fleet config type BeatsMonitor struct { enabled bool // feature flag disabling whole v1 monitoring story config *monitoringConfig @@ -110,10 +111,16 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error { } // MonitoringConfig adds monitoring inputs to a configuration based on retrieved list of components to run. +// args: +// policy: the existing config policy +// components: a list of the expected running components +// componentIDToBinary: a map of component IDs to binary names +// componentIDPidMap: a map of component IDs to the PIDs of the running components. func (b *BeatsMonitor) MonitoringConfig( policy map[string]interface{}, components []component.Component, componentIDToBinary map[string]string, + componentIDPidMap map[string]uint64, ) (map[string]interface{}, error) { if !b.Enabled() { return nil, nil @@ -158,7 +165,7 @@ func (b *BeatsMonitor) MonitoringConfig( } if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, monitoringOutput, components); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -298,6 +305,7 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{ return nil } +// injectLogsInput adds logging configs for component monitoring to the `cfg` map func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []component.Component, monitoringOutput string) error { monitoringNamespace := b.monitoringNamespace() logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem)) @@ -448,7 +456,6 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] // only monitor service inputs that define a log path continue } - fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.BinaryName(), "-", "_"), "/", "_") // conform with index naming policy dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName) streams = append(streams, map[string]interface{}{ @@ -534,7 +541,8 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } -func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error { +// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object. +func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, componentList []component.Component, existingStateServicePids map[string]uint64) error { metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") @@ -908,6 +916,89 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } + // add system/process metrics for services that can't be monitored via json/beats metrics + // If there's a checkin PID and the corresponding component has a service spec section, add a system/process config + for _, compState := range componentList { + if compState.InputSpec != nil && compState.InputSpec.Spec.Service != nil { + if comp, ok := existingStateServicePids[compState.ID]; ok && comp != 0 { + name := strings.ReplaceAll(strings.ReplaceAll(compState.BinaryName(), "-", "_"), "/", "_") + inputs = append(inputs, map[string]interface{}{ + idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), + "name": fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), + "type": "system/metrics", + useOutputKey: monitoringOutput, + "data_stream": map[string]interface{}{ + "namespace": monitoringNamespace, + }, + "streams": []interface{}{ + map[string]interface{}{ + idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": fmt.Sprintf("elastic_agent.%s", name), + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"process"}, + "period": metricsCollectionIntervalString, + "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace), + "process.pid": comp, + "process.cgroups.enabled": false, + "processors": []interface{}{ + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": fmt.Sprintf("elastic_agent.%s", name), + "namespace": monitoringNamespace, + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": fmt.Sprintf("elastic_agent.%s", name), + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + "version": b.agentInfo.Version(), + "snapshot": b.agentInfo.Snapshot(), + "process": name, + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "component", + "fields": map[string]interface{}{ + "binary": name, + "id": compState.ID, + }, + }, + }, + }, + }, + }, + }) + } + + } + } + // if we have shipper data, inject the extra inputs if len(shipperHTTPStreams) > 0 { inputs = append(inputs, map[string]interface{}{ diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index f18a459b4cc..b3b9a633aa4 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -22,6 +22,95 @@ import ( "github.com/elastic/elastic-agent/pkg/component" ) +func TestMonitoringWithEndpoint(t *testing.T) { + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + testMon := BeatsMonitor{ + enabled: true, + config: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + + Enabled: true, + }, + }, + }, + agentInfo: agentInfo, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + + // manually declaring all the MonitoringConfig() args since there's a lot of them, and this makes + // the test a little more self-describing + + compList := []component.Component{ + { + ID: "endpoint-default", + InputSpec: &component.InputRuntimeSpec{ + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Name: "endpoint-security", + }, + Service: &component.ServiceSpec{ + CPort: 7688, + }, + }, + }, + }, + } + + compIdToBinary := map[string]string{ + "endpoint-default": "endpoint-security", + "filebeat-default": "filebeat", + } + existingPidStateMap := map[string]uint64{ + "endpoint-default": 1234, + } + + outCfg, err := testMon.MonitoringConfig(policy, compList, compIdToBinary, existingPidStateMap) + require.NoError(t, err) + + inputCfg := outCfg["inputs"].([]interface{}) + + foundConfig := false + + for _, cfg := range inputCfg { + unwrappedCfg := cfg.(map[string]interface{}) + if idName, ok := unwrappedCfg["id"]; ok && idName == "metrics-monitoring-endpoint_security" { + foundConfig = true + for compName, compCfg := range unwrappedCfg { + if compName == "streams" { + streamCfgUnwrapped := compCfg.([]interface{}) + for _, streamCfg := range streamCfgUnwrapped { + streamValues := streamCfg.(map[string]interface{}) + require.Equal(t, []interface{}{"process"}, streamValues["metricsets"]) + require.Equal(t, "metrics-elastic_agent.endpoint_security-default", streamValues["index"]) + require.Equal(t, uint64(1234), streamValues["process.pid"]) + } + } + + } + } + } + + require.True(t, foundConfig) +} + func TestMonitoringConfigMetricsInterval(t *testing.T) { agentInfo, err := info.NewAgentInfo(context.Background(), false) @@ -56,7 +145,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { operatingSystem: runtime.GOOS, agentInfo: agentInfo, } - got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}) // put a componentID/binary mapping to have something in the beats monitoring input + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input assert.NoError(t, err) rawInputs, ok := got["inputs"] @@ -146,7 +235,7 @@ func TestMonitoringConfigComponentFields(t *testing.T) { }, }, } - monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}) + monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}, map[string]uint64{}) if err != nil { t.Fatalf("cannot render monitoring configuration: %s", err) } diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index 271aa26eedc..848141ecba9 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -181,20 +181,40 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return fmt.Errorf("failed to get binary mappings: %w", err) } + // service units like endpoint are special; they require a PID to monitor. + // however, `inspect` doesn't talk to the coordinator backend, which means it can't know their actual PID from this point in the code + // instead, we look for service units and create a fake PID, so we print the monitoring config anyway. + serviceUnitExists := false + fakeServicePids := map[string]uint64{} + // The monitoring config depends on a map from component id to // binary name. binaryMapping := make(map[string]string) for _, component := range components { if spec := component.InputSpec; spec != nil { binaryMapping[component.ID] = component.BinaryName() + if spec.Spec.Service != nil { + serviceUnitExists = true + fakeServicePids[component.ID] = 1234 + } } } - monitorCfg, err := monitorFn(cfg, components, binaryMapping) + monitorCfg, err := monitorFn(cfg, components, binaryMapping, fakeServicePids) if err != nil { return fmt.Errorf("failed to get monitoring config: %w", err) } if monitorCfg != nil { + + // see above comment; because we don't know endpoint's actual PID, we need to make a fake one. Warn the user. + if serviceUnitExists { + keys := make([]string, 0, len(fakeServicePids)) + for k := range fakeServicePids { + keys = append(keys, k) + } + fmt.Fprintf(streams.Err, "WARNING: the inspect command can't accurately produce monitoring configs for service units: %v. Use the diagnostics command to get the real config used for monitoring these components\n", keys) + } + rawCfg := config.MustNewConfigFrom(cfg) if err := rawCfg.Merge(monitorCfg); err != nil { @@ -205,7 +225,9 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, if err != nil { return fmt.Errorf("failed to convert monitoring config: %w", err) } + } + } return printMapStringConfig(cfg, streams) @@ -351,7 +373,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri } // Compute the components from the computed configuration. - comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo) + comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index a2b2240ea26..ed8902d9ba2 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -300,7 +300,7 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi if err != nil { return nil, aerrors.New("failed to create a map from config", err) } - allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil) + allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/pkg/component/component.go b/pkg/component/component.go index 1f3579f1ce5..f7b7195d1cd 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -24,7 +24,7 @@ import ( ) // GenerateMonitoringCfgFn is a function that can inject information into the model generation process. -type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string) (map[string]interface{}, error) +type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string, map[string]uint64) (map[string]interface{}, error) type HeadersProvider interface { Headers() map[string]string @@ -283,6 +283,7 @@ func (r *RuntimeSpecs) ToComponents( monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider, + currentServiceCompInts map[string]uint64, ) ([]Component, error) { components, err := r.PolicyToComponents(policy, ll, headers) if err != nil { @@ -296,7 +297,7 @@ func (r *RuntimeSpecs) ToComponents( for _, component := range components { binaryMapping[component.ID] = component.BinaryName() } - monitoringCfg, err := monitoringInjector(policy, components, binaryMapping) + monitoringCfg, err := monitoringInjector(policy, components, binaryMapping, currentServiceCompInts) if err != nil { return nil, fmt.Errorf("failed to inject monitoring: %w", err) } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index f13a2767b4b..bf09c2be41e 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -3555,7 +3555,7 @@ func TestToComponents(t *testing.T) { runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck()) require.NoError(t, err) - result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers) + result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, map[string]uint64{}) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) } else { @@ -4066,7 +4066,7 @@ func TestFlattenedDataStream(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } @@ -4167,7 +4167,7 @@ func TestFlattenedDataStreamIsolatedUnits(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index a3285ffa065..55ff74f9089 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -295,6 +295,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p close(recvDone) return } + c.logger.Infof("got checkin with pid %d", checkin.Pid) c.checkinObserved <- checkin } }() diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index 67dc9c028d3..6bd927fc68b 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -75,6 +75,11 @@ type ComponentState struct { VersionInfo ComponentVersionInfo `yaml:"version_info"` + // The PID of the process, as obtained from the *from the Protobuf API* + // As of now, this is only used by Endpoint, as agent doesn't know the PID + // of the endpoint service. If you need the PID for beats, use the coordinator/communicator + Pid uint64 + // internal expectedUnits map[ComponentUnitKey]expectedUnitState @@ -269,6 +274,12 @@ func (s *ComponentState) syncUnits(comp *component.Component) bool { func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false + + if s.Pid != checkin.Pid { + changed = true + s.Pid = checkin.Pid + } + touched := make(map[ComponentUnitKey]bool) for _, unit := range checkin.Units { key := ComponentUnitKey{ diff --git a/testing/integration/endpoint_security_test.go b/testing/integration/endpoint_security_test.go index ccd79a90aec..c0a1bc47dee 100644 --- a/testing/integration/endpoint_security_test.go +++ b/testing/integration/endpoint_security_test.go @@ -8,10 +8,8 @@ package integration import ( "archive/zip" - "bytes" "context" - _ "embed" - "encoding/json" + "fmt" "io/fs" "os" @@ -20,7 +18,6 @@ import ( "slices" "strings" "testing" - "text/template" "time" "github.com/google/uuid" @@ -39,21 +36,9 @@ import ( ) const ( - // TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package - endpointPackageVersion = "8.11.0" endpointHealthPollingTimeout = 2 * time.Minute ) -//go:embed endpoint_security_package.json.tmpl -var endpointPackagePolicyTemplate string - -type endpointPackageTemplateVars struct { - ID string - Name string - PolicyID string - Version string -} - var protectionTests = []struct { name string protected bool @@ -451,52 +436,6 @@ type agentPolicyUpdateRequest struct { IsProtected bool `json:"is_protected"` } -// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. -func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { - t.Helper() - - t.Log("Templating endpoint package policy request") - tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) - if err != nil { - return r, fmt.Errorf("error creating new template: %w", err) - } - - packagePolicyID := uuid.New().String() - var pkgPolicyBuf bytes.Buffer - - // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get - // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} - err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ - ID: packagePolicyID, - Name: "Defend-" + packagePolicyID, - PolicyID: policyID, - Version: endpointPackageVersion, - }) - if err != nil { - return r, fmt.Errorf("error executing template: %w", err) - } - - // Make sure the templated value is actually valid JSON before making the API request. - // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. - packagePolicyReq := kibana.PackagePolicyRequest{} - err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) - if err != nil { - return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) - } - - t.Log("POST /api/fleet/package_policies") - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) - if err != nil { - t.Logf("Error installing fleet package: %v", err) - return r, fmt.Errorf("error installing fleet package: %w", err) - } - t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) - return pkgResp, err -} - // Tests that install of Elastic Defend fails if Agent is installed in a base // path other than default func TestEndpointSecurityNonDefaultBasePath(t *testing.T) { @@ -883,55 +822,6 @@ func checkDiagnosticsForEndpointFiles(t *testing.T, diagsPath string, endpointCo } } -func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { - t.Helper() - - state, err := agentClient.State(ctx) - if err != nil { - t.Logf("Error getting agent state: %s", err) - return false - } - - if state.State != client.Healthy { - t.Logf("local Agent is not Healthy: current state: %+v", state) - return false - } - - foundEndpointInputUnit := false - foundEndpointOutputUnit := false - for _, comp := range state.Components { - isEndpointComponent := strings.Contains(comp.Name, "endpoint") - if comp.State != client.Healthy { - t.Logf("endpoint component is not Healthy: current state: %+v", comp) - return false - } - - for _, unit := range comp.Units { - if isEndpointComponent { - if unit.UnitType == client.UnitTypeInput { - foundEndpointInputUnit = true - } - if unit.UnitType == client.UnitTypeOutput { - foundEndpointOutputUnit = true - } - } - - if unit.State != client.Healthy { - t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) - return false - } - } - } - - // Ensure both the endpoint input and output units were found and healthy. - if !foundEndpointInputUnit || !foundEndpointOutputUnit { - t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) - return false - } - - return true -} - func agentIsHealthyNoEndpoint(t *testing.T, ctx context.Context, agentClient client.Client) bool { t.Helper() diff --git a/testing/integration/endpoint_test_tools.go b/testing/integration/endpoint_test_tools.go new file mode 100644 index 00000000000..db924be4845 --- /dev/null +++ b/testing/integration/endpoint_test_tools.go @@ -0,0 +1,133 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "strings" + "testing" + "text/template" + "time" + + "github.com/google/uuid" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/testing/define" +) + +//go:embed endpoint_security_package.json.tmpl +var endpointPackagePolicyTemplate string + +type endpointPackageTemplateVars struct { + ID string + Name string + PolicyID string + Version string +} + +// TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package +const endpointPackageVersion = "8.11.0" + +func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { + t.Helper() + + state, err := agentClient.State(ctx) + if err != nil { + t.Logf("Error getting agent state: %s", err) + return false + } + + if state.State != client.Healthy { + t.Logf("local Agent is not Healthy: current state: %+v", state) + return false + } + + foundEndpointInputUnit := false + foundEndpointOutputUnit := false + for _, comp := range state.Components { + isEndpointComponent := strings.Contains(comp.Name, "endpoint") + if comp.State != client.Healthy { + t.Logf("endpoint component is not Healthy: current state: %+v", comp) + return false + } + + for _, unit := range comp.Units { + if isEndpointComponent { + if unit.UnitType == client.UnitTypeInput { + foundEndpointInputUnit = true + } + if unit.UnitType == client.UnitTypeOutput { + foundEndpointOutputUnit = true + } + } + + if unit.State != client.Healthy { + t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) + return false + } + } + } + + // Ensure both the endpoint input and output units were found and healthy. + if !foundEndpointInputUnit || !foundEndpointOutputUnit { + t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) + return false + } + + return true +} + +// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. +func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { + t.Helper() + + t.Log("Templating endpoint package policy request") + tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) + if err != nil { + return r, fmt.Errorf("error creating new template: %w", err) + } + + packagePolicyID := uuid.New().String() + var pkgPolicyBuf bytes.Buffer + + // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get + // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} + err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ + ID: packagePolicyID, + Name: "Defend-" + packagePolicyID, + PolicyID: policyID, + Version: endpointPackageVersion, + }) + if err != nil { + return r, fmt.Errorf("error executing template: %w", err) + } + + // Make sure the templated value is actually valid JSON before making the API request. + // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. + packagePolicyReq := kibana.PackagePolicyRequest{} + err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) + if err != nil { + return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) + } + + t.Log("POST /api/fleet/package_policies") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) + if err != nil { + t.Logf("Error installing fleet package: %v", err) + return r, fmt.Errorf("error installing fleet package: %w", err) + } + t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) + return pkgResp, err +} diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go new file mode 100644 index 00000000000..b91a0fe55fe --- /dev/null +++ b/testing/integration/monitoring_endpoint_test.go @@ -0,0 +1,266 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "context" + "os/exec" + "runtime" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/elastic/elastic-agent-libs/kibana" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" +) + +type EndpointMetricsMonRunner struct { + suite.Suite + info *define.Info + fixture *atesting.Fixture + endpointID string +} + +func TestEndpointAgentServiceMonitoring(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: Fleet, + Stack: &define.Stack{}, + Local: false, // requires Agent installation + Sudo: true, // requires Agent installation + OS: []define.OS{ + {Type: define.Linux}, + }, + }) + + // Get path to agent executable. + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err, "could not create agent fixture") + + runner := &EndpointMetricsMonRunner{ + info: info, + fixture: fixture, + endpointID: "endpoint-default", + } + + suite.Run(t, runner) +} + +func (runner *EndpointMetricsMonRunner) SetupSuite() { + deadline := time.Now().Add(10 * time.Minute) + ctx, cancel := testcontext.WithDeadline(runner.T(), context.Background(), deadline) + defer cancel() + + runner.T().Log("Enrolling the agent in Fleet") + policyUUID := uuid.New().String() + + createPolicyReq := kibana.AgentPolicy{ + Name: "test-policy-" + policyUUID, + Namespace: "default", + Description: "Test policy " + policyUUID, + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + } + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + Privileged: true, + } + + policy, err := tools.InstallAgentWithPolicy(ctx, runner.T(), + installOpts, runner.fixture, runner.info.KibanaClient, createPolicyReq) + require.NoError(runner.T(), err, "failed to install agent with policy") + + runner.T().Log("Installing Elastic Defend") + pkgPolicyResp, err := installElasticDefendPackage(runner.T(), runner.info, policy.ID) + require.NoErrorf(runner.T(), err, "Policy Response was: %v", pkgPolicyResp) + + runner.T().Log("Polling for endpoint-security to become Healthy") + ctx, cancel = context.WithTimeout(ctx, time.Minute*3) + defer cancel() + + agentClient := runner.fixture.Client() + err = agentClient.Connect(ctx) + require.NoError(runner.T(), err, "could not connect to local agent") + + require.Eventually(runner.T(), + func() bool { return agentAndEndpointAreHealthy(runner.T(), ctx, agentClient) }, + time.Minute*3, + time.Second, + "Endpoint component or units are not healthy.", + ) + +} + +func (runner *EndpointMetricsMonRunner) TestEndpointMetrics() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) + defer cancel() + + agentStatus, err := runner.fixture.ExecStatus(ctx) + require.NoError(runner.T(), err) + + require.Eventually(runner.T(), func() bool { + + query := genESQueryByBinary(agentStatus.Info.ID, runner.endpointID) + res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) + require.NoError(runner.T(), err) + runner.T().Logf("Fetched metrics for %s, got %d hits", runner.endpointID, res.Hits.Total.Value) + return res.Hits.Total.Value >= 1 + }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", runner.endpointID, agentStatus.Info.ID) + +} + +func (runner *EndpointMetricsMonRunner) TestEndpointMetricsAfterRestart() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) + defer cancel() + // once we've gotten the first round of metrics,forcably restart endpoint, see if we still get metrics + // This makes sure that the backend coordinator can deal with properly updating the metrics handlers if there's unexpected state changes + + // confine this to linux; the behavior is platform-agnostic, and this way we have `pgrep` + if runtime.GOOS != "linux" { + return + } + + // kill endpoint + cmd := exec.Command("pgrep", "-f", "endpoint") + pgrep, err := cmd.CombinedOutput() + runner.T().Logf("killing pid: %s", string(pgrep)) + + cmd = exec.Command("pkill", "--signal", "SIGKILL", "-f", "endpoint") + _, err = cmd.CombinedOutput() + require.NoError(runner.T(), err) + + // wait for endpoint to come back up. We use `pgrep` + // since the agent health status won't imidately register that the endpoint process itself is gone. + require.Eventually(runner.T(), func() bool { + cmd := exec.Command("pgrep", "-f", "endpoint") + pgrep, err := cmd.CombinedOutput() + runner.T().Logf("found pid: %s", string(pgrep)) + if err == nil { + return true + } + return false + }, time.Minute*2, time.Second) + + // make sure agent still says we're healthy + agentClient := runner.fixture.Client() + err = agentClient.Connect(ctx) + require.NoError(runner.T(), err, "could not connect to local agent") + + require.Eventually(runner.T(), + func() bool { return agentAndEndpointAreHealthy(runner.T(), ctx, agentClient) }, + time.Minute*3, + time.Second, + "Endpoint component or units are not healthy.", + ) + + // catch the time endpoint is restarted, so we can filter for documents after a given time + endpointRestarted := time.Now() + + agentStatus, err := runner.fixture.ExecStatus(ctx) + require.NoError(runner.T(), err) + + // now query again, but make sure we're getting new metrics + require.Eventually(runner.T(), func() bool { + query := genESQueryByDate(agentStatus.Info.ID, runner.endpointID, endpointRestarted.Format(time.RFC3339)) + res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) + require.NoError(runner.T(), err) + runner.T().Logf("Fetched metrics for %s, got %d hits", runner.endpointID, res.Hits.Total.Value) + return res.Hits.Total.Value >= 1 + }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", runner.endpointID, agentStatus.Info.ID) +} + +func genESQueryByDate(agentID string, componentID string, dateAfter string) map[string]interface{} { + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "match": map[string]interface{}{ + "agent.id": agentID, + }, + }, + { + "match": map[string]interface{}{ + "component.id": componentID, + }, + }, + { + "range": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + "gte": dateAfter, + }, + }, + }, + { + "range": map[string]interface{}{ + "system.process.cpu.total.value": map[string]interface{}{ + "gt": 0, + }, + }, + }, + { + "range": map[string]interface{}{ + "system.process.memory.size": map[string]interface{}{ + "gt": 0, + }, + }, + }, + }, + }, + }, + } + + return queryRaw +} + +func genESQueryByBinary(agentID string, componentID string) map[string]interface{} { + // see https://github.com/elastic/kibana/blob/main/x-pack/plugins/fleet/server/services/agents/agent_metrics.ts + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "match": map[string]interface{}{ + "agent.id": agentID, + }, + }, + { + "match": map[string]interface{}{ + "component.id": componentID, + }, + }, + { + "range": map[string]interface{}{ + "system.process.cpu.total.value": map[string]interface{}{ + "gt": 0, + }, + }, + }, + { + "range": map[string]interface{}{ + "system.process.memory.size": map[string]interface{}{ + "gt": 0, + }, + }, + }, + }, + }, + }, + } + + return queryRaw +}