diff --git a/docs/tracing.md b/docs/tracing.md new file mode 100644 index 00000000000..c2085f699e1 --- /dev/null +++ b/docs/tracing.md @@ -0,0 +1,48 @@ +# Elastic agent APM configuration + + +## Configuration +The APM elastic agent configuration in `elastic-agent.yml` looks like this (the keys under `apm` have the same meaning +and usage as a regular [APM configuration](https://www.elastic.co/guide/en/apm/agent/go/current/configuration.html)) : + ```yaml + agent.monitoring: + traces: true + apm: + hosts: + - + environment: + secret_token: + api_key: + global_labels: + k1: v1 + k2: v2 + tls: + skip_verify: true + server_certificate: + server_ca: + ``` +APM configuration is only available in `elastic-agent.yml` configuration file (Fleet does not support these settings at the moment): +- for a standalone agent the configuration is reloaded by default from file in case of changes while the agent is running (unless the configuration reload mechanism has been disabled using `agent.reload.enabled` setting) +- for a managed agent, the configuration is read once at startup and then added to every policy change coming from Fleet: in this case changes to APM configuration require a restart of agent to be picked up + +## APM config propagation + +APM propagation to components requires agent APM traces to be enabled (`agent.monitoring.traces` must be set to `true`). +Elastic Agent will propagate the APM parameters defined in its configuration to all the components it manages. +APM configuration is sent to the components via the control protocol, specifically in the [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208). + +At the moment the agent supports only Elastic APM configuration but since want to support OTLP protocol the APM configuration +has a dedicated field for Elastic, and we will put support for other protocols side-by-side (see [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208)) + +The components can consume the configuration by using the [`Unit.Expected()`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/unit.go#L166-L177) +from the [`UnitChanged`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L126-L131) +object published by the elastic-agent-client. The [TriggeredAPMChange](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L63) +trigger flag will be set whenever there is a change in APM configuration. + +Components are expected to take appropriate action to reload/re-instantiate their APM instrumentation. +How that happens in detail depends on what sort of APM objects the component uses, for example: +- if the component uses a decorated http server it may be needed to stop (gracefully) the current server, recreate it with the new configuration and start the new one. +- if it uses a custom Tracer object, it will need to create the new one, close the old one and swap them safely. + +The list above is obviously not an exhaustive one, the handling of APM configuration change will probably be specific +to each component/unit. \ No newline at end of file diff --git a/internal/pkg/agent/application/apm_config_modifier.go b/internal/pkg/agent/application/apm_config_modifier.go new file mode 100644 index 00000000000..99d89b748fd --- /dev/null +++ b/internal/pkg/agent/application/apm_config_modifier.go @@ -0,0 +1,151 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" + "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger" + + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/utils" +) + +// InjectAPMConfig is a modifier passed to coordinator in order to set the global APM configuration used for the agent +// into each Component coming from input/output configuration +func InjectAPMConfig(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { + + tracesEnabled, err := getAPMTracesEnabled(cfg) + if err != nil { + return comps, fmt.Errorf("error retrieving APM traces flag: %w", err) + } + + if !tracesEnabled { + // nothing to do + return comps, nil + } + + apmConfig, err := getAPMConfigFromMap(cfg) + if err != nil { + return comps, fmt.Errorf("error retrieving apm config: %w", err) + } + + if apmConfig == nil { + // nothing to do + return comps, nil + } + + for i := range comps { + // We shouldn't really go straight from config datamodel to protobuf datamodel (a core datamodel would be nice to + // abstract from protocol details) + if comps[i].Component == nil { + comps[i].Component = new(proto.Component) + } + comps[i].Component.ApmConfig = runtime.MapAPMConfig(apmConfig) + } + + return comps, nil +} + +func getAPMTracesEnabled(cfg map[string]any) (bool, error) { + + rawTracesEnabled, err := utils.GetNestedMap(cfg, "agent", "monitoring", "traces") + if errors.Is(err, utils.ErrKeyNotFound) { + // We didn't find the key, return false without any error + return false, nil + } + + if err != nil { + return false, fmt.Errorf("error accessing trace flag: %w", err) + } + + traceEnabled, ok := rawTracesEnabled.(bool) + if !ok { + return false, fmt.Errorf("trace flag has unexpected type %T", rawTracesEnabled) + } + + return traceEnabled, nil +} + +func getAPMConfigFromMap(cfg map[string]any) (*monitoringcfg.APMConfig, error) { + nestedValue, err := utils.GetNestedMap(cfg, "agent", "monitoring", "apm") + if errors.Is(err, utils.ErrKeyNotFound) { + // No APM config found, nothing to do + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("error traversing config: %w", err) + } + + rawApmConfig, ok := nestedValue.(map[string]any) + if !ok { + return nil, fmt.Errorf("the retrieved apm configs is not a map: %T", nestedValue) + } + + newConfigFrom, err := config.NewConfigFrom(rawApmConfig) + if err != nil { + return nil, fmt.Errorf("error parsing apm config: %w", err) + } + + monitoringConfig := new(monitoringcfg.APMConfig) + err = newConfigFrom.Unpack(monitoringConfig) + if err != nil { + return nil, fmt.Errorf("error unpacking apm config: %w", err) + } + return monitoringConfig, nil +} + +func noop(change coordinator.ConfigChange) coordinator.ConfigChange { + return change +} + +// PatchAPMConfig is a temporary configuration patcher function (see ConfigPatchManager and ConfigPatch for reference) that +// will patch the configuration coming from Fleet adding the APM parameters from the elastic agent configuration file +// until Fleet supports this config directly +func PatchAPMConfig(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange { + configMap, err := rawConfig.ToMapStr() + if err != nil { + log.Errorf("error decoding raw config, patching disabled: %v", err) + return noop + } + + tracesEnabled, err := getAPMTracesEnabled(configMap) + if err != nil { + log.Errorf("error retrieving trace flag, patching disabled: %v", err) + return noop + } + + apmConfig, err := getAPMConfigFromMap(configMap) + if err != nil { + log.Errorf("error retrieving apm config, patching disabled: %v", err) + return noop + } + + if !tracesEnabled && apmConfig == nil { + // traces disabled and no apm config -> no patching happening + log.Debugf("traces disabled and no apm config: no patching necessary") + return noop + } + monitoringPatch := map[string]any{"traces": tracesEnabled} + if apmConfig != nil { + monitoringPatch["apm"] = apmConfig + } + + return func(change coordinator.ConfigChange) coordinator.ConfigChange { + err := change.Config().Merge(map[string]any{"agent": map[string]any{"monitoring": monitoringPatch}}) + if err != nil { + log.Errorf("error patching apm config into configchange: %v", err) + } + + return change + } +} diff --git a/internal/pkg/agent/application/apm_config_modifier_test.go b/internal/pkg/agent/application/apm_config_modifier_test.go new file mode 100644 index 00000000000..11e71c2b820 --- /dev/null +++ b/internal/pkg/agent/application/apm_config_modifier_test.go @@ -0,0 +1,466 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + gproto "google.golang.org/protobuf/proto" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type injectedConfigAssertion func(*testing.T, []component.Component) + +func allComponentLvlConfigNil(t *testing.T, components []component.Component) { + for _, comp := range components { + assert.Nil(t, comp.Component) + } +} + +func apmConfigEqual(apmConfig *proto.APMConfig) injectedConfigAssertion { + return func(t *testing.T, components []component.Component) { + for _, comp := range components { + if !assert.NotNil(t, comp.Component) { + // component level config is null, move to the next + continue + } + + assert.Truef(t, gproto.Equal(comp.Component.ApmConfig, apmConfig), "apmConfig (%v, %v) not equal", comp.Component.ApmConfig, apmConfig) + } + } +} + +func TestInjectAPMConfig(t *testing.T) { + + type args struct { + comps []component.Component + cfg map[string]interface{} + } + tests := []struct { + name string + args args + want injectedConfigAssertion + wantErr assert.ErrorAssertionFunc + }{ + { + name: "No apm or component level config set", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "No apm config but traces enabled - no config is propagated", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but traces disabled - no config is propagated", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": false, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but no trace flag set - leave components untouched", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config set but trace flag set to false - leave components untouched", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": false, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.NoError, + }, + { + name: "Apm config and trace flag set - fill existing component level config and propagate it to components", + args: args{ + comps: []component.Component{ + { + ID: "some component", + Component: &proto.Component{ + Limits: &proto.ComponentLimits{ + GoMaxProcs: 1, + }, + }, + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + "apm": map[string]any{ + "hosts": []string{ + "https://apmhost1", + "https://apmhost2", + }, + "environment": "apm-unit-tests", + "api_key": "apik", + "secret_token": "🤫", + "tls": map[string]any{ + "skip_verify": true, + }, + }, + }, + }, + }, + }, + want: apmConfigEqual(&proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "apm-unit-tests", + ApiKey: "apik", + SecretToken: "🤫", + Hosts: []string{ + "https://apmhost1", + "https://apmhost2", + }, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "", + ServerCa: "", + }, + }, + }), + wantErr: assert.NoError, + }, + { + name: "Wrong traces flag type (string) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + "traces": "true", + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Wrong traces flag type (map) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "enabled": true, + "traces": map[string]any{"foo": "bar"}, + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Malformed config - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": "some string value", + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + { + name: "Malformed apm config (not a map) - Error", + args: args{ + comps: []component.Component{ + { + ID: "some component", + }, + }, + cfg: map[string]interface{}{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "traces": true, + "apm": "some string value", + }, + }, + }, + }, + want: allComponentLvlConfigNil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := InjectAPMConfig(tt.args.comps, tt.args.cfg) + if !tt.wantErr(t, err, fmt.Sprintf("InjectAPMConfig(%v, %v)", tt.args.comps, tt.args.cfg)) { + return + } + tt.want(t, got) + }) + } +} + +type mockConfigChange struct { + c *config.Config +} + +func (mcc *mockConfigChange) Config() *config.Config { + return mcc.c +} + +func (mcc *mockConfigChange) Ack() error { + return nil +} + +func (mcc *mockConfigChange) Fail(err error) { + // nothing happens +} +func TestPatchAPMConfig(t *testing.T) { + + type args struct { + fleetCfg string + agentFileCfg string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "No apm config or traces flag", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + `, + }, + { + name: "traces flag set but no APM config", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + traces: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + traces: true + `, + }, + { + name: "traces flag and APM config set", + args: args{ + fleetCfg: ` + agent.monitoring: + enabled: true + logs: true + metrics: true + `, + agentFileCfg: ` + agent.monitoring: + enabled: true + traces: true + apm: + hosts: + - https://apmhost1:443 + environment: test-apm + secret_token: secret + global_labels: + key1: value1 + key2: value2 + tls: + skip_verify: true + `, + }, + want: ` + agent: + monitoring: + enabled: true + logs: true + metrics: true + traces: true + apm: + hosts: + - https://apmhost1:443 + environment: test-apm + api_key: "" + secret_token: secret + global_labels: + key1: value1 + key2: value2 + tls: + skip_verify: true + server_ca: "" + server_certificate: "" + `, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fleetConf, err := config.NewConfigFrom(tt.args.fleetCfg) + require.NoError(t, err) + agtConf, err := config.NewConfigFrom(tt.args.agentFileCfg) + require.NoError(t, err) + log, _ := logger.NewTesting(tt.name) + patcher := PatchAPMConfig(log, agtConf) + + mcc := &mockConfigChange{c: fleetConf} + patcher(mcc) + + patchedConf, err := mcc.Config().ToMapStr() + require.NoError(t, err) + patchedConfBytes, err := yaml.Marshal(patchedConf) + require.NoError(t, err) + + assert.YAMLEq(t, tt.want, string(patchedConfBytes)) + }) + } +} diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 445bba8434f..dca8973f79e 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -16,6 +16,7 @@ import ( "go.elastic.co/apm" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" @@ -118,9 +119,10 @@ func New( var configMgr coordinator.ConfigManager var managed *managedConfigManager - var compModifiers []coordinator.ComponentsModifier + var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig} var composableManaged bool var isManaged bool + if testingMode { log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol") @@ -145,12 +147,11 @@ func New( if err != nil { return nil, nil, nil, err } - if configuration.IsFleetServerBootstrap(cfg.Fleet) { log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode") compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) - configMgr = newFleetServerBootstrapManager(log) + configMgr = coordinator.NewConfigPatchManager(newFleetServerBootstrapManager(log), PatchAPMConfig(log, rawConfig)) } else { log.Info("Parsed configuration and determined agent is managed by Fleet") @@ -164,7 +165,7 @@ func New( if err != nil { return nil, nil, nil, err } - configMgr = managed + configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig)) } } diff --git a/internal/pkg/agent/application/coordinator/config_patcher.go b/internal/pkg/agent/application/coordinator/config_patcher.go new file mode 100644 index 00000000000..8f926b9fff1 --- /dev/null +++ b/internal/pkg/agent/application/coordinator/config_patcher.go @@ -0,0 +1,49 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package coordinator + +import ( + "context" +) + +type ConfigPatch func(change ConfigChange) ConfigChange + +// ConfigPatchManager is a decorator to restore some agent settings from the elastic agent configuration file +type ConfigPatchManager struct { + inner ConfigManager + outCh chan ConfigChange + patchFn ConfigPatch +} + +func (c ConfigPatchManager) Run(ctx context.Context) error { + go c.patch(c.inner.Watch(), c.outCh) + return c.inner.Run(ctx) +} + +func (c ConfigPatchManager) Errors() <-chan error { + return c.inner.Errors() +} + +func (c ConfigPatchManager) ActionErrors() <-chan error { + return c.inner.ActionErrors() +} + +func (c ConfigPatchManager) Watch() <-chan ConfigChange { + return c.outCh +} + +func (c ConfigPatchManager) patch(src <-chan ConfigChange, dst chan ConfigChange) { + for ccc := range src { + dst <- c.patchFn(ccc) + } +} + +func NewConfigPatchManager(inner ConfigManager, pf ConfigPatch) *ConfigPatchManager { + return &ConfigPatchManager{ + inner: inner, + outCh: make(chan ConfigChange), + patchFn: pf, + } +} diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 546cdc4430c..c6f2dc4fc97 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -13,9 +13,13 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/diagnostics" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/component" @@ -55,11 +59,58 @@ func TestDiagnosticLocalConfig(t *testing.T) { Protocol: "test-protocol", }, }, + Settings: &configuration.SettingsConfig{ + MonitoringConfig: &monitoringCfg.MonitoringConfig{ + MonitorTraces: true, + APM: monitoringCfg.APMConfig{ + Environment: "diag-unit-test", + APIKey: "apikey", + SecretToken: "secret", + Hosts: []string{"host1", "host2"}, + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: monitoringCfg.APMTLS{ + SkipVerify: false, + ServerCertificate: "/path/to/server/cert", + ServerCA: "/path/to/server/ca", + }, + }, + }, + }, } // The YAML we expect to see from the preceding config expectedCfg := ` agent: + download: null + grpc: null + id: "" + path: "" + process: null + reload: null + upgrade: null + v1_monitoring_enabled: false + monitoring: + enabled: false + http: null + logs: false + metrics: false + namespace: "" + pprof: null + traces: true + apm: + hosts: + - host1 + - host2 + environment: diag-unit-test + apikey: apikey + secrettoken: secret + globallabels: + k1: v1 + k2: v2 + tls: + skipverify: false + servercertificate: "/path/to/server/cert" + serverca: "/path/to/server/ca" fleet: enabled: true access_api_key: "test-key" @@ -237,6 +288,65 @@ components: assert.YAMLEq(t, expected, string(result), "components-expected diagnostic returned unexpected value") } +func TestDiagnosticComponentsExpectedWithAPM(t *testing.T) { + // Create a Coordinator with a test component model and make sure it's + // reported by the components-expected diagnostic + components := []component.Component{ + { + ID: "some-apm-aware-component", + InputType: "filestream", + OutputType: "elasticsearch", + Component: &proto.Component{ + ApmConfig: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "diag-unit-test", + ApiKey: "apikey", + SecretToken: "st", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "k=v", + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercert", + ServerCa: "serverca", + }, + }, + }, + }, + }, + } + + expected := ` +components: + - id: some-apm-aware-component + input_type: filestream + output_type: elasticsearch + units: [] + component: + limits: null + apmconfig: + elastic: + environment: diag-unit-test + apikey: apikey + secrettoken: st + globallabels: "k=v" + hosts: + - host1 + - host2 + tls: + skipverify: true + servercert: servercert + serverca: serverca +` + + coord := &Coordinator{componentModel: components} + + hook, ok := diagnosticHooksMap(coord)["components-expected"] + require.True(t, ok, "diagnostic hooks should have an entry for components-expected") + + result := hook.Hook(context.Background()) + assert.YAMLEq(t, expected, string(result), "components-expected diagnostic returned unexpected value") +} + func TestDiagnosticComponentsActual(t *testing.T) { // Create a Coordinator with observed component data in the state broadcaster // and make sure the components-actual diagnostic reports it @@ -356,6 +466,92 @@ components: assert.YAMLEq(t, expected, string(result), "state diagnostic returned unexpected value") } +func TestDiagnosticStateForAPM(t *testing.T) { + // Create a coordinator with a test state and verify that the state + // diagnostic reports it + + token := "st" + state := State{ + State: agentclient.Starting, + Message: "starting up", + FleetState: agentclient.Configuring, + FleetMessage: "configuring", + LogLevel: 1, + Components: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp-1"}, + State: runtime.ComponentState{ + State: client.UnitStateDegraded, + Message: "degraded message", + VersionInfo: runtime.ComponentVersionInfo{ + Name: "version name", + Version: "version value", + }, + Component: &proto.Component{ + ApmConfig: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "diag-state-ut", + SecretToken: token, + Hosts: []string{"apmhost"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "sc", + ServerCa: "sca", + }, + }, + }, + }, + ComponentIdx: 1, + }, + }, + }, + } + + expected := ` +state: 0 +message: "starting up" +fleet_state: 1 +fleet_message: "configuring" +log_level: "warning" +components: + - id: "comp-1" + state: + state: 3 + message: "degraded message" + features_idx: 0 + units: {} + version_info: + name: "version name" + version: "version value" + component: + apmconfig: + elastic: + apikey: "" + environment: diag-state-ut + hosts: [apmhost] + secrettoken: st + globallabels: "" + tls: + skipverify: true + serverca: sca + servercert: sc + limits: null + component_idx: 1 +` + + coord := &Coordinator{ + // This test needs a broadcaster since the components-actual diagnostic + // fetches the state via State(). + stateBroadcaster: broadcaster.New(state, 0, 0), + } + + hook, ok := diagnosticHooksMap(coord)["state"] + require.True(t, ok, "diagnostic hooks should have an entry for state") + + result := hook.Hook(context.Background()) + assert.YAMLEq(t, expected, string(result), "state diagnostic returned unexpected value") +} + // Fetch the diagnostic hooks and add them to a lookup table for // easier verification func diagnosticHooksMap(coord *Coordinator) map[string]diagnostics.Hook { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 2ab23dae7c2..91d470ac00b 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -26,6 +26,7 @@ import ( monitoringLib "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/elastic-agent-system-metrics/report" + "github.com/elastic/elastic-agent/internal/pkg/agent/application" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index 50027b4830f..1f326e29a8d 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -66,11 +66,12 @@ func DefaultConfig() *MonitoringConfig { // APMConfig configures APM Tracing. type APMConfig struct { - Environment string `config:"environment"` - APIKey string `config:"api_key"` - SecretToken string `config:"secret_token"` - Hosts []string `config:"hosts"` - TLS APMTLS `config:"tls"` + Environment string `config:"environment"` + APIKey string `config:"api_key"` + SecretToken string `config:"secret_token"` + Hosts []string `config:"hosts"` + GlobalLabels map[string]string `config:"global_labels"` + TLS APMTLS `config:"tls"` } // APMTLS contains the configuration options necessary for configuring TLS in diff --git a/pkg/component/component.go b/pkg/component/component.go index e9cdd7ce86d..18060b645fd 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -13,9 +13,11 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/install/pkgmgr" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/eql" "github.com/elastic/elastic-agent/pkg/features" "github.com/elastic/elastic-agent/pkg/limits" @@ -141,6 +143,12 @@ func getStringValue(m map[string]interface{}, key string) (string, error) { return s, nil } +type ElasticAPM config.APMConfig + +type APMConfig struct { + Elastic *ElasticAPM `yaml:"elastic"` +} + // Component is a set of units that needs to run. type Component struct { // ID is the unique ID of the component. diff --git a/pkg/component/fake/component/README.md b/pkg/component/fake/component/README.md index 678fb3dd4f2..79b09db284a 100644 --- a/pkg/component/fake/component/README.md +++ b/pkg/component/fake/component/README.md @@ -1,3 +1,53 @@ # Fake Component -Controllable through GRPC control protocol with actions. Allows unit tests to simulate control and communication with a running sub-process. +Controllable through GRPC control protocol with actions. Allows tests to simulate control and communication with a running sub-process. + + +## How to use the fake component +If we need to use the fake component for a manual test, we need to build it using +`mage build:testbinaries` +and then we have to drop the binary and its corresponding spec file (see example below) in the +`elastic-agent-/data/components` directory. + +### Spec file example + +```yaml +version: 2 +inputs: + - name: fake + description: "Fake component input" + platforms: &platforms + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: &outputs + - elasticsearch + shippers: &shippers + - shipper + command: &command + restart_monitoring_period: 5s + maximum_restarts_per_period: 1 + timeouts: + restart: 1s + args: [] + - name: fake-apm + description: "Fake component apm traces generator" + platforms: *platforms + outputs: *outputs + shippers: *shippers + command: *command +``` + +### Agent configuration example (APM traces sender) + +```yaml +inputs: + ... + - type: fake-apm + id: fake-apm-traces-generator + use_output: default +``` diff --git a/pkg/component/fake/component/comp/actions.go b/pkg/component/fake/component/comp/actions.go index 7ac7332bc20..9b4dc74c0b2 100644 --- a/pkg/component/fake/component/comp/actions.go +++ b/pkg/component/fake/component/comp/actions.go @@ -12,11 +12,13 @@ import ( "time" "github.com/rs/zerolog" + "google.golang.org/protobuf/encoding/protojson" "github.com/elastic/elastic-agent-client/v7/pkg/client" ) const ActionRetrieveFeatures = "retrieve_features" +const ActionRetrieveAPMConfig = "retrieve_apm_config" type retrieveFeaturesAction struct { input *fakeInput @@ -34,6 +36,10 @@ type killAction struct { logger zerolog.Logger } +type retrieveAPMConfigAction struct { + input *fakeInput +} + func (s *stateSetterAction) Name() string { return "set_state" } @@ -129,7 +135,26 @@ func newRunningUnit(logger zerolog.Logger, manager *StateManager, unit *client.U switch expected.Config.Type { case Fake: return newFakeInput(logger, expected.LogLevel, manager, unit, expected.Config) + case APM: + return newFakeAPMInput(logger, expected.LogLevel, unit) } return nil, fmt.Errorf("unknown input unit config type: %s", expected.Config.Type) } + +func (a *retrieveAPMConfigAction) Name() string { + return ActionRetrieveAPMConfig +} + +func (a *retrieveAPMConfigAction) Execute( + _ context.Context, + _ map[string]interface{}) (map[string]interface{}, error) { + + a.input.logger.Info().Msg("executing " + ActionRetrieveAPMConfig + " action") + a.input.logger.Debug().Msgf("stored apm config %v", a.input.apmConfig) + if a.input.apmConfig == nil { + return map[string]interface{}{"apm": nil}, nil + } + marshaledBytes, err := protojson.Marshal(a.input.apmConfig) + return map[string]interface{}{"apm": string(marshaledBytes)}, err +} diff --git a/pkg/component/fake/component/comp/apm.go b/pkg/component/fake/component/comp/apm.go new file mode 100644 index 00000000000..56882088b12 --- /dev/null +++ b/pkg/component/fake/component/comp/apm.go @@ -0,0 +1,238 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package comp + +import ( + "context" + "fmt" + "net/url" + "os" + "time" + + "github.com/rs/zerolog" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "go.elastic.co/apm" + apmtransport "go.elastic.co/apm/transport" + + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +var SenderAlreadyRunningError = errors.New("apm sender is already running") + +type apmTracesSender struct { + cfgUpd chan *proto.APMConfig + updateErrCh chan error + ctx context.Context + ctxCancelF context.CancelFunc +} + +func (ats *apmTracesSender) Start(ctx context.Context, cfg *proto.APMConfig) error { + if ats.ctx != nil && ats.ctx.Err() == nil { + // the context is still running, we cannot start another + return SenderAlreadyRunningError + } + + ats.init(ctx) + + go ats.sendTracesLoop(cfg, time.Second, 100*time.Millisecond) + return nil +} + +func (ats *apmTracesSender) init(outerCtx context.Context) { + ats.ctx, ats.ctxCancelF = context.WithCancel(outerCtx) + ats.cfgUpd = make(chan *proto.APMConfig) + ats.updateErrCh = make(chan error) +} + +func (ats *apmTracesSender) cleanup() { + if ats.ctxCancelF != nil { + ats.ctxCancelF() + ats.ctxCancelF = nil + } + + ats.ctx = nil + close(ats.cfgUpd) + ats.cfgUpd = nil + close(ats.updateErrCh) + ats.updateErrCh = nil +} + +func (ats *apmTracesSender) sendTracesLoop(initialCfg *proto.APMConfig, sendInterval time.Duration, traceDuration time.Duration) { + tracer, err := ats.createNewTracer(initialCfg) + if err != nil { + ats.updateErrCh <- fmt.Errorf("error creating tracer from initial config: %w", err) + } + + ticker := time.NewTicker(sendInterval) + for { + select { + case <-ats.ctx.Done(): + return + case <-ticker.C: + ats.sendTrace(tracer, traceDuration) + case updatedCfg := <-ats.cfgUpd: + newTracer, err := ats.createNewTracer(updatedCfg) + if err != nil { + ats.updateErrCh <- fmt.Errorf("error creating tracer from config: %w", err) + continue + } + if tracer != nil { + tracer.Close() + } + tracer = newTracer + ats.updateErrCh <- nil + } + } +} + +func (ats *apmTracesSender) Update(cfg *proto.APMConfig, timeout time.Duration) error { + select { + case ats.cfgUpd <- cfg: + return nil + case <-time.After(timeout): + return fmt.Errorf("config update timed out") + } +} + +func (ats *apmTracesSender) Stop() error { + ats.cleanup() + return nil +} + +func (ats *apmTracesSender) createNewTracer(cfg *proto.APMConfig) (*apm.Tracer, error) { + if cfg == nil { + return nil, nil + } + + const ( + envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" + envServerCert = "ELASTIC_APM_SERVER_CERT" + envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" + envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" + ) + if cfg.Elastic.Tls.SkipVerify { + os.Setenv(envVerifyServerCert, "false") + defer os.Unsetenv(envVerifyServerCert) + } + if cfg.Elastic.Tls.ServerCert != "" { + os.Setenv(envServerCert, cfg.Elastic.Tls.ServerCert) + defer os.Unsetenv(envServerCert) + } + if cfg.Elastic.Tls.ServerCa != "" { + os.Setenv(envCACert, cfg.Elastic.Tls.ServerCa) + defer os.Unsetenv(envCACert) + } + + ts, err := apmtransport.NewHTTPTransport() + if err != nil { + return nil, err + } + + if len(cfg.Elastic.Hosts) > 0 { + hosts := make([]*url.URL, 0, len(cfg.Elastic.Hosts)) + for _, host := range cfg.Elastic.Hosts { + u, err := url.Parse(host) + if err != nil { + return nil, fmt.Errorf("failed parsing %s: %w", host, err) + } + hosts = append(hosts, u) + } + ts.SetServerURL(hosts...) + } + if cfg.Elastic.ApiKey != "" { + ts.SetAPIKey(cfg.Elastic.ApiKey) + } else if cfg.Elastic.SecretToken != "" { + ts.SetSecretToken(cfg.Elastic.SecretToken) + } + return apm.NewTracerOptions(apm.TracerOptions{ + ServiceName: "fake-apm", + ServiceVersion: "0.1", + ServiceEnvironment: cfg.Elastic.Environment, + Transport: ts, + }) +} + +func (ats *apmTracesSender) sendTrace(tracer *apm.Tracer, duration time.Duration) { + if tracer == nil { + // no tracer configured, skip + return + } + tx := tracer.StartTransaction("faketransaction", "request") + defer tx.End() + span := tx.StartSpan("spanName", "spanType", nil) + defer span.End() + time.Sleep(duration) +} + +type fakeAPMInput struct { + logger zerolog.Logger + unit *client.Unit + sender *apmTracesSender +} + +func (fai *fakeAPMInput) Unit() *client.Unit { + return fai.unit +} +func (fai *fakeAPMInput) Update(u *client.Unit, triggers client.Trigger) error { + if u.Expected().State == client.UnitStateStopped { + fai.logger.Info().Msg("fakeAPMInput stopping APM sender") + // stop apm trace sender + return fai.sender.Stop() + } + + if triggers&client.TriggeredAPMChange != client.TriggeredAPMChange { + // no apm change, nothing to do + return nil + } + + fai.logger.Info().Msgf("Updating sender config with %+v", u.Expected().APMConfig) + + return fai.sender.Update(u.Expected().APMConfig, time.Second) +} + +func newFakeAPMInput(logger zerolog.Logger, logLevel client.UnitLogLevel, unit *client.Unit) (*fakeAPMInput, error) { + logger = logger.Level(toZerologLevel(logLevel)) + apmInput := &fakeAPMInput{ + logger: logger, + unit: unit, + sender: new(apmTracesSender), + } + err := unit.UpdateState(client.UnitStateStarting, "Starting fake APM traces sender", nil) + if err != nil { + return apmInput, fmt.Errorf("error while setting starting state: %w", err) + } + err = apmInput.sender.Start(context.Background(), unit.Expected().APMConfig) + if err != nil { + return apmInput, fmt.Errorf("error starting apm tracer sender: %w", err) + } + go senderErrorLogger(apmInput.sender.ctx, logger, apmInput.sender.updateErrCh, unit) + err = unit.UpdateState(client.UnitStateHealthy, "Fake APM traces sender has started", nil) + if err != nil { + return apmInput, fmt.Errorf("error while setting healthy state: %w", err) + } + return apmInput, err +} + +func senderErrorLogger(ctx context.Context, logger zerolog.Logger, errCh <-chan error, unit *client.Unit) { + for { + select { + case <-ctx.Done(): + logger.Info().Msg("context expired: senderErrorLogger exiting") + return + case err, ok := <-errCh: + if !ok { + logger.Info().Msg("error channel closed: senderErrorLogger exiting") + } + if err != nil { + logger.Err(err).Msg("sender error") + _ = unit.UpdateState(client.UnitStateDegraded, fmt.Sprintf("sender error: %s", err), nil) + } + _ = unit.UpdateState(client.UnitStateHealthy, fmt.Sprintf("sender error: %s", err), nil) + } + } +} diff --git a/pkg/component/fake/component/comp/component.go b/pkg/component/fake/component/comp/component.go index e845ba6c09f..837689f912f 100644 --- a/pkg/component/fake/component/comp/component.go +++ b/pkg/component/fake/component/comp/component.go @@ -29,6 +29,7 @@ import ( const ( Fake = "fake" fakeShipper = "fake-shipper" + APM = "fake-apm" configuringMsg = "Configuring" stoppingMsg = "Stopping" @@ -288,8 +289,8 @@ type fakeInput struct { state client.UnitState stateMsg string - features *proto.Features - + features *proto.Features + apmConfig *proto.APMConfig canceller context.CancelFunc killerCanceller context.CancelFunc } @@ -302,12 +303,13 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager * } i := &fakeInput{ - logger: logger, - manager: manager, - unit: unit, - cfg: cfg, - state: state, - stateMsg: msg, + logger: logger, + manager: manager, + unit: unit, + cfg: cfg, + state: state, + stateMsg: msg, + apmConfig: unit.Expected().APMConfig, } logger.Trace().Msg("registering set_state action for unit") @@ -318,6 +320,8 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager * unit.RegisterAction(&killAction{i.logger}) logger.Trace().Msg("registering " + ActionRetrieveFeatures + " action for unit") unit.RegisterAction(&retrieveFeaturesAction{i}) + logger.Trace().Msg("registering " + ActionRetrieveAPMConfig + " action for unit") + unit.RegisterAction(&retrieveAPMConfigAction{i}) logger.Debug(). Str("state", i.state.String()). @@ -411,6 +415,13 @@ func (f *fakeInput) Update(u *client.Unit, triggers client.Trigger) error { } } + if triggers&client.TriggeredAPMChange == client.TriggeredAPMChange { + f.logger.Info(). + Interface("apmConfig", expected.APMConfig). + Msg("updating apm configuration") + f.apmConfig = expected.APMConfig + } + return nil } diff --git a/pkg/component/fake/component/main.go b/pkg/component/fake/component/main.go index 36dfdacba6a..508357fa75b 100644 --- a/pkg/component/fake/component/main.go +++ b/pkg/component/fake/component/main.go @@ -68,25 +68,7 @@ func run() error { case <-ctx.Done(): return nil case change := <-c.UnitChanges(): - if change.Unit != nil { - u := change.Unit - state, _, _ := u.State() - logger.Info(). - Str("state", state.String()). - Str("expectedState", u.Expected().State.String()). - Msg("unit change received") - } else { - logger.Info().Msg("unit change received, but no unit on it") - } - - switch change.Type { - case client.UnitChangedAdded: - s.Added(change.Unit) - case client.UnitChangedModified: - s.Modified(change) - case client.UnitChangedRemoved: - s.Removed(change.Unit) - } + handleChange(logger, s, change) case err := <-c.Errors(): if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { fmt.Fprintf(os.Stderr, "GRPC client error: %+v\n", err) @@ -94,3 +76,25 @@ func run() error { } } } + +func handleChange(logger zerolog.Logger, s *comp.StateManager, change client.UnitChanged) { + if change.Unit != nil { + u := change.Unit + state, _, _ := u.State() + logger.Info(). + Str("state", state.String()). + Str("expectedState", u.Expected().State.String()). + Msg("unit change received") + } else { + logger.Info().Msg("unit change received, but no unit on it") + } + + switch change.Type { + case client.UnitChangedAdded: + s.Added(change.Unit) + case client.UnitChangedModified: + s.Modified(change) + case client.UnitChangedRemoved: + s.Removed(change.Unit) + } +} diff --git a/pkg/component/runtime/apm_config_mapper.go b/pkg/component/runtime/apm_config_mapper.go new file mode 100644 index 00000000000..921016ad30f --- /dev/null +++ b/pkg/component/runtime/apm_config_mapper.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "sort" + "strings" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +var zeroElasticAPMTLS = config.APMTLS{} + +func MapAPMConfig(conf *config.APMConfig) *proto.APMConfig { + if conf == nil { + // component apm config is nil, so the protobuf config is nil as well + return nil + } + + elasticAPMConf := &proto.ElasticAPM{ + Environment: conf.Environment, + ApiKey: conf.APIKey, + SecretToken: conf.SecretToken, + Hosts: conf.Hosts, + GlobalLabels: buildGlobalLabelsString(conf.GlobalLabels), + } + + if conf.TLS != zeroElasticAPMTLS { + // we have some TLS config to propagate too + elasticAPMConf.Tls = &proto.ElasticAPMTLS{ + SkipVerify: conf.TLS.SkipVerify, + ServerCert: conf.TLS.ServerCertificate, + ServerCa: conf.TLS.ServerCA, + } + } + + return &proto.APMConfig{Elastic: elasticAPMConf} +} + +func buildGlobalLabelsString(labels map[string]string) string { + const separator = "," + + if len(labels) == 0 { + return "" + } + + //prepare sorted keys to make output deterministic + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + + sort.Strings(keys) + + // create the key=value string + buf := new(strings.Builder) + for _, k := range keys { + if buf.Len() > 0 { + buf.WriteString(separator) + } + buf.WriteString(k) + buf.WriteString("=") + buf.WriteString(labels[k]) + } + return buf.String() +} diff --git a/pkg/component/runtime/apm_config_mapper_test.go b/pkg/component/runtime/apm_config_mapper_test.go new file mode 100644 index 00000000000..cd26ff714d8 --- /dev/null +++ b/pkg/component/runtime/apm_config_mapper_test.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +func TestMapAPMConfig(t *testing.T) { + type args struct { + conf *config.APMConfig + } + tests := []struct { + name string + args args + want *proto.APMConfig + }{ + { + name: "nil config", + args: args{ + conf: nil, + }, + want: nil, + }, + { + name: "full config", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "k1=v1,k2=v2", + }, + }, + }, + { + name: "config without global labels", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: nil, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: []string{"host1", "host2"}, + GlobalLabels: "", + }, + }, + }, + { + name: "config without hosts", + args: args{ + conf: &config.APMConfig{ + Environment: "environment", + APIKey: "apikey", + SecretToken: "secrettoken", + GlobalLabels: map[string]string{"k1": "v1", "k2": "v2"}, + TLS: config.APMTLS{ + SkipVerify: true, + ServerCertificate: "servercertificate", + ServerCA: "serverca", + }, + }, + }, + want: &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercertificate", + ServerCa: "serverca", + }, + Environment: "environment", + ApiKey: "apikey", + SecretToken: "secrettoken", + Hosts: nil, + GlobalLabels: "k1=v1,k2=v2", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, MapAPMConfig(tt.args.conf), "MapAPMConfig(%v)", tt.args.conf) + }) + } +} diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 4facc93df3d..df02656b3f2 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -203,7 +203,8 @@ func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error { sendExpected = true } if sendExpected { - comm.CheckinExpected(c.state.toCheckinExpected(), checkin) + checkinExpected := c.state.toCheckinExpected() + comm.CheckinExpected(checkinExpected, checkin) } if changed { c.sendObserved() diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index e489ad71056..d00df10db64 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -18,6 +18,9 @@ import ( "testing" "time" + "google.golang.org/protobuf/encoding/protojson" + gproto "google.golang.org/protobuf/proto" + fakecmp "github.com/elastic/elastic-agent/pkg/component/fake/component/comp" "github.com/gofrs/uuid" @@ -28,6 +31,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" @@ -482,6 +486,331 @@ func TestManager_FakeInput_Features(t *testing.T) { } } +func TestManager_FakeInput_APM(t *testing.T) { + testPaths(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + agentInfo, _ := info.NewAgentInfo(ctx, true) + m, err := NewManager( + newDebugLogger(t), + newDebugLogger(t), + "localhost:0", + agentInfo, + apmtest.DiscardTracer, + newTestMonitoringMgr(), + configuration.DefaultGRPCConfig()) + require.NoError(t, err) + + managerErrCh := make(chan error) + go func() { + err := m.Run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + managerErrCh <- err + }() + + waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) + defer waitCancel() + if err := waitForReady(waitCtx, m); err != nil { + require.NoError(t, err) + } + + binaryPath := testBinary(t, "component") + const compID = "fake-default" + comp := component.Component{ + ID: compID, + InputSpec: &component.InputRuntimeSpec{ + InputType: "fake", + BinaryName: "", + BinaryPath: binaryPath, + Spec: fakeInputSpec, + }, + Units: []component.Unit{ + { + ID: "fake-input", + Type: client.UnitTypeInput, + LogLevel: client.UnitLogLevelTrace, + Config: component.MustExpectedConfig(map[string]interface{}{ + "type": "fake", + "state": int(client.UnitStateHealthy), + "message": "Fake Healthy", + }), + }, + }, + } + + subscriptionCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + subscriptionErrCh := make(chan error) + doneCh := make(chan struct{}) + + initialAPMConfig := &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "test", + ApiKey: "apiKey", + SecretToken: "secretToken", + Hosts: []string{"host1", "host2", "host3"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "servercert", + ServerCa: "serverca", + }, + }, + } + + modifiedAPMConfig := &proto.APMConfig{ + Elastic: &proto.ElasticAPM{ + Environment: "test-modified", + ApiKey: "apiKey", + SecretToken: "secretToken", + Hosts: []string{"newhost1", "host2", "differenthost3"}, + Tls: &proto.ElasticAPMTLS{ + SkipVerify: true, + ServerCert: "", + ServerCa: "", + }, + }, + } + + go func() { + sub := m.Subscribe(subscriptionCtx, compID) + var healthIteration int + var retrievedApmConfig *proto.APMConfig + for { + select { + case <-subscriptionCtx.Done(): + return + case componentState := <-sub.Ch(): + t.Logf("component state changed: %+v", componentState) + + if componentState.State == client.UnitStateFailed { + subscriptionErrCh <- fmt.Errorf("component failed: %s", componentState.Message) + return + } + + unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] + if !ok { + subscriptionErrCh <- errors.New("unit missing: fake-input") + return + } + + switch unit.State { + case client.UnitStateFailed: + subscriptionErrCh <- fmt.Errorf("unit failed: %s", unit.Message) + + case client.UnitStateHealthy: + healthIteration++ + t.Logf("Healthy iteration %d starting at %s", healthIteration, time.Now()) + switch healthIteration { + case 1: // yes, it's starting on 1 + comp.Component = &proto.Component{ + ApmConfig: initialAPMConfig, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + + // check if config sent on iteration 1 was set + case 2: + // In the previous iteration, the (fake) component has received a CheckinExpected + // message to propagate the APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. Within the component, which is running as a separate process, actions + // and CheckinExpected messages are processed concurrently. We need some way to wait + // a reasonably short amount of time for the CheckinExpected message to be applied by the + // component (thus setting the APM config) before we query the same component + // for apm config information. We accomplish this via assert.Eventually. + // We also send a modified APM config to see that the component updates correctly and + // reports the new config in the next iteration. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + return gproto.Equal(initialAPMConfig, retrievedApmConfig) + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", initialAPMConfig, retrievedApmConfig) + + comp.Component = &proto.Component{ + ApmConfig: modifiedAPMConfig, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + // Set a new APM config to check that we update correctly + case 3: + // In the previous iteration, the (fake) component has received another CheckinExpected + // message to propagate a modified APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + return gproto.Equal(modifiedAPMConfig, retrievedApmConfig) + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", modifiedAPMConfig, retrievedApmConfig) + + comp.Component = &proto.Component{ + ApmConfig: nil, + } + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", + healthIteration, err) + return + } + + case 4: + // In the previous iteration, the (fake) component has received another CheckinExpected + // message to propagate a nil APM configuration. In this iteration we are about to + // retrieve the APM configuration from the same component via the retrieve_apm_config + // action. + assert.Eventuallyf(t, func() bool { + // check the component + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + + retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) + if err != nil { + subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", + healthIteration, fakecmp.ActionRetrieveAPMConfig, err) + return false + } + return retrievedApmConfig == nil + }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: nil actual: %s", retrievedApmConfig) + + doneCh <- struct{}{} + } + + case client.UnitStateStarting: + // acceptable + + case client.UnitStateConfiguring: + // set unit back to healthy, so other cases will run. + comp.Units[0].Config = component.MustExpectedConfig(map[string]interface{}{ + "type": "fake", + "state": int(client.UnitStateHealthy), + "message": "Fake Healthy", + }) + + err := m.Update(component.Model{Components: []component.Component{comp}}) + if err != nil { + t.Logf("error updating component state to health: %v", err) + + subscriptionErrCh <- fmt.Errorf("failed to update component: %w", err) + } + + default: + // unexpected state that should not have occurred + subscriptionErrCh <- fmt.Errorf("unit reported unexpected state: %v", + unit.State) + } + + } + } + }() + + defer drainErrChan(managerErrCh) + defer drainErrChan(subscriptionErrCh) + + err = m.Update(component.Model{Components: []component.Component{comp}}) + require.NoError(t, err) + + timeout := 30 * time.Second + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + + // Wait for a success, an error or time out + for { + select { + case <-timeoutTimer.C: + t.Fatalf("timed out after %s", timeout) + case err := <-managerErrCh: + require.NoError(t, err) + case err := <-subscriptionErrCh: + require.NoError(t, err) + case <-doneCh: + subCancel() + cancel() + + err = <-managerErrCh + require.NoError(t, err) + return + } + } +} + +func extractAPMConfigFromActionResult(t *testing.T, res map[string]interface{}) (*proto.APMConfig, error) { + apmCfg, ok := res["apm"] + if !ok { + return nil, fmt.Errorf("ActionResult for %s does not contain top level key %s", fakecmp.ActionRetrieveAPMConfig, "apm") + } + if apmCfg == nil { + // the APM config is not set on the component + return nil, nil + } + + jsonApmConfig, ok := apmCfg.(string) + if !ok { + return nil, fmt.Errorf("ActionResult for %s does not contain a string value: %T", fakecmp.ActionRetrieveAPMConfig, apmCfg) + } + + retrievedApmConfig := new(proto.APMConfig) + err := protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig) + if err != nil { + return nil, fmt.Errorf("error unmarshaling apmconfig %s: %w", jsonApmConfig, err) + } + return retrievedApmConfig, nil +} + func TestManager_FakeInput_Limits(t *testing.T) { testPaths(t) diff --git a/pkg/utils/maps.go b/pkg/utils/maps.go new file mode 100644 index 00000000000..eafcefcb0ab --- /dev/null +++ b/pkg/utils/maps.go @@ -0,0 +1,35 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package utils + +import "errors" + +var ErrNoKeys = errors.New("no key provided") +var ErrKeyNotFound = errors.New("key not found") +var ErrValueNotMap = errors.New("value is not a map") + +// GetNestedMap is a utility function to traverse nested maps using a series of key +func GetNestedMap[K comparable](src map[K]any, keys ...K) (any, error) { + if len(keys) == 0 { + return nil, ErrNoKeys + } + if _, ok := src[keys[0]]; !ok { + // no key found + return nil, ErrKeyNotFound + } + + if len(keys) == 1 { + // we reached the final key, return the value + return src[keys[0]], nil + } + + // we have more keys to go through + valueMap, ok := src[keys[0]].(map[K]any) + if !ok { + return nil, ErrValueNotMap + } + + return GetNestedMap[K](valueMap, keys[1:]...) +} diff --git a/testing/integration/apm_propagation_test.go b/testing/integration/apm_propagation_test.go new file mode 100644 index 00000000000..62aa87b0d83 --- /dev/null +++ b/testing/integration/apm_propagation_test.go @@ -0,0 +1,239 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" + "github.com/elastic/go-elasticsearch/v8" + + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" +) + +const agentConfigTemplateString = ` +outputs: + default: + type: fake-action-output + shipper.enabled: true +inputs: + - id: fake-apm + type: fake-apm +agent.monitoring: + traces: true + apm: + hosts: + - {{ .host }} + environment: {{ .environment }} + secret_token: {{ .secret_token }} + global_labels: + test_name: TestAPMConfig + test_type: Agent integration test + tls: + skip_verify: true +` + +func TestAPMConfig(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + }) + f, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + deadline := time.Now().Add(10 * time.Minute) + ctx, cancel := testcontext.WithDeadline(t, context.Background(), deadline) + defer cancel() + + err = f.Prepare(ctx, fakeComponent, fakeShipper) + require.NoError(t, err) + + name := "fake-apm" + environment := info.Namespace + + agentConfig := generateAgentConfigForAPM(t, agentConfigTemplateString, info, environment) + t.Logf("Rendered agent config:\n%s", agentConfig) + + testAPMTraces := func() error { + state, err := f.Client().State(ctx) + require.NoError(t, err) + + t.Logf("agent state: %+v", state) + + // test that APM traces are being sent using initial configuration + require.Eventually(t, func() bool { + count, errCount := countAPMTraces(t, info.ESClient, name, environment) + if errCount != nil { + t.Logf("Error retrieving APM traces count for service %q and environment %q: %s", name, environment, errCount) + return false + } + return count > 0 + }, 1*time.Minute, time.Second) + + //change the configuration with a new environment and check that the update has been processed + environment = environment + "-changed" + modifiedAgentConfig := generateAgentConfigForAPM(t, agentConfigTemplateString, info, environment) + t.Logf("Rendered agent modified config:\n%s", modifiedAgentConfig) + err = f.Client().Configure(ctx, modifiedAgentConfig) + require.NoError(t, err, "error updating agent config with a new APM environment") + + // check that we receive traces with the new environment string + require.Eventually(t, func() bool { + count, errCount := countAPMTraces(t, info.ESClient, name, environment) + if errCount != nil { + t.Logf("Error retrieving APM traces count for service %q and environment %q: %s", name, environment, errCount) + return false + } + return count > 0 + }, 1*time.Minute, time.Second) + + return nil + } + + err = f.Run(ctx, atesting.State{ + Configure: agentConfig, + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "fake-apm-default": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-apm-default"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-apm-default-fake-apm"}: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + }, + After: testAPMTraces, + }) + + require.NoError(t, err) + +} + +func countAPMTraces(t *testing.T, esClient *elasticsearch.Client, serviceName, environment string) (int, error) { + queryRaw := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": []map[string]interface{}{ + { + "term": map[string]interface{}{ + "service.name": map[string]interface{}{ + "value": serviceName, + }, + }, + }, + { + "term": map[string]interface{}{ + "service.environment": map[string]interface{}{ + "value": environment, + }, + }, + }, + }, + }, + }, + } + + buf := new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(queryRaw) + if err != nil { + return 0, fmt.Errorf("error encoding query: %w", err) + } + + count := esClient.Count + + response, err := count( + count.WithIndex("traces-apm-default"), + count.WithBody(buf), + ) + if err != nil { + return 0, fmt.Errorf("error executing query: %w", err) + } + + defer response.Body.Close() + + var body struct { + Count int + } + + //decoder := json.NewDecoder(response.Body) + //err = decoder.Decode(&body) + bodyBytes, _ := io.ReadAll(response.Body) + + t.Logf("received ES response: %s", bodyBytes) + err = json.Unmarshal(bodyBytes, &body) + + return body.Count, err +} + +// types to correctly parse the APM config we get from kibana API +type apmConfigResponse struct { + CloudStandaloneSetup CloudStandaloneSetup `json:"cloudStandaloneSetup,omitempty"` + IsFleetEnabled bool `json:"isFleetEnabled,omitempty"` + FleetAgents []FleetAgents `json:"fleetAgents,omitempty"` +} +type CloudStandaloneSetup struct { + ApmServerURL string `json:"apmServerUrl,omitempty"` + SecretToken string `json:"secretToken,omitempty"` +} +type FleetAgents struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + ApmServerURL string `json:"apmServerUrl,omitempty"` + SecretToken string `json:"secretToken,omitempty"` +} + +func generateAgentConfigForAPM(t *testing.T, configTemplate string, info *define.Info, environment string) string { + t.Helper() + apmConfigData := getAPMConfigFromKibana(t, info.KibanaClient) + + configT, err := template.New("test config").Parse(configTemplate) + require.NoErrorf(t, err, "Error parsing agent config template\n%s", configTemplate) + + buf := new(strings.Builder) + templateData := map[string]any{ + "environment": environment, + "secret_token": apmConfigData.SecretToken, + "host": apmConfigData.ApmServerURL, + } + err = configT.Execute(buf, templateData) + require.NoErrorf(t, err, "Error rendering template\n%s\nwith data %v", configTemplate, templateData) + return buf.String() +} + +func getAPMConfigFromKibana(t *testing.T, kc *kibana.Client) CloudStandaloneSetup { + t.Helper() + response, err := kc.Send(http.MethodGet, "/internal/apm/fleet/agents", nil, nil, nil) + require.NoError(t, err, "Error getting APM connection params from kibana") + defer response.Body.Close() + + responseBytes, err := io.ReadAll(response.Body) + require.NoError(t, err, "Error reading data from http response") + apmConfig := new(apmConfigResponse) + err = json.Unmarshal(responseBytes, apmConfig) + require.NoError(t, err, "Error unmarshalling apm config") + require.NotEmpty(t, apmConfig.CloudStandaloneSetup.ApmServerURL, "APM config URL is empty") + require.NotEmpty(t, apmConfig.CloudStandaloneSetup.SecretToken, "APM config token is empty") + + return apmConfig.CloudStandaloneSetup +} diff --git a/testing/integration/fake_test.go b/testing/integration/fake_test.go index 91c6b2353da..cb685e4cb4a 100644 --- a/testing/integration/fake_test.go +++ b/testing/integration/fake_test.go @@ -8,73 +8,16 @@ package integration import ( "context" - "path/filepath" - "runtime" "testing" "time" "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control/v2/client" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" ) -var fakeComponent = atesting.UsableComponent{ - Name: "fake", - BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "component", osExt("component"))), - Spec: &component.Spec{ - Version: 2, - Inputs: []component.InputSpec{ - { - Name: "fake", - Description: "A fake input", - Platforms: []string{ - "container/amd64", - "container/arm64", - "darwin/amd64", - "darwin/arm64", - "linux/amd64", - "linux/arm64", - "windows/amd64", - }, - Shippers: []string{ - "fake-shipper", - }, - Command: &component.CommandSpec{}, - }, - }, - }, -} - -var fakeShipper = atesting.UsableComponent{ - Name: "fake-shipper", - BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "shipper", osExt("shipper"))), - Spec: &component.Spec{ - Version: 2, - Shippers: []component.ShipperSpec{ - { - Name: "fake-shipper", - Description: "A fake shipper", - Platforms: []string{ - "container/amd64", - "container/arm64", - "darwin/amd64", - "darwin/arm64", - "linux/amd64", - "linux/arm64", - "windows/amd64", - }, - Outputs: []string{ - "fake-action-output", - }, - Command: &component.CommandSpec{}, - }, - }, - }, -} - var simpleConfig1 = ` outputs: default: @@ -161,18 +104,3 @@ func TestFakeComponent(t *testing.T) { }) require.NoError(t, err) } - -func mustAbs(path string) string { - abs, err := filepath.Abs(path) - if err != nil { - panic(err) - } - return abs -} - -func osExt(name string) string { - if runtime.GOOS == "windows" { - return name + ".exe" - } - return name -} diff --git a/testing/integration/fakes.go b/testing/integration/fakes.go new file mode 100644 index 00000000000..2364b06580d --- /dev/null +++ b/testing/integration/fakes.go @@ -0,0 +1,108 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "path/filepath" + "runtime" + + "github.com/elastic/elastic-agent/pkg/component" + atesting "github.com/elastic/elastic-agent/pkg/testing" +) + +const fakeShipperName = "fake-shipper" + +var fakeComponentPltfs = []string{ + "container/amd64", + "container/arm64", + "darwin/amd64", + "darwin/arm64", + "linux/amd64", + "linux/arm64", + "windows/amd64", +} + +var fakeComponent = atesting.UsableComponent{ + Name: "fake", + BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "component", osExt("component"))), + Spec: &component.Spec{ + Version: 2, + Inputs: []component.InputSpec{ + { + Name: "fake", + Description: "A fake input", + Platforms: fakeComponentPltfs, + Shippers: []string{ + fakeShipperName, + }, + Command: &component.CommandSpec{}, + }, + { + Name: "fake-apm", + Description: "Fake component apm traces generator", + Platforms: fakeComponentPltfs, + Shippers: []string{ + fakeShipperName, + }, + Command: &component.CommandSpec{ + Env: []component.CommandEnvSpec{ + { + Name: "ELASTIC_APM_LOG_FILE", + Value: "stderr", + }, + { + Name: "ELASTIC_APM_LOG_LEVEL", + Value: "debug", + }, + }, + }, + }, + }, + }, +} + +var fakeShipper = atesting.UsableComponent{ + Name: fakeShipperName, + BinaryPath: mustAbs(filepath.Join("..", "..", "pkg", "component", "fake", "shipper", osExt("shipper"))), + Spec: &component.Spec{ + Version: 2, + Shippers: []component.ShipperSpec{ + { + Name: fakeShipperName, + Description: "A fake shipper", + Platforms: []string{ + "container/amd64", + "container/arm64", + "darwin/amd64", + "darwin/arm64", + "linux/amd64", + "linux/arm64", + "windows/amd64", + }, + Outputs: []string{ + "fake-action-output", + }, + Command: &component.CommandSpec{}, + }, + }, + }, +} + +func mustAbs(path string) string { + abs, err := filepath.Abs(path) + if err != nil { + panic(err) + } + return abs +} + +func osExt(name string) string { + if runtime.GOOS == "windows" { + return name + ".exe" + } + return name +}