From 123ba9ce80c9865f72fa3659b5cafe9b51954f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paolo=20Chil=C3=A0?= Date: Fri, 29 Sep 2023 09:19:59 +0200 Subject: [PATCH] Propagate apm config (#3223) This change propagates the APM configuration set up for agent to the components that are managed. We only support Elastic APM at the moment and at this time it can only be configured in elastic-agent.yml configuration file. For fleet-managed agents we include a workaround that will inject this configuration from the config file; this workaround inject the configuration from the config file in all the config changes received from Fleet, however it does not support hot reloading in this configuration: any changes to the apm configuration will take effect after a restart. * Pass apm config to components * Add config patcher for apm injection in fleet managed agents * Add global labels to apm config --- docs/tracing.md | 48 ++ .../agent/application/apm_config_modifier.go | 151 ++++++ .../application/apm_config_modifier_test.go | 466 ++++++++++++++++++ internal/pkg/agent/application/application.go | 9 +- .../application/coordinator/config_patcher.go | 49 ++ .../coordinator/diagnostics_test.go | 196 ++++++++ internal/pkg/agent/cmd/run.go | 1 + internal/pkg/core/monitoring/config/config.go | 11 +- pkg/component/component.go | 8 + pkg/component/fake/component/README.md | 52 +- pkg/component/fake/component/comp/actions.go | 25 + pkg/component/fake/component/comp/apm.go | 238 +++++++++ .../fake/component/comp/component.go | 27 +- pkg/component/fake/component/main.go | 42 +- pkg/component/runtime/apm_config_mapper.go | 70 +++ .../runtime/apm_config_mapper_test.go | 130 +++++ pkg/component/runtime/command.go | 3 +- pkg/component/runtime/manager_test.go | 329 +++++++++++++ pkg/utils/maps.go | 35 ++ testing/integration/apm_propagation_test.go | 239 +++++++++ testing/integration/fake_test.go | 72 --- testing/integration/fakes.go | 108 ++++ 22 files changed, 2199 insertions(+), 110 deletions(-) create mode 100644 docs/tracing.md create mode 100644 internal/pkg/agent/application/apm_config_modifier.go create mode 100644 internal/pkg/agent/application/apm_config_modifier_test.go create mode 100644 internal/pkg/agent/application/coordinator/config_patcher.go create mode 100644 pkg/component/fake/component/comp/apm.go create mode 100644 pkg/component/runtime/apm_config_mapper.go create mode 100644 pkg/component/runtime/apm_config_mapper_test.go create mode 100644 pkg/utils/maps.go create mode 100644 testing/integration/apm_propagation_test.go create mode 100644 testing/integration/fakes.go diff --git a/docs/tracing.md b/docs/tracing.md new file mode 100644 index 00000000000..c2085f699e1 --- /dev/null +++ b/docs/tracing.md @@ -0,0 +1,48 @@ +# Elastic agent APM configuration + + +## Configuration +The APM elastic agent configuration in `elastic-agent.yml` looks like this (the keys under `apm` have the same meaning +and usage as a regular [APM configuration](https://www.elastic.co/guide/en/apm/agent/go/current/configuration.html)) : + ```yaml + agent.monitoring: + traces: true + apm: + hosts: + - + environment: + secret_token: + api_key: + global_labels: + k1: v1 + k2: v2 + tls: + skip_verify: true + server_certificate: + server_ca: + ``` +APM configuration is only available in `elastic-agent.yml` configuration file (Fleet does not support these settings at the moment): +- for a standalone agent the configuration is reloaded by default from file in case of changes while the agent is running (unless the configuration reload mechanism has been disabled using `agent.reload.enabled` setting) +- for a managed agent, the configuration is read once at startup and then added to every policy change coming from Fleet: in this case changes to APM configuration require a restart of agent to be picked up + +## APM config propagation + +APM propagation to components requires agent APM traces to be enabled (`agent.monitoring.traces` must be set to `true`). +Elastic Agent will propagate the APM parameters defined in its configuration to all the components it manages. +APM configuration is sent to the components via the control protocol, specifically in the [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208). + +At the moment the agent supports only Elastic APM configuration but since want to support OTLP protocol the APM configuration +has a dedicated field for Elastic, and we will put support for other protocols side-by-side (see [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208)) + +The components can consume the configuration by using the [`Unit.Expected()`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/unit.go#L166-L177) +from the [`UnitChanged`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L126-L131) +object published by the elastic-agent-client. The [TriggeredAPMChange](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L63) +trigger flag will be set whenever there is a change in APM configuration. + +Components are expected to take appropriate action to reload/re-instantiate their APM instrumentation. +How that happens in detail depends on what sort of APM objects the component uses, for example: +- if the component uses a decorated http server it may be needed to stop (gracefully) the current server, recreate it with the new configuration and start the new one. +- if it uses a custom Tracer object, it will need to create the new one, close the old one and swap them safely. + +The list above is obviously not an exhaustive one, the handling of APM configuration change will probably be specific +to each component/unit. \ No newline at end of file diff --git a/internal/pkg/agent/application/apm_config_modifier.go b/internal/pkg/agent/application/apm_config_modifier.go new file mode 100644 index 00000000000..99d89b748fd --- /dev/null +++ b/internal/pkg/agent/application/apm_config_modifier.go @@ -0,0 +1,151 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger" + + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/utils" +) + +// InjectAPMConfig is a modifier passed to coordinator in order to set the global APM configuration used for the agent +// into each Component coming from input/output configuration +func InjectAPMConfig(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { + + tracesEnabled, err := getAPMTracesEnabled(cfg) + if err != nil { + return comps, fmt.Errorf("error retrieving APM traces flag: %w", err) + } + + if !tracesEnabled { + // nothing to do + return comps, nil + } + + apmConfig, err := getAPMConfigFromMap(cfg) + if err != nil { + return comps, fmt.Errorf("error retrieving apm config: %w", err) + } + + if apmConfig == nil { + // nothing to do + return comps, nil + } + + for i := range comps { + // We shouldn't really go straight from config datamodel to protobuf datamodel (a core datamodel would be nice to + // abstract from protocol details) + if comps[i].Component == nil { + comps[i].Component = new(proto.Component) + } + comps[i].Component.ApmConfig = runtime.MapAPMConfig(apmConfig) + } + + return comps, nil +} + +func getAPMTracesEnabled(cfg map[string]any) (bool, error) { + + rawTracesEnabled, err := utils.GetNestedMap(cfg, "agent", "monitoring", "traces") + if errors.Is(err, utils.ErrKeyNotFound) { + // We didn't find the key, return false without any error + return false, nil + } + + if err != nil { + return false, fmt.Errorf("error accessing trace flag: %w", err) + } + + traceEnabled, ok := rawTracesEnabled.(bool) + if !ok { + return false, fmt.Errorf("trace flag has unexpected type %T", rawTracesEnabled) + } + + return traceEnabled, nil +} + +func getAPMConfigFromMap(cfg map[string]any) (*monitoringcfg.APMConfig, error) { + nestedValue, err := utils.GetNestedMap(cfg, "agent", "monitoring", "apm") + if errors.Is(err, utils.ErrKeyNotFound) { + // No APM config found, nothing to do + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("error traversing config: %w", err) + } + + rawApmConfig, ok := nestedValue.(map[string]any) + if !ok { + return nil, fmt.Errorf("the retrieved apm configs is not a map: %T", nestedValue) + } + + newConfigFrom, err := config.NewConfigFrom(rawApmConfig) + if err != nil { + return nil, fmt.Errorf("error parsing apm config: %w", err) + } + + monitoringConfig := new(monitoringcfg.APMConfig) + err = newConfigFrom.Unpack(monitoringConfig) + if err != nil { + return nil, fmt.Errorf("error unpacking apm config: %w", err) + } + return monitoringConfig, nil +} + +func noop(change coordinator.ConfigChange) coordinator.ConfigChange { + return change +} + +// PatchAPMConfig is a temporary configuration patcher function (see ConfigPatchManager and ConfigPatch for reference) that +// will patch the configuration coming from Fleet adding the APM parameters from the elastic agent configuration file +// until Fleet supports this config directly +func PatchAPMConfig(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange { + configMap, err := rawConfig.ToMapStr() + if err != nil { + log.Errorf("error decoding raw config, patching disabled: %v", err) + return noop + } + + tracesEnabled, err := getAPMTracesEnabled(configMap) + if err != nil { + log.Errorf("error retrieving trace flag, patching disabled: %v", err) + return noop + } + + apmConfig, err := getAPMConfigFromMap(configMap) + if err != nil { + log.Errorf("error retrieving apm config, patching disabled: %v", err) + return noop + } + + if !tracesEnabled && apmConfig == nil { + // traces disabled and no apm config -> no patching happening + log.Debugf("traces disabled and no apm config: no patching necessary") + return noop + } + monitoringPatch := map[string]any{"traces": tracesEnabled} + if apmConfig != nil { + monitoringPatch["apm"] = apmConfig + } + + return func(change coordinator.ConfigChange) coordinator.ConfigChange { + err := change.Config().Merge(map[string]any{"agent": map[string]any{"monitoring": monitoringPatch}}) + if err != nil { + log.Errorf("error patching apm config into configchange: %v", err) + } + + return change + } +} diff --git a/internal/pkg/agent/application/apm_config_modifier_test.go b/internal/pkg/agent/application/apm_config_modifier_test.go new file mode 100644 index 00000000000..11e71c2b820 --- /dev/null +++ b/internal/pkg/agent/application/apm_config_modifier_test.go @@ -0,0 +1,466 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + gproto "google.golang.org/protobuf/proto" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type injectedConfigAssertion func(*testing.T, []component.Component) + +func allComponentLvlConfigNil(t *testing.T, components []component.Component) { + for _, comp := range components { + assert.Nil(t, comp.Component) + } +} + +func apmConfigEqual(apmConfig *proto.APMConfig) injectedConfigAssertion { + return func(t *testing.T, components []component.Component) { + for _, comp := range components { + if !assert.NotNil(t, comp.Component) { + // component level config is null, move to the next + continue + } + + assert.Truef(t, gproto.Equal(comp.Component.ApmConfig, apmConfig), "apmConfig (%v, %v) not equal", comp.Component.ApmConfig, apmConfig) + } + } +} + +func TestInjectAPMConfig(t *testing.T) { + + type args struct { + comps []component.Component + cfg map[string]interface{} + } + tests := []struct { + name string + args args + want injectedConfigAssertion + wantErr assert.ErrorAssertionFunc + }{ + { + name: "No apm or component level config set", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "No apm config but traces enabled - no config is propagated", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but traces disabled - no config is propagated", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": false, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but no trace flag set - leave components untouched", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but trace flag set to false - leave components untouched", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": false, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config and trace flag set - fill existing component level config and propagate it to components", + args: args{ + comps: []component.Component{ + { + ID: "some component", + Component: &proto.Component{ + Limits: &proto.ComponentLimits{ + GoMaxProcs: 1, + }, + }, + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: apmConfigEqual(&proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "apm-unit-tests", + ApiKey: "apik", + SecretToken: "🤫", + Hosts: []string{ + "https://apmhost1", + "https://apmhost2", + }, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "", + ServerCa: "", + }, + }, + }), + wantErr: assert.NoError, + }, + { + name: "Wrong traces flag type (string) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + "traces": "true", + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Wrong traces flag type (map) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + "traces": map[string]any{"foo": "bar"}, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Malformed config - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": "some string value", + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Malformed apm config (not a map) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + "apm": "some string value", + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := InjectAPMConfig(tt.args.comps, tt.args.cfg) + if !tt.wantErr(t, err, fmt.Sprintf("InjectAPMConfig(%v, %v)", tt.args.comps, tt.args.cfg)) { + return + } + tt.want(t, got) + }) + } +} + +type mockConfigChange struct { + c *config.Config +} + +func (mcc *mockConfigChange) Config() *config.Config { + return mcc.c +} + +func (mcc *mockConfigChange) Ack() error { + return nil +} + +func (mcc *mockConfigChange) Fail(err error) { + // nothing happens +} +func TestPatchAPMConfig(t *testing.T) { + + type args struct { + fleetCfg string + agentFileCfg string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "No apm config or traces flag", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + `, + }, + { + name: "traces flag set but no APM config", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + traces: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + traces: true + `, + }, + { + name: "traces flag and APM config set", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + traces: true + apm: + hosts: + - https://apmhost1:443 + environment: test-apm + secret_token: secret + global_labels: + key1: value1 + key2: value2 + tls: + skip_verify: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + traces: true + apm: + hosts: + - https://apmhost1:443 + environment: test-apm + api_key: "" + secret_token: secret + global_labels: + key1: value1 + key2: value2 + tls: + skip_verify: true + server_ca: "" + server_certificate: "" + `, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fleetConf, err := config.NewConfigFrom(tt.args.fleetCfg) + require.NoError(t, err) + agtConf, err := config.NewConfigFrom(tt.args.agentFileCfg) + require.NoError(t, err) + log, _ := logger.NewTesting(tt.name) + patcher := PatchAPMConfig(log, agtConf) + + mcc := &mockConfigChange{c: fleetConf} + patcher(mcc) + + patchedConf, err := mcc.Config().ToMapStr() + require.NoError(t, err) + patchedConfBytes, err := yaml.Marshal(patchedConf) + require.NoError(t, err) + + assert.YAMLEq(t, tt.want, string(patchedConfBytes)) + }) + } +} diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 445bba8434f..dca8973f79e 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -16,6 +16,7 @@ import ( "go.elastic.co/apm" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" @@ -118,9 +119,10 @@ func New( var configMgr coordinator.ConfigManager var managed *managedConfigManager - var compModifiers []coordinator.ComponentsModifier + var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig} var composableManaged bool var isManaged bool + if testingMode { log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol") @@ -145,12 +147,11 @@ func New( if err != nil { return nil, nil, nil, err } - if configuration.IsFleetServerBootstrap(cfg.Fleet) { log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode") compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) - configMgr = newFleetServerBootstrapManager(log) + configMgr = coordinator.NewConfigPatchManager(newFleetServerBootstrapManager(log), PatchAPMConfig(log, rawConfig)) } else { log.Info("Parsed configuration and determined agent is managed by Fleet") @@ -164,7 +165,7 @@ func New( if err != nil { return nil, nil, nil, err } - configMgr = managed + configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig)) } } diff --git a/internal/pkg/agent/application/coordinator/config_patcher.go b/internal/pkg/agent/application/coordinator/config_patcher.go new file mode 100644 index 00000000000..8f926b9fff1 --- /dev/null +++ b/internal/pkg/agent/application/coordinator/config_patcher.go @@ -0,0 +1,49 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package coordinator + +import ( + "context" +) + +type ConfigPatch func(change ConfigChange) ConfigChange + +// ConfigPatchManager is a decorator to restore some agent settings from the elastic agent configuration file +type ConfigPatchManager struct { + inner ConfigManager + outCh chan ConfigChange + patchFn ConfigPatch +} + +func (c ConfigPatchManager) Run(ctx context.Context) error { + go c.patch(c.inner.Watch(), c.outCh) + return c.inner.Run(ctx) +} + +func (c ConfigPatchManager) Errors() <-chan error { + return c.inner.Errors() +} + +func (c ConfigPatchManager) ActionErrors() <-chan error { + return c.inner.ActionErrors() +} + +func (c ConfigPatchManager) Watch() <-chan ConfigChange { + return c.outCh +} + +func (c ConfigPatchManager) patch(src <-chan ConfigChange, dst chan ConfigChange) { + for ccc := range src { + dst <- c.patchFn(ccc) + } +} + +func NewConfigPatchManager(inner ConfigManager, pf ConfigPatch) *ConfigPatchManager { + return &ConfigPatchManager{ + inner: inner, + outCh: make(chan ConfigChange), + patchFn: pf, + } +} diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 546cdc4430c..c6f2dc4fc97 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -13,9 +13,13 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/diagnostics" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/component" @@ -55,11 +59,58 @@ func TestDiagnosticLocalConfig(t *testing.T) { Protocol: "test-protocol", }, }, + Settings: &configuration.SettingsConfig{ + MonitoringConfig: &monitoringCfg.MonitoringConfig{ + MonitorTraces: true, + APM: monitoringCfg.APMConfig{ + Environment: "diag-unit-test", + APIKey: "apikey", + SecretToken: "secret", + Hosts: []string{"host1", "host2"}, + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: monitoringCfg.APMTLS{ + SkipVerify: false, + ServerCertificate: "/path/to/server/cert", + ServerCA: "/path/to/server/ca", + }, + }, + }, + }, } // The YAML we expect to see from the preceding config expectedCfg := ` agent: + download: null + grpc: null + id: "" + path: "" + process: null + reload: null + upgrade: null + v1_monitoring_enabled: false + monitoring: + enabled: false + http: null + logs: false + metrics: false + namespace: "" + pprof: null + traces: true + apm: + hosts: + - host1 + - host2 + environment: diag-unit-test + apikey: apikey + secrettoken: secret + globallabels: + k1: v1 + k2: v2 + tls: + skipverify: false + servercertificate: "/path/to/server/cert" + serverca: "/path/to/server/ca" fleet: enabled: true access_api_key: "test-key" @@ -237,6 +288,65 @@ components: assert.YAMLEq(t, expected, string(result), "components-expected diagnostic returned unexpected value") } +func TestDiagnosticComponentsExpectedWithAPM(t *testing.T) { + // Create a Coordinator with a test component model and make sure it's + // reported by the components-expected diagnostic + components := []component.Component{ + { + ID: "some-apm-aware-component", + InputType: "filestream", + OutputType: "elasticsearch", + Component: &proto.Component{ + ApmConfig: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "diag-unit-test", + ApiKey: "apikey", + SecretToken: "st", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "k=v", + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercert", + ServerCa: "serverca", + }, + }, + }, + }, + }, + } + + expected := ` +components: + - id: some-apm-aware-component + input_type: filestream + output_type: elasticsearch + units: [] + component: + limits: null + apmconfig: + elastic: + environment: diag-unit-test + apikey: apikey + secrettoken: st + globallabels: "k=v" + hosts: + - host1 + - host2 + tls: + skipverify: true + servercert: servercert + serverca: serverca +` + + coord := &Coordinator{componentModel: components} + + hook, ok := diagnosticHooksMap(coord)["components-expected"] + require.True(t, ok, "diagnostic hooks should have an entry for components-expected") + + result := hook.Hook(context.Background()) + assert.YAMLEq(t, expected, string(result), "components-expected diagnostic returned unexpected value") +} + func TestDiagnosticComponentsActual(t *testing.T) { // Create a Coordinator with observed component data in the state broadcaster // and make sure the components-actual diagnostic reports it @@ -356,6 +466,92 @@ components: assert.YAMLEq(t, expected, string(result), "state diagnostic returned unexpected value") } +func TestDiagnosticStateForAPM(t *testing.T) { + // Create a coordinator with a test state and verify that the state + // diagnostic reports it + + token := "st" + state := State{ + State: agentclient.Starting, + Message: "starting up", + FleetState: agentclient.Configuring, + FleetMessage: "configuring", + LogLevel: 1, + Components: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp-1"}, + State: runtime.ComponentState{ + State: client.UnitStateDegraded, + Message: "degraded message", + VersionInfo: runtime.ComponentVersionInfo{ + Name: "version name", + Version: "version value", + }, + Component: &proto.Component{ + ApmConfig: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "diag-state-ut", + SecretToken: token, + Hosts: []string{"apmhost"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "sc", + ServerCa: "sca", + }, + }, + }, + }, + ComponentIdx: 1, + }, + }, + }, + } + + expected := ` +state: 0 +message: "starting up" +fleet_state: 1 +fleet_message: "configuring" +log_level: "warning" +components: + - id: "comp-1" + state: + state: 3 + message: "degraded message" + features_idx: 0 + units: {} + version_info: + name: "version name" + version: "version value" + component: + apmconfig: + elastic: + apikey: "" + environment: diag-state-ut + hosts: [apmhost] + secrettoken: st + globallabels: "" + tls: + skipverify: true + serverca: sca + servercert: sc + limits: null + component_idx: 1 +` + + coord := &Coordinator{ + // This test needs a broadcaster since the components-actual diagnostic + // fetches the state via State(). + stateBroadcaster: broadcaster.New(state, 0, 0), + } + + hook, ok := diagnosticHooksMap(coord)["state"] + require.True(t, ok, "diagnostic hooks should have an entry for state") + + result := hook.Hook(context.Background()) + assert.YAMLEq(t, expected, string(result), "state diagnostic returned unexpected value") +} + // Fetch the diagnostic hooks and add them to a lookup table for // easier verification func diagnosticHooksMap(coord *Coordinator) map[string]diagnostics.Hook { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 2ab23dae7c2..91d470ac00b 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -26,6 +26,7 @@ import ( monitoringLib "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/elastic-agent-system-metrics/report" + "github.com/elastic/elastic-agent/internal/pkg/agent/application" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index 50027b4830f..1f326e29a8d 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -66,11 +66,12 @@ func DefaultConfig() *MonitoringConfig { // APMConfig configures APM Tracing. type APMConfig struct { - Environment string `config:"environment"` - APIKey string `config:"api_key"` - SecretToken string `config:"secret_token"` - Hosts []string `config:"hosts"` - TLS APMTLS `config:"tls"` + Environment string `config:"environment"` + APIKey string `config:"api_key"` + SecretToken string `config:"secret_token"` + Hosts []string `config:"hosts"` + GlobalLabels map[string]string `config:"global_labels"` + TLS APMTLS `config:"tls"` } // APMTLS contains the configuration options necessary for configuring TLS in diff --git a/pkg/component/component.go b/pkg/component/component.go index e9cdd7ce86d..18060b645fd 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -13,9 +13,11 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/install/pkgmgr" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/eql" "github.com/elastic/elastic-agent/pkg/features" "github.com/elastic/elastic-agent/pkg/limits" @@ -141,6 +143,12 @@ func getStringValue(m map[string]interface{}, key string) (string, error) { return s, nil } +type ElasticAPM config.APMConfig + +type APMConfig struct { + Elastic *ElasticAPM `yaml:"elastic"` +} + // Component is a set of units that needs to run. type Component struct { // ID is the unique ID of the component. diff --git a/pkg/component/fake/component/README.md b/pkg/component/fake/component/README.md index 678fb3dd4f2..79b09db284a 100644 --- a/pkg/component/fake/component/README.md +++ b/pkg/component/fake/component/README.md @@ -1,3 +1,53 @@ # Fake Component -Controllable through GRPC control protocol with actions. Allows unit tests to simulate control and communication with a running sub-process. +Controllable through GRPC control protocol with actions. Allows tests to simulate control and communication with a running sub-process. + + +## How to use the fake component +If we need to use the fake component for a manual test, we need to build it using +`mage build:testbinaries` +and then we have to drop the binary and its corresponding spec file (see example below) in the +`elastic-agent-/data/components` directory. + +### Spec file example + +```yaml +version: 2 +inputs: + - name: fake + description: "Fake component input" + platforms: &platforms + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: &outputs + - elasticsearch + shippers: &shippers + - shipper + command: &command + restart_monitoring_period: 5s + maximum_restarts_per_period: 1 + timeouts: + restart: 1s + args: [] + - name: fake-apm + description: "Fake component apm traces generator" + platforms: *platforms + outputs: *outputs + shippers: *shippers + command: *command +``` + +### Agent configuration example (APM traces sender) + +```yaml +inputs: + ... + - type: fake-apm + id: fake-apm-traces-generator + use_output: default +``` diff --git a/pkg/component/fake/component/comp/actions.go b/pkg/component/fake/component/comp/actions.go index 7ac7332bc20..9b4dc74c0b2 100644 --- a/pkg/component/fake/component/comp/actions.go +++ b/pkg/component/fake/component/comp/actions.go @@ -12,11 +12,13 @@ import ( "time" "github.com/rs/zerolog" + "google.golang.org/protobuf/encoding/protojson" "github.com/elastic/elastic-agent-client/v7/pkg/client" ) const ActionRetrieveFeatures = "retrieve_features" +const ActionRetrieveAPMConfig = "retrieve_apm_config" type retrieveFeaturesAction struct { input *fakeInput @@ -34,6 +36,10 @@ type killAction struct { logger zerolog.Logger } +type retrieveAPMConfigAction struct { + input *fakeInput +} + func (s *stateSetterAction) Name() string { return "set_state" } @@ -129,7 +135,26 @@ func newRunningUnit(logger zerolog.Logger, manager *StateManager, unit *client.U switch expected.Config.Type { case Fake: return newFakeInput(logger, expected.LogLevel, manager, unit, expected.Config) + case APM: + return newFakeAPMInput(logger, expected.LogLevel, unit) } return nil, fmt.Errorf("unknown input unit config type: %s", expected.Config.Type) } + +func (a *retrieveAPMConfigAction) Name() string { + return ActionRetrieveAPMConfig +} + +func (a *retrieveAPMConfigAction) Execute( + _ context.Context, + _ map[string]interface{}) (map[string]interface{}, error) { + + a.input.logger.Info().Msg("executing " + ActionRetrieveAPMConfig + " action") + a.input.logger.Debug().Msgf("stored apm config %v", a.input.apmConfig) + if a.input.apmConfig == nil { + return map[string]interface{}{"apm": nil}, nil + } + marshaledBytes, err := protojson.Marshal(a.input.apmConfig) + return map[string]interface{}{"apm": string(marshaledBytes)}, err +} diff --git a/pkg/component/fake/component/comp/apm.go b/pkg/component/fake/component/comp/apm.go new file mode 100644 index 00000000000..56882088b12 --- /dev/null +++ b/pkg/component/fake/component/comp/apm.go @@ -0,0 +1,238 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package comp + +import ( + "context" + "fmt" + "net/url" + "os" + "time" + + "github.com/rs/zerolog" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "go.elastic.co/apm" + apmtransport "go.elastic.co/apm/transport" + + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +var SenderAlreadyRunningError = errors.New("apm sender is already running") + +type apmTracesSender struct { + cfgUpd chan *proto.APMConfig + updateErrCh chan error + ctx context.Context + ctxCancelF context.CancelFunc +} + +func (ats *apmTracesSender) Start(ctx context.Context, cfg *proto.APMConfig) error { + if ats.ctx != nil && ats.ctx.Err() == nil { + // the context is still running, we cannot start another + return SenderAlreadyRunningError + } + + ats.init(ctx) + + go ats.sendTracesLoop(cfg, time.Second, 100*time.Millisecond) + return nil +} + +func (ats *apmTracesSender) init(outerCtx context.Context) { + ats.ctx, ats.ctxCancelF = context.WithCancel(outerCtx) + ats.cfgUpd = make(chan *proto.APMConfig) + ats.updateErrCh = make(chan error) +} + +func (ats *apmTracesSender) cleanup() { + if ats.ctxCancelF != nil { + ats.ctxCancelF() + ats.ctxCancelF = nil + } + + ats.ctx = nil + close(ats.cfgUpd) + ats.cfgUpd = nil + close(ats.updateErrCh) + ats.updateErrCh = nil +} + +func (ats *apmTracesSender) sendTracesLoop(initialCfg *proto.APMConfig, sendInterval time.Duration, traceDuration time.Duration) { + tracer, err := ats.createNewTracer(initialCfg) + if err != nil { + ats.updateErrCh <- fmt.Errorf("error creating tracer from initial config: %w", err) + } + + ticker := time.NewTicker(sendInterval) + for { + select { + case <-ats.ctx.Done(): + return + case <-ticker.C: + ats.sendTrace(tracer, traceDuration) + case updatedCfg := <-ats.cfgUpd: + newTracer, err := ats.createNewTracer(updatedCfg) + if err != nil { + ats.updateErrCh <- fmt.Errorf("error creating tracer from config: %w", err) + continue + } + if tracer != nil { + tracer.Close() + } + tracer = newTracer + ats.updateErrCh <- nil + } + } +} + +func (ats *apmTracesSender) Update(cfg *proto.APMConfig, timeout time.Duration) error { + select { + case ats.cfgUpd <- cfg: + return nil + case <-time.After(timeout): + return fmt.Errorf("config update timed out") + } +} + +func (ats *apmTracesSender) Stop() error { + ats.cleanup() + return nil +} + +func (ats *apmTracesSender) createNewTracer(cfg *proto.APMConfig) (*apm.Tracer, error) { + if cfg == nil { + return nil, nil + } + + const ( + envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" + envServerCert = "ELASTIC_APM_SERVER_CERT" + envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" + envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" + ) + if cfg.Elastic.Tls.SkipVerify { + os.Setenv(envVerifyServerCert, "false") + defer os.Unsetenv(envVerifyServerCert) + } + if cfg.Elastic.Tls.ServerCert != "" { + os.Setenv(envServerCert, cfg.Elastic.Tls.ServerCert) + defer os.Unsetenv(envServerCert) + } + if cfg.Elastic.Tls.ServerCa != "" { + os.Setenv(envCACert, cfg.Elastic.Tls.ServerCa) + defer os.Unsetenv(envCACert) + } + + ts, err := apmtransport.NewHTTPTransport() + if err != nil { + return nil, err + } + + if len(cfg.Elastic.Hosts) > 0 { + hosts := make([]*url.URL, 0, len(cfg.Elastic.Hosts)) + for _, host := range cfg.Elastic.Hosts { + u, err := url.Parse(host) + if err != nil { + return nil, fmt.Errorf("failed parsing %s: %w", host, err) + } + hosts = append(hosts, u) + } + ts.SetServerURL(hosts...) + } + if cfg.Elastic.ApiKey != "" { + ts.SetAPIKey(cfg.Elastic.ApiKey) + } else if cfg.Elastic.SecretToken != "" { + ts.SetSecretToken(cfg.Elastic.SecretToken) + } + return apm.NewTracerOptions(apm.TracerOptions{ + ServiceName: "fake-apm", + ServiceVersion: "0.1", + ServiceEnvironment: cfg.Elastic.Environment, + Transport: ts, + }) +} + +func (ats *apmTracesSender) sendTrace(tracer *apm.Tracer, duration time.Duration) { + if tracer == nil { + // no tracer configured, skip + return + } + tx := tracer.StartTransaction("faketransaction", "request") + defer tx.End() + span := tx.StartSpan("spanName", "spanType", nil) + defer span.End() + time.Sleep(duration) +} + +type fakeAPMInput struct { + logger zerolog.Logger + unit *client.Unit + sender *apmTracesSender +} + +func (fai *fakeAPMInput) Unit() *client.Unit { + return fai.unit +} +func (fai *fakeAPMInput) Update(u *client.Unit, triggers client.Trigger) error { + if u.Expected().State == client.UnitStateStopped { + fai.logger.Info().Msg("fakeAPMInput stopping APM sender") + // stop apm trace sender + return fai.sender.Stop() + } + + if triggers&client.TriggeredAPMChange != client.TriggeredAPMChange { + // no apm change, nothing to do + return nil + } + + fai.logger.Info().Msgf("Updating sender config with %+v", u.Expected().APMConfig) + + return fai.sender.Update(u.Expected().APMConfig, time.Second) +} + +func newFakeAPMInput(logger zerolog.Logger, logLevel client.UnitLogLevel, unit *client.Unit) (*fakeAPMInput, error) { + logger = logger.Level(toZerologLevel(logLevel)) + apmInput := &fakeAPMInput{ + logger: logger, + unit: unit, + sender: new(apmTracesSender), + } + err := unit.UpdateState(client.UnitStateStarting, "Starting fake APM traces sender", nil) + if err != nil { + return apmInput, fmt.Errorf("error while setting starting state: %w", err) + } + err = apmInput.sender.Start(context.Background(), unit.Expected().APMConfig) + if err != nil { + return apmInput, fmt.Errorf("error starting apm tracer sender: %w", err) + } + go senderErrorLogger(apmInput.sender.ctx, logger, apmInput.sender.updateErrCh, unit) + err = unit.UpdateState(client.UnitStateHealthy, "Fake APM traces sender has started", nil) + if err != nil { + return apmInput, fmt.Errorf("error while setting healthy state: %w", err) + } + return apmInput, err +} + +func senderErrorLogger(ctx context.Context, logger zerolog.Logger, errCh <-chan error, unit *client.Unit) { + for { + select { + case <-ctx.Done(): + logger.Info().Msg("context expired: senderErrorLogger exiting") + return + case err, ok := <-errCh: + if !ok { + logger.Info().Msg("error channel closed: senderErrorLogger exiting") + } + if err != nil { + logger.Err(err).Msg("sender error") + _ = unit.UpdateState(client.UnitStateDegraded, fmt.Sprintf("sender error: %s", err), nil) + } + _ = unit.UpdateState(client.UnitStateHealthy, fmt.Sprintf("sender error: %s", err), nil) + } + } +} diff --git a/pkg/component/fake/component/comp/component.go b/pkg/component/fake/component/comp/component.go index e845ba6c09f..837689f912f 100644 --- a/pkg/component/fake/component/comp/component.go +++ b/pkg/component/fake/component/comp/component.go @@ -29,6 +29,7 @@ import ( const ( Fake = "fake" fakeShipper = "fake-shipper" + APM = "fake-apm" configuringMsg = "Configuring" stoppingMsg = "Stopping" @@ -288,8 +289,8 @@ type fakeInput struct { state client.UnitState stateMsg string - features *proto.Features - + features *proto.Features + apmConfig *proto.APMConfig canceller context.CancelFunc killerCanceller context.CancelFunc } @@ -302,12 +303,13 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager * } i := &fakeInput{ - logger: logger, - manager: manager, - unit: unit, - cfg: cfg, - state: state, - stateMsg: msg, + logger: logger, + manager: manager, + unit: unit, + cfg: cfg, + state: state, + stateMsg: msg, + apmConfig: unit.Expected().APMConfig, } logger.Trace().Msg("registering set_state action for unit") @@ -318,6 +320,8 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager * unit.RegisterAction(&killAction{i.logger}) logger.Trace().Msg("registering " + ActionRetrieveFeatures + " action for unit") unit.RegisterAction(&retrieveFeaturesAction{i}) + logger.Trace().Msg("registering " + ActionRetrieveAPMConfig + " action for unit") + unit.RegisterAction(&retrieveAPMConfigAction{i}) logger.Debug(). Str("state", i.state.String()). @@ -411,6 +415,13 @@ func (f *fakeInput) Update(u *client.Unit, triggers client.Trigger) error { } } + if triggers&client.TriggeredAPMChange == client.TriggeredAPMChange { + f.logger.Info(). + Interface("apmConfig", expected.APMConfig). + Msg("updating apm configuration") + f.apmConfig = expected.APMConfig + } + return nil } diff --git a/pkg/component/fake/component/main.go b/pkg/component/fake/component/main.go index 36dfdacba6a..508357fa75b 100644 --- a/pkg/component/fake/component/main.go +++ b/pkg/component/fake/component/main.go @@ -68,25 +68,7 @@ func run() error { case <-ctx.Done(): return nil case change := <-c.UnitChanges(): - if change.Unit != nil { - u := change.Unit - state, _, _ := u.State() - logger.Info(). - Str("state", state.String()). - Str("expectedState", u.Expected().State.String()). - Msg("unit change received") - } else { - logger.Info().Msg("unit change received, but no unit on it") - } - - switch change.Type { - case client.UnitChangedAdded: - s.Added(change.Unit) - case client.UnitChangedModified: - s.Modified(change) - case client.UnitChangedRemoved: - s.Removed(change.Unit) - } + handleChange(logger, s, change) case err := <-c.Errors(): if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { fmt.Fprintf(os.Stderr, "GRPC client error: %+v\n", err) @@ -94,3 +76,25 @@ func run() error { } } } + +func handleChange(logger zerolog.Logger, s *comp.StateManager, change client.UnitChanged) { + if change.Unit != nil { + u := change.Unit + state, _, _ := u.State() + logger.Info(). + Str("state", state.String()). + Str("expectedState", u.Expected().State.String()). + Msg("unit change received") + } else { + logger.Info().Msg("unit change received, but no unit on it") + } + + switch change.Type { + case client.UnitChangedAdded: + s.Added(change.Unit) + case client.UnitChangedModified: + s.Modified(change) + case client.UnitChangedRemoved: + s.Removed(change.Unit) + } +} diff --git a/pkg/component/runtime/apm_config_mapper.go b/pkg/component/runtime/apm_config_mapper.go new file mode 100644 index 00000000000..921016ad30f --- /dev/null +++ b/pkg/component/runtime/apm_config_mapper.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "sort" + "strings" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +var zeroElasticAPMTLS = config.APMTLS{} + +func MapAPMConfig(conf *config.APMConfig) *proto.APMConfig { + if conf == nil { + // component apm config is nil, so the protobuf config is nil as well + return nil + } + + elasticAPMConf := &proto.ElasticAPM{ + Environment: conf.Environment, + ApiKey: conf.APIKey, + SecretToken: conf.SecretToken, + Hosts: conf.Hosts, + GlobalLabels: buildGlobalLabelsString(conf.GlobalLabels), + } + + if conf.TLS != zeroElasticAPMTLS { + // we have some TLS config to propagate too + elasticAPMConf.Tls = &proto.ElasticAPMTLS{ + SkipVerify: conf.TLS.SkipVerify, + ServerCert: conf.TLS.ServerCertificate, + ServerCa: conf.TLS.ServerCA, + } + } + + return &proto.APMConfig{Elastic: elasticAPMConf} +} + +func buildGlobalLabelsString(labels map[string]string) string { + const separator = "," + + if len(labels) == 0 { + return "" + } + + //prepare sorted keys to make output deterministic + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + + sort.Strings(keys) + + // create the key=value string + buf := new(strings.Builder) + for _, k := range keys { + if buf.Len() > 0 { + buf.WriteString(separator) + } + buf.WriteString(k) + buf.WriteString("=") + buf.WriteString(labels[k]) + } + return buf.String() +} diff --git a/pkg/component/runtime/apm_config_mapper_test.go b/pkg/component/runtime/apm_config_mapper_test.go new file mode 100644 index 00000000000..cd26ff714d8 --- /dev/null +++ b/pkg/component/runtime/apm_config_mapper_test.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +func TestMapAPMConfig(t *testing.T) { + type args struct { + conf *config.APMConfig + } + tests := []struct { + name string + args args + want *proto.APMConfig + }{ + { + name: "nil config", + args: args{ + conf: nil, + }, + want: nil, + }, + { + name: "full config", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "k1=v1,k2=v2", + }, + }, + }, + { + name: "config without global labels", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: nil, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "", + }, + }, + }, + { + name: "config without hosts", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: nil, + GlobalLabels: "k1=v1,k2=v2", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, MapAPMConfig(tt.args.conf), "MapAPMConfig(%v)", tt.args.conf) + }) + } +} diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 4facc93df3d..df02656b3f2 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -203,7 +203,8 @@ func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error { sendExpected = true } if sendExpected { - comm.CheckinExpected(c.state.toCheckinExpected(), checkin) + checkinExpected := c.state.toCheckinExpected() + comm.CheckinExpected(checkinExpected, checkin) } if changed { c.sendObserved() diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index e489ad71056..d00df10db64 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -18,6 +18,9 @@ import ( "testing" "time" + "google.golang.org/protobuf/encoding/protojson" + gproto "google.golang.org/protobuf/proto" + fakecmp "github.com/elastic/elastic-agent/pkg/component/fake/component/comp" "github.com/gofrs/uuid" @@ -28,6 +31,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" @@ -482,6 +486,331 @@ func TestManager_FakeInput_Features(t *testing.T) { } } +func TestManager_FakeInput_APM(t *testing.T) { + testPaths(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + agentInfo, _ := info.NewAgentInfo(ctx, true) + m, err := NewManager( + newDebugLogger(t), + newDebugLogger(t), + "localhost:0", + agentInfo, + apmtest.DiscardTracer, + newTestMonitoringMgr(), + configuration.DefaultGRPCConfig()) + require.NoError(t, err) + + managerErrCh := make(chan error) + go func() { + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + managerErrCh <- err + }() + + waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) + defer waitCancel() + if err := waitForReady(waitCtx, m); err != nil { + require.NoError(t, err) + } + + binaryPath := testBinary(t, "component") + const compID = "fake-default" + comp := component.Component{ + ID: compID, + InputSpec: &component.InputRuntimeSpec{ + InputType: "fake", + BinaryName: "", + BinaryPath: binaryPath, + Spec: fakeInputSpec, + }, + Units: []component.Unit{ + { + ID: "fake-input", + Type: client.UnitTypeInput, + LogLevel: client.UnitLogLevelTrace, + Config: component.MustExpectedConfig(map[string]interface{}{ + "type": "fake", + "state": int(client.UnitStateHealthy), + "message": "Fake Healthy", + }), + }, + }, + } + + subscriptionCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + subscriptionErrCh := make(chan error) + doneCh := make(chan struct{}) + + initialAPMConfig := &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "test", + ApiKey: "apiKey", + SecretToken: "secretToken", + Hosts: []string{"host1", "host2", "host3"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercert", + ServerCa: "serverca", + }, + }, + } + + modifiedAPMConfig := &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "test-modified", + ApiKey: "apiKey", + SecretToken: "secretToken", + Hosts: []string{"newhost1", "host2", "differenthost3"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "", + ServerCa: "", + }, + }, + } + + go func() { + sub := m.Subscribe(subscriptionCtx, compID) + var healthIteration int + var retrievedApmConfig *proto.APMConfig + for { + select { + case <-subscriptionCtx.Done(): + return + case componentState := <-sub.Ch(): + t.Logf("component state changed: %+v", componentState) + + if componentState.State == client.UnitStateFailed { + subscriptionErrCh <- fmt.Errorf("component failed: %s", componentState.Message) + return + } + + unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] + if !ok { + subscriptionErrCh <- errors.New("unit missing: fake-input") + return + } + + switch unit.State { + case client.UnitStateFailed: + subscriptionErrCh <- fmt.Errorf("unit failed: %s", unit.Message) + + case client.UnitStateHealthy: + healthIteration++ + t.Logf("Healthy iteration %d starting at %s", healthIteration, time.Now()) + switch healthIteration { + case 1: // yes, it's starting on 1 + comp.Component = &proto.Component{ + ApmConfig: initialAPMConfig, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + + // check if config sent on iteration 1 was set + case 2: + // In the previous iteration, the (fake) component has received a CheckinExpected + // message to propagate the APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. Within the component, which is running as a separate process, actions + // and CheckinExpected messages are processed concurrently. We need some way to wait + // a reasonably short amount of time for the CheckinExpected message to be applied by the + // component (thus setting the APM config) before we query the same component + // for apm config information. We accomplish this via assert.Eventually. + // We also send a modified APM config to see that the component updates correctly and + // reports the new config in the next iteration. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + return gproto.Equal(initialAPMConfig, retrievedApmConfig) + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", initialAPMConfig, retrievedApmConfig) + + comp.Component = &proto.Component{ + ApmConfig: modifiedAPMConfig, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + // Set a new APM config to check that we update correctly + case 3: + // In the previous iteration, the (fake) component has received another CheckinExpected + // message to propagate a modified APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + return gproto.Equal(modifiedAPMConfig, retrievedApmConfig) + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", modifiedAPMConfig, retrievedApmConfig) + + comp.Component = &proto.Component{ + ApmConfig: nil, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + + case 4: + // In the previous iteration, the (fake) component has received another CheckinExpected + // message to propagate a nil APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + return retrievedApmConfig == nil + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: nil actual: %s", retrievedApmConfig) + + doneCh <- struct{}{} + } + + case client.UnitStateStarting: + // acceptable + + case client.UnitStateConfiguring: + // set unit back to healthy, so other cases will run. + comp.Units[0].Config = component.MustExpectedConfig(map[string]interface{}{ + "type": "fake", + "state": int(client.UnitStateHealthy), + "message": "Fake Healthy", + }) + + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + t.Logf("error updating component state to health: %v", err) + + subscriptionErrCh <- fmt.Errorf("failed to update component: %w", err) + } + + default: + // unexpected state that should not have occurred + subscriptionErrCh <- fmt.Errorf("unit reported unexpected state: %v", + unit.State) + } + + } + } + }() + + defer drainErrChan(managerErrCh) + defer drainErrChan(subscriptionErrCh) + + err = m.Update(component.Model{Components: []component.Component{comp}}) + require.NoError(t, err) + + timeout := 30 * time.Second + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + + // Wait for a success, an error or time out + for { + select { + case <-timeoutTimer.C: + t.Fatalf("timed out after %s", timeout) + case err := <-managerErrCh: + require.NoError(t, err) + case err := <-subscriptionErrCh: + require.NoError(t, err) + case <-doneCh: + subCancel() + cancel() + + err = <-managerErrCh + require.NoError(t, err) + return + } + } +} + +func extractAPMConfigFromActionResult(t *testing.T, res map[string]interface{}) (*proto.APMConfig, error) { + apmCfg, ok := res["apm"] + if !ok { + return nil, fmt.Errorf("ActionResult for %s does not contain top level key %s", fakecmp.ActionRetrieveAPMConfig, "apm") + } + if apmCfg == nil { + // the APM config is not set on the component + return nil, nil + } + + jsonApmConfig, ok := apmCfg.(string) + if !ok { + return nil, fmt.Errorf("ActionResult for %s does not contain a string value: %T", fakecmp.ActionRetrieveAPMConfig, apmCfg) + } + + retrievedApmConfig := new(proto.APMConfig) + err := protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig) + if err != nil { + return nil, fmt.Errorf("error unmarshaling apmconfig %s: %w", jsonApmConfig, err) + } + return retrievedApmConfig, nil +} + func TestManager_FakeInput_Limits(t *testing.T) { testPaths(t) diff --git a/pkg/utils/maps.go b/pkg/utils/maps.go new file mode 100644 index 00000000000..eafcefcb0ab --- /dev/null +++ b/pkg/utils/maps.go @@ -0,0 +1,35 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package utils + +import "errors" + +var ErrNoKeys = errors.New("no key provided") +var ErrKeyNotFound = errors.New("key not found") +var ErrValueNotMap = errors.New("value is not a map") + +// GetNestedMap is a utility function to traverse nested maps using a series of key +func GetNestedMap[K comparable](src map[K]any, keys ...K) (any, error) { + if len(keys) == 0 { + return nil, ErrNoKeys + } + if _, ok := src[keys[0]]; !ok { + // no key found + return nil, ErrKeyNotFound + } + + if len(keys) == 1 { + // we reached the final key, return the value + return src[keys[0]], nil + } + + // we have more keys to go through + valueMap, ok := src[keys[0]].(map[K]any) + if !ok { + return nil, ErrValueNotMap + } + + return GetNestedMap[K](valueMap, keys[1:]...) +} diff --git a/testing/integration/apm_propagation_test.go b/testing/integration/apm_propagation_test.go new file mode 100644 index 00000000000..62aa87b0d83 --- /dev/null +++ b/testing/integration/apm_propagation_test.go @@ -0,0 +1,239 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" + "github.com/elastic/go-elasticsearch/v8" + + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" +) + +const agentConfigTemplateString = ` +outputs: + default: + type: fake-action-output + shipper.enabled: true +inputs: + - id: fake-apm + type: fake-apm +agent.monitoring: + traces: true + apm: + hosts: + - {{ .host }} + environment: {{ .environment }} + secret_token: {{ .secret_token }} + global_labels: + test_name: TestAPMConfig + test_type: Agent integration test + tls: + skip_verify: true +` + +func TestAPMConfig(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + }) + f, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + deadline := time.Now().Add(10 * time.Minute) + ctx, cancel := testcontext.WithDeadline(t, context.Background(), deadline) + defer cancel() + + err = f.Prepare(ctx, fakeComponent, fakeShipper) + require.NoError(t, err) + + name := "fake-apm" + environment := info.Namespace + + agentConfig := generateAgentConfigForAPM(t, agentConfigTemplateString, info, environment) + t.Logf("Rendered agent config:\n%s", agentConfig) + + testAPMTraces := func() error { + state, err := f.Client().State(ctx) + require.NoError(t, err) + + t.Logf("agent state: %+v", state) + + // test that APM traces are being sent using initial configuration + require.Eventually(t, func() bool { + count, errCount := countAPMTraces(t, info.ESClient, name, environment) + if errCount != nil { + t.Logf("Error retrieving APM traces count for service %q and environment %q: %s", name, environment, errCount) + return false + } + return count > 0 + }, 1*time.Minute, time.Second) + + //change the configuration with a new environment and check that the update has been processed + environment = environment + "-changed" + modifiedAgentConfig := generateAgentConfigForAPM(t, agentConfigTemplateString, info, environment) + t.Logf("Rendered agent modified config:\n%s", modifiedAgentConfig) + err = f.Client().Configure(ctx, modifiedAgentConfig) + require.NoError(t, err, "error updating agent config with a new APM environment") + + // check that we receive traces with the new environment string + require.Eventually(t, func() bool { + count, errCount := countAPMTraces(t, info.ESClient, name, environment) + if errCount != nil { + t.Logf("Error retrieving APM traces count for service %q and environment %q: %s", name, environment, errCount) + return false + } + return count > 0 + }, 1*time.Minute, time.Second) + + return nil + } + + err = f.Run(ctx, atesting.State{ + Configure: agentConfig, + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "fake-apm-default": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-apm-default"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-apm-default-fake-apm"}: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + }, + After: testAPMTraces, + }) + + require.NoError(t, err) + +} + +func countAPMTraces(t *testing.T, esClient *elasticsearch.Client, serviceName, environment string) (int, error) { + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": []map[string]interface{}{ + { + "term": map[string]interface{}{ + "service.name": map[string]interface{}{ + "value": serviceName, + }, + }, + }, + { + "term": map[string]interface{}{ + "service.environment": map[string]interface{}{ + "value": environment, + }, + }, + }, + }, + }, + }, + } + + buf := new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(queryRaw) + if err != nil { + return 0, fmt.Errorf("error encoding query: %w", err) + } + + count := esClient.Count + + response, err := count( + count.WithIndex("traces-apm-default"), + count.WithBody(buf), + ) + if err != nil { + return 0, fmt.Errorf("error executing query: %w", err) + } + + defer response.Body.Close() + + var body struct { + Count int + } + + //decoder := json.NewDecoder(response.Body) + //err = decoder.Decode(&body) + bodyBytes, _ := io.ReadAll(response.Body) + + t.Logf("received ES response: %s", bodyBytes) + err = json.Unmarshal(bodyBytes, &body) + + return body.Count, err +} + +// types to correctly parse the APM config we get from kibana API +type apmConfigResponse struct { + CloudStandaloneSetup CloudStandaloneSetup `json:"cloudStandaloneSetup,omitempty"` + IsFleetEnabled bool `json:"isFleetEnabled,omitempty"` + FleetAgents []FleetAgents `json:"fleetAgents,omitempty"` +} +type CloudStandaloneSetup struct { + ApmServerURL string `json:"apmServerUrl,omitempty"` + SecretToken string `json:"secretToken,omitempty"` +} +type FleetAgents struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + ApmServerURL string `json:"apmServerUrl,omitempty"` + SecretToken string `json:"secretToken,omitempty"` +} + +func generateAgentConfigForAPM(t *testing.T, configTemplate string, info *define.Info, environment string) string { + t.Helper() + apmConfigData := getAPMConfigFromKibana(t, info.KibanaClient) + + configT, err := template.New("test config").Parse(configTemplate) + require.NoErrorf(t, err, "Error parsing agent config template\n%s", configTemplate) + + buf := new(strings.Builder) + templateData := map[string]any{ + "environment": environment, + "secret_token": apmConfigData.SecretToken, + "host": apmConfigData.ApmServerURL, + } + err = configT.Execute(buf, templateData) + require.NoErrorf(t, err, "Error rendering template\n%s\nwith data %v", configTemplate, templateData) + return buf.String() +} + +func getAPMConfigFromKibana(t *testing.T, kc *kibana.Client) CloudStandaloneSetup { + t.Helper() + response, err := kc.Send(http.MethodGet, "/internal/apm/fleet/agents", nil, nil, nil) + require.NoError(t, err, "Error getting APM connection params from kibana") + defer response.Body.Close() + + responseBytes, err := io.ReadAll(response.Body) + require.NoError(t, err, "Error reading data from http response") + apmConfig := new(apmConfigResponse) + err = json.Unmarshal(responseBytes, apmConfig) + require.NoError(t, err, "Error unmarshalling apm config") + require.NotEmpty(t, apmConfig.CloudStandaloneSetup.ApmServerURL, "APM config URL is empty") + require.NotEmpty(t, apmConfig.CloudStandaloneSetup.SecretToken, "APM config token is empty") + + return apmConfig.CloudStandaloneSetup +} diff --git a/testing/integration/fake_test.go b/testing/integration/fake_test.go index 91c6b2353da..cb685e4cb4a 100644 --- a/testing/integration/fake_test.go +++ b/testing/integration/fake_test.go @@ -8,73 +8,16 @@ package integration import ( "context" - "path/filepath" - "runtime" "testing" "time" "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control/v2/client" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" ) -var fakeComponent = atesting.UsableComponent{ - Name: "fake", - BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "component", osExt("component"))), - Spec: &component.Spec{ - Version: 2, - Inputs: []component.InputSpec{ - { - Name: "fake", - Description: "A fake input", - Platforms: []string{ - "container/amd64", - "container/arm64", - "darwin/amd64", - "darwin/arm64", - "linux/amd64", - "linux/arm64", - "windows/amd64", - }, - Shippers: []string{ - "fake-shipper", - }, - Command: &component.CommandSpec{}, - }, - }, - }, -} - -var fakeShipper = atesting.UsableComponent{ - Name: "fake-shipper", - BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "shipper", osExt("shipper"))), - Spec: &component.Spec{ - Version: 2, - Shippers: []component.ShipperSpec{ - { - Name: "fake-shipper", - Description: "A fake shipper", - Platforms: []string{ - "container/amd64", - "container/arm64", - "darwin/amd64", - "darwin/arm64", - "linux/amd64", - "linux/arm64", - "windows/amd64", - }, - Outputs: []string{ - "fake-action-output", - }, - Command: &component.CommandSpec{}, - }, - }, - }, -} - var simpleConfig1 = ` outputs: default: @@ -161,18 +104,3 @@ func TestFakeComponent(t *testing.T) { }) require.NoError(t, err) } - -func mustAbs(path string) string { - abs, err := filepath.Abs(path) - if err != nil { - panic(err) - } - return abs -} - -func osExt(name string) string { - if runtime.GOOS == "windows" { - return name + ".exe" - } - return name -} diff --git a/testing/integration/fakes.go b/testing/integration/fakes.go new file mode 100644 index 00000000000..2364b06580d --- /dev/null +++ b/testing/integration/fakes.go @@ -0,0 +1,108 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "path/filepath" + "runtime" + + "github.com/elastic/elastic-agent/pkg/component" + atesting "github.com/elastic/elastic-agent/pkg/testing" +) + +const fakeShipperName = "fake-shipper" + +var fakeComponentPltfs = []string{ + "container/amd64", + "container/arm64", + "darwin/amd64", + "darwin/arm64", + "linux/amd64", + "linux/arm64", + "windows/amd64", +} + +var fakeComponent = atesting.UsableComponent{ + Name: "fake", + BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "component", osExt("component"))), + Spec: &component.Spec{ + Version: 2, + Inputs: []component.InputSpec{ + { + Name: "fake", + Description: "A fake input", + Platforms: fakeComponentPltfs, + Shippers: []string{ + fakeShipperName, + }, + Command: &component.CommandSpec{}, + }, + { + Name: "fake-apm", + Description: "Fake component apm traces generator", + Platforms: fakeComponentPltfs, + Shippers: []string{ + fakeShipperName, + }, + Command: &component.CommandSpec{ + Env: []component.CommandEnvSpec{ + { + Name: "ELASTIC_APM_LOG_FILE", + Value: "stderr", + }, + { + Name: "ELASTIC_APM_LOG_LEVEL", + Value: "debug", + }, + }, + }, + }, + }, + }, +} + +var fakeShipper = atesting.UsableComponent{ + Name: fakeShipperName, + BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "shipper", osExt("shipper"))), + Spec: &component.Spec{ + Version: 2, + Shippers: []component.ShipperSpec{ + { + Name: fakeShipperName, + Description: "A fake shipper", + Platforms: []string{ + "container/amd64", + "container/arm64", + "darwin/amd64", + "darwin/arm64", + "linux/amd64", + "linux/arm64", + "windows/amd64", + }, + Outputs: []string{ + "fake-action-output", + }, + Command: &component.CommandSpec{}, + }, + }, + }, +} + +func mustAbs(path string) string { + abs, err := filepath.Abs(path) + if err != nil { + panic(err) + } + return abs +} + +func osExt(name string) string { + if runtime.GOOS == "windows" { + return name + ".exe" + } + return name +}