diff --git a/internal/pkg/agent/storage/store/state_store.go b/internal/pkg/agent/storage/store/state_store.go index 3e794c3547b..1955d479e67 100644 --- a/internal/pkg/agent/storage/store/state_store.go +++ b/internal/pkg/agent/storage/store/state_store.go @@ -76,7 +76,9 @@ type stateSerializer struct { // NewStateStoreWithMigration creates a new state store and migrates the old one. func NewStateStoreWithMigration(ctx context.Context, log *logger.Logger, actionStorePath, stateStorePath string) (*StateStore, error) { - err := migrateStateStore(ctx, log, actionStorePath, stateStorePath) + + stateDiskStore := storage.NewEncryptedDiskStore(ctx, stateStorePath) + err := migrateStateStore(log, actionStorePath, stateDiskStore) if err != nil { return nil, err } @@ -143,20 +145,23 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { }, nil } -func migrateStateStore(ctx context.Context, log *logger.Logger, actionStorePath, stateStorePath string) (err error) { +func migrateStateStore( + log *logger.Logger, + actionStorePath string, + stateDiskStore storage.Storage) (err error) { + log = log.Named("state_migration") actionDiskStore := storage.NewDiskStore(actionStorePath) - stateDiskStore := storage.NewEncryptedDiskStore(ctx, stateStorePath) stateStoreExits, err := stateDiskStore.Exists() if err != nil { - log.Errorf("failed to check if state store %s exists: %v", stateStorePath, err) + log.Errorf("failed to check if state store exists: %v", err) return err } // do not migrate if the state store already exists if stateStoreExits { - log.Debugf("state store %s already exists", stateStorePath) + log.Debugf("state store already exists") return nil } @@ -204,7 +209,7 @@ func migrateStateStore(ctx context.Context, log *logger.Logger, actionStorePath, err = stateStore.Save() if err != nil { - log.Debugf("failed to save agent state store %s, err: %v", stateStorePath, err) + log.Debugf("failed to save agent state store, err: %v", err) } return err } diff --git a/internal/pkg/agent/storage/store/state_store_test.go b/internal/pkg/agent/storage/store/state_store_test.go index 0e969a7525e..a31cb9006db 100644 --- a/internal/pkg/agent/storage/store/state_store_test.go +++ b/internal/pkg/agent/storage/store/state_store_test.go @@ -6,26 +6,31 @@ package store import ( "context" + "io" + "os" "path/filepath" + "runtime" "sync" "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" + "github.com/elastic/elastic-agent/internal/pkg/agent/vault" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/pkg/core/logger" ) func TestStateStore(t *testing.T) { t.Run("ack token", func(t *testing.T) { - runTestStateStore(t, "") + runTestStateStore(t, "czlV93YBwdkt5lYhBY7S") }) t.Run("no ack token", func(t *testing.T) { - runTestStateStore(t, "czlV93YBwdkt5lYhBY7S") + runTestStateStore(t, "") }) } @@ -35,264 +40,302 @@ func runTestStateStore(t *testing.T, ackToken string) { ctx, cn := context.WithCancel(context.Background()) defer cn() - withFile := func(fn func(t *testing.T, file string)) func(*testing.T) { - return func(t *testing.T) { - dir := t.TempDir() - file := filepath.Join(dir, "state.yml") - fn(t, file) + t.Run("action returns empty when no action is saved on disk", func(t *testing.T) { + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + require.Empty(t, store.Actions()) + require.Empty(t, store.Queue()) + }) + + t.Run("will discard silently unknown action", func(t *testing.T) { + actionPolicyChange := &fleetapi.ActionUnknown{ + ActionID: "abc123", } - } - t.Run("action returns empty when no action is saved on disk", - withFile(func(t *testing.T, file string) { - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - require.Empty(t, store.Actions()) - require.Empty(t, store.Queue()) - })) - - t.Run("will discard silently unknown action", - withFile(func(t *testing.T, file string) { - actionPolicyChange := &fleetapi.ActionUnknown{ - ActionID: "abc123", - } - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - - require.Equal(t, 0, len(store.Actions())) - store.Add(actionPolicyChange) - store.SetAckToken(ackToken) - err = store.Save() - require.NoError(t, err) - require.Empty(t, store.Actions()) - require.Empty(t, store.Queue()) - require.Equal(t, ackToken, store.AckToken()) - })) - - t.Run("can save to disk known action type", - withFile(func(t *testing.T, file string) { - ActionPolicyChange := &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, - } - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - - require.Empty(t, store.Actions()) - require.Empty(t, store.Queue()) - store.Add(ActionPolicyChange) - store.SetAckToken(ackToken) - err = store.Save() - require.NoError(t, err) - require.Len(t, store.Actions(), 1) - require.Empty(t, store.Queue()) - require.Equal(t, ackToken, store.AckToken()) - - s = storage.NewDiskStore(file) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - - actions := store1.Actions() - require.Len(t, actions, 1) - require.Empty(t, store1.Queue()) - - require.Equal(t, ActionPolicyChange, actions[0]) - require.Equal(t, ackToken, store.AckToken()) - })) - - t.Run("can save a queue with one upgrade action", - withFile(func(t *testing.T, file string) { - ts := time.Now().UTC().Round(time.Second) - queue := []action{&fleetapi.ActionUpgrade{ - ActionID: "test", - ActionType: fleetapi.ActionTypeUpgrade, - ActionStartTime: ts.Format(time.RFC3339), - Version: "1.2.3", - SourceURI: "https://example.com", - }} - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - - require.Empty(t, store.Actions()) - store.SetQueue(queue) - err = store.Save() - require.NoError(t, err) - require.Empty(t, store.Actions()) - require.Len(t, store.Queue(), 1) - - s = storage.NewDiskStore(file) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - require.Empty(t, store1.Actions()) - require.Len(t, store1.Queue(), 1) - require.Equal(t, "test", store1.Queue()[0].ID()) - scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) - require.True(t, ok, "expected to be able to cast Action as ScheduledAction") - start, err := scheduledAction.StartTime() - require.NoError(t, err) - require.Equal(t, ts, start) - })) - - t.Run("can save a queue with two actions", - withFile(func(t *testing.T, file string) { - ts := time.Now().UTC().Round(time.Second) - queue := []action{&fleetapi.ActionUpgrade{ - ActionID: "test", - ActionType: fleetapi.ActionTypeUpgrade, - ActionStartTime: ts.Format(time.RFC3339), - Version: "1.2.3", - SourceURI: "https://example.com", - Retry: 1, - }, &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, - }} - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - - require.Empty(t, store.Actions()) - store.SetQueue(queue) - err = store.Save() - require.NoError(t, err) - require.Empty(t, store.Actions()) - require.Len(t, store.Queue(), 2) - - s = storage.NewDiskStore(file) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - require.Empty(t, store1.Actions()) - require.Len(t, store1.Queue(), 2) - - require.Equal(t, "test", store1.Queue()[0].ID()) - scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) - require.True(t, ok, "expected to be able to cast Action as ScheduledAction") - start, err := scheduledAction.StartTime() - require.NoError(t, err) - require.Equal(t, ts, start) - retryableAction, ok := store1.Queue()[0].(fleetapi.RetryableAction) - require.True(t, ok, "expected to be able to cast Action as RetryableAction") - require.Equal(t, 1, retryableAction.RetryAttempt()) - - require.Equal(t, "abc123", store1.Queue()[1].ID()) - _, ok = store1.Queue()[1].(fleetapi.ScheduledAction) - require.False(t, ok, "expected cast to ScheduledAction to fail") - })) - - t.Run("can save to disk unenroll action type", - withFile(func(t *testing.T, file string) { - action := &fleetapi.ActionUnenroll{ - ActionID: "abc123", - ActionType: "UNENROLL", - } - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - - require.Empty(t, store.Actions()) - require.Empty(t, store.Queue()) - store.Add(action) - store.SetAckToken(ackToken) - err = store.Save() - require.NoError(t, err) - require.Len(t, store.Actions(), 1) - require.Empty(t, store.Queue()) - require.Equal(t, ackToken, store.AckToken()) - - s = storage.NewDiskStore(file) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - - actions := store1.Actions() - require.Len(t, actions, 1) - require.Empty(t, store1.Queue()) - require.Equal(t, action, actions[0]) - require.Equal(t, ackToken, store.AckToken()) - })) - - t.Run("when we ACK we save to disk", - withFile(func(t *testing.T, file string) { - ActionPolicyChange := &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - } - - s := storage.NewDiskStore(file) - store, err := NewStateStore(log, s) - require.NoError(t, err) - store.SetAckToken(ackToken) - - acker := NewStateStoreActionAcker(&testAcker{}, store) - require.Empty(t, store.Actions()) - - require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange)) - require.Len(t, store.Actions(), 1) - require.Empty(t, store.Queue()) - require.Equal(t, ackToken, store.AckToken()) - })) - - t.Run("migrate actions file does not exists", - withFile(func(t *testing.T, actionStorePath string) { - withFile(func(t *testing.T, stateStorePath string) { - err := migrateStateStore(ctx, log, actionStorePath, stateStorePath) - require.NoError(t, err) - stateStore, err := NewStateStore(log, storage.NewDiskStore(stateStorePath)) - require.NoError(t, err) - stateStore.SetAckToken(ackToken) - require.Empty(t, stateStore.Actions()) - require.Equal(t, ackToken, stateStore.AckToken()) - require.Empty(t, stateStore.Queue()) - }) - })) - - t.Run("migrate", - withFile(func(t *testing.T, actionStorePath string) { - ActionPolicyChange := &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, - } - - actionStore, err := newActionStore(log, storage.NewDiskStore(actionStorePath)) - require.NoError(t, err) - - require.Empty(t, actionStore.actions()) - actionStore.add(ActionPolicyChange) - err = actionStore.save() - require.NoError(t, err) - require.Len(t, actionStore.actions(), 1) - - withFile(func(t *testing.T, stateStorePath string) { - err = migrateStateStore(ctx, log, actionStorePath, stateStorePath) - require.NoError(t, err) - - stateStore, err := NewStateStore(log, storage.NewDiskStore(stateStorePath)) - require.NoError(t, err) - stateStore.SetAckToken(ackToken) - diff := cmp.Diff(actionStore.actions(), stateStore.Actions()) - if diff != "" { - t.Error(diff) - } - require.Equal(t, ackToken, stateStore.AckToken()) - require.Empty(t, stateStore.Queue()) - }) - })) + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + + require.Equal(t, 0, len(store.Actions())) + store.Add(actionPolicyChange) + store.SetAckToken(ackToken) + err = store.Save() + require.NoError(t, err) + require.Empty(t, store.Actions()) + require.Empty(t, store.Queue()) + require.Equal(t, ackToken, store.AckToken()) + }) + + t.Run("can save to disk known action type", func(t *testing.T) { + ActionPolicyChange := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Policy: map[string]interface{}{ + "hello": "world", + }, + } + + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + + require.Empty(t, store.Actions()) + require.Empty(t, store.Queue()) + store.Add(ActionPolicyChange) + store.SetAckToken(ackToken) + err = store.Save() + require.NoError(t, err) + require.Len(t, store.Actions(), 1) + require.Empty(t, store.Queue()) + require.Equal(t, ackToken, store.AckToken()) + + s = storage.NewDiskStore(storePath) + store1, err := NewStateStore(log, s) + require.NoError(t, err) + + actions := store1.Actions() + require.Len(t, actions, 1) + require.Empty(t, store1.Queue()) + + require.Equal(t, ActionPolicyChange, actions[0]) + require.Equal(t, ackToken, store.AckToken()) + }) + + t.Run("can save a queue with one upgrade action", func(t *testing.T) { + ts := time.Now().UTC().Round(time.Second) + queue := []action{&fleetapi.ActionUpgrade{ + ActionID: "test", + ActionType: fleetapi.ActionTypeUpgrade, + ActionStartTime: ts.Format(time.RFC3339), + Version: "1.2.3", + SourceURI: "https://example.com", + }} + + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + + require.Empty(t, store.Actions()) + store.SetQueue(queue) + err = store.Save() + require.NoError(t, err) + require.Empty(t, store.Actions()) + require.Len(t, store.Queue(), 1) + + s = storage.NewDiskStore(storePath) + store1, err := NewStateStore(log, s) + require.NoError(t, err) + require.Empty(t, store1.Actions()) + require.Len(t, store1.Queue(), 1) + require.Equal(t, "test", store1.Queue()[0].ID()) + scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) + require.True(t, ok, "expected to be able to cast Action as ScheduledAction") + start, err := scheduledAction.StartTime() + require.NoError(t, err) + require.Equal(t, ts, start) + }) + + t.Run("can save a queue with two actions", func(t *testing.T) { + ts := time.Now().UTC().Round(time.Second) + queue := []action{&fleetapi.ActionUpgrade{ + ActionID: "test", + ActionType: fleetapi.ActionTypeUpgrade, + ActionStartTime: ts.Format(time.RFC3339), + Version: "1.2.3", + SourceURI: "https://example.com", + Retry: 1, + }, &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Policy: map[string]interface{}{ + "hello": "world", + }, + }} + + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + + require.Empty(t, store.Actions()) + store.SetQueue(queue) + err = store.Save() + require.NoError(t, err) + require.Empty(t, store.Actions()) + require.Len(t, store.Queue(), 2) + + s = storage.NewDiskStore(storePath) + store1, err := NewStateStore(log, s) + require.NoError(t, err) + require.Empty(t, store1.Actions()) + require.Len(t, store1.Queue(), 2) + + require.Equal(t, "test", store1.Queue()[0].ID()) + scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) + require.True(t, ok, "expected to be able to cast Action as ScheduledAction") + start, err := scheduledAction.StartTime() + require.NoError(t, err) + require.Equal(t, ts, start) + retryableAction, ok := store1.Queue()[0].(fleetapi.RetryableAction) + require.True(t, ok, "expected to be able to cast Action as RetryableAction") + require.Equal(t, 1, retryableAction.RetryAttempt()) + + require.Equal(t, "abc123", store1.Queue()[1].ID()) + _, ok = store1.Queue()[1].(fleetapi.ScheduledAction) + require.False(t, ok, "expected cast to ScheduledAction to fail") + }) + + t.Run("can save to disk unenroll action type", func(t *testing.T) { + action := &fleetapi.ActionUnenroll{ + ActionID: "abc123", + ActionType: "UNENROLL", + } + + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + + require.Empty(t, store.Actions()) + require.Empty(t, store.Queue()) + store.Add(action) + store.SetAckToken(ackToken) + err = store.Save() + require.NoError(t, err) + require.Len(t, store.Actions(), 1) + require.Empty(t, store.Queue()) + require.Equal(t, ackToken, store.AckToken()) + + s = storage.NewDiskStore(storePath) + store1, err := NewStateStore(log, s) + require.NoError(t, err) + + actions := store1.Actions() + require.Len(t, actions, 1) + require.Empty(t, store1.Queue()) + require.Equal(t, action, actions[0]) + require.Equal(t, ackToken, store.AckToken()) + }) + + t.Run("when we ACK we save to disk", func(t *testing.T) { + ActionPolicyChange := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + } + + storePath := filepath.Join(t.TempDir(), "state.yml") + s := storage.NewDiskStore(storePath) + store, err := NewStateStore(log, s) + require.NoError(t, err) + store.SetAckToken(ackToken) + + acker := NewStateStoreActionAcker(&testAcker{}, store) + require.Empty(t, store.Actions()) + + require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange)) + require.Len(t, store.Actions(), 1) + require.Empty(t, store.Queue()) + require.Equal(t, ackToken, store.AckToken()) + }) + + t.Run("migrate actions file does not exists", func(t *testing.T) { + if runtime.GOOS == "darwin" { + // the original test never actually run, so with this at least + // there is coverage for linux and windows. + t.Skipf("needs https://github.com/elastic/elastic-agent/issues/3866" + + "to be merged so this test can work on darwin") + } + + tempDir := t.TempDir() + oldActionStorePath := filepath.Join(tempDir, "action_store.yml") + newStateStorePath := filepath.Join(tempDir, "state_store.yml") + + newStateStore := storage.NewEncryptedDiskStore(ctx, newStateStorePath) + err := migrateStateStore(log, oldActionStorePath, newStateStore) + require.NoError(t, err, "migration action store -> state store failed") + + // to load from disk a new store needs to be created, it loads the file + // to memory during the store creation. + stateStore, err := NewStateStore(log, storage.NewDiskStore(newStateStorePath)) + require.NoError(t, err) + stateStore.SetAckToken(ackToken) + require.Empty(t, stateStore.Actions()) + require.Equal(t, ackToken, stateStore.AckToken()) + require.Empty(t, stateStore.Queue()) + }) + + t.Run("migrate", func(t *testing.T) { + if runtime.GOOS == "darwin" { + // the original migrate never actually run, so with this at least + // there is coverage for linux and windows. + t.Skipf("needs https://github.com/elastic/elastic-agent/issues/3866" + + "to be merged so this test can work on darwin") + } + + want := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Policy: map[string]interface{}{ + "hello": "world", + "phi": 1.618, + "answer": 42, + }, + } + + tempDir := t.TempDir() + vaultPath := filepath.Join(tempDir, "vault") + err := os.MkdirAll(vaultPath, 0o750) + require.NoError(t, err, + "could not create directory for the agent's vault") + _, err = vault.New(ctx, vaultPath) + require.NoError(t, err, "could not create agent's vault") + err = secret.CreateAgentSecret( + context.Background(), secret.WithVaultPath(vaultPath)) + require.NoError(t, err, "could not create agent secret") + + // Copy the golden file as the migration deletes the old store. + goldenActionStoreFile, err := os.Open( + filepath.Join("testdata", "7.17.18-action_store.yml")) + require.NoError(t, err, "could not open action store golden file") + defer goldenActionStoreFile.Close() + + oldActionStorePath := filepath.Join(tempDir, "action_store.yml") + storeFile, err := os.Create(oldActionStorePath) + require.NoError(t, err, "could not create action store file") + + _, err = io.Copy(storeFile, goldenActionStoreFile) + require.NoError(t, err, "could not copy action store golden file") + err = storeFile.Close() + // It needs to be closed now otherwise on windows the store will fail to + // open the file. + require.NoError(t, err, "could not close store file") + + newStateStorePath := filepath.Join(tempDir, "state_store.yaml") + newStateStore := storage.NewEncryptedDiskStore(ctx, newStateStorePath, + storage.WithVaultPath(vaultPath)) + err = migrateStateStore(log, oldActionStorePath, newStateStore) + require.NoError(t, err, "migration action store -> state store failed") + + // to load from disk a new store needs to be created, it loads the file + // to memory during the store creation. + newStateStore = storage.NewEncryptedDiskStore(ctx, newStateStorePath, + storage.WithVaultPath(vaultPath)) + stateStore, err := NewStateStore(log, newStateStore) + require.NoError(t, err, "could not create state store") + + actions := stateStore.Actions() + require.Len(t, actions, 1, "state store should load exactly 1 action") + got := actions[0] + + assert.Equalf(t, want, got, + "loaded action differs from action on the old action store") + assert.Empty(t, stateStore.Queue(), + "queue should be empty, old action store did not have a queue") + }) } diff --git a/internal/pkg/agent/storage/store/testdata/7.17.18-action_store.yml b/internal/pkg/agent/storage/store/testdata/7.17.18-action_store.yml new file mode 100644 index 00000000000..8f559ec80d9 --- /dev/null +++ b/internal/pkg/agent/storage/store/testdata/7.17.18-action_store.yml @@ -0,0 +1,6 @@ +action_id: abc123 +action_type: POLICY_CHANGE +policy: + answer: 42 + hello: world + phi: 1.618