Skip to content

Commit

Permalink
Propagate apm config (#3223)
Browse files Browse the repository at this point in the history
This change propagates the APM configuration set up for agent to the components that are managed.

We only support Elastic APM at the moment and at this time it can only be configured in elastic-agent.yml configuration file.

For fleet-managed agents we include a workaround that will inject this configuration from the config file; this workaround inject the configuration from the config file in all the config changes received from Fleet, however it does not support hot reloading in this configuration: any changes to the apm configuration will take effect after a restart.



* Pass apm config to components
* Add config patcher for apm injection in fleet managed agents
* Add global labels to apm config
  • Loading branch information
pchila authored Sep 29, 2023
1 parent bc1982a commit 123ba9c
Show file tree
Hide file tree
Showing 22 changed files with 2,199 additions and 110 deletions.
48 changes: 48 additions & 0 deletions docs/tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Elastic agent APM configuration


## Configuration
The APM elastic agent configuration in `elastic-agent.yml` looks like this (the keys under `apm` have the same meaning
and usage as a regular [APM configuration](https://www.elastic.co/guide/en/apm/agent/go/current/configuration.html)) :
```yaml
agent.monitoring:
traces: true
apm:
hosts:
- <apm host url>
environment: <apm environment>
secret_token: <redacted>
api_key: <redacted>
global_labels:
k1: v1
k2: v2
tls:
skip_verify: true
server_certificate: <path to the server certificate>
server_ca: <path to the server CA>
```
APM configuration is only available in `elastic-agent.yml` configuration file (Fleet does not support these settings at the moment):
- for a standalone agent the configuration is reloaded by default from file in case of changes while the agent is running (unless the configuration reload mechanism has been disabled using `agent.reload.enabled` setting)
- for a managed agent, the configuration is read once at startup and then added to every policy change coming from Fleet: in this case changes to APM configuration require a restart of agent to be picked up

## APM config propagation

APM propagation to components requires agent APM traces to be enabled (`agent.monitoring.traces` must be set to `true`).
Elastic Agent will propagate the APM parameters defined in its configuration to all the components it manages.
APM configuration is sent to the components via the control protocol, specifically in the [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208).

At the moment the agent supports only Elastic APM configuration but since want to support OTLP protocol the APM configuration
has a dedicated field for Elastic, and we will put support for other protocols side-by-side (see [APMConfig message](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/elastic-agent-client.proto#L188-L208))

The components can consume the configuration by using the [`Unit.Expected()`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/unit.go#L166-L177)
from the [`UnitChanged`](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L126-L131)
object published by the elastic-agent-client. The [TriggeredAPMChange](https://github.com/elastic/elastic-agent-client/blob/5c7929a9889af5047137fabcb8f16ea38653ab97/pkg/client/client_v2.go#L63)
trigger flag will be set whenever there is a change in APM configuration.

Components are expected to take appropriate action to reload/re-instantiate their APM instrumentation.
How that happens in detail depends on what sort of APM objects the component uses, for example:
- if the component uses a decorated http server it may be needed to stop (gracefully) the current server, recreate it with the new configuration and start the new one.
- if it uses a custom Tracer object, it will need to create the new one, close the old one and swap them safely.

The list above is obviously not an exhaustive one, the handling of APM configuration change will probably be specific
to each component/unit.
151 changes: 151 additions & 0 deletions internal/pkg/agent/application/apm_config_modifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"fmt"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"
)

// InjectAPMConfig is a modifier passed to coordinator in order to set the global APM configuration used for the agent
// into each Component coming from input/output configuration
func InjectAPMConfig(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) {

tracesEnabled, err := getAPMTracesEnabled(cfg)
if err != nil {
return comps, fmt.Errorf("error retrieving APM traces flag: %w", err)
}

if !tracesEnabled {
// nothing to do
return comps, nil
}

apmConfig, err := getAPMConfigFromMap(cfg)
if err != nil {
return comps, fmt.Errorf("error retrieving apm config: %w", err)
}

if apmConfig == nil {
// nothing to do
return comps, nil
}

for i := range comps {
// We shouldn't really go straight from config datamodel to protobuf datamodel (a core datamodel would be nice to
// abstract from protocol details)
if comps[i].Component == nil {
comps[i].Component = new(proto.Component)
}
comps[i].Component.ApmConfig = runtime.MapAPMConfig(apmConfig)
}

return comps, nil
}

func getAPMTracesEnabled(cfg map[string]any) (bool, error) {

rawTracesEnabled, err := utils.GetNestedMap(cfg, "agent", "monitoring", "traces")
if errors.Is(err, utils.ErrKeyNotFound) {
// We didn't find the key, return false without any error
return false, nil
}

if err != nil {
return false, fmt.Errorf("error accessing trace flag: %w", err)
}

traceEnabled, ok := rawTracesEnabled.(bool)
if !ok {
return false, fmt.Errorf("trace flag has unexpected type %T", rawTracesEnabled)
}

return traceEnabled, nil
}

func getAPMConfigFromMap(cfg map[string]any) (*monitoringcfg.APMConfig, error) {
nestedValue, err := utils.GetNestedMap(cfg, "agent", "monitoring", "apm")
if errors.Is(err, utils.ErrKeyNotFound) {
// No APM config found, nothing to do
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error traversing config: %w", err)
}

rawApmConfig, ok := nestedValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("the retrieved apm configs is not a map: %T", nestedValue)
}

newConfigFrom, err := config.NewConfigFrom(rawApmConfig)
if err != nil {
return nil, fmt.Errorf("error parsing apm config: %w", err)
}

monitoringConfig := new(monitoringcfg.APMConfig)
err = newConfigFrom.Unpack(monitoringConfig)
if err != nil {
return nil, fmt.Errorf("error unpacking apm config: %w", err)
}
return monitoringConfig, nil
}

func noop(change coordinator.ConfigChange) coordinator.ConfigChange {
return change
}

// PatchAPMConfig is a temporary configuration patcher function (see ConfigPatchManager and ConfigPatch for reference) that
// will patch the configuration coming from Fleet adding the APM parameters from the elastic agent configuration file
// until Fleet supports this config directly
func PatchAPMConfig(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange {
configMap, err := rawConfig.ToMapStr()
if err != nil {
log.Errorf("error decoding raw config, patching disabled: %v", err)
return noop
}

tracesEnabled, err := getAPMTracesEnabled(configMap)
if err != nil {
log.Errorf("error retrieving trace flag, patching disabled: %v", err)
return noop
}

apmConfig, err := getAPMConfigFromMap(configMap)
if err != nil {
log.Errorf("error retrieving apm config, patching disabled: %v", err)
return noop
}

if !tracesEnabled && apmConfig == nil {
// traces disabled and no apm config -> no patching happening
log.Debugf("traces disabled and no apm config: no patching necessary")
return noop
}
monitoringPatch := map[string]any{"traces": tracesEnabled}
if apmConfig != nil {
monitoringPatch["apm"] = apmConfig
}

return func(change coordinator.ConfigChange) coordinator.ConfigChange {
err := change.Config().Merge(map[string]any{"agent": map[string]any{"monitoring": monitoringPatch}})
if err != nil {
log.Errorf("error patching apm config into configchange: %v", err)
}

return change
}
}
Loading

0 comments on commit 123ba9c

Please sign in to comment.