From e0621fa581f732e4b359e59a481ca61351a9d0dd Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 7 May 2024 13:18:06 -0700 Subject: [PATCH 01/26] first attempt --- go.mod | 2 +- go.sum | 2 ++ .../agent/application/coordinator/coordinator.go | 9 ++++++++- .../application/coordinator/coordinator_test.go | 2 +- .../pkg/agent/application/monitoring/v1_monitor.go | 14 ++++++++++++-- .../application/monitoring/v1_monitor_test.go | 4 ++-- internal/pkg/agent/cmd/inspect.go | 6 ++++-- internal/pkg/agent/install/uninstall.go | 3 ++- pkg/component/component.go | 5 +++-- pkg/component/component_test.go | 7 ++++--- pkg/component/runtime/runtime_comm.go | 1 + pkg/component/runtime/state.go | 8 ++++++++ 12 files changed, 48 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 4c25e74a1e9..6791e594a03 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 github.com/elastic/e2e-testing v1.1.0 github.com/elastic/elastic-agent-autodiscover v0.6.8 - github.com/elastic/elastic-agent-client/v7 v7.8.1 + github.com/elastic/elastic-agent-client/v7 v7.9.0 github.com/elastic/elastic-agent-libs v0.9.6 github.com/elastic/elastic-agent-system-metrics v0.9.2 github.com/elastic/elastic-transport-go/v8 v8.5.0 diff --git a/go.sum b/go.sum index 635f24c27b0..d1c91c441b3 100644 --- a/go.sum +++ b/go.sum @@ -795,6 +795,8 @@ github.com/elastic/elastic-agent-autodiscover v0.6.8 h1:BSXz+QwjZAEt08G+T3GDGl14 github.com/elastic/elastic-agent-autodiscover v0.6.8/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= github.com/elastic/elastic-agent-client/v7 v7.8.1 h1:J9wZc/0mUvSEok0X5iR5+n60Jgb+AWooKddb3XgPWqM= github.com/elastic/elastic-agent-client/v7 v7.8.1/go.mod h1:axl1nkdqc84YRFkeJGD9jExKNPUrOrzf3DFo2m653nY= +github.com/elastic/elastic-agent-client/v7 v7.9.0 h1:ryNbISIg4tTRT9KA0MYOa+fxW0CpsF+qxELWWb13rYE= +github.com/elastic/elastic-agent-client/v7 v7.9.0/go.mod h1:/AeiwX9zxG99eUNrLhpApTpwmE71Qwuh4ozObn7a0ss= github.com/elastic/elastic-agent-libs v0.9.6 h1:3paTd2JVkxTHH8rnlVZVrTLJgacR2l8jFr+NYHHCNio= github.com/elastic/elastic-agent-libs v0.9.6/go.mod h1:xhHF9jeWhPzKPtEHN+epKjdiZi0bCbACLxwkp1aHMpc= github.com/elastic/elastic-agent-system-metrics v0.9.2 h1:/tvTKOt55EerU0WwGFoDhBlyWLgxyv7d8xCbny0bciw= diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 9b7041d22ef..94a27e12f84 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -82,7 +82,7 @@ type MonitorManager interface { Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. - MonitoringConfig(map[string]interface{}, []component.Component, map[string]string) (map[string]interface{}, error) + MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, []uint64) (map[string]interface{}, error) } // Runner provides interface to run a manager and receive running errors. @@ -1240,11 +1240,18 @@ func (c *Coordinator) generateComponentModel() (err error) { configInjector = c.monitorMgr.MonitoringConfig } + existingState := c.State() + var existingCompState = make([]uint64, len(existingState.Components)) + for i, comp := range existingState.Components { + existingCompState[i] = comp.State.CheckinPid + } + comps, err := c.specs.ToComponents( cfg, configInjector, c.state.LogLevel, c.agentInfo, + existingCompState, ) if err != nil { return fmt.Errorf("failed to render components: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 18c2c3496c3..750366827eb 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1004,7 +1004,7 @@ func (*testMonitoringManager) Prepare(_ string) error func (*testMonitoringManager) Cleanup(string) error { return nil } func (*testMonitoringManager) Enabled() bool { return false } func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil } -func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string) (map[string]interface{}, error) { +func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ []uint64) (map[string]interface{}, error) { return nil, nil } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 82fce6ff29b..b5c0b761b2e 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -15,6 +15,7 @@ import ( "time" "unicode" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/utils" @@ -114,6 +115,7 @@ func (b *BeatsMonitor) MonitoringConfig( policy map[string]interface{}, components []component.Component, componentIDToBinary map[string]string, + existingState []uint64, ) (map[string]interface{}, error) { if !b.Enabled() { return nil, nil @@ -157,8 +159,9 @@ func (b *BeatsMonitor) MonitoringConfig( } } + // TODO: existingState will get passed here so we can fetch the PID if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, monitoringOutput, components); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, monitoringOutput, components, existingState); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -534,7 +537,7 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } -func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error { +func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component, existingStateServicePids []uint64) error { metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") @@ -619,6 +622,13 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } + // TODO: this is test code, remove/clean up later + for _, comp := range existingStateServicePids { + if comp != 0 { + logp.L().Infof("Got non-zero pid %v, components are: %#v", componentIDToBinary) + } + } + //create a new map with the monitoring beats included componentListWithMonitoring := map[string]string{ fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat", diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index f18a459b4cc..a08a8da30b2 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -56,7 +56,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { operatingSystem: runtime.GOOS, agentInfo: agentInfo, } - got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}) // put a componentID/binary mapping to have something in the beats monitoring input + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}, []uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input assert.NoError(t, err) rawInputs, ok := got["inputs"] @@ -146,7 +146,7 @@ func TestMonitoringConfigComponentFields(t *testing.T) { }, }, } - monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}) + monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}, []uint64{}) if err != nil { t.Fatalf("cannot render monitoring configuration: %s", err) } diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index 6b9155cec21..f7a0b82cce4 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -189,7 +189,8 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, binaryMapping[component.ID] = component.BinaryName() } } - monitorCfg, err := monitorFn(cfg, components, binaryMapping) + // TODO: how do we handle endpoint config? + monitorCfg, err := monitorFn(cfg, components, binaryMapping, []uint64{}) if err != nil { return fmt.Errorf("failed to get monitoring config: %w", err) } @@ -284,7 +285,8 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen } // Compute the components from the computed configuration. - comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo) + // TODO: how to we deal with the endpoint state here? + comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, []uint64{}) if err != nil { return fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index efcbafcaaff..4f4d8fc9c51 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -310,7 +310,8 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi if err != nil { return nil, aerrors.New("failed to create a map from config", err) } - allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil) + // TODO: how should this handle endpoint? + allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, []uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/pkg/component/component.go b/pkg/component/component.go index 954fc373a13..99435ecd950 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -25,7 +25,7 @@ import ( ) // GenerateMonitoringCfgFn is a function that can inject information into the model generation process. -type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string) (map[string]interface{}, error) +type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string, []uint64) (map[string]interface{}, error) type HeadersProvider interface { Headers() map[string]string @@ -284,6 +284,7 @@ func (r *RuntimeSpecs) ToComponents( monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider, + currentServiceCompInts []uint64, ) ([]Component, error) { components, err := r.PolicyToComponents(policy, ll, headers) if err != nil { @@ -297,7 +298,7 @@ func (r *RuntimeSpecs) ToComponents( for _, component := range components { binaryMapping[component.ID] = component.BinaryName() } - monitoringCfg, err := monitoringInjector(policy, components, binaryMapping) + monitoringCfg, err := monitoringInjector(policy, components, binaryMapping, currentServiceCompInts) if err != nil { return nil, fmt.Errorf("failed to inject monitoring: %w", err) } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 562ecc1450d..68658405cd4 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -3551,7 +3551,8 @@ func TestToComponents(t *testing.T) { runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck()) require.NoError(t, err) - result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers) + // TODO: figure out what to do with endpoint + result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, []uint64{}) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) } else { @@ -4062,7 +4063,7 @@ func TestFlattenedDataStream(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, []uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } @@ -4163,7 +4164,7 @@ func TestFlattenedDataStreamIsolatedUnits(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, []uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index 79eeb9f89c6..f56ae3f0f66 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -293,6 +293,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p close(recvDone) return } + c.logger.Infof("got checkin with pid %d", checkin.Pid) c.checkinObserved <- checkin } }() diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index 67dc9c028d3..3604317fc65 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -75,6 +75,8 @@ type ComponentState struct { VersionInfo ComponentVersionInfo `yaml:"version_info"` + CheckinPid uint64 + // internal expectedUnits map[ComponentUnitKey]expectedUnitState @@ -269,6 +271,12 @@ func (s *ComponentState) syncUnits(comp *component.Component) bool { func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false + + if s.CheckinPid != checkin.Pid { + changed = true + s.CheckinPid = checkin.Pid + } + touched := make(map[ComponentUnitKey]bool) for _, unit := range checkin.Units { key := ComponentUnitKey{ From f165295e3025037e02e5f23c7df12fad89efab84 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 15 May 2024 10:56:50 -0700 Subject: [PATCH 02/26] still tinkering --- .../application/coordinator/coordinator.go | 15 +++- .../coordinator/coordinator_state.go | 14 +++ .../coordinator/coordinator_test.go | 2 +- .../application/monitoring/v1_monitor.go | 88 +++++++++++++++++-- .../application/monitoring/v1_monitor_test.go | 4 +- internal/pkg/agent/cmd/inspect.go | 4 +- internal/pkg/agent/install/uninstall.go | 2 +- magefile.go | 2 +- pkg/component/component.go | 4 +- pkg/component/component_test.go | 6 +- pkg/component/runtime/state.go | 2 + 11 files changed, 118 insertions(+), 25 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 94a27e12f84..5b2f09780f5 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -82,7 +82,7 @@ type MonitorManager interface { Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. - MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, []uint64) (map[string]interface{}, error) + MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error) } // Runner provides interface to run a manager and receive running errors. @@ -284,6 +284,8 @@ type Coordinator struct { // loop in runLoopIteration() is active and listening. // Should only be interacted with via CoordinatorActive() or runLoopIteration() heartbeatChan chan struct{} + + servicePidUpdate chan struct{} } // The channels Coordinator reads to receive updates from the various managers. @@ -378,6 +380,7 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. overrideStateChan: make(chan *coordinatorOverrideState), upgradeDetailsChan: make(chan *details.Details), heartbeatChan: make(chan struct{}), + servicePidUpdate: make(chan struct{}, 1), } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -1001,6 +1004,10 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case c.heartbeatChan <- struct{}{}: + case <-c.servicePidUpdate: + c.logger.Infof("got pid service update, refreshing config") + c.refreshComponentModel(ctx) + case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the @@ -1241,9 +1248,9 @@ func (c *Coordinator) generateComponentModel() (err error) { } existingState := c.State() - var existingCompState = make([]uint64, len(existingState.Components)) - for i, comp := range existingState.Components { - existingCompState[i] = comp.State.CheckinPid + var existingCompState = make(map[string]uint64, len(existingState.Components)) + for _, comp := range existingState.Components { + existingCompState[comp.Component.ID] = comp.State.CheckinPid } comps, err := c.specs.ToComponents( diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 0d6ba22bddd..b5bfc98b1a5 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -136,8 +136,14 @@ func (c *Coordinator) refreshState() { // Must be called on the main Coordinator goroutine. func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) { found := false + pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { + //will this work? + c.logger.Infof("got updated component state with pid %d", state.State.CheckinPid) + if other.State.CheckinPid != state.State.CheckinPid { + pidRequiresUpdate = true + } c.state.Components[i] = state found = true break @@ -145,6 +151,9 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) } if !found { c.state.Components = append(c.state.Components, state) + if state.State.CheckinPid != 0 { + pidRequiresUpdate = true + } } // In the case that the component has stopped, it is now removed. @@ -160,6 +169,11 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) } c.stateNeedsRefresh = true + + if pidRequiresUpdate { + c.logger.Infof("got pid update, refreshing config") + c.servicePidUpdate <- struct{}{} + } } // generateReportableState aggregates the internal state of the Coordinator diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 750366827eb..5b81d0f4bb3 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1004,7 +1004,7 @@ func (*testMonitoringManager) Prepare(_ string) error func (*testMonitoringManager) Cleanup(string) error { return nil } func (*testMonitoringManager) Enabled() bool { return false } func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil } -func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ []uint64) (map[string]interface{}, error) { +func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) { return nil, nil } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index b5c0b761b2e..c3de042b6a6 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -115,7 +115,7 @@ func (b *BeatsMonitor) MonitoringConfig( policy map[string]interface{}, components []component.Component, componentIDToBinary map[string]string, - existingState []uint64, + existingState map[string]uint64, ) (map[string]interface{}, error) { if !b.Enabled() { return nil, nil @@ -537,7 +537,7 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } -func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component, existingStateServicePids []uint64) error { +func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component, existingStateServicePids map[string]uint64) error { metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") @@ -622,13 +622,6 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } - // TODO: this is test code, remove/clean up later - for _, comp := range existingStateServicePids { - if comp != 0 { - logp.L().Infof("Got non-zero pid %v, components are: %#v", componentIDToBinary) - } - } - //create a new map with the monitoring beats included componentListWithMonitoring := map[string]string{ fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat", @@ -918,6 +911,83 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } + // If there's a checkin PID + for id, comp := range existingStateServicePids { + logp.L().Infof("component/pid monitoring map is: %#v", existingStateServicePids) + if comp != 0 && id == "elastic-endpoint" { + logp.L().Infof("creating system/process watcher for pid %d", comp) + // TODO: No idea if these metadata fields are correct + inputs = append(inputs, map[string]interface{}{ + idKey: fmt.Sprintf("%s-endpoint", monitoringMetricsUnitID), + "name": fmt.Sprintf("%s-endpoint", monitoringMetricsUnitID), + "type": "system/metrics", + useOutputKey: monitoringOutput, + "data_stream": map[string]interface{}{ + "namespace": monitoringNamespace, + }, + "streams": map[string]interface{}{ + idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + "endpoint", + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint", + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"process"}, + "period": metricsCollectionIntervalString, + "index": fmt.Sprintf("metrics-elastic_agent.endpoint-%s", monitoringNamespace), + "process.pid": comp, + "processors": []interface{}{ + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint", + "namespace": monitoringNamespace, + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": "elastic_agent.endpoint", + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + "version": b.agentInfo.Version(), + "snapshot": b.agentInfo.Snapshot(), + "process": "endpoint", + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + }, + }, + }, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "component", + "fields": map[string]interface{}{ + "binary": "endpoint", + }, + }, + }, + }, + }, + }) + } + } + // if we have shipper data, inject the extra inputs if len(shipperHTTPStreams) > 0 { inputs = append(inputs, map[string]interface{}{ diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index a08a8da30b2..db0730b5e7e 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -56,7 +56,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { operatingSystem: runtime.GOOS, agentInfo: agentInfo, } - got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}, []uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input assert.NoError(t, err) rawInputs, ok := got["inputs"] @@ -146,7 +146,7 @@ func TestMonitoringConfigComponentFields(t *testing.T) { }, }, } - monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}, []uint64{}) + monitoringConfig, err := b.MonitoringConfig(policy, components, map[string]string{"filestream-default": "filebeat"}, map[string]uint64{}) if err != nil { t.Fatalf("cannot render monitoring configuration: %s", err) } diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index f7a0b82cce4..a17d3046e3f 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -190,7 +190,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, } } // TODO: how do we handle endpoint config? - monitorCfg, err := monitorFn(cfg, components, binaryMapping, []uint64{}) + monitorCfg, err := monitorFn(cfg, components, binaryMapping, map[string]uint64{}) if err != nil { return fmt.Errorf("failed to get monitoring config: %w", err) } @@ -286,7 +286,7 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen // Compute the components from the computed configuration. // TODO: how to we deal with the endpoint state here? - comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, []uint64{}) + comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, map[string]uint64{}) if err != nil { return fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index b041ca1e808..f6cdb445a15 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -311,7 +311,7 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi return nil, aerrors.New("failed to create a map from config", err) } // TODO: how should this handle endpoint? - allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, []uint64{}) + allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/magefile.go b/magefile.go index fb6a795bf63..40ff0c9f97c 100644 --- a/magefile.go +++ b/magefile.go @@ -1160,7 +1160,7 @@ func flattenDependencies(requiredPackages []string, packageVersion, archivePath, log.Printf(">>> Extracting %s to %s", m, versionedFlatPath) } if err := devtools.Extract(m, versionedFlatPath); err != nil { - panic(err) + panic(fmt.Errorf("error extracting %s: %s", m, err)) } } diff --git a/pkg/component/component.go b/pkg/component/component.go index 99435ecd950..a9fb32866ac 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -25,7 +25,7 @@ import ( ) // GenerateMonitoringCfgFn is a function that can inject information into the model generation process. -type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string, []uint64) (map[string]interface{}, error) +type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string, map[string]uint64) (map[string]interface{}, error) type HeadersProvider interface { Headers() map[string]string @@ -284,7 +284,7 @@ func (r *RuntimeSpecs) ToComponents( monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider, - currentServiceCompInts []uint64, + currentServiceCompInts map[string]uint64, ) ([]Component, error) { components, err := r.PolicyToComponents(policy, ll, headers) if err != nil { diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 68658405cd4..e38d4d3e8ef 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -3552,7 +3552,7 @@ func TestToComponents(t *testing.T) { require.NoError(t, err) // TODO: figure out what to do with endpoint - result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, []uint64{}) + result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, map[string]uint64{}) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) } else { @@ -4063,7 +4063,7 @@ func TestFlattenedDataStream(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, []uint64{}) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } @@ -4164,7 +4164,7 @@ func TestFlattenedDataStreamIsolatedUnits(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, []uint64{}) + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index 3604317fc65..eb0197bcc8b 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/pkg/component" ) @@ -273,6 +274,7 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false if s.CheckinPid != checkin.Pid { + logp.L().Infof("got updated pid %d for component %s", checkin.Pid, s.Component.String()) changed = true s.CheckinPid = checkin.Pid } From 99de46cd096b8bf8f6f3d5fd1f547588f643d159 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 21 May 2024 11:33:59 -0700 Subject: [PATCH 03/26] first draft change --- dev-tools/mage/settings.go | 7 +- go.sum | 10 - .../application/monitoring/v1_monitor.go | 113 +++++----- .../application/monitoring/v1_monitor_test.go | 76 +++++++ magefile.go | 195 ++++++++++-------- testing/integration/endpoint_security_test.go | 112 +--------- testing/integration/helpers.go | 126 +++++++++++ .../integration/monitoring_endpoint_test.go | 150 ++++++++++++++ 8 files changed, 523 insertions(+), 266 deletions(-) create mode 100644 testing/integration/helpers.go create mode 100644 testing/integration/monitoring_endpoint_test.go diff --git a/dev-tools/mage/settings.go b/dev-tools/mage/settings.go index 011c9795373..2f7f2cd18b6 100644 --- a/dev-tools/mage/settings.go +++ b/dev-tools/mage/settings.go @@ -86,7 +86,7 @@ var ( Snapshot bool DevBuild bool - ExternalBuild bool + ExternalBuild string versionQualified bool versionQualifier string @@ -147,10 +147,7 @@ func initGlobals() { panic(fmt.Errorf("failed to parse DEV env value: %w", err)) } - ExternalBuild, err = strconv.ParseBool(EnvOr("EXTERNAL", "false")) - if err != nil { - panic(fmt.Errorf("failed to parse EXTERNAL env value: %w", err)) - } + ExternalBuild = EnvOr("EXTERNAL", "false") versionQualifier, versionQualified = os.LookupEnv("VERSION_QUALIFIER") diff --git a/go.sum b/go.sum index f093ad2883f..13b8ae6152e 100644 --- a/go.sum +++ b/go.sum @@ -789,16 +789,6 @@ github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdf github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/elastic/e2e-testing v1.2.0 h1:XGbsAMb05zdqHm9Hi9TunzcWSM97qv9vMuSaeEaXDi8= -github.com/elastic/e2e-testing v1.2.0/go.mod h1:8q2d8dmwavJXISowwaoreHFBnbR/uK4qanfRGhC/W9A= -github.com/elastic/elastic-agent-autodiscover v0.6.8 h1:BSXz+QwjZAEt08G+T3GDGl14Bh9a6zD8luNCvZut/b8= -github.com/elastic/elastic-agent-autodiscover v0.6.8/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= -github.com/elastic/elastic-agent-client/v7 v7.8.1 h1:J9wZc/0mUvSEok0X5iR5+n60Jgb+AWooKddb3XgPWqM= -github.com/elastic/elastic-agent-client/v7 v7.8.1/go.mod h1:axl1nkdqc84YRFkeJGD9jExKNPUrOrzf3DFo2m653nY= -github.com/elastic/elastic-agent-client/v7 v7.9.0 h1:ryNbISIg4tTRT9KA0MYOa+fxW0CpsF+qxELWWb13rYE= -github.com/elastic/elastic-agent-client/v7 v7.9.0/go.mod h1:/AeiwX9zxG99eUNrLhpApTpwmE71Qwuh4ozObn7a0ss= -github.com/elastic/elastic-agent-libs v0.9.6 h1:3paTd2JVkxTHH8rnlVZVrTLJgacR2l8jFr+NYHHCNio= -github.com/elastic/elastic-agent-libs v0.9.6/go.mod h1:xhHF9jeWhPzKPtEHN+epKjdiZi0bCbACLxwkp1aHMpc= github.com/elastic/e2e-testing v1.2.1 h1:jIuikohPtTxtO+bfoVEyKAWmcsAl21lxiiTK8Fj+G8U= github.com/elastic/e2e-testing v1.2.1/go.mod h1:8q2d8dmwavJXISowwaoreHFBnbR/uK4qanfRGhC/W9A= github.com/elastic/elastic-agent-autodiscover v0.6.14 h1:0zJYNyv9GKTOiNqCHqEVboP+WioV73ia17Et+UlFbz8= diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index c3de042b6a6..06fef7ef7a6 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -115,7 +115,7 @@ func (b *BeatsMonitor) MonitoringConfig( policy map[string]interface{}, components []component.Component, componentIDToBinary map[string]string, - existingState map[string]uint64, + existingStatePidMap map[string]uint64, ) (map[string]interface{}, error) { if !b.Enabled() { return nil, nil @@ -159,9 +159,8 @@ func (b *BeatsMonitor) MonitoringConfig( } } - // TODO: existingState will get passed here so we can fetch the PID if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, monitoringOutput, components, existingState); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, components, existingStatePidMap); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -537,7 +536,7 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } -func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component, existingStateServicePids map[string]uint64) error { +func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, componentList []component.Component, existingStateServicePids map[string]uint64) error { metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") @@ -911,74 +910,78 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } - // If there's a checkin PID + // add system/process metrics for services that can't be monitored via json/beats metrics + // If there's a checkin PID and it contains the "endpoint" string, assume we want to monitor it for id, comp := range existingStateServicePids { logp.L().Infof("component/pid monitoring map is: %#v", existingStateServicePids) - if comp != 0 && id == "elastic-endpoint" { + if comp != 0 && strings.Contains(id, "endpoint") { logp.L().Infof("creating system/process watcher for pid %d", comp) - // TODO: No idea if these metadata fields are correct inputs = append(inputs, map[string]interface{}{ - idKey: fmt.Sprintf("%s-endpoint", monitoringMetricsUnitID), - "name": fmt.Sprintf("%s-endpoint", monitoringMetricsUnitID), + idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + "name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), "type": "system/metrics", useOutputKey: monitoringOutput, "data_stream": map[string]interface{}{ "namespace": monitoringNamespace, }, - "streams": map[string]interface{}{ - idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + "endpoint", - "data_stream": map[string]interface{}{ - "type": "metrics", - "dataset": "elastic_agent.endpoint", - "namespace": monitoringNamespace, - }, - "metricsets": []interface{}{"process"}, - "period": metricsCollectionIntervalString, - "index": fmt.Sprintf("metrics-elastic_agent.endpoint-%s", monitoringNamespace), - "process.pid": comp, - "processors": []interface{}{ - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "data_stream", - "fields": map[string]interface{}{ - "type": "metrics", - "dataset": "elastic_agent.endpoint", - "namespace": monitoringNamespace, + "streams": []interface{}{ + map[string]interface{}{ + idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint_security", + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"process"}, + "period": metricsCollectionIntervalString, + "index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace), + "process.pid": comp, + "process.cgroups.enabled": false, + "processors": []interface{}{ + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint_security", + "namespace": monitoringNamespace, + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "event", - "fields": map[string]interface{}{ - "dataset": "elastic_agent.endpoint", + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": "elastic_agent.endpoint_security", + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "elastic_agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), - "version": b.agentInfo.Version(), - "snapshot": b.agentInfo.Snapshot(), - "process": "endpoint", + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + "version": b.agentInfo.Version(), + "snapshot": b.agentInfo.Snapshot(), + "process": "endpoint_security", + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "component", - "fields": map[string]interface{}{ - "binary": "endpoint", + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "component", + "fields": map[string]interface{}{ + "binary": "endpoint_security", + "id": id, + }, }, }, }, diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index db0730b5e7e..4772cb608af 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -22,6 +22,82 @@ import ( "github.com/elastic/elastic-agent/pkg/component" ) +func TestMonitoringWithEndpoint(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + testMon := BeatsMonitor{ + enabled: true, + config: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + + Enabled: true, + }, + }, + }, + agentInfo: agentInfo, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + + // manually declaring all the MonitoringConfig() args since there's a lot of them, and this makes + // the test a little more self-describing + + var compList []component.Component + + compIdToBinary := map[string]string{ + "endpoint-default": "endpoint-security", + "filebeat-default": "filebeat", + } + existingPidStateMap := map[string]uint64{ + "endpoint-default": 1234, + } + + outCfg, err := testMon.MonitoringConfig(policy, compList, compIdToBinary, existingPidStateMap) + require.NoError(t, err) + + inputCfg := outCfg["inputs"].([]interface{}) + + foundConfig := false + + for _, cfg := range inputCfg { + unwrappedCfg := cfg.(map[string]interface{}) + if idName, ok := unwrappedCfg["id"]; ok && idName == "metrics-monitoring-endpoint_security" { + foundConfig = true + for compName, compCfg := range unwrappedCfg { + if compName == "streams" { + streamCfgUnwrapped := compCfg.([]interface{}) + for _, streamCfg := range streamCfgUnwrapped { + streamValues := streamCfg.(map[string]interface{}) + require.Equal(t, []interface{}{"process"}, streamValues["metricsets"]) + require.Equal(t, "metrics-elastic_agent.endpoint_security-default", streamValues["index"]) + require.Equal(t, uint64(1234), streamValues["process.pid"]) + } + } + + } + } + } + + require.True(t, foundConfig) +} + func TestMonitoringConfigMetricsInterval(t *testing.T) { agentInfo, err := info.NewAgentInfo(context.Background(), false) diff --git a/magefile.go b/magefile.go index f86e99ac945..975b4344d24 100644 --- a/magefile.go +++ b/magefile.go @@ -196,7 +196,7 @@ func (Dev) Package() { os.Setenv(devEnv, "true") if _, hasExternal := os.LookupEnv(externalArtifacts); !hasExternal { - devtools.ExternalBuild = true + devtools.ExternalBuild = "true" } devtools.DevBuild = true @@ -813,7 +813,7 @@ func (Cloud) Image() { devtools.SelectedPackageTypes = []devtools.PackageType{devtools.Docker} if _, hasExternal := os.LookupEnv(externalArtifacts); !hasExternal { - devtools.ExternalBuild = true + devtools.ExternalBuild = "true" } Package() @@ -1055,7 +1055,7 @@ func collectPackageDependencies(platforms []string, packageVersion string, requi os.Setenv(agentDropPath, dropPath) - if devtools.ExternalBuild == true { + if devtools.ExternalBuild == "true" { // Map of binaries to download to their project name in the unified-release manager. // The project names are used to generate the URLs when downloading binaries. For example: // @@ -1077,101 +1077,126 @@ func collectPackageDependencies(platforms []string, packageVersion string, requi "pf-elastic-symbolizer": "prodfiler", "pf-host-agent": "prodfiler", } + downloadFromExternalBuild(platforms, archivePath, packageVersion, externalBinaries) + } else if devtools.ExternalBuild == "agentbeat" { + // package agentbeat, then also pull external builds for non-agentbeat binaries + externalBinaries := map[string]string{ + "cloudbeat": "cloudbeat", // only supporting linux/amd64 or linux/arm64 + "cloud-defend": "cloud-defend", + "apm-server": "apm-server", // not supported on darwin/aarch64 + "endpoint-security": "endpoint-dev", + "fleet-server": "fleet-server", + "pf-elastic-collector": "prodfiler", + "pf-elastic-symbolizer": "prodfiler", + "pf-host-agent": "prodfiler", + } + downloadFromExternalBuild(platforms, archivePath, packageVersion, externalBinaries) + packageAgentBeat(packageVersion, archivePath, requiredPackages) - // Only log fatal logs for logs produced using logrus. This is the global logger - // used by github.com/elastic/e2e-testing/pkg/downloads which can only be configured globally like this or via - // environment variables. - // - // Using FatalLevel avoids filling the build log with scary looking errors when we attempt to - // download artifacts on unsupported platforms and choose to ignore the errors. - // - // Change this to InfoLevel to see exactly what the downloader is doing. - logrus.SetLevel(logrus.FatalLevel) - - errGroup, ctx := errgroup.WithContext(context.Background()) - completedDownloads := &atomic.Int32{} - for binary, project := range externalBinaries { - for _, platform := range platforms { - reqPackage := platformPackages[platform] - targetPath := filepath.Join(archivePath, reqPackage) - os.MkdirAll(targetPath, 0755) - newVersion, packageName := getPackageName(binary, packageVersion, reqPackage) - errGroup.Go(downloadBinary(ctx, project, packageName, binary, platform, newVersion, targetPath, completedDownloads)) - } + } else { + packageAgentBeat(packageVersion, archivePath, requiredPackages) + } + } else { + archivePath = movePackagesToArchive(dropPath, requiredPackages) + } + return archivePath, dropPath +} + +// downloadFromExternalBuild downloads the component binaries and places them in the drop path for the rest of the build +func downloadFromExternalBuild(platforms []string, archivePath string, packageVersion string, externalBinaries map[string]string) { + + // Only log fatal logs for logs produced using logrus. This is the global logger + // used by github.com/elastic/e2e-testing/pkg/downloads which can only be configured globally like this or via + // environment variables. + // + // Using FatalLevel avoids filling the build log with scary looking errors when we attempt to + // download artifacts on unsupported platforms and choose to ignore the errors. + // + // Change this to InfoLevel to see exactly what the downloader is doing. + logrus.SetLevel(logrus.FatalLevel) + + errGroup, ctx := errgroup.WithContext(context.Background()) + completedDownloads := &atomic.Int32{} + for binary, project := range externalBinaries { + for _, platform := range platforms { + reqPackage := platformPackages[platform] + targetPath := filepath.Join(archivePath, reqPackage) + os.MkdirAll(targetPath, 0755) + newVersion, packageName := getPackageName(binary, packageVersion, reqPackage) + errGroup.Go(downloadBinary(ctx, project, packageName, binary, platform, newVersion, targetPath, completedDownloads)) + } + } + + err := errGroup.Wait() + if err != nil { + panic(err) + } + if completedDownloads.Load() == 0 { + panic(fmt.Sprintf("No packages were successfully downloaded. You may be building against an invalid or unreleased version. version=%s. If this is an unreleased version, try SNAPSHOT=true or EXTERNAL=false", packageVersion)) + } +} + +// packageAgentBeat packages the beat from the local code in the beats path +func packageAgentBeat(packageVersion string, archivePath string, requiredPackages []string) { + packedBeats := []string{"agentbeat"} + // build from local repo, will assume beats repo is located on the same root level + for _, b := range packedBeats { + pwd, err := filepath.Abs(filepath.Join("../beats/x-pack", b)) + if err != nil { + panic(err) + } + + packagesCopied := 0 + + if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { + fmt.Printf("--- Package %s\n", pwd) + cmd := exec.Command("mage", "package") + cmd.Dir = pwd + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), fmt.Sprintf("PWD=%s", pwd), "AGENT_PACKAGING=on") + if envVar := selectedPackageTypes(); envVar != "" { + cmd.Env = append(cmd.Env, envVar) } - err = errGroup.Wait() - if err != nil { + if err := cmd.Run(); err != nil { panic(err) } - if completedDownloads.Load() == 0 { - panic(fmt.Sprintf("No packages were successfully downloaded. You may be building against an invalid or unreleased version. version=%s. If this is an unreleased version, try SNAPSHOT=true or EXTERNAL=false", packageVersion)) - } - } else { - packedBeats := []string{"agentbeat"} - // build from local repo, will assume beats repo is located on the same root level - for _, b := range packedBeats { - pwd, err := filepath.Abs(filepath.Join("../beats/x-pack", b)) - if err != nil { - panic(err) - } + } - packagesCopied := 0 - - if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { - fmt.Printf("--- Package %s\n", pwd) - cmd := exec.Command("mage", "package") - cmd.Dir = pwd - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = append(os.Environ(), fmt.Sprintf("PWD=%s", pwd), "AGENT_PACKAGING=on") - if envVar := selectedPackageTypes(); envVar != "" { - cmd.Env = append(cmd.Env, envVar) - } + // copy to new drop + sourcePath := filepath.Join(pwd, "build", "distributions") + for _, rp := range requiredPackages { + files, err := filepath.Glob(filepath.Join(sourcePath, "*"+rp+"*")) + if err != nil { + panic(err) + } - if err := cmd.Run(); err != nil { - panic(err) - } + targetPath := filepath.Join(archivePath, rp) + os.MkdirAll(targetPath, 0755) + for _, f := range files { + // safety check; if the user has an older version of the beats repo, + // for example right after a release where you've `git pulled` from on repo and not the other, + // they might end up with a mishmash of packages from different versions. + // check to see if we have mismatched versions. + if !strings.Contains(f, packageVersion) { + // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. + panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) } - // copy to new drop - sourcePath := filepath.Join(pwd, "build", "distributions") - for _, rp := range requiredPackages { - files, err := filepath.Glob(filepath.Join(sourcePath, "*"+rp+"*")) - if err != nil { - panic(err) - } - - targetPath := filepath.Join(archivePath, rp) - os.MkdirAll(targetPath, 0755) - for _, f := range files { - // safety check; if the user has an older version of the beats repo, - // for example right after a release where you've `git pulled` from on repo and not the other, - // they might end up with a mishmash of packages from different versions. - // check to see if we have mismatched versions. - if !strings.Contains(f, packageVersion) { - // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. - panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) - } - - targetFile := filepath.Join(targetPath, filepath.Base(f)) - packagesCopied += 1 - if err := sh.Copy(targetFile, f); err != nil { - panic(err) - } - } - } - // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied - // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. - if packagesCopied == 0 { - fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") + targetFile := filepath.Join(targetPath, filepath.Base(f)) + packagesCopied += 1 + if err := sh.Copy(targetFile, f); err != nil { + panic(err) } } } - } else { - archivePath = movePackagesToArchive(dropPath, requiredPackages) + // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied + // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. + if packagesCopied == 0 { + fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") + } } - return archivePath, dropPath } // flattenDependencies will extract all the required packages collected in archivePath and dropPath in flatPath and diff --git a/testing/integration/endpoint_security_test.go b/testing/integration/endpoint_security_test.go index 15342b3e20f..cd416b560e5 100644 --- a/testing/integration/endpoint_security_test.go +++ b/testing/integration/endpoint_security_test.go @@ -8,10 +8,8 @@ package integration import ( "archive/zip" - "bytes" "context" - _ "embed" - "encoding/json" + "fmt" "io/fs" "os" @@ -20,7 +18,6 @@ import ( "slices" "strings" "testing" - "text/template" "time" "github.com/google/uuid" @@ -39,21 +36,9 @@ import ( ) const ( - // TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package - endpointPackageVersion = "8.11.0" endpointHealthPollingTimeout = 2 * time.Minute ) -//go:embed endpoint_security_package.json.tmpl -var endpointPackagePolicyTemplate string - -type endpointPackageTemplateVars struct { - ID string - Name string - PolicyID string - Version string -} - var protectionTests = []struct { name string protected bool @@ -451,52 +436,6 @@ type agentPolicyUpdateRequest struct { IsProtected bool `json:"is_protected"` } -// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. -func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { - t.Helper() - - t.Log("Templating endpoint package policy request") - tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) - if err != nil { - return r, fmt.Errorf("error creating new template: %w", err) - } - - packagePolicyID := uuid.New().String() - var pkgPolicyBuf bytes.Buffer - - // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get - // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} - err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ - ID: packagePolicyID, - Name: "Defend-" + packagePolicyID, - PolicyID: policyID, - Version: endpointPackageVersion, - }) - if err != nil { - return r, fmt.Errorf("error executing template: %w", err) - } - - // Make sure the templated value is actually valid JSON before making the API request. - // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. - packagePolicyReq := kibana.PackagePolicyRequest{} - err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) - if err != nil { - return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) - } - - t.Log("POST /api/fleet/package_policies") - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) - if err != nil { - t.Logf("Error installing fleet package: %v", err) - return r, fmt.Errorf("error installing fleet package: %w", err) - } - t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) - return pkgResp, err -} - // Tests that install of Elastic Defend fails if Agent is installed in a base // path other than default func TestEndpointSecurityNonDefaultBasePath(t *testing.T) { @@ -813,55 +752,6 @@ func checkDiagnosticsForEndpointFiles(t *testing.T, diagsPath string, endpointCo } } -func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { - t.Helper() - - state, err := agentClient.State(ctx) - if err != nil { - t.Logf("Error getting agent state: %s", err) - return false - } - - if state.State != client.Healthy { - t.Logf("local Agent is not Healthy: current state: %+v", state) - return false - } - - foundEndpointInputUnit := false - foundEndpointOutputUnit := false - for _, comp := range state.Components { - isEndpointComponent := strings.Contains(comp.Name, "endpoint") - if comp.State != client.Healthy { - t.Logf("endpoint component is not Healthy: current state: %+v", comp) - return false - } - - for _, unit := range comp.Units { - if isEndpointComponent { - if unit.UnitType == client.UnitTypeInput { - foundEndpointInputUnit = true - } - if unit.UnitType == client.UnitTypeOutput { - foundEndpointOutputUnit = true - } - } - - if unit.State != client.Healthy { - t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) - return false - } - } - } - - // Ensure both the endpoint input and output units were found and healthy. - if !foundEndpointInputUnit || !foundEndpointOutputUnit { - t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) - return false - } - - return true -} - func agentIsHealthyNoEndpoint(t *testing.T, ctx context.Context, agentClient client.Client) bool { t.Helper() diff --git a/testing/integration/helpers.go b/testing/integration/helpers.go new file mode 100644 index 00000000000..e6b25ec6fe8 --- /dev/null +++ b/testing/integration/helpers.go @@ -0,0 +1,126 @@ +package integration + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "strings" + "testing" + "text/template" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/google/uuid" +) + +//go:embed endpoint_security_package.json.tmpl +var endpointPackagePolicyTemplate string + +type endpointPackageTemplateVars struct { + ID string + Name string + PolicyID string + Version string +} + +// TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package +const endpointPackageVersion = "8.11.0" + +func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { + t.Helper() + + state, err := agentClient.State(ctx) + if err != nil { + t.Logf("Error getting agent state: %s", err) + return false + } + + if state.State != client.Healthy { + t.Logf("local Agent is not Healthy: current state: %+v", state) + return false + } + + foundEndpointInputUnit := false + foundEndpointOutputUnit := false + for _, comp := range state.Components { + isEndpointComponent := strings.Contains(comp.Name, "endpoint") + if comp.State != client.Healthy { + t.Logf("endpoint component is not Healthy: current state: %+v", comp) + return false + } + + for _, unit := range comp.Units { + if isEndpointComponent { + if unit.UnitType == client.UnitTypeInput { + foundEndpointInputUnit = true + } + if unit.UnitType == client.UnitTypeOutput { + foundEndpointOutputUnit = true + } + } + + if unit.State != client.Healthy { + t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) + return false + } + } + } + + // Ensure both the endpoint input and output units were found and healthy. + if !foundEndpointInputUnit || !foundEndpointOutputUnit { + t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) + return false + } + + return true +} + +// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. +func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { + t.Helper() + + t.Log("Templating endpoint package policy request") + tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) + if err != nil { + return r, fmt.Errorf("error creating new template: %w", err) + } + + packagePolicyID := uuid.New().String() + var pkgPolicyBuf bytes.Buffer + + // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get + // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} + err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ + ID: packagePolicyID, + Name: "Defend-" + packagePolicyID, + PolicyID: policyID, + Version: endpointPackageVersion, + }) + if err != nil { + return r, fmt.Errorf("error executing template: %w", err) + } + + // Make sure the templated value is actually valid JSON before making the API request. + // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. + packagePolicyReq := kibana.PackagePolicyRequest{} + err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) + if err != nil { + return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) + } + + t.Log("POST /api/fleet/package_policies") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) + if err != nil { + t.Logf("Error installing fleet package: %v", err) + return r, fmt.Errorf("error installing fleet package: %w", err) + } + t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) + return pkgResp, err +} diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go new file mode 100644 index 00000000000..d5748c60269 --- /dev/null +++ b/testing/integration/monitoring_endpoint_test.go @@ -0,0 +1,150 @@ +//go:build integration + +package integration + +import ( + "context" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type EndpointMetricsMonRunner struct { + suite.Suite + info *define.Info + fixture *atesting.Fixture +} + +func TestEndpointAgentServiceMonitoring(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: "fleet", + Stack: &define.Stack{}, + Local: false, // requires Agent installation + Sudo: true, // requires Agent installation + OS: []define.OS{ + {Type: define.Linux}, + }, + }) + + // Get path to agent executable. + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err, "could not create agent fixture") + + runner := &EndpointMetricsMonRunner{ + info: info, + fixture: fixture, + } + + suite.Run(t, runner) +} + +func (runner *EndpointMetricsMonRunner) SetupSuite() { + deadline := time.Now().Add(10 * time.Minute) + ctx, cancel := testcontext.WithDeadline(runner.T(), context.Background(), deadline) + defer cancel() + + runner.T().Log("Enrolling the agent in Fleet") + policyUUID := uuid.New().String() + + createPolicyReq := kibana.AgentPolicy{ + Name: "test-policy-" + policyUUID, + Namespace: "default", + Description: "Test policy " + policyUUID, + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + } + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + Privileged: true, + } + + policy, err := tools.InstallAgentWithPolicy(ctx, runner.T(), + installOpts, runner.fixture, runner.info.KibanaClient, createPolicyReq) + require.NoError(runner.T(), err, "failed to install agent with policy") + + runner.T().Log("Installing Elastic Defend") + pkgPolicyResp, err := installElasticDefendPackage(runner.T(), runner.info, policy.ID) + require.NoErrorf(runner.T(), err, "Policy Response was: %v", pkgPolicyResp) + + runner.T().Log("Polling for endpoint-security to become Healthy") + ctx, cancel = context.WithTimeout(ctx, time.Minute*3) + defer cancel() + + agentClient := runner.fixture.Client() + err = agentClient.Connect(ctx) + require.NoError(runner.T(), err, "could not connect to local agent") + + require.Eventually(runner.T(), + func() bool { return agentAndEndpointAreHealthy(runner.T(), ctx, agentClient) }, + time.Minute*3, + time.Second, + "Endpoint component or units are not healthy.", + ) +} + +func (runner *EndpointMetricsMonRunner) TestEndpointMetrics() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) + defer cancel() + + agentStatus, err := runner.fixture.ExecStatus(ctx) + require.NoError(runner.T(), err) + + endpointID := "endpoint-default" + require.Eventually(runner.T(), func() bool { + + query := genESQueryByBinary(agentStatus.Info.ID, endpointID) + res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) + require.NoError(runner.T(), err) + runner.T().Logf("Fetched metrics for %s, got %d hits", endpointID, res.Hits.Total.Value) + return res.Hits.Total.Value >= 1 + }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", endpointID, agentStatus.Info.ID) +} + +// TODO: move to helpers.go +func genESQueryByBinary(agentID string, componentID string) map[string]interface{} { + // see https://github.com/elastic/kibana/blob/main/x-pack/plugins/fleet/server/services/agents/agent_metrics.ts + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + // { + // "match": map[string]interface{}{ + // "agent.id": agentID, + // }, + // }, + { + "match": map[string]interface{}{ + "component.id": componentID, + }, + }, + // make sure we fetch documents that have the metric field used by fleet monitoring + // { + // "exists": map[string]interface{}{ + // "field": "system.process.cpu.total.value", + // }, + // }, + // { + // "exists": map[string]interface{}{ + // "field": "system.process.memory.size", + // }, + // }, + }, + }, + }, + } + + return queryRaw +} From 30028fffc37d1d9485e4d1aac5770c14636c5e8b Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 21 May 2024 11:39:50 -0700 Subject: [PATCH 04/26] cleanup --- .../coordinator/coordinator_state.go | 3 -- internal/pkg/agent/install/uninstall.go | 1 - pkg/component/component_test.go | 1 - pkg/component/runtime/state.go | 2 -- .../integration/monitoring_endpoint_test.go | 28 +++++++++---------- 5 files changed, 14 insertions(+), 21 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index b5bfc98b1a5..4fbb26e2914 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -139,8 +139,6 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { - //will this work? - c.logger.Infof("got updated component state with pid %d", state.State.CheckinPid) if other.State.CheckinPid != state.State.CheckinPid { pidRequiresUpdate = true } @@ -171,7 +169,6 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) c.stateNeedsRefresh = true if pidRequiresUpdate { - c.logger.Infof("got pid update, refreshing config") c.servicePidUpdate <- struct{}{} } } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index f6cdb445a15..10ab519e4a4 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -310,7 +310,6 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi if err != nil { return nil, aerrors.New("failed to create a map from config", err) } - // TODO: how should this handle endpoint? allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index e38d4d3e8ef..9b4d3943c0e 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -3551,7 +3551,6 @@ func TestToComponents(t *testing.T) { runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck()) require.NoError(t, err) - // TODO: figure out what to do with endpoint result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, map[string]uint64{}) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index eb0197bcc8b..3604317fc65 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/pkg/component" ) @@ -274,7 +273,6 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false if s.CheckinPid != checkin.Pid { - logp.L().Infof("got updated pid %d for component %s", checkin.Pid, s.Component.String()) changed = true s.CheckinPid = checkin.Pid } diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index d5748c60269..f098f02c73e 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -120,27 +120,27 @@ func genESQueryByBinary(agentID string, componentID string) map[string]interface "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ - // { - // "match": map[string]interface{}{ - // "agent.id": agentID, - // }, - // }, { "match": map[string]interface{}{ - "component.id": componentID, + "agent.id": agentID, }, }, - // make sure we fetch documents that have the metric field used by fleet monitoring - // { - // "exists": map[string]interface{}{ - // "field": "system.process.cpu.total.value", - // }, - // }, + // will not work due to https://github.com/elastic/integrations/pull/9928 // { - // "exists": map[string]interface{}{ - // "field": "system.process.memory.size", + // "match": map[string]interface{}{ + // "component.id": componentID, // }, // }, + { + "exists": map[string]interface{}{ + "field": "system.process.cpu.total.value", + }, + }, + { + "exists": map[string]interface{}{ + "field": "system.process.memory.size", + }, + }, }, }, }, From 13599385eb53c6bc32e758894e38a0bc9a00cde5 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 21 May 2024 12:37:10 -0700 Subject: [PATCH 05/26] clean up --- .../pkg/agent/application/coordinator/coordinator.go | 7 ++++++- testing/integration/helpers.go | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 96b25682c9d..d907b01c272 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1037,7 +1037,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case <-c.servicePidUpdate: c.logger.Infof("got pid service update, refreshing config") - c.refreshComponentModel(ctx) + err := c.refreshComponentModel(ctx) + if err != nil { + err = fmt.Errorf("error refreshing component model for PID update: %w", err) + c.setConfigManagerError(err) + c.logger.Errorf("%s", err) + } case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via diff --git a/testing/integration/helpers.go b/testing/integration/helpers.go index e6b25ec6fe8..db924be4845 100644 --- a/testing/integration/helpers.go +++ b/testing/integration/helpers.go @@ -1,3 +1,9 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + package integration import ( @@ -11,10 +17,11 @@ import ( "text/template" "time" + "github.com/google/uuid" + "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/testing/define" - "github.com/google/uuid" ) //go:embed endpoint_security_package.json.tmpl From 354b7d09781d241a0e3eca33ba30d87a8cd28fc6 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 21 May 2024 12:48:49 -0700 Subject: [PATCH 06/26] add changelog --- .../1716320508-add-endpoint-monitoring.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1716320508-add-endpoint-monitoring.yaml diff --git a/changelog/fragments/1716320508-add-endpoint-monitoring.yaml b/changelog/fragments/1716320508-add-endpoint-monitoring.yaml new file mode 100644 index 00000000000..fd052449064 --- /dev/null +++ b/changelog/fragments/1716320508-add-endpoint-monitoring.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add monitoring of endpoint usage metrics + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/4789 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/4083 \ No newline at end of file From 5f5a7a39b485e4b41ef03abed5135b503a4f6011 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 22 May 2024 10:06:57 -0700 Subject: [PATCH 07/26] format --- testing/integration/monitoring_endpoint_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index f098f02c73e..1f108251039 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + //go:build integration package integration @@ -7,15 +11,16 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/elastic/elastic-agent-libs/kibana" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools" "github.com/elastic/elastic-agent/pkg/testing/tools/estools" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) type EndpointMetricsMonRunner struct { From 704d9ea50ebbb6035d82115802e5fb30b1f33363 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 22 May 2024 14:04:26 -0700 Subject: [PATCH 08/26] fix component lookup --- internal/pkg/agent/application/coordinator/coordinator.go | 5 ++--- .../agent/application/coordinator/coordinator_unit_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index d907b01c272..e0b134deb49 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1283,9 +1283,8 @@ func (c *Coordinator) generateComponentModel() (err error) { configInjector = c.monitorMgr.MonitoringConfig } - existingState := c.State() - var existingCompState = make(map[string]uint64, len(existingState.Components)) - for _, comp := range existingState.Components { + var existingCompState = make(map[string]uint64, len(c.state.Components)) + for _, comp := range c.state.Components { existingCompState[comp.Component.ID] = comp.State.CheckinPid } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 2831e3e6a4e..c9117b8c0b9 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -389,7 +389,6 @@ agent.download.sourceURI: cfgChange := &configChange{cfg: cfg} configChan <- cfgChange coord.runLoopIteration(ctx) - assert.True(t, cfgChange.failed, "Policy with invalid field should have reported failed config change") require.ErrorContainsf(t, cfgChange.err, @@ -420,6 +419,7 @@ agent.download.sourceURI: // (This check is based on a previous bug in which a vars update could // discard active policy errors.) varsChan <- emptyVars(t) + t.Logf("after emptyVars statement") coord.runLoopIteration(ctx) assert.Error(t, coord.configErr, "Vars update shouldn't affect configErr") From 2e08d79ad9f62bb02d856be4ec60e9716d5394f4 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 22 May 2024 15:14:45 -0700 Subject: [PATCH 09/26] fix state tests --- internal/pkg/agent/application/coordinator/diagnostics_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 478a5811586..9c68a20cc1a 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -488,6 +488,7 @@ log_level: "warning" components: - id: "comp-1" state: + checkinpid: 0 state: 3 message: "degraded message" features_idx: 0 @@ -570,6 +571,7 @@ log_level: "warning" components: - id: "comp-1" state: + checkinpid: 0 state: 3 message: "degraded message" features_idx: 0 From f806c9332c36ec14af7b952048ce680e5630cc8d Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Thu, 23 May 2024 09:36:25 -0700 Subject: [PATCH 10/26] complete integration tests --- testing/integration/monitoring_endpoint_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index 1f108251039..3ed16f43ab3 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -130,12 +130,11 @@ func genESQueryByBinary(agentID string, componentID string) map[string]interface "agent.id": agentID, }, }, - // will not work due to https://github.com/elastic/integrations/pull/9928 - // { - // "match": map[string]interface{}{ - // "component.id": componentID, - // }, - // }, + { + "match": map[string]interface{}{ + "component.id": componentID, + }, + }, { "exists": map[string]interface{}{ "field": "system.process.cpu.total.value", From 3451d8874a37af9f1ea029ae31f0e9acb424d09e Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 3 Jun 2024 08:17:43 -0700 Subject: [PATCH 11/26] basic cleanup --- .../application/coordinator/coordinator.go | 7 +- .../coordinator/coordinator_state.go | 2 +- testing/integration/endpoint_test_tools.go | 133 ++++++++++++++++++ .../integration/monitoring_endpoint_test.go | 2 +- 4 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 testing/integration/endpoint_test_tools.go diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e0b134deb49..8f10778f8ff 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -285,7 +285,7 @@ type Coordinator struct { // Should only be interacted with via CoordinatorActive() or runLoopIteration() heartbeatChan chan struct{} - servicePidUpdate chan struct{} + compPidUpdate chan struct{} } // The channels Coordinator reads to receive updates from the various managers. @@ -380,7 +380,7 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. overrideStateChan: make(chan *coordinatorOverrideState), upgradeDetailsChan: make(chan *details.Details), heartbeatChan: make(chan struct{}), - servicePidUpdate: make(chan struct{}, 1), + compPidUpdate: make(chan struct{}, 1), } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -1035,8 +1035,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case c.heartbeatChan <- struct{}{}: - case <-c.servicePidUpdate: - c.logger.Infof("got pid service update, refreshing config") + case <-c.compPidUpdate: err := c.refreshComponentModel(ctx) if err != nil { err = fmt.Errorf("error refreshing component model for PID update: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 4fbb26e2914..9914c208abf 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -169,7 +169,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) c.stateNeedsRefresh = true if pidRequiresUpdate { - c.servicePidUpdate <- struct{}{} + c.compPidUpdate <- struct{}{} } } diff --git a/testing/integration/endpoint_test_tools.go b/testing/integration/endpoint_test_tools.go new file mode 100644 index 00000000000..db924be4845 --- /dev/null +++ b/testing/integration/endpoint_test_tools.go @@ -0,0 +1,133 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "strings" + "testing" + "text/template" + "time" + + "github.com/google/uuid" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/testing/define" +) + +//go:embed endpoint_security_package.json.tmpl +var endpointPackagePolicyTemplate string + +type endpointPackageTemplateVars struct { + ID string + Name string + PolicyID string + Version string +} + +// TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package +const endpointPackageVersion = "8.11.0" + +func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { + t.Helper() + + state, err := agentClient.State(ctx) + if err != nil { + t.Logf("Error getting agent state: %s", err) + return false + } + + if state.State != client.Healthy { + t.Logf("local Agent is not Healthy: current state: %+v", state) + return false + } + + foundEndpointInputUnit := false + foundEndpointOutputUnit := false + for _, comp := range state.Components { + isEndpointComponent := strings.Contains(comp.Name, "endpoint") + if comp.State != client.Healthy { + t.Logf("endpoint component is not Healthy: current state: %+v", comp) + return false + } + + for _, unit := range comp.Units { + if isEndpointComponent { + if unit.UnitType == client.UnitTypeInput { + foundEndpointInputUnit = true + } + if unit.UnitType == client.UnitTypeOutput { + foundEndpointOutputUnit = true + } + } + + if unit.State != client.Healthy { + t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) + return false + } + } + } + + // Ensure both the endpoint input and output units were found and healthy. + if !foundEndpointInputUnit || !foundEndpointOutputUnit { + t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) + return false + } + + return true +} + +// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. +func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { + t.Helper() + + t.Log("Templating endpoint package policy request") + tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) + if err != nil { + return r, fmt.Errorf("error creating new template: %w", err) + } + + packagePolicyID := uuid.New().String() + var pkgPolicyBuf bytes.Buffer + + // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get + // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} + err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ + ID: packagePolicyID, + Name: "Defend-" + packagePolicyID, + PolicyID: policyID, + Version: endpointPackageVersion, + }) + if err != nil { + return r, fmt.Errorf("error executing template: %w", err) + } + + // Make sure the templated value is actually valid JSON before making the API request. + // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. + packagePolicyReq := kibana.PackagePolicyRequest{} + err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) + if err != nil { + return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) + } + + t.Log("POST /api/fleet/package_policies") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) + if err != nil { + t.Logf("Error installing fleet package: %v", err) + return r, fmt.Errorf("error installing fleet package: %w", err) + } + t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) + return pkgResp, err +} diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index 3ed16f43ab3..9b957060a03 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -31,7 +31,7 @@ type EndpointMetricsMonRunner struct { func TestEndpointAgentServiceMonitoring(t *testing.T) { info := define.Require(t, define.Requirements{ - Group: "fleet", + Group: Fleet, Stack: &define.Stack{}, Local: false, // requires Agent installation Sudo: true, // requires Agent installation From 824cc7b85e39d3633b18e7e8cbea7fefb58fba36 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 3 Jun 2024 14:48:06 -0700 Subject: [PATCH 12/26] update metrics setup --- .../application/coordinator/coordinator.go | 39 +++-- .../coordinator/coordinator_state.go | 3 +- .../application/monitoring/v1_monitor.go | 133 +++++++++--------- .../application/monitoring/v1_monitor_test.go | 14 +- testing/integration/helpers.go | 133 ------------------ .../integration/monitoring_endpoint_test.go | 21 +-- 6 files changed, 121 insertions(+), 222 deletions(-) delete mode 100644 testing/integration/helpers.go diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 8f10778f8ff..a18008b9f96 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -10,6 +10,7 @@ import ( "fmt" "reflect" "strings" + "sync/atomic" "time" "github.com/hashicorp/go-multierror" @@ -285,7 +286,13 @@ type Coordinator struct { // Should only be interacted with via CoordinatorActive() or runLoopIteration() heartbeatChan chan struct{} - compPidUpdate chan struct{} + // if a component (mostly endpoint) has a new PID, we need to update + // the monitoring components so they have a PID to monitor + // however, if endpoint is in some kind of restart loop, + // we could DOS the config system. Instead, + // run a ticker that checks to see if we have a new PID. + componentPIDTicker *time.Ticker + componentPidRequiresUpdate *atomic.Bool } // The channels Coordinator reads to receive updates from the various managers. @@ -376,11 +383,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), - logLevelCh: make(chan logp.Level), - overrideStateChan: make(chan *coordinatorOverrideState), - upgradeDetailsChan: make(chan *details.Details), - heartbeatChan: make(chan struct{}), - compPidUpdate: make(chan struct{}, 1), + logLevelCh: make(chan logp.Level), + overrideStateChan: make(chan *coordinatorOverrideState), + upgradeDetailsChan: make(chan *details.Details), + heartbeatChan: make(chan struct{}), + componentPIDTicker: time.NewTicker(time.Second * 30), + componentPidRequiresUpdate: &atomic.Bool{}, } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -923,6 +931,8 @@ func (c *Coordinator) runner(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + defer c.componentPIDTicker.Stop() + // We run nil checks before starting the various managers so that unit tests // only have to initialize / mock the specific components they're testing. // If a manager is nil, we prebuffer its return channel with nil also so @@ -1035,12 +1045,17 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case c.heartbeatChan <- struct{}{}: - case <-c.compPidUpdate: - err := c.refreshComponentModel(ctx) - if err != nil { - err = fmt.Errorf("error refreshing component model for PID update: %w", err) - c.setConfigManagerError(err) - c.logger.Errorf("%s", err) + case <-c.componentPIDTicker.C: + // if we hit the ticker and we've got a new PID, + // reload the component model + if c.componentPidRequiresUpdate.Load() { + c.componentPidRequiresUpdate.Store(false) + err := c.refreshComponentModel(ctx) + if err != nil { + err = fmt.Errorf("error refreshing component model for PID update: %w", err) + c.setConfigManagerError(err) + c.logger.Errorf("%s", err) + } } case componentState := <-c.managerChans.runtimeManagerUpdate: diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 9914c208abf..aa822105a39 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -136,6 +136,7 @@ func (c *Coordinator) refreshState() { // Must be called on the main Coordinator goroutine. func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) { found := false + // check for any component updates to the PID, so we can update the component monitoring pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { @@ -169,7 +170,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) c.stateNeedsRefresh = true if pidRequiresUpdate { - c.compPidUpdate <- struct{}{} + c.componentPidRequiresUpdate.Store(true) } } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 06fef7ef7a6..341be15c527 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -910,84 +910,89 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } + for _, comp := range componentList { + logp.L().Infof("input component %s: %#v", comp.ID, comp.InputSpec.Spec.Service) + } + // add system/process metrics for services that can't be monitored via json/beats metrics - // If there's a checkin PID and it contains the "endpoint" string, assume we want to monitor it - for id, comp := range existingStateServicePids { - logp.L().Infof("component/pid monitoring map is: %#v", existingStateServicePids) - if comp != 0 && strings.Contains(id, "endpoint") { - logp.L().Infof("creating system/process watcher for pid %d", comp) - inputs = append(inputs, map[string]interface{}{ - idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), - "name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), - "type": "system/metrics", - useOutputKey: monitoringOutput, - "data_stream": map[string]interface{}{ - "namespace": monitoringNamespace, - }, - "streams": []interface{}{ - map[string]interface{}{ - idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), - "data_stream": map[string]interface{}{ - "type": "metrics", - "dataset": "elastic_agent.endpoint_security", - "namespace": monitoringNamespace, - }, - "metricsets": []interface{}{"process"}, - "period": metricsCollectionIntervalString, - "index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace), - "process.pid": comp, - "process.cgroups.enabled": false, - "processors": []interface{}{ - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "data_stream", - "fields": map[string]interface{}{ - "type": "metrics", - "dataset": "elastic_agent.endpoint_security", - "namespace": monitoringNamespace, + // If there's a checkin PID and the corrisponding component has a service spec section, add a system/process config + for _, compState := range componentList { + if compState.InputSpec != nil && compState.InputSpec.Spec.Service != nil { + if comp, ok := existingStateServicePids[compState.ID]; ok && comp != 0 { + inputs = append(inputs, map[string]interface{}{ + idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + "name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + "type": "system/metrics", + useOutputKey: monitoringOutput, + "data_stream": map[string]interface{}{ + "namespace": monitoringNamespace, + }, + "streams": []interface{}{ + map[string]interface{}{ + idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint_security", + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"process"}, + "period": metricsCollectionIntervalString, + "index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace), + "process.pid": comp, + "process.cgroups.enabled": false, + "processors": []interface{}{ + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": "elastic_agent.endpoint_security", + "namespace": monitoringNamespace, + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "event", - "fields": map[string]interface{}{ - "dataset": "elastic_agent.endpoint_security", + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": "elastic_agent.endpoint_security", + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "elastic_agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), - "version": b.agentInfo.Version(), - "snapshot": b.agentInfo.Snapshot(), - "process": "endpoint_security", + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + "version": b.agentInfo.Version(), + "snapshot": b.agentInfo.Snapshot(), + "process": "endpoint_security", + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "agent", + "fields": map[string]interface{}{ + "id": b.agentInfo.AgentID(), + }, }, }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "component", - "fields": map[string]interface{}{ - "binary": "endpoint_security", - "id": id, + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "target": "component", + "fields": map[string]interface{}{ + "binary": "endpoint_security", + "id": compState.ID, + }, }, }, }, }, }, - }, - }) + }) + } + } } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 4772cb608af..b3755bb5e3b 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -23,7 +23,6 @@ import ( ) func TestMonitoringWithEndpoint(t *testing.T) { - agentInfo, err := info.NewAgentInfo(context.Background(), false) require.NoError(t, err, "Error creating agent info") @@ -59,7 +58,18 @@ func TestMonitoringWithEndpoint(t *testing.T) { // manually declaring all the MonitoringConfig() args since there's a lot of them, and this makes // the test a little more self-describing - var compList []component.Component + compList := []component.Component{ + { + ID: "endpoint-default", + InputSpec: &component.InputRuntimeSpec{ + Spec: component.InputSpec{ + Service: &component.ServiceSpec{ + CPort: 7688, + }, + }, + }, + }, + } compIdToBinary := map[string]string{ "endpoint-default": "endpoint-security", diff --git a/testing/integration/helpers.go b/testing/integration/helpers.go deleted file mode 100644 index db924be4845..00000000000 --- a/testing/integration/helpers.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "bytes" - "context" - _ "embed" - "encoding/json" - "fmt" - "strings" - "testing" - "text/template" - "time" - - "github.com/google/uuid" - - "github.com/elastic/elastic-agent-libs/kibana" - "github.com/elastic/elastic-agent/pkg/control/v2/client" - "github.com/elastic/elastic-agent/pkg/testing/define" -) - -//go:embed endpoint_security_package.json.tmpl -var endpointPackagePolicyTemplate string - -type endpointPackageTemplateVars struct { - ID string - Name string - PolicyID string - Version string -} - -// TODO: Setup a GitHub Action to update this for each release of https://github.com/elastic/endpoint-package -const endpointPackageVersion = "8.11.0" - -func agentAndEndpointAreHealthy(t *testing.T, ctx context.Context, agentClient client.Client) bool { - t.Helper() - - state, err := agentClient.State(ctx) - if err != nil { - t.Logf("Error getting agent state: %s", err) - return false - } - - if state.State != client.Healthy { - t.Logf("local Agent is not Healthy: current state: %+v", state) - return false - } - - foundEndpointInputUnit := false - foundEndpointOutputUnit := false - for _, comp := range state.Components { - isEndpointComponent := strings.Contains(comp.Name, "endpoint") - if comp.State != client.Healthy { - t.Logf("endpoint component is not Healthy: current state: %+v", comp) - return false - } - - for _, unit := range comp.Units { - if isEndpointComponent { - if unit.UnitType == client.UnitTypeInput { - foundEndpointInputUnit = true - } - if unit.UnitType == client.UnitTypeOutput { - foundEndpointOutputUnit = true - } - } - - if unit.State != client.Healthy { - t.Logf("unit %q is not Healthy\n%+v", unit.UnitID, unit) - return false - } - } - } - - // Ensure both the endpoint input and output units were found and healthy. - if !foundEndpointInputUnit || !foundEndpointOutputUnit { - t.Logf("State did not contain endpoint units (input: %v/output: %v) state: %+v. ", foundEndpointInputUnit, foundEndpointOutputUnit, state) - return false - } - - return true -} - -// Installs the Elastic Defend package to cause the agent to install the endpoint-security service. -func installElasticDefendPackage(t *testing.T, info *define.Info, policyID string) (r kibana.PackagePolicyResponse, err error) { - t.Helper() - - t.Log("Templating endpoint package policy request") - tmpl, err := template.New("pkgpolicy").Parse(endpointPackagePolicyTemplate) - if err != nil { - return r, fmt.Errorf("error creating new template: %w", err) - } - - packagePolicyID := uuid.New().String() - var pkgPolicyBuf bytes.Buffer - - // Need unique name for Endpoint integration otherwise on multiple runs on the same instance you get - // http error response with code 409: {StatusCode:409 Error:Conflict Message:An integration policy with the name Defend-cbomziz4uvn5fov9t1gsrcvdwn2p1s7tefnvgsye already exists. Please rename it or choose a different name.} - err = tmpl.Execute(&pkgPolicyBuf, endpointPackageTemplateVars{ - ID: packagePolicyID, - Name: "Defend-" + packagePolicyID, - PolicyID: policyID, - Version: endpointPackageVersion, - }) - if err != nil { - return r, fmt.Errorf("error executing template: %w", err) - } - - // Make sure the templated value is actually valid JSON before making the API request. - // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. - packagePolicyReq := kibana.PackagePolicyRequest{} - err = json.Unmarshal(pkgPolicyBuf.Bytes(), &packagePolicyReq) - if err != nil { - return r, fmt.Errorf("templated package policy is not valid JSON: %s, %w", pkgPolicyBuf.String(), err) - } - - t.Log("POST /api/fleet/package_policies") - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - pkgResp, err := info.KibanaClient.InstallFleetPackage(ctx, packagePolicyReq) - if err != nil { - t.Logf("Error installing fleet package: %v", err) - return r, fmt.Errorf("error installing fleet package: %w", err) - } - t.Logf("Endpoint package Policy Response:\n%+v", pkgResp) - return pkgResp, err -} diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index 9b957060a03..1a032e2b095 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -135,16 +135,17 @@ func genESQueryByBinary(agentID string, componentID string) map[string]interface "component.id": componentID, }, }, - { - "exists": map[string]interface{}{ - "field": "system.process.cpu.total.value", - }, - }, - { - "exists": map[string]interface{}{ - "field": "system.process.memory.size", - }, - }, + // see https://github.com/elastic/integrations/pull/10054 + // { + // "exists": map[string]interface{}{ + // "field": "system.process.cpu.total.value", + // }, + // }, + // { + // "exists": map[string]interface{}{ + // "field": "system.process.memory.size", + // }, + // }, }, }, }, From 937f7ac55fb7900118af40c08344d2021fd1a48a Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 3 Jun 2024 14:49:06 -0700 Subject: [PATCH 13/26] remove mage changes --- dev-tools/mage/settings.go | 7 +- magefile.go | 197 ++++++++++++++++--------------------- 2 files changed, 91 insertions(+), 113 deletions(-) diff --git a/dev-tools/mage/settings.go b/dev-tools/mage/settings.go index 6b22b66ebdb..a44f435fb67 100644 --- a/dev-tools/mage/settings.go +++ b/dev-tools/mage/settings.go @@ -88,7 +88,7 @@ var ( Snapshot bool DevBuild bool - ExternalBuild string + ExternalBuild bool versionQualified bool versionQualifier string @@ -149,7 +149,10 @@ func initGlobals() { panic(fmt.Errorf("failed to parse DEV env value: %w", err)) } - ExternalBuild = EnvOr("EXTERNAL", "false") + ExternalBuild, err = strconv.ParseBool(EnvOr("EXTERNAL", "false")) + if err != nil { + panic(fmt.Errorf("failed to parse EXTERNAL env value: %w", err)) + } versionQualifier, versionQualified = os.LookupEnv("VERSION_QUALIFIER") diff --git a/magefile.go b/magefile.go index e0f67280ede..98637eb3191 100644 --- a/magefile.go +++ b/magefile.go @@ -196,7 +196,7 @@ func (Dev) Package() { os.Setenv(devEnv, "true") if _, hasExternal := os.LookupEnv(externalArtifacts); !hasExternal { - devtools.ExternalBuild = "true" + devtools.ExternalBuild = true } devtools.DevBuild = true @@ -813,7 +813,7 @@ func (Cloud) Image() { devtools.SelectedPackageTypes = []devtools.PackageType{devtools.Docker} if _, hasExternal := os.LookupEnv(externalArtifacts); !hasExternal { - devtools.ExternalBuild = "true" + devtools.ExternalBuild = true } Package() @@ -1055,7 +1055,7 @@ func collectPackageDependencies(platforms []string, packageVersion string, requi os.Setenv(agentDropPath, dropPath) - if devtools.ExternalBuild == "true" { + if devtools.ExternalBuild == true { // Map of binaries to download to their project name in the unified-release manager. // The project names are used to generate the URLs when downloading binaries. For example: // @@ -1077,126 +1077,101 @@ func collectPackageDependencies(platforms []string, packageVersion string, requi "pf-elastic-symbolizer": "prodfiler", "pf-host-agent": "prodfiler", } - downloadFromExternalBuild(platforms, archivePath, packageVersion, externalBinaries) - } else if devtools.ExternalBuild == "agentbeat" { - // package agentbeat, then also pull external builds for non-agentbeat binaries - externalBinaries := map[string]string{ - "cloudbeat": "cloudbeat", // only supporting linux/amd64 or linux/arm64 - "cloud-defend": "cloud-defend", - "apm-server": "apm-server", // not supported on darwin/aarch64 - "endpoint-security": "endpoint-dev", - "fleet-server": "fleet-server", - "pf-elastic-collector": "prodfiler", - "pf-elastic-symbolizer": "prodfiler", - "pf-host-agent": "prodfiler", - } - downloadFromExternalBuild(platforms, archivePath, packageVersion, externalBinaries) - packageAgentBeat(packageVersion, archivePath, requiredPackages) - - } else { - packageAgentBeat(packageVersion, archivePath, requiredPackages) - } - } else { - archivePath = movePackagesToArchive(dropPath, requiredPackages) - } - return archivePath, dropPath -} - -// downloadFromExternalBuild downloads the component binaries and places them in the drop path for the rest of the build -func downloadFromExternalBuild(platforms []string, archivePath string, packageVersion string, externalBinaries map[string]string) { - - // Only log fatal logs for logs produced using logrus. This is the global logger - // used by github.com/elastic/e2e-testing/pkg/downloads which can only be configured globally like this or via - // environment variables. - // - // Using FatalLevel avoids filling the build log with scary looking errors when we attempt to - // download artifacts on unsupported platforms and choose to ignore the errors. - // - // Change this to InfoLevel to see exactly what the downloader is doing. - logrus.SetLevel(logrus.FatalLevel) - - errGroup, ctx := errgroup.WithContext(context.Background()) - completedDownloads := &atomic.Int32{} - for binary, project := range externalBinaries { - for _, platform := range platforms { - reqPackage := platformPackages[platform] - targetPath := filepath.Join(archivePath, reqPackage) - os.MkdirAll(targetPath, 0755) - newVersion, packageName := getPackageName(binary, packageVersion, reqPackage) - errGroup.Go(downloadBinary(ctx, project, packageName, binary, platform, newVersion, targetPath, completedDownloads)) - } - } - - err := errGroup.Wait() - if err != nil { - panic(err) - } - if completedDownloads.Load() == 0 { - panic(fmt.Sprintf("No packages were successfully downloaded. You may be building against an invalid or unreleased version. version=%s. If this is an unreleased version, try SNAPSHOT=true or EXTERNAL=false", packageVersion)) - } -} - -// packageAgentBeat packages the beat from the local code in the beats path -func packageAgentBeat(packageVersion string, archivePath string, requiredPackages []string) { - packedBeats := []string{"agentbeat"} - // build from local repo, will assume beats repo is located on the same root level - for _, b := range packedBeats { - pwd, err := filepath.Abs(filepath.Join("../beats/x-pack", b)) - if err != nil { - panic(err) - } - - packagesCopied := 0 - if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { - fmt.Printf("--- Package %s\n", pwd) - cmd := exec.Command("mage", "package") - cmd.Dir = pwd - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = append(os.Environ(), fmt.Sprintf("PWD=%s", pwd), "AGENT_PACKAGING=on") - if envVar := selectedPackageTypes(); envVar != "" { - cmd.Env = append(cmd.Env, envVar) - } - - if err := cmd.Run(); err != nil { - panic(err) + // Only log fatal logs for logs produced using logrus. This is the global logger + // used by github.com/elastic/e2e-testing/pkg/downloads which can only be configured globally like this or via + // environment variables. + // + // Using FatalLevel avoids filling the build log with scary looking errors when we attempt to + // download artifacts on unsupported platforms and choose to ignore the errors. + // + // Change this to InfoLevel to see exactly what the downloader is doing. + logrus.SetLevel(logrus.FatalLevel) + + errGroup, ctx := errgroup.WithContext(context.Background()) + completedDownloads := &atomic.Int32{} + for binary, project := range externalBinaries { + for _, platform := range platforms { + reqPackage := platformPackages[platform] + targetPath := filepath.Join(archivePath, reqPackage) + os.MkdirAll(targetPath, 0755) + newVersion, packageName := getPackageName(binary, packageVersion, reqPackage) + errGroup.Go(downloadBinary(ctx, project, packageName, binary, platform, newVersion, targetPath, completedDownloads)) + } } - } - // copy to new drop - sourcePath := filepath.Join(pwd, "build", "distributions") - for _, rp := range requiredPackages { - files, err := filepath.Glob(filepath.Join(sourcePath, "*"+rp+"*")) + err = errGroup.Wait() if err != nil { panic(err) } + if completedDownloads.Load() == 0 { + panic(fmt.Sprintf("No packages were successfully downloaded. You may be building against an invalid or unreleased version. version=%s. If this is an unreleased version, try SNAPSHOT=true or EXTERNAL=false", packageVersion)) + } + } else { + packedBeats := []string{"agentbeat"} + // build from local repo, will assume beats repo is located on the same root level + for _, b := range packedBeats { + pwd, err := filepath.Abs(filepath.Join("../beats/x-pack", b)) + if err != nil { + panic(err) + } + + packagesCopied := 0 + + if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { + fmt.Printf("--- Package %s\n", pwd) + cmd := exec.Command("mage", "package") + cmd.Dir = pwd + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), fmt.Sprintf("PWD=%s", pwd), "AGENT_PACKAGING=on") + if envVar := selectedPackageTypes(); envVar != "" { + cmd.Env = append(cmd.Env, envVar) + } - targetPath := filepath.Join(archivePath, rp) - os.MkdirAll(targetPath, 0755) - for _, f := range files { - // safety check; if the user has an older version of the beats repo, - // for example right after a release where you've `git pulled` from on repo and not the other, - // they might end up with a mishmash of packages from different versions. - // check to see if we have mismatched versions. - if !strings.Contains(f, packageVersion) { - // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. - panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) + if err := cmd.Run(); err != nil { + panic(err) + } } - targetFile := filepath.Join(targetPath, filepath.Base(f)) - packagesCopied += 1 - if err := sh.Copy(targetFile, f); err != nil { - panic(err) + // copy to new drop + sourcePath := filepath.Join(pwd, "build", "distributions") + for _, rp := range requiredPackages { + files, err := filepath.Glob(filepath.Join(sourcePath, "*"+rp+"*")) + if err != nil { + panic(err) + } + + targetPath := filepath.Join(archivePath, rp) + os.MkdirAll(targetPath, 0755) + for _, f := range files { + // safety check; if the user has an older version of the beats repo, + // for example right after a release where you've `git pulled` from on repo and not the other, + // they might end up with a mishmash of packages from different versions. + // check to see if we have mismatched versions. + if !strings.Contains(f, packageVersion) { + // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. + panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) + } + + targetFile := filepath.Join(targetPath, filepath.Base(f)) + packagesCopied += 1 + if err := sh.Copy(targetFile, f); err != nil { + panic(err) + } + } + } + // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied + // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. + if packagesCopied == 0 { + fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") } } } - // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied - // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. - if packagesCopied == 0 { - fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") - } + } else { + archivePath = movePackagesToArchive(dropPath, requiredPackages) } + return archivePath, dropPath } // flattenDependencies will extract all the required packages collected in archivePath and dropPath in flatPath and @@ -1240,7 +1215,7 @@ func flattenDependencies(requiredPackages []string, packageVersion, archivePath, log.Printf(">>> Extracting %s to %s", m, versionedFlatPath) } if err := devtools.Extract(m, versionedFlatPath); err != nil { - panic(fmt.Errorf("error extracting %s: %s", m, err)) + panic(err) } } From 4853383ac2f048aa216ddb4f31a38fc828f1da37 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 3 Jun 2024 15:26:08 -0700 Subject: [PATCH 14/26] spelling --- internal/pkg/agent/application/monitoring/v1_monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 341be15c527..75586667847 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -915,7 +915,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI } // add system/process metrics for services that can't be monitored via json/beats metrics - // If there's a checkin PID and the corrisponding component has a service spec section, add a system/process config + // If there's a checkin PID and the corresponding component has a service spec section, add a system/process config for _, compState := range componentList { if compState.InputSpec != nil && compState.InputSpec.Spec.Service != nil { if comp, ok := existingStateServicePids[compState.ID]; ok && comp != 0 { From 4d417cdc82a98276afdfbd2c7b0e26b0c0ec6d53 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 4 Jun 2024 08:22:55 -0700 Subject: [PATCH 15/26] fix test --- .../pkg/agent/application/coordinator/coordinator_unit_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index c9117b8c0b9..8f3223481c7 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -59,6 +59,7 @@ func TestVarsManagerError(t *testing.T) { managerChans: managerChans{ varsManagerError: varsErrorChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an error via the vars manager channel, and let Coordinator update const errorStr = "force error" From baafc48bc519df26e15c7c8517023561f83d57a1 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 4 Jun 2024 11:10:17 -0700 Subject: [PATCH 16/26] figured out weird test failures --- .../pkg/agent/application/coordinator/coordinator_state.go | 3 ++- .../pkg/agent/application/coordinator/coordinator_test.go | 6 +++++- .../agent/application/coordinator/coordinator_unit_test.go | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index aa822105a39..ea8cf5bcfb5 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -135,8 +135,9 @@ func (c *Coordinator) refreshState() { // Coordinator state and sets stateNeedsRefresh. // Must be called on the main Coordinator goroutine. func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) { + + // check for any component updates to the known PID, so we can update the component monitoring found := false - // check for any component updates to the PID, so we can update the component monitoring pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 5b81d0f4bb3..d53a7ec3b3b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -587,7 +587,6 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) { resultChan <- ctx.Err() return case state := <-subChan: - t.Logf("%+v", state) if len(state.Components) == 3 { compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0") compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1") @@ -599,6 +598,11 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) { (unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") { resultChan <- nil return + } else if unit0.State == client.UnitStateFailed && unit1.State == client.UnitStateFailed { + // if you get a really strange failed state, check to make sure the mock binaries in + // elastic-agent/pkg/component/fake/ are updated + t.Fail() + t.Logf("got units with failed state: %#v / %#v", unit1, unit0) } } } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 8f3223481c7..81b50343e2f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -111,6 +111,7 @@ func TestCoordinatorReportsUnhealthyComponents(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } unhealthyComponent := runtime.ComponentComponentState{ @@ -187,6 +188,7 @@ func TestCoordinatorComponentStatesAreSeparate(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } comp1 := runtime.ComponentComponentState{ From 6f17e4e987ae20139481402057323b10244ca271 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 4 Jun 2024 11:43:18 -0700 Subject: [PATCH 17/26] still fixing unit tests --- .../coordinator/coordinator_unit_test.go | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 81b50343e2f..5aa8e975872 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -259,6 +259,7 @@ func TestCoordinatorReportsUnhealthyUnits(t *testing.T) { managerChans: managerChans{ runtimeManagerUpdate: runtimeChan, }, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a healthy component with healthy input and output units @@ -378,8 +379,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { runtimeMgr: &fakeRuntimeManager{}, // Set valid but empty initial values for ast and vars - vars: emptyVars(t), - ast: emptyAST(t), + vars: emptyVars(t), + ast: emptyAST(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an invalid config update and confirm that Coordinator reports @@ -492,8 +494,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { runtimeMgr: &fakeRuntimeManager{}, // Set valid but empty initial values for ast and vars - vars: emptyVars(t), - ast: emptyAST(t), + vars: emptyVars(t), + ast: emptyAST(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // This configuration produces a valid AST but its EQL condition is @@ -586,8 +589,9 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { managerChans: managerChans{ configManagerUpdate: configChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } coord.RegisterMonitoringServer(monitoringReloader) @@ -714,8 +718,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { managerChans: managerChans{ configManagerUpdate: configChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a policy with one input and one output @@ -801,8 +806,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // manager, so it receives the update result. runtimeManagerError: updateErrChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an empty policy which should forward an empty component model to @@ -863,8 +869,9 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) { configManagerUpdate: configChan, varsManagerUpdate: varsChan, }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), + runtimeMgr: runtimeManager, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a policy with one input and one output @@ -939,7 +946,8 @@ func TestCoordinatorReportsOverrideState(t *testing.T) { stateBroadcaster: &broadcaster.Broadcaster[State]{ InputChan: stateChan, }, - overrideStateChan: overrideStateChan, + overrideStateChan: overrideStateChan, + componentPIDTicker: time.NewTicker(time.Second * 30), } // Send an error via the vars manager channel, and let Coordinator update overrideStateChan <- &coordinatorOverrideState{ From 8aaa07411c9af3c95dee9e11d6775708542fa50a Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 4 Jun 2024 16:00:48 -0700 Subject: [PATCH 18/26] major test improvements --- .../integration/monitoring_endpoint_test.go | 112 ++++++++++++++++-- 1 file changed, 103 insertions(+), 9 deletions(-) diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index 1a032e2b095..1cfc3d553f1 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -8,6 +8,8 @@ package integration import ( "context" + "os/exec" + "runtime" "testing" "time" @@ -25,8 +27,9 @@ import ( type EndpointMetricsMonRunner struct { suite.Suite - info *define.Info - fixture *atesting.Fixture + info *define.Info + fixture *atesting.Fixture + endpointID string } func TestEndpointAgentServiceMonitoring(t *testing.T) { @@ -45,8 +48,9 @@ func TestEndpointAgentServiceMonitoring(t *testing.T) { require.NoError(t, err, "could not create agent fixture") runner := &EndpointMetricsMonRunner{ - info: info, - fixture: fixture, + info: info, + fixture: fixture, + endpointID: "endpoint-default", } suite.Run(t, runner) @@ -98,6 +102,7 @@ func (runner *EndpointMetricsMonRunner) SetupSuite() { time.Second, "Endpoint component or units are not healthy.", ) + } func (runner *EndpointMetricsMonRunner) TestEndpointMetrics() { @@ -107,18 +112,107 @@ func (runner *EndpointMetricsMonRunner) TestEndpointMetrics() { agentStatus, err := runner.fixture.ExecStatus(ctx) require.NoError(runner.T(), err) - endpointID := "endpoint-default" require.Eventually(runner.T(), func() bool { - query := genESQueryByBinary(agentStatus.Info.ID, endpointID) + query := genESQueryByBinary(agentStatus.Info.ID, runner.endpointID) res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) require.NoError(runner.T(), err) - runner.T().Logf("Fetched metrics for %s, got %d hits", endpointID, res.Hits.Total.Value) + runner.T().Logf("Fetched metrics for %s, got %d hits", runner.endpointID, res.Hits.Total.Value) return res.Hits.Total.Value >= 1 - }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", endpointID, agentStatus.Info.ID) + }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", runner.endpointID, agentStatus.Info.ID) + +} + +func (runner *EndpointMetricsMonRunner) TestEndpointMetricsAfterRestart() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) + defer cancel() + // once we've gotten the first round of metrics,forcably restart endpoint, see if we still get metrics + // This makes sure that the backend coordinator can deal with properly updating the metrics handlers if there's unexpected state changes + + // confine this to linux; the behavior is platform-agnostic, and this way we have `pgrep` + if runtime.GOOS != "linux" { + return + } + + // kill endpoint + cmd := exec.Command("pgrep", "-f", "endpoint") + pgrep, err := cmd.CombinedOutput() + runner.T().Logf("killing pid: %s", string(pgrep)) + + cmd = exec.Command("pkill", "--signal", "SIGKILL", "-f", "endpoint") + _, err = cmd.CombinedOutput() + require.NoError(runner.T(), err) + + // wait for endpoint to come back up. We use `pgrep` + // since the agent health status won't imidately register that the endpoint process itself is gone. + require.Eventually(runner.T(), func() bool { + cmd := exec.Command("pgrep", "-f", "endpoint") + pgrep, err := cmd.CombinedOutput() + runner.T().Logf("found pid: %s", string(pgrep)) + if err == nil { + return true + } + return false + }, time.Minute*2, time.Second) + + // make sure agent still says we're healthy + agentClient := runner.fixture.Client() + err = agentClient.Connect(ctx) + require.NoError(runner.T(), err, "could not connect to local agent") + + require.Eventually(runner.T(), + func() bool { return agentAndEndpointAreHealthy(runner.T(), ctx, agentClient) }, + time.Minute*3, + time.Second, + "Endpoint component or units are not healthy.", + ) + + // catch the time endpoint is restarted, so we can filter for documents after a given time + endpointRestarted := time.Now() + + agentStatus, err := runner.fixture.ExecStatus(ctx) + require.NoError(runner.T(), err) + + // now query again, but make sure we're getting new metrics + require.Eventually(runner.T(), func() bool { + query := genESQueryByDate(agentStatus.Info.ID, runner.endpointID, endpointRestarted.Format(time.RFC3339)) + res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) + require.NoError(runner.T(), err) + runner.T().Logf("Fetched metrics for %s, got %d hits", runner.endpointID, res.Hits.Total.Value) + return res.Hits.Total.Value >= 1 + }, time.Minute*10, time.Second*10, "could not fetch component metricsets for endpoint with ID %s and agent ID %s", runner.endpointID, agentStatus.Info.ID) +} + +func genESQueryByDate(agentID string, componentID string, dateAfter string) map[string]interface{} { + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "match": map[string]interface{}{ + "agent.id": agentID, + }, + }, + { + "match": map[string]interface{}{ + "component.id": componentID, + }, + }, + { + "range": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + "gte": dateAfter, + }, + }, + }, + }, + }, + }, + } + + return queryRaw } -// TODO: move to helpers.go func genESQueryByBinary(agentID string, componentID string) map[string]interface{} { // see https://github.com/elastic/kibana/blob/main/x-pack/plugins/fleet/server/services/agents/agent_metrics.ts queryRaw := map[string]interface{}{ From e15fba3ea6639a35c541a293ac8ab1137f5ae29d Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 5 Jun 2024 07:59:17 -0700 Subject: [PATCH 19/26] finish up tests --- .../integration/monitoring_endpoint_test.go | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/testing/integration/monitoring_endpoint_test.go b/testing/integration/monitoring_endpoint_test.go index 1cfc3d553f1..b91a0fe55fe 100644 --- a/testing/integration/monitoring_endpoint_test.go +++ b/testing/integration/monitoring_endpoint_test.go @@ -205,6 +205,20 @@ func genESQueryByDate(agentID string, componentID string, dateAfter string) map[ }, }, }, + { + "range": map[string]interface{}{ + "system.process.cpu.total.value": map[string]interface{}{ + "gt": 0, + }, + }, + }, + { + "range": map[string]interface{}{ + "system.process.memory.size": map[string]interface{}{ + "gt": 0, + }, + }, + }, }, }, }, @@ -229,17 +243,20 @@ func genESQueryByBinary(agentID string, componentID string) map[string]interface "component.id": componentID, }, }, - // see https://github.com/elastic/integrations/pull/10054 - // { - // "exists": map[string]interface{}{ - // "field": "system.process.cpu.total.value", - // }, - // }, - // { - // "exists": map[string]interface{}{ - // "field": "system.process.memory.size", - // }, - // }, + { + "range": map[string]interface{}{ + "system.process.cpu.total.value": map[string]interface{}{ + "gt": 0, + }, + }, + }, + { + "range": map[string]interface{}{ + "system.process.memory.size": map[string]interface{}{ + "gt": 0, + }, + }, + }, }, }, }, From 0e35f59bedcb0d131d88290dadef9f1c86f972bf Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Fri, 7 Jun 2024 12:00:00 -0700 Subject: [PATCH 20/26] remove log line --- internal/pkg/agent/application/monitoring/v1_monitor.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 75586667847..8b1d9bf50cb 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -15,7 +15,6 @@ import ( "time" "unicode" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/utils" @@ -910,10 +909,6 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, } - for _, comp := range componentList { - logp.L().Infof("input component %s: %#v", comp.ID, comp.InputSpec.Spec.Service) - } - // add system/process metrics for services that can't be monitored via json/beats metrics // If there's a checkin PID and the corresponding component has a service spec section, add a system/process config for _, compState := range componentList { From aa4c5fcc66a726ddfdcc6d306af42eb26d8adff0 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Fri, 7 Jun 2024 13:49:40 -0700 Subject: [PATCH 21/26] update name --- .../pkg/agent/application/coordinator/coordinator.go | 2 +- .../agent/application/coordinator/coordinator_state.go | 4 ++-- .../agent/application/coordinator/diagnostics_test.go | 4 ++-- pkg/component/runtime/state.go | 9 ++++++--- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index a18008b9f96..f83d8d03d30 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1299,7 +1299,7 @@ func (c *Coordinator) generateComponentModel() (err error) { var existingCompState = make(map[string]uint64, len(c.state.Components)) for _, comp := range c.state.Components { - existingCompState[comp.Component.ID] = comp.State.CheckinPid + existingCompState[comp.Component.ID] = comp.State.Pid } comps, err := c.specs.ToComponents( diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index ea8cf5bcfb5..68fb0e4feb6 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -141,7 +141,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { - if other.State.CheckinPid != state.State.CheckinPid { + if other.State.Pid != state.State.Pid { pidRequiresUpdate = true } c.state.Components[i] = state @@ -151,7 +151,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) } if !found { c.state.Components = append(c.state.Components, state) - if state.State.CheckinPid != 0 { + if state.State.Pid != 0 { pidRequiresUpdate = true } } diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 9c68a20cc1a..6504ace8919 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -488,7 +488,7 @@ log_level: "warning" components: - id: "comp-1" state: - checkinpid: 0 + pid: 0 state: 3 message: "degraded message" features_idx: 0 @@ -571,7 +571,7 @@ log_level: "warning" components: - id: "comp-1" state: - checkinpid: 0 + pid: 0 state: 3 message: "degraded message" features_idx: 0 diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index 3604317fc65..6bd927fc68b 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -75,7 +75,10 @@ type ComponentState struct { VersionInfo ComponentVersionInfo `yaml:"version_info"` - CheckinPid uint64 + // The PID of the process, as obtained from the *from the Protobuf API* + // As of now, this is only used by Endpoint, as agent doesn't know the PID + // of the endpoint service. If you need the PID for beats, use the coordinator/communicator + Pid uint64 // internal expectedUnits map[ComponentUnitKey]expectedUnitState @@ -272,9 +275,9 @@ func (s *ComponentState) syncUnits(comp *component.Component) bool { func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false - if s.CheckinPid != checkin.Pid { + if s.Pid != checkin.Pid { changed = true - s.CheckinPid = checkin.Pid + s.Pid = checkin.Pid } touched := make(map[ComponentUnitKey]bool) From 76fea5ed8892c56b6d171c02a84b1e9c81938b11 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 10 Jun 2024 08:38:13 -0700 Subject: [PATCH 22/26] fix comments --- .../pkg/agent/application/monitoring/v1_monitor.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 8b1d9bf50cb..7c6883fb89d 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -67,7 +67,8 @@ var ( supportedBeatsComponents = []string{"filebeat", "metricbeat", "apm-server", "fleet-server", "auditbeat", "cloudbeat", "heartbeat", "osquerybeat", "packetbeat", "pf-elastic-collector", "pf-elastic-symbolizer"} ) -// BeatsMonitor is providing V1 monitoring support for metrics and logs for endpoint-security only. +// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc) +// by injecting the monitoring config into an existing fleet config type BeatsMonitor struct { enabled bool // feature flag disabling whole v1 monitoring story config *monitoringConfig @@ -110,11 +111,16 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error { } // MonitoringConfig adds monitoring inputs to a configuration based on retrieved list of components to run. +// args: +// policy: the existing config policy +// components: a list of the expected running components +// componentIDToBinary: a map of component IDs to binary names +// componentIDPidMap: a map of component IDs to the PIDs of the running components. func (b *BeatsMonitor) MonitoringConfig( policy map[string]interface{}, components []component.Component, componentIDToBinary map[string]string, - existingStatePidMap map[string]uint64, + componentIDPidMap map[string]uint64, ) (map[string]interface{}, error) { if !b.Enabled() { return nil, nil @@ -159,7 +165,7 @@ func (b *BeatsMonitor) MonitoringConfig( } if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, components, existingStatePidMap); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -299,6 +305,7 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{ return nil } +// injectLogsInput adds logging configs for component monitoring to the `cfg` map func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []component.Component, monitoringOutput string) error { monitoringNamespace := b.monitoringNamespace() logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem)) @@ -535,6 +542,7 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } +// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object. func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, componentList []component.Component, existingStateServicePids map[string]uint64) error { metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() From 9bcdb4f175f44f0ac8f3c24ff6e707f4e4d0ff11 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 10 Jun 2024 10:44:00 -0700 Subject: [PATCH 23/26] fix bool logic --- .../application/coordinator/coordinator.go | 8 ++++++-- .../coordinator/coordinator_state.go | 8 ++------ .../application/monitoring/v1_monitor.go | 19 ++++++++++--------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index b7b17060879..c967b51b732 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -83,6 +83,11 @@ type MonitorManager interface { Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. + // args: + // - the existing config policy + // - a list of the expected running components + // - a map of component IDs to binary names + // - a map of component IDs to the PIDs of the running components. MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error) } @@ -1054,8 +1059,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case <-c.componentPIDTicker.C: // if we hit the ticker and we've got a new PID, // reload the component model - if c.componentPidRequiresUpdate.Load() { - c.componentPidRequiresUpdate.Store(false) + if c.componentPidRequiresUpdate.Swap(false) { err := c.refreshComponentModel(ctx) if err != nil { err = fmt.Errorf("error refreshing component model for PID update: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 68fb0e4feb6..6e388d961f1 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -138,11 +138,10 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) // check for any component updates to the known PID, so we can update the component monitoring found := false - pidRequiresUpdate := false for i, other := range c.state.Components { if other.Component.ID == state.Component.ID { if other.State.Pid != state.State.Pid { - pidRequiresUpdate = true + c.componentPidRequiresUpdate.Store(true) } c.state.Components[i] = state found = true @@ -152,7 +151,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) if !found { c.state.Components = append(c.state.Components, state) if state.State.Pid != 0 { - pidRequiresUpdate = true + c.componentPidRequiresUpdate.Store(true) } } @@ -170,9 +169,6 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) c.stateNeedsRefresh = true - if pidRequiresUpdate { - c.componentPidRequiresUpdate.Store(true) - } } // generateReportableState aggregates the internal state of the Coordinator diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 7c6883fb89d..cf7df39e171 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -922,9 +922,10 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI for _, compState := range componentList { if compState.InputSpec != nil && compState.InputSpec.Spec.Service != nil { if comp, ok := existingStateServicePids[compState.ID]; ok && comp != 0 { + name := strings.ReplaceAll(strings.ReplaceAll(compState.BinaryName(), "-", "_"), "/", "_") inputs = append(inputs, map[string]interface{}{ - idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), - "name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), + "name": fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), "type": "system/metrics", useOutputKey: monitoringOutput, "data_stream": map[string]interface{}{ @@ -932,15 +933,15 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, "streams": []interface{}{ map[string]interface{}{ - idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID), + idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name), "data_stream": map[string]interface{}{ "type": "metrics", - "dataset": "elastic_agent.endpoint_security", + "dataset": fmt.Sprintf("elastic_agent.%s", name), "namespace": monitoringNamespace, }, "metricsets": []interface{}{"process"}, "period": metricsCollectionIntervalString, - "index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace), + "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace), "process.pid": comp, "process.cgroups.enabled": false, "processors": []interface{}{ @@ -949,7 +950,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "target": "data_stream", "fields": map[string]interface{}{ "type": "metrics", - "dataset": "elastic_agent.endpoint_security", + "dataset": fmt.Sprintf("elastic_agent.%s", name), "namespace": monitoringNamespace, }, }, @@ -958,7 +959,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "add_fields": map[string]interface{}{ "target": "event", "fields": map[string]interface{}{ - "dataset": "elastic_agent.endpoint_security", + "dataset": fmt.Sprintf("elastic_agent.%s", name), }, }, }, @@ -969,7 +970,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "id": b.agentInfo.AgentID(), "version": b.agentInfo.Version(), "snapshot": b.agentInfo.Snapshot(), - "process": "endpoint_security", + "process": name, }, }, }, @@ -985,7 +986,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "add_fields": map[string]interface{}{ "target": "component", "fields": map[string]interface{}{ - "binary": "endpoint_security", + "binary": name, "id": compState.ID, }, }, From 88d9bba7602990be9e0f37ccc73aeada0721ca08 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 10 Jun 2024 11:43:43 -0700 Subject: [PATCH 24/26] fix tests --- internal/pkg/agent/application/monitoring/v1_monitor.go | 1 - internal/pkg/agent/application/monitoring/v1_monitor_test.go | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index cf7df39e171..ce6bda03fcc 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -456,7 +456,6 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] // only monitor service inputs that define a log path continue } - fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.BinaryName(), "-", "_"), "/", "_") // conform with index naming policy dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName) streams = append(streams, map[string]interface{}{ diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index b3755bb5e3b..b3b9a633aa4 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -63,6 +63,9 @@ func TestMonitoringWithEndpoint(t *testing.T) { ID: "endpoint-default", InputSpec: &component.InputRuntimeSpec{ Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Name: "endpoint-security", + }, Service: &component.ServiceSpec{ CPort: 7688, }, From 3e3aa15ef91cb7528864812e7d85b6f9eca4b799 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 12 Jun 2024 14:05:01 -0700 Subject: [PATCH 25/26] fix merge --- internal/pkg/agent/cmd/inspect.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index 28e4ba2703f..c271b436912 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -189,7 +189,6 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, binaryMapping[component.ID] = component.BinaryName() } } - // TODO: how do we handle endpoint config? monitorCfg, err := monitorFn(cfg, components, binaryMapping, map[string]uint64{}) if err != nil { return fmt.Errorf("failed to get monitoring config: %w", err) @@ -352,7 +351,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri } // Compute the components from the computed configuration. - comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo) + comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } From f1d0e5a5b15989720df8048983c026e35bab10df Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Thu, 13 Jun 2024 08:27:56 -0700 Subject: [PATCH 26/26] add warning and mock output to inspect --- internal/pkg/agent/cmd/inspect.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index c271b436912..848141ecba9 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -181,20 +181,40 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return fmt.Errorf("failed to get binary mappings: %w", err) } + // service units like endpoint are special; they require a PID to monitor. + // however, `inspect` doesn't talk to the coordinator backend, which means it can't know their actual PID from this point in the code + // instead, we look for service units and create a fake PID, so we print the monitoring config anyway. + serviceUnitExists := false + fakeServicePids := map[string]uint64{} + // The monitoring config depends on a map from component id to // binary name. binaryMapping := make(map[string]string) for _, component := range components { if spec := component.InputSpec; spec != nil { binaryMapping[component.ID] = component.BinaryName() + if spec.Spec.Service != nil { + serviceUnitExists = true + fakeServicePids[component.ID] = 1234 + } } } - monitorCfg, err := monitorFn(cfg, components, binaryMapping, map[string]uint64{}) + monitorCfg, err := monitorFn(cfg, components, binaryMapping, fakeServicePids) if err != nil { return fmt.Errorf("failed to get monitoring config: %w", err) } if monitorCfg != nil { + + // see above comment; because we don't know endpoint's actual PID, we need to make a fake one. Warn the user. + if serviceUnitExists { + keys := make([]string, 0, len(fakeServicePids)) + for k := range fakeServicePids { + keys = append(keys, k) + } + fmt.Fprintf(streams.Err, "WARNING: the inspect command can't accurately produce monitoring configs for service units: %v. Use the diagnostics command to get the real config used for monitoring these components\n", keys) + } + rawCfg := config.MustNewConfigFrom(cfg) if err := rawCfg.Merge(monitorCfg); err != nil { @@ -205,7 +225,9 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, if err != nil { return fmt.Errorf("failed to convert monitoring config: %w", err) } + } + } return printMapStringConfig(cfg, streams)