Skip to content

Commit

Permalink
Only run providers that are referenced in the policy (elastic#6169)
Browse files Browse the repository at this point in the history
  • Loading branch information
blakerouse authored Dec 2, 2024
1 parent 5401628 commit b811364
Show file tree
Hide file tree
Showing 12 changed files with 838 additions and 204 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Only run providers referenced in the policy

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6169

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3609
30 changes: 30 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ type ConfigManager interface {
type VarsManager interface {
Runner

// Observe instructs the variables to observe.
Observe([]string)

// Watch returns the chanel to watch for variable changes.
Watch() <-chan []*transpiler.Vars
}
Expand Down Expand Up @@ -1235,6 +1238,9 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config
return err
}

// pass the observed vars from the AST to the varsMgr
c.observeASTVars()

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

Expand Down Expand Up @@ -1313,6 +1319,30 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
return nil
}

// observeASTVars identifies the variables that are referenced in the computed AST and passed to
// the varsMgr so it knows what providers are being referenced. If a providers is not being
// referenced then the provider does not need to be running.
func (c *Coordinator) observeASTVars() {
if c.varsMgr == nil {
// No varsMgr (only happens in testing)
return
}
if c.ast == nil {
// No AST; no vars
c.varsMgr.Observe(nil)
return
}
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if !ok {
// No inputs; no vars
c.varsMgr.Observe(nil)
return
}
var vars []string
vars = inputs.Vars(vars)
c.varsMgr.Observe(vars)
}

// processVars updates the transpiler vars in the Coordinator.
// Called on the main Coordinator goroutine.
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) {
Expand Down
81 changes: 81 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
goruntime "runtime"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -327,6 +328,77 @@ func mustNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct {
return str
}

func TestCoordinator_VarsMgr_Observe(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
stateChan := coord.StateSubscribe(ctx, 32)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
// allowed error
err = nil
}
coordCh <- err
}()

// wait for it to be in starting state
waitForState(t, stateChan, func(state State) bool {
return state.State == agentclient.Starting &&
state.Message == "Waiting for initial configuration and composable variables"
}, 3*time.Second)

// set vars state should stay same (until config)
varsMgr.Vars(ctx, []*transpiler.Vars{{}})

// State changes happen asynchronously in the Coordinator goroutine, so
// wait a little bit to make sure no changes are reported; if the Vars
// call does trigger a change, it should happen relatively quickly.
select {
case <-stateChan:
assert.Fail(t, "Vars call shouldn't cause a state change")
case <-time.After(50 * time.Millisecond):
}

// set configuration that has variables present
cfg, err := config.NewConfigFrom(map[string]interface{}{
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"paths": []interface{}{
"${env.filestream_path|env.log_path|'/var/log/syslog'}",
},
},
map[string]interface{}{
"type": "windows",
"condition": "${host.platform} == 'windows'",
},
},
})
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

// healthy signals that the configuration has been computed
waitForState(t, stateChan, func(state State) bool {
return state.State == agentclient.Healthy && state.Message == "Running"
}, 3*time.Second)

// get the set observed vars from the fake vars manager
varsMgr.observedMx.Lock()
observed := varsMgr.observed
varsMgr.observedMx.Unlock()

// stop the coordinator
cancel()
err = <-coordCh
require.NoError(t, err)

// verify that the observed vars are the expected vars
assert.Equal(t, []string{"env.filestream_path", "env.log_path", "host.platform"}, observed)
}

func TestCoordinator_State_Starting(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1072,6 +1144,9 @@ func (l *configChange) Fail(err error) {
type fakeVarsManager struct {
varsCh chan []*transpiler.Vars
errCh chan error

observedMx sync.RWMutex
observed []string
}

func newFakeVarsManager() *fakeVarsManager {
Expand Down Expand Up @@ -1101,6 +1176,12 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars {
return f.varsCh
}

func (f *fakeVarsManager) Observe(observed []string) {
f.observedMx.Lock()
defer f.observedMx.Unlock()
f.observed = observed
}

func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
select {
case <-ctx.Done():
Expand Down
7 changes: 1 addition & 6 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,10 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, configMgr, _, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return logReturn(l, err)
}
defer func() {
if composable != nil {
composable.Close()
}
}()

monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions internal/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Node interface {
// Hash compute a sha256 hash of the current node and recursively call any children.
Hash() []byte

// Vars adds to the array with the variables identified in the node. Returns the array in-case
// the capacity of the array had to be changed.
Vars([]string) []string

// Apply apply the current vars, returning the new value for the node.
Apply(*Vars) (Node, error)

Expand Down Expand Up @@ -162,6 +166,15 @@ func (d *Dict) Hash() []byte {
return h.Sum(nil)
}

// Vars returns a list of all variables referenced in the dictionary.
func (d *Dict) Vars(vars []string) []string {
for _, v := range d.value {
k := v.(*Key)
vars = k.Vars(vars)
}
return vars
}

// Apply applies the vars to all the nodes in the dictionary.
func (d *Dict) Apply(vars *Vars) (Node, error) {
nodes := make([]Node, 0, len(d.value))
Expand Down Expand Up @@ -277,6 +290,14 @@ func (k *Key) Hash() []byte {
return h.Sum(nil)
}

// Vars returns a list of all variables referenced in the value.
func (k *Key) Vars(vars []string) []string {
if k.value == nil {
return vars
}
return k.value.Vars(vars)
}

// Apply applies the vars to the value.
func (k *Key) Apply(vars *Vars) (Node, error) {
if k.value == nil {
Expand Down Expand Up @@ -397,6 +418,14 @@ func (l *List) ShallowClone() Node {
return &List{value: nodes}
}

// Vars returns a list of all variables referenced in the list.
func (l *List) Vars(vars []string) []string {
for _, v := range l.value {
vars = v.Vars(vars)
}
return vars
}

// Apply applies the vars to all nodes in the list.
func (l *List) Apply(vars *Vars) (Node, error) {
nodes := make([]Node, 0, len(l.value))
Expand Down Expand Up @@ -472,6 +501,16 @@ func (s *StrVal) Hash() []byte {
return []byte(s.value)
}

// Vars returns a list of all variables referenced in the string.
func (s *StrVal) Vars(vars []string) []string {
// errors are ignored (if there is an error determine the vars it will also error computing the policy)
_, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) {
vars = append(vars, variable)
return nil, nil, false
}, false)
return vars
}

// Apply applies the vars to the string value.
func (s *StrVal) Apply(vars *Vars) (Node, error) {
return vars.Replace(s.value)
Expand Down Expand Up @@ -523,6 +562,11 @@ func (s *IntVal) ShallowClone() Node {
return s.Clone()
}

// Vars does nothing. Cannot have variable in an IntVal.
func (s *IntVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *IntVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -584,6 +628,11 @@ func (s *UIntVal) Hash() []byte {
return []byte(s.String())
}

// Vars does nothing. Cannot have variable in an UIntVal.
func (s *UIntVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *UIntVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -641,6 +690,11 @@ func (s *FloatVal) Hash() []byte {
return []byte(strconv.FormatFloat(s.value, 'f', -1, 64))
}

// Vars does nothing. Cannot have variable in an FloatVal.
func (s *FloatVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *FloatVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -703,6 +757,11 @@ func (s *BoolVal) Hash() []byte {
return falseVal
}

// Vars does nothing. Cannot have variable in an BoolVal.
func (s *BoolVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *BoolVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -982,6 +1041,11 @@ func attachProcessors(node Node, processors Processors) Node {

// Lookup accept an AST and a selector and return the matching Node at that position.
func Lookup(a *AST, selector Selector) (Node, bool) {
// Be defensive and ensure that the ast is usable.
if a == nil || a.root == nil {
return nil, false
}

// Run through the graph and find matching nodes.
current := a.root
for _, part := range splitPath(selector) {
Expand Down
Loading

0 comments on commit b811364

Please sign in to comment.