diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 5c3f94e3e6a..a161bf679f2 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "strings" + "time" "unicode" "github.com/elastic/elastic-agent/pkg/component" @@ -517,6 +518,8 @@ func (b *BeatsMonitor) monitoringNamespace() string { } func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error { + metricsCollectionInterval := 60 * time.Second + metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) @@ -532,7 +535,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)}, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -608,7 +611,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, "metricsets": []interface{}{"stats", "state"}, "hosts": endpoints, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -663,7 +666,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "hosts": endpoints, "path": "/stats", "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -725,7 +728,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/inputs/", "namespace": fbDataStreamName, "json.is_array": true, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fbDataStreamName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -799,7 +802,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/shipper", "hosts": endpoints, "namespace": "application", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }, map[string]interface{}{ @@ -813,7 +816,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": endpoints, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }) } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go new file mode 100644 index 00000000000..2bc71cddb1e --- /dev/null +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -0,0 +1,92 @@ +package monitoring + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +func TestMonitoringConfigMetricsInterval(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + mCfg := &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + }, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + b := &BeatsMonitor{ + enabled: true, + config: mCfg, + operatingSystem: runtime.GOOS, + agentInfo: agentInfo, + } + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}) // put a componentID/binary mapping to have something in the beats monitoring input + assert.NoError(t, err) + + rawInputs, ok := got["inputs"] + require.True(t, ok, "monitoring config contains no input") + inputs, ok := rawInputs.([]any) + require.True(t, ok, "monitoring inputs are not a list") + marshaledInputs, err := yaml.Marshal(inputs) + if assert.NoError(t, err, "error marshaling monitoring inputs") { + t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs) + } + + // loop over the created inputs + for _, i := range inputs { + input, ok := i.(map[string]any) + if assert.Truef(t, ok, "input is not represented as a map: %v", i) { + inputID := input["id"] + t.Logf("input %q", inputID) + // check the streams created for the input, should be a list of objects + if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + // loop over streams and cast to map[string]any to access keys + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && + assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + periodString := stream["period"].(string) + duration, err := time.ParseDuration(periodString) + if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { + assert.Equalf(t, duration, 60*time.Second, "unexpected duration for stream %q of input %q", streamID, inputID) + } + } + } + } + } + } + + } +}