Skip to content

Commit

Permalink
Add IsStandalone() to agent info, report management mode to gRPC (#…
Browse files Browse the repository at this point in the history
…4911)

* init commit
* update runtime_comm
* fix startup bug
* fix tests, log
  • Loading branch information
fearful-symmetry authored Jun 12, 2024
1 parent 6c20730 commit 249d0b6
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ type fakeAgentInfo struct {
snapshot bool
version string
unprivileged bool
isStandalone bool
}

func (a fakeAgentInfo) AgentID() string {
Expand Down Expand Up @@ -661,5 +662,9 @@ func (a fakeAgentInfo) Unprivileged() bool {
return a.unprivileged
}

func (a fakeAgentInfo) IsStandalone() bool {
return a.isStandalone
}

func (a fakeAgentInfo) ReloadID(ctx context.Context) error { panic("implement me") }
func (a fakeAgentInfo) SetLogLevel(ctx context.Context, level string) error { panic("implement me") }
55 changes: 35 additions & 20 deletions internal/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ioStore interface {

// updateLogLevel updates log level and persists it to disk.
func updateLogLevel(ctx context.Context, level string) error {
ai, err := loadAgentInfoWithBackoff(ctx, false, defaultLogLevel, false)
ai, _, err := loadAgentInfoWithBackoff(ctx, false, defaultLogLevel, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -71,51 +71,65 @@ func generateAgentID() (string, error) {
return uid.String(), nil
}

func getInfoFromStore(s ioStore, logLevel string) (*persistentAgentInfo, error) {
// getInfoFromStore uses the IO store to return the config from agent.* fields in the config,
// as well as a bool indicating if agent is running in standalone mode.
func getInfoFromStore(s ioStore, logLevel string) (*persistentAgentInfo, bool, error) {
agentConfigFile := paths.AgentConfigFile()
reader, err := s.Load()
if err != nil {
return nil, fmt.Errorf("failed to load from ioStore: %w", err)
return nil, false, fmt.Errorf("failed to load from ioStore: %w", err)
}

// reader is closed by this function
cfg, err := config.NewConfigFrom(reader)
if err != nil {
return nil, errors.New(err,
return nil, false, errors.New(err,
fmt.Sprintf("fail to read configuration %s for the agent", agentConfigFile),
errors.TypeFilesystem,
errors.M(errors.MetaKeyPath, agentConfigFile))
}

configMap, err := cfg.ToMapStr()
if err != nil {
return nil, errors.New(err,
return nil, false, errors.New(err,
"failed to unpack stored config to map",
errors.TypeFilesystem)
}

// check fleet config. This behavior emulates configuration.IsStandalone
fleetmode, fleetExists := configMap["fleet"]
isStandalone := true
if fleetExists {
fleetCfg, ok := fleetmode.(map[string]interface{})
if ok {
if fleetCfg["enabled"] == true {
isStandalone = false
}
}
}

agentInfoSubMap, found := configMap[agentInfoKey]
if !found {
return &persistentAgentInfo{
LogLevel: logLevel,
MonitoringHTTP: monitoringConfig.DefaultConfig().HTTP,
}, nil
}, isStandalone, nil
}

cc, err := config.NewConfigFrom(agentInfoSubMap)
if err != nil {
return nil, errors.New(err, "failed to create config from agent info submap")
return nil, false, errors.New(err, "failed to create config from agent info submap")
}

pid := &persistentAgentInfo{
LogLevel: logLevel,
MonitoringHTTP: monitoringConfig.DefaultConfig().HTTP,
}
if err := cc.Unpack(&pid); err != nil {
return nil, errors.New(err, "failed to unpack stored config to map")
return nil, false, errors.New(err, "failed to unpack stored config to map")
}

return pid, nil
return pid, isStandalone, nil
}

func updateAgentInfo(s ioStore, agentInfo *persistentAgentInfo) error {
Expand Down Expand Up @@ -177,53 +191,54 @@ func yamlToReader(in interface{}) (io.Reader, error) {
return bytes.NewReader(data), nil
}

func loadAgentInfoWithBackoff(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfoWithBackoff(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, bool, error) {
var err error
var ai *persistentAgentInfo
var isStandalone bool

signal := make(chan struct{})
backExp := backoff.NewExpBackoff(signal, 100*time.Millisecond, 3*time.Second)

for i := 0; i <= maxRetriesloadAgentInfo; i++ {
backExp.Wait()
ai, err = loadAgentInfo(ctx, forceUpdate, logLevel, createAgentID)
ai, isStandalone, err = loadAgentInfo(ctx, forceUpdate, logLevel, createAgentID)
if !errors.Is(err, filelock.ErrAppAlreadyRunning) {
break
}
}

close(signal)
return ai, err
return ai, isStandalone, err
}

func loadAgentInfo(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfo(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, bool, error) {
idLock := paths.AgentConfigFileLock()
if err := idLock.TryLock(); err != nil {
return nil, err
return nil, false, err
}
//nolint:errcheck // keeping the same behavior, and making linter happy
defer idLock.Unlock()

agentConfigFile := paths.AgentConfigFile()
diskStore, err := storage.NewEncryptedDiskStore(ctx, agentConfigFile)
if err != nil {
return nil, fmt.Errorf("error instantiating encrypted disk store: %w", err)
return nil, false, fmt.Errorf("error instantiating encrypted disk store: %w", err)
}

agentInfo, err := getInfoFromStore(diskStore, logLevel)
agentInfo, isStandalone, err := getInfoFromStore(diskStore, logLevel)
if err != nil {
return nil, fmt.Errorf("could not get agent info from store: %w", err)
return nil, false, fmt.Errorf("could not get agent info from store: %w", err)
}

if agentInfo != nil && !forceUpdate && (agentInfo.ID != "" || !createAgentID) {
return agentInfo, nil
return agentInfo, isStandalone, nil
}

if err := updateID(agentInfo, diskStore); err != nil {
return nil, fmt.Errorf("could not update agent ID on disk store: %w", err)
return nil, false, fmt.Errorf("could not update agent ID on disk store: %w", err)
}

return agentInfo, nil
return agentInfo, isStandalone, nil
}

func updateID(agentInfo *persistentAgentInfo, s ioStore) error {
Expand Down
91 changes: 91 additions & 0 deletions internal/pkg/agent/application/info/agent_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package info

import (
"bytes"
"context"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/secret"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/vault"
)

func TestAgentIDStandaloneWorks(t *testing.T) {
if runtime.GOOS == "darwin" {
// vault requres extra perms on mac
t.Skip()
}
// create a new encrypted disk store
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tmpPath := t.TempDir()
paths.SetConfig(tmpPath)

vaultPath := filepath.Join(tmpPath, "vault")
err := secret.CreateAgentSecret(ctx, vault.WithVaultPath(vaultPath))
require.NoError(t, err)

setID := "test-id"
testCfg := map[string]interface{}{
"agent": map[string]interface{}{
"id": setID,
},
}
saveToStateStore(t, tmpPath, testCfg)

got, err := NewAgentInfo(ctx, false)
require.NoError(t, err)
t.Logf("got: %#v", got)

// check the ID to make sure we've opened the fleet config properly
require.Equal(t, setID, got.agentID)

// no fleet config, should be standalone
require.True(t, got.isStandalone)

// update fleet config, this time in managed mode
testCfg = map[string]interface{}{
"agent": map[string]interface{}{
"id": setID,
},
"fleet": map[string]interface{}{
"enabled": true,
},
}
saveToStateStore(t, tmpPath, testCfg)

got, err = NewAgentInfo(ctx, false)
require.NoError(t, err)
t.Logf("got: %#v", got)
require.False(t, got.isStandalone)

}

func saveToStateStore(t *testing.T, tmpPath string, in map[string]interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

encPath := filepath.Join(tmpPath, "fleet.enc")
store, err := storage.NewEncryptedDiskStore(ctx, encPath)
require.NoError(t, err)

rawYml, err := yaml.Marshal(in)
require.NoError(t, err)

reader := bytes.NewReader(rawYml)

err = store.Save(reader)
require.NoError(t, err)
}
11 changes: 10 additions & 1 deletion internal/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ type Agent interface {

// Unprivileged returns true when this Agent is running unprivileged.
Unprivileged() bool

// IsStandalone returns true is the agent is running in standalone mode, i.e, without fleet
IsStandalone() bool
}

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
agentID string
logLevel string
unprivileged bool
isStandalone bool

// esHeaders will be injected into the headers field of any elasticsearch
// output created by this agent (see component.toIntermediate).
Expand All @@ -60,7 +64,7 @@ type AgentInfo struct {
// If agent config file does not exist it gets created.
// Initiates log level to predefined value.
func NewAgentInfoWithLog(ctx context.Context, level string, createAgentID bool) (*AgentInfo, error) {
agentInfo, err := loadAgentInfoWithBackoff(ctx, false, level, createAgentID)
agentInfo, isStandalone, err := loadAgentInfoWithBackoff(ctx, false, level, createAgentID)
if err != nil {
return nil, err
}
Expand All @@ -74,6 +78,7 @@ func NewAgentInfoWithLog(ctx context.Context, level string, createAgentID bool)
logLevel: agentInfo.LogLevel,
unprivileged: !isRoot,
esHeaders: agentInfo.Headers,
isStandalone: isStandalone,
}, nil
}

Expand Down Expand Up @@ -144,3 +149,7 @@ func (i *AgentInfo) Headers() map[string]string {
func (i *AgentInfo) Unprivileged() bool {
return i.unprivileged
}

func (i *AgentInfo) IsStandalone() bool {
return i.isStandalone
}
10 changes: 10 additions & 0 deletions pkg/component/runtime/runtime_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (c *runtimeComm) WriteStartUpInfo(w io.Writer, services ...client.Service)
Id: c.agentInfo.AgentID(),
Version: c.agentInfo.Version(),
Snapshot: c.agentInfo.Snapshot(),
Mode: protoAgentMode(c.agentInfo),
},
}
infoBytes, err := protobuf.Marshal(startupInfo)
Expand All @@ -155,6 +156,7 @@ func (c *runtimeComm) CheckinExpected(
Id: c.agentInfo.AgentID(),
Version: c.agentInfo.Version(),
Snapshot: c.agentInfo.Snapshot(),
Mode: protoAgentMode(c.agentInfo),
}
} else {
expected.AgentInfo = nil
Expand Down Expand Up @@ -433,3 +435,11 @@ func sendExpectedChunked(server proto.ElasticAgent_CheckinV2Server, msg *proto.C
}
return nil
}

// protoAgentMode converts the agent info mode bool to the AgentManagedMode enum
func protoAgentMode(agent info.Agent) proto.AgentManagedMode {
if agent.IsStandalone() {
return proto.AgentManagedMode_STANDALONE
}
return proto.AgentManagedMode_MANAGED
}
5 changes: 5 additions & 0 deletions pkg/component/runtime/runtime_comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type agentInfoMock struct {
snapshot bool
version string
unprivileged bool
isStandalone bool
}

func (a agentInfoMock) AgentID() string {
Expand All @@ -38,6 +39,10 @@ func (a agentInfoMock) Unprivileged() bool {
return a.unprivileged
}

func (a agentInfoMock) IsStandalone() bool {
return a.isStandalone
}

func (a agentInfoMock) Headers() map[string]string { panic("implement me") }
func (a agentInfoMock) LogLevel() string { panic("implement me") }
func (a agentInfoMock) RawLogLevel() string { panic("implement me") }
Expand Down
Loading

0 comments on commit 249d0b6

Please sign in to comment.