Skip to content

Commit

Permalink
harden linux/windows vault implementation for concurrent access. (#3240)
Browse files Browse the repository at this point in the history
* Harden linux/windows vault implementation for concurrent access

* Spit off some platform-specific code to make linter happy

* Add missing field for windows implementation

* Fix return params for Get on windows

* Replace deprecated ioutil with os package API for windows implementation

* Reorganise platform specific code

* Refactored vault code, extracted shared code, addressed the code review comments

* Split off platform specific options

* Remove wrong build directive from the common options

* do not try to generate HTML test report if test XML output isn't present

---------

Co-authored-by: Anderson Queiroz <[email protected]>
  • Loading branch information
aleksmaus and AndersonQ authored Aug 24, 2023
1 parent 5fe4fcd commit c4c314a
Show file tree
Hide file tree
Showing 60 changed files with 690 additions and 1,016 deletions.
10 changes: 8 additions & 2 deletions .buildkite/scripts/steps/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ TESTS_EXIT_STATUS=$?
set -e

# HTML report
go install github.com/alexec/junit2html@latest
junit2html < build/TEST-go-integration.xml > build/TEST-report.html
outputXML="build/TEST-go-integration.xml"

if [ -f "$outputXML" ]; then
go install github.com/alexec/junit2html@latest
junit2html < "$outputXML" > build/TEST-report.html
else
echo "Cannot generate HTML test report: $outputXML not found"
fi

exit $TESTS_EXIT_STATUS
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import (
func TestPolicyChange(t *testing.T) {
log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
nullStore := &storage.NullStore{}

t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) {
Expand All @@ -59,7 +63,10 @@ func TestPolicyChange(t *testing.T) {

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
nullStore := &storage.NullStore{}

t.Run("Config change should ACK", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
return fmt.Errorf("failed to unpack log level: %w", err)
}

if err := h.agentInfo.SetLogLevel(action.LogLevel); err != nil {
if err := h.agentInfo.SetLogLevel(ctx, action.LogLevel); err != nil {
return fmt.Errorf("failed to update log level: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestUpgradeHandler(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the coordinator
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package application

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -34,6 +35,7 @@ import (

// New creates a new Agent and bootstrap the required subsystem.
func New(
ctx context.Context,
log *logger.Logger,
baseLogger *logger.Logger,
logLevel logp.Level,
Expand Down Expand Up @@ -139,7 +141,7 @@ func New(
} else {
isManaged = true
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
store, cfg, err = mergeFleetConfig(ctx, rawConfig)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -158,7 +160,7 @@ func New(
EndpointSignedComponentModifier(),
)

managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime, fleetInitTimeout)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -196,9 +198,9 @@ func New(
return coord, configMgr, composable, nil
}

func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
path := paths.AgentConfigFile()
store := storage.NewEncryptedDiskStore(path)
store := storage.NewEncryptedDiskStore(ctx, path)

reader, err := store.Load()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package application

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestMergeFleetConfig(t *testing.T) {
}

rawConfig := config.MustNewConfigFrom(cfg)
storage, conf, err := mergeFleetConfig(rawConfig)
storage, conf, err := mergeFleetConfig(context.Background(), rawConfig)
require.NoError(t, err)
assert.NotNil(t, storage)
assert.NotNil(t, conf)
Expand All @@ -48,7 +49,11 @@ func TestMergeFleetConfig(t *testing.T) {

func TestLimitsLog(t *testing.T) {
log, obs := logger.NewTesting("TestLimitsLog")
ctx, cn := context.WithCancel(context.Background())
defer cn()

_, _, _, err := New(
ctx,
log,
log,
logp.DebugLevel,
Expand Down
16 changes: 8 additions & 8 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCoordinator_State_Starting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
stateChan := coord.StateSubscribe(ctx, 32)
go func() {
err := coord.Run(ctx)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestCoordinator_State_ConfigError_NotManaged(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t, ManagedCoordinator(true))
coord, cfgMgr, varsMgr := createCoordinator(t, ctx, ManagedCoordinator(true))
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestCoordinator_StateSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestCoordinator_ReExec(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestCoordinator_Upgrade(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -472,7 +472,7 @@ func ManagedCoordinator(managed bool) CoordinatorOpt {
// createCoordinator creates a coordinator that using a fake config manager and a fake vars manager.
//
// The runtime specifications is set up to use both the fake component and fake shipper.
func createCoordinator(t *testing.T, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) {
func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) {
t.Helper()

o := &createCoordinatorOpts{}
Expand All @@ -482,7 +482,7 @@ func createCoordinator(t *testing.T, opts ...CoordinatorOpt) (*Coordinator, *fak

l := newErrorLogger(t)

ai, err := info.NewAgentInfo(false)
ai, err := info.NewAgentInfo(ctx, false)
require.NoError(t, err)

componentSpec := component.InputRuntimeSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata(f.log)
ecsMeta, err := info.Metadata(ctx, f.log)
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
}
Expand Down
15 changes: 8 additions & 7 deletions internal/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package info

import (
"bytes"
"context"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -40,8 +41,8 @@ type ioStore interface {
}

// updateLogLevel updates log level and persists it to disk.
func updateLogLevel(level string) error {
ai, err := loadAgentInfoWithBackoff(false, defaultLogLevel, false)
func updateLogLevel(ctx context.Context, level string) error {
ai, err := loadAgentInfoWithBackoff(ctx, false, defaultLogLevel, false)
if err != nil {
return err
}
Expand All @@ -52,7 +53,7 @@ func updateLogLevel(level string) error {
}

agentConfigFile := paths.AgentConfigFile()
diskStore := storage.NewEncryptedDiskStore(agentConfigFile)
diskStore := storage.NewEncryptedDiskStore(ctx, agentConfigFile)

ai.LogLevel = level
return updateAgentInfo(diskStore, ai)
Expand Down Expand Up @@ -173,7 +174,7 @@ func yamlToReader(in interface{}) (io.Reader, error) {
return bytes.NewReader(data), nil
}

func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfoWithBackoff(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
var err error
var ai *persistentAgentInfo

Expand All @@ -182,7 +183,7 @@ func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID b

for i := 0; i <= maxRetriesloadAgentInfo; i++ {
backExp.Wait()
ai, err = loadAgentInfo(forceUpdate, logLevel, createAgentID)
ai, err = loadAgentInfo(ctx, forceUpdate, logLevel, createAgentID)
if !errors.Is(err, filelock.ErrAppAlreadyRunning) {
break
}
Expand All @@ -192,7 +193,7 @@ func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID b
return ai, err
}

func loadAgentInfo(forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfo(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
idLock := paths.AgentConfigFileLock()
if err := idLock.TryLock(); err != nil {
return nil, err
Expand All @@ -201,7 +202,7 @@ func loadAgentInfo(forceUpdate bool, logLevel string, createAgentID bool) (*pers
defer idLock.Unlock()

agentConfigFile := paths.AgentConfigFile()
diskStore := storage.NewEncryptedDiskStore(agentConfigFile)
diskStore := storage.NewEncryptedDiskStore(ctx, agentConfigFile)

agentInfo, err := getInfoFromStore(diskStore, logLevel)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions internal/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package info

import (
"context"

"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -25,8 +27,8 @@ type AgentInfo struct {
// new unique identifier for agent.
// If agent config file does not exist it gets created.
// Initiates log level to predefined value.
func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
agentInfo, err := loadAgentInfoWithBackoff(false, level, createAgentID)
func NewAgentInfoWithLog(ctx context.Context, level string, createAgentID bool) (*AgentInfo, error) {
agentInfo, err := loadAgentInfoWithBackoff(ctx, false, level, createAgentID)
if err != nil {
return nil, err
}
Expand All @@ -43,8 +45,8 @@ func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
// this created ID otherwise it generates
// new unique identifier for agent.
// If agent config file does not exist it gets created.
func NewAgentInfo(createAgentID bool) (*AgentInfo, error) {
return NewAgentInfoWithLog(defaultLogLevel, createAgentID)
func NewAgentInfo(ctx context.Context, createAgentID bool) (*AgentInfo, error) {
return NewAgentInfoWithLog(ctx, defaultLogLevel, createAgentID)
}

// LogLevel retrieves a log level.
Expand All @@ -56,8 +58,8 @@ func (i *AgentInfo) LogLevel() string {
}

// SetLogLevel updates log level of agent.
func (i *AgentInfo) SetLogLevel(level string) error {
if err := updateLogLevel(level); err != nil {
func (i *AgentInfo) SetLogLevel(ctx context.Context, level string) error {
if err := updateLogLevel(ctx, level); err != nil {
return err
}

Expand All @@ -66,8 +68,8 @@ func (i *AgentInfo) SetLogLevel(level string) error {
}

// ReloadID reloads agent info ID from configuration file.
func (i *AgentInfo) ReloadID() error {
newInfo, err := NewAgentInfoWithLog(i.logLevel, false)
func (i *AgentInfo) ReloadID(ctx context.Context) error {
newInfo, err := NewAgentInfoWithLog(ctx, i.logLevel, false)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/agent/application/info/agent_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package info

import (
"context"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -125,8 +126,8 @@ const (
)

// Metadata loads metadata from disk.
func Metadata(l *logger.Logger) (*ECSMeta, error) {
agentInfo, err := NewAgentInfo(false)
func Metadata(ctx context.Context, l *logger.Logger) (*ECSMeta, error) {
agentInfo, err := NewAgentInfo(ctx, false)
if err != nil {
return nil, fmt.Errorf("failed to create new agent info: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type managedConfigManager struct {
}

func newManagedConfigManager(
ctx context.Context,
log *logger.Logger,
agentInfo *info.AgentInfo,
cfg *configuration.Configuration,
Expand All @@ -72,7 +73,7 @@ func newManagedConfigManager(
}

// Create the state store that will persist the last good policy change on disk.
stateStore, err := store.NewStateStoreWithMigration(log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
stateStore, err := store.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", paths.AgentActionStoreFile()))
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
}

// Reload ID because of win7 sync issue
if err := m.agentInfo.ReloadID(); err != nil {
if err := m.agentInfo.ReloadID(ctx); err != nil {
return err
}

Expand Down
Loading

0 comments on commit c4c314a

Please sign in to comment.