From 19c8fd0d5bcdb81170d4fba53862d649e7f7b05d Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 23 Feb 2024 07:03:54 +0100 Subject: [PATCH] update action model to match fleet-server schema (#4240) * simplify fleetapi.Actions.UnmarshalJSON * add test to ensure the state store is correctly loaded from disk * skip state store migration tests, they will be fixes on a follow-up PR as part of https://github.com/elastic/elastic-agent/issues/3912 --- .../actions/handlers/handler_action_cancel.go | 8 +- .../handlers/handler_action_policy_change.go | 2 +- .../handler_action_policy_change_test.go | 8 +- .../handlers/handler_action_settings.go | 9 +- .../handlers/handler_action_upgrade.go | 15 +- .../handlers/handler_action_upgrade_test.go | 12 +- .../application/dispatcher/dispatcher.go | 4 +- .../application/dispatcher/dispatcher_test.go | 20 +- .../agent/application/upgrade/step_mark.go | 10 +- .../pkg/agent/storage/store/action_store.go | 29 +- .../agent/storage/store/action_store_test.go | 5 +- .../pkg/agent/storage/store/state_store.go | 107 ++++-- .../agent/storage/store/state_store_test.go | 225 ++++++++++- internal/pkg/config/operations/inspector.go | 2 +- internal/pkg/fleetapi/ack_cmd_test.go | 7 +- .../fleetapi/acker/fleet/fleet_acker_test.go | 15 +- internal/pkg/fleetapi/action.go | 353 ++++++------------ internal/pkg/fleetapi/action_test.go | 22 +- internal/pkg/fleetapi/checkin_cmd_test.go | 2 +- 19 files changed, 489 insertions(+), 366 deletions(-) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_cancel.go b/internal/pkg/agent/application/actions/handlers/handler_action_cancel.go index bb48b2bd753..3086fe8e7a1 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_cancel.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_cancel.go @@ -37,11 +37,13 @@ func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acke if !ok { return fmt.Errorf("invalid type, expected ActionCancel and received %T", a) } - n := h.c.Cancel(action.TargetID) + n := h.c.Cancel(action.Data.TargetID) if n == 0 { - h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.", action.ActionID, action.TargetID) + h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.", + action.ActionID, action.Data.TargetID) return nil } - h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.", action.ActionID, action.TargetID, n) + h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.", + action.ActionID, action.Data.TargetID, n) return nil } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index af76e83f6d1..7ce4660531b 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -98,7 +98,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack // // Cache signature validation key for the next policy handling // h.signatureValidationKey = signatureValidationKey - c, err := config.NewConfigFrom(action.Policy) + c, err := config.NewConfigFrom(action.Data.Policy) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index b56bb1a1ba4..72cb77d491f 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -44,7 +44,9 @@ func TestPolicyChange(t *testing.T) { action := &fleetapi.ActionPolicyChange{ ActionID: "abc123", ActionType: "POLICY_CHANGE", - Policy: conf, + Data: fleetapi.ActionPolicyChangeData{ + Policy: conf, + }, } cfg := configuration.DefaultConfiguration() @@ -73,7 +75,9 @@ func TestPolicyAcked(t *testing.T) { action := &fleetapi.ActionPolicyChange{ ActionID: actionID, ActionType: "POLICY_CHANGE", - Policy: config, + Data: fleetapi.ActionPolicyChangeData{ + Policy: config, + }, } cfg := configuration.DefaultConfiguration() diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_settings.go b/internal/pkg/agent/application/actions/handlers/handler_action_settings.go index 68be5715dab..ed59fbe6266 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_settings.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_settings.go @@ -45,17 +45,18 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac return fmt.Errorf("invalid type, expected ActionSettings and received %T", a) } - if !isSupportedLogLevel(action.LogLevel) { - return fmt.Errorf("invalid log level, expected debug|info|warning|error and received '%s'", action.LogLevel) + if !isSupportedLogLevel(action.Data.LogLevel) { + return fmt.Errorf("invalid log level, expected debug|info|warning|error and received '%s'", + action.Data.LogLevel) } lvl := logp.InfoLevel - err := lvl.Unpack(action.LogLevel) + err := lvl.Unpack(action.Data.LogLevel) if err != nil { return fmt.Errorf("failed to unpack log level: %w", err) } - if err := h.agentInfo.SetLogLevel(ctx, action.LogLevel); err != nil { + if err := h.agentInfo.SetLogLevel(ctx, action.Data.LogLevel); err != nil { return fmt.Errorf("failed to update log level: %w", err) } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go index 6d58797f37e..ce24f297168 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go @@ -74,9 +74,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker } go func() { - h.log.Infof("starting upgrade to version %s in background", action.Version) - if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false, false); err != nil { - h.log.Errorf("upgrade to version %s failed: %v", action.Version, err) + h.log.Infof("starting upgrade to version %s in background", action.Data.Version) + if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil { + h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err) // If context is cancelled in getAsyncContext, the actions are acked there if !errors.Is(asyncCtx.Err(), context.Canceled) { h.bkgMutex.Lock() @@ -125,14 +125,17 @@ func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, a h.log.Errorf("invalid type, expected ActionUpgrade and received %T", action) return nil, false } - if (upgradeAction.Version == bkgAction.Version) && (upgradeAction.SourceURI == bkgAction.SourceURI) { - h.log.Infof("Duplicate upgrade to version %s received", bkgAction.Version) + if (upgradeAction.Data.Version == bkgAction.Data.Version) && + (upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI) { + h.log.Infof("Duplicate upgrade to version %s received", + bkgAction.Data.Version) h.bkgActions = append(h.bkgActions, action) return nil, false } // Versions must be different, cancel the first upgrade and run the new one - h.log.Infof("Canceling upgrade to version %s received", bkgAction.Version) + h.log.Infof("Canceling upgrade to version %s received", + bkgAction.Data.Version) h.bkgCancel() // Ack here because we have the lock, and we need to clear out the saved actions diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go index 14f4a02c571..e331e477253 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go @@ -81,7 +81,8 @@ func TestUpgradeHandler(t *testing.T) { go c.Run(ctx) u := NewUpgrade(log, c) - a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"} + a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{ + Version: "8.3.0", SourceURI: "http://localhost"}} ack := noopacker.New() err := u.Handle(ctx, &a, ack) require.NoError(t, err) @@ -114,7 +115,8 @@ func TestUpgradeHandlerSameVersion(t *testing.T) { go c.Run(ctx) u := NewUpgrade(log, c) - a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"} + a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{ + Version: "8.3.0", SourceURI: "http://localhost"}} ack := noopacker.New() err1 := u.Handle(ctx, &a, ack) err2 := u.Handle(ctx, &a, ack) @@ -149,8 +151,10 @@ func TestUpgradeHandlerNewVersion(t *testing.T) { go c.Run(ctx) u := NewUpgrade(log, c) - a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"} - a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"} + a1 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{ + Version: "8.2.0", SourceURI: "http://localhost"}} + a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{ + Version: "8.5.0", SourceURI: "http://localhost"}} ack := noopacker.New() err1 := u.Handle(ctx, &a1, ack) require.NoError(t, err1) diff --git a/internal/pkg/agent/application/dispatcher/dispatcher.go b/internal/pkg/agent/application/dispatcher/dispatcher.go index aef7bff5cdb..b7b97d7e69d 100644 --- a/internal/pkg/agent/application/dispatcher/dispatcher.go +++ b/internal/pkg/agent/application/dispatcher/dispatcher.go @@ -304,7 +304,7 @@ func (ad *ActionDispatcher) handleExpired( version := "unknown" expiration := "unknown" if upgrade, ok := e.(*fleetapi.ActionUpgrade); ok { - version = upgrade.Version + version = upgrade.Data.Version expiration = upgrade.ActionExpiration } ad.lastUpgradeDetails = details.NewDetails(version, details.StateFailed, e.ID()) @@ -356,7 +356,7 @@ func (ad *ActionDispatcher) reportNextScheduledUpgrade(input []fleetapi.Action, } upgradeDetails := details.NewDetails( - nextUpgrade.Version, + nextUpgrade.Data.Version, details.StateScheduled, nextUpgrade.ID()) startTime, err := nextUpgrade.StartTime() diff --git a/internal/pkg/agent/application/dispatcher/dispatcher_test.go b/internal/pkg/agent/application/dispatcher/dispatcher_test.go index 1e63e73e6e9..176f12f7b35 100644 --- a/internal/pkg/agent/application/dispatcher/dispatcher_test.go +++ b/internal/pkg/agent/application/dispatcher/dispatcher_test.go @@ -675,7 +675,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) { actions: []fleetapi.Action{ &fleetapi.ActionUpgrade{ ActionID: "action1", - Version: "8.12.3", + Data: fleetapi.ActionUpgradeData{ + Version: "8.12.3", + }, }, }, expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]", @@ -685,7 +687,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) { &fleetapi.ActionUpgrade{ ActionID: "action2", ActionStartTime: later.Format(time.RFC3339), - Version: "8.13.0", + Data: fleetapi.ActionUpgradeData{ + Version: "8.13.0", + }, }, }, expectedDetails: &details.Details{ @@ -702,12 +706,16 @@ func TestReportNextScheduledUpgrade(t *testing.T) { &fleetapi.ActionUpgrade{ ActionID: "action3", ActionStartTime: muchLater.Format(time.RFC3339), - Version: "8.14.1", + Data: fleetapi.ActionUpgradeData{ + Version: "8.14.1", + }, }, &fleetapi.ActionUpgrade{ ActionID: "action4", ActionStartTime: later.Format(time.RFC3339), - Version: "8.13.5", + Data: fleetapi.ActionUpgradeData{ + Version: "8.13.5", + }, }, }, expectedDetails: &details.Details{ @@ -723,8 +731,10 @@ func TestReportNextScheduledUpgrade(t *testing.T) { actions: []fleetapi.Action{ &fleetapi.ActionUpgrade{ ActionID: "action1", - Version: "8.13.2", ActionStartTime: "invalid", + Data: fleetapi.ActionUpgradeData{ + Version: "8.13.2", + }, }, }, expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]", diff --git a/internal/pkg/agent/application/upgrade/step_mark.go b/internal/pkg/agent/application/upgrade/step_mark.go index 44a869ee2dd..23ae5f59948 100644 --- a/internal/pkg/agent/application/upgrade/step_mark.go +++ b/internal/pkg/agent/application/upgrade/step_mark.go @@ -70,8 +70,8 @@ func convertToMarkerAction(a *fleetapi.ActionUpgrade) *MarkerActionUpgrade { return &MarkerActionUpgrade{ ActionID: a.ActionID, ActionType: a.ActionType, - Version: a.Version, - SourceURI: a.SourceURI, + Version: a.Data.Version, + SourceURI: a.Data.SourceURI, } } @@ -82,8 +82,10 @@ func convertToActionUpgrade(a *MarkerActionUpgrade) *fleetapi.ActionUpgrade { return &fleetapi.ActionUpgrade{ ActionID: a.ActionID, ActionType: a.ActionType, - Version: a.Version, - SourceURI: a.SourceURI, + Data: fleetapi.ActionUpgradeData{ + Version: a.Version, + SourceURI: a.SourceURI, + }, } } diff --git a/internal/pkg/agent/storage/store/action_store.go b/internal/pkg/agent/storage/store/action_store.go index 4fc9df8b485..9f40dd678e3 100644 --- a/internal/pkg/agent/storage/store/action_store.go +++ b/internal/pkg/agent/storage/store/action_store.go @@ -9,7 +9,7 @@ import ( "fmt" "io" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -19,7 +19,8 @@ import ( // take care of action policy change every other action are discarded. The store will only keep the // last good action on disk, we assume that the action is added to the store after it was ACK with // Fleet. The store is not threadsafe. -// ATTN!!!: THE actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed. +// The actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed. +// Deprecated. type actionStore struct { log *logger.Logger store storeLoad @@ -86,7 +87,7 @@ func (s *actionStore) save() error { if apc, ok := s.action.(*fleetapi.ActionPolicyChange); ok { serialize := actionPolicyChangeSerializer(*apc) - r, err := yamlToReader(&serialize) + r, err := jsonToReader(&serialize) if err != nil { return err } @@ -95,7 +96,7 @@ func (s *actionStore) save() error { } else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok { serialize := actionUnenrollSerializer(*aun) - r, err := yamlToReader(&serialize) + r, err := jsonToReader(&serialize) if err != nil { return err } @@ -130,26 +131,26 @@ func (s *actionStore) actions() []action { // There are four ways to achieve the same results: // 1. We create a second struct that map the existing field. // 2. We add the serialization in the fleetapi. -// 3. We move the actual action type outside of the actual fleetapi package. +// 3. We move the actual action type outside the actual fleetapi package. // 4. We have two sets of type. // // This could be done in a refactoring. type actionPolicyChangeSerializer struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"action_type"` - Policy map[string]interface{} `yaml:"policy"` + ActionID string `json:"id" yaml:"id"` + ActionType string `json:"type" yaml:"type"` + Data fleetapi.ActionPolicyChangeData `json:"data,omitempty" yaml:"data,omitempty"` } // add a guards between the serializer structs and the original struct. -var _ actionPolicyChangeSerializer = actionPolicyChangeSerializer(fleetapi.ActionPolicyChange{}) +var _ = actionPolicyChangeSerializer(fleetapi.ActionPolicyChange{}) // actionUnenrollSerializer is a struct that adds a YAML serialization, type actionUnenrollSerializer struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"action_type"` - IsDetected bool `yaml:"is_detected"` - Signed *fleetapi.Signed `yaml:"signed,omitempty"` + ActionID string `json:"action_id"` + ActionType string `json:"action_type"` + IsDetected bool `json:"is_detected"` + Signed *fleetapi.Signed `json:"signed,omitempty"` } // add a guards between the serializer structs and the original struct. -var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{}) +var _ = actionUnenrollSerializer(fleetapi.ActionUnenroll{}) diff --git a/internal/pkg/agent/storage/store/action_store_test.go b/internal/pkg/agent/storage/store/action_store_test.go index 5a5d56b8056..691e4302c16 100644 --- a/internal/pkg/agent/storage/store/action_store_test.go +++ b/internal/pkg/agent/storage/store/action_store_test.go @@ -58,9 +58,8 @@ func TestActionStore(t *testing.T) { ActionPolicyChange := &fleetapi.ActionPolicyChange{ ActionID: "abc123", ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{"hello": "world"}}, } s := storage.NewDiskStore(file) diff --git a/internal/pkg/agent/storage/store/state_store.go b/internal/pkg/agent/storage/store/state_store.go index 1955d479e67..e11837d7f42 100644 --- a/internal/pkg/agent/storage/store/state_store.go +++ b/internal/pkg/agent/storage/store/state_store.go @@ -7,13 +7,12 @@ package store import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" "sync" - "gopkg.in/yaml.v2" - "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/conv" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" @@ -43,25 +42,32 @@ type StateStore struct { log *logger.Logger store storeLoad dirty bool - state stateT + state state mx sync.RWMutex } -type stateT struct { +type state struct { action action ackToken string - queue []action + // TODO: the queue is for scheduled actions. Set its type accordingly. + queue []action } // actionSerializer is a combined yml serializer for the ActionPolicyChange and ActionUnenroll -// it is used to read the yaml file and assign the action to stateT.action as we must provide the +// it is used to read the yaml file and assign the action to state.action as we must provide the // underlying struct that provides the action interface. +// TODO: get rid of this type type actionSerializer struct { - ID string `yaml:"action_id"` - Type string `yaml:"action_type"` - Policy map[string]interface{} `yaml:"policy,omitempty"` - IsDetected *bool `yaml:"is_detected,omitempty"` + ID string `json:"action_id"` + Type string `json:"action_type"` + Data actionDataSerializer `json:"data,omitempty"` + IsDetected *bool `json:"is_detected,omitempty"` + Signed *fleetapi.Signed `json:"signed,omitempty"` +} + +type actionDataSerializer struct { + Policy map[string]interface{} `json:"policy" yaml:"policy,omitempty"` } // stateSerializer is used to serialize the state to yaml. @@ -69,9 +75,9 @@ type actionSerializer struct { // queue serialization is handled through yaml struct tags or the actions unmarshaller defined in fleetapi // TODO clean up action serialization (have it be part of the fleetapi?) type stateSerializer struct { - Action *actionSerializer `yaml:"action,omitempty"` - AckToken string `yaml:"ack_token,omitempty"` - Queue fleetapi.Actions `yaml:"action_queue,omitempty"` + Action *actionSerializer `json:"action,omitempty"` + AckToken string `json:"ack_token,omitempty"` + Queue fleetapi.Actions `json:"action_queue,omitempty"` } // NewStateStoreWithMigration creates a new state store and migrates the old one. @@ -102,10 +108,10 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { } defer reader.Close() - var sr stateSerializer + var serializer stateSerializer - dec := yaml.NewDecoder(reader) - err = dec.Decode(&sr) + dec := json.NewDecoder(reader) + err = dec.Decode(&serializer) if errors.Is(err, io.EOF) { return &StateStore{ log: log, @@ -117,23 +123,29 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { return nil, err } - state := stateT{ - ackToken: sr.AckToken, - queue: sr.Queue, + st := state{ + ackToken: serializer.AckToken, + queue: serializer.Queue, } - if sr.Action != nil { - if sr.Action.IsDetected != nil { - state.action = &fleetapi.ActionUnenroll{ - ActionID: sr.Action.ID, - ActionType: sr.Action.Type, - IsDetected: *sr.Action.IsDetected, + if serializer.Action != nil { + // TODO: use ActionType instead + if serializer.Action.IsDetected != nil { + st.action = &fleetapi.ActionUnenroll{ + ActionID: serializer.Action.ID, + ActionType: serializer.Action.Type, + IsDetected: *serializer.Action.IsDetected, + Signed: serializer.Action.Signed, } } else { - state.action = &fleetapi.ActionPolicyChange{ - ActionID: sr.Action.ID, - ActionType: sr.Action.Type, - Policy: conv.YAMLMapToJSONMap(sr.Action.Policy), // Fix Policy, in order to make it consistent with the policy received from the fleet gateway as nested map[string]interface{} + st.action = &fleetapi.ActionPolicyChange{ + ActionID: serializer.Action.ID, + ActionType: serializer.Action.Type, + Data: fleetapi.ActionPolicyChangeData{ + // Fix Policy, in order to make it consistent with the policy + // received from the fleet gateway as nested map[string]interface{} + Policy: conv.YAMLMapToJSONMap(serializer.Action.Data.Policy), + }, } } } @@ -141,7 +153,7 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { return &StateStore{ log: log, store: store, - state: state, + state: st, }, nil } @@ -216,6 +228,11 @@ func migrateStateStore( // Add is only taking care of ActionPolicyChange for now and will only keep the last one it receive, // any other type of action will be silently ignored. +// TODO: fix docs: state: +// - the valid actions, +// - it silently discard invalid actions +// - perhaps rename it as it does not add to the queue, but sets the current +// action func (s *StateStore) Add(a action) { s.mx.Lock() defer s.mx.Unlock() @@ -269,16 +286,28 @@ func (s *StateStore) Save() error { } if s.state.action != nil { - if apc, ok := s.state.action.(*fleetapi.ActionPolicyChange); ok { - serialize.Action = &actionSerializer{apc.ActionID, apc.ActionType, apc.Policy, nil} - } else if aun, ok := s.state.action.(*fleetapi.ActionUnenroll); ok { - serialize.Action = &actionSerializer{aun.ActionID, aun.ActionType, nil, &aun.IsDetected} - } else { - return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.state.action) + switch a := s.state.action.(type) { + case *fleetapi.ActionPolicyChange: + serialize.Action = &actionSerializer{ + ID: a.ActionID, + Type: a.ActionType, + Data: actionDataSerializer{ + Policy: a.Data.Policy, + }} + case *fleetapi.ActionUnenroll: + serialize.Action = &actionSerializer{ + ID: a.ActionID, + Type: a.ActionType, + IsDetected: &a.IsDetected, + Signed: a.Signed, + } + default: + return fmt.Errorf("incompatible type, expected ActionPolicyChange "+ + "or ActionUnenroll but received %T", s.state.action) } } - reader, err := yamlToReader(&serialize) + reader, err := jsonToReader(&serialize) if err != nil { return err } @@ -342,8 +371,8 @@ func (a *StateStoreActionAcker) Commit(ctx context.Context) error { return a.acker.Commit(ctx) } -func yamlToReader(in interface{}) (io.Reader, error) { - data, err := yaml.Marshal(in) +func jsonToReader(in interface{}) (io.Reader, error) { + data, err := json.Marshal(in) if err != nil { return nil, fmt.Errorf("could not marshal to YAML: %w", err) } diff --git a/internal/pkg/agent/storage/store/state_store_test.go b/internal/pkg/agent/storage/store/state_store_test.go index a31cb9006db..0b5a07c5de3 100644 --- a/internal/pkg/agent/storage/store/state_store_test.go +++ b/internal/pkg/agent/storage/store/state_store_test.go @@ -9,6 +9,7 @@ import ( "io" "os" "path/filepath" + "reflect" "runtime" "sync" "testing" @@ -73,9 +74,10 @@ func runTestStateStore(t *testing.T, ackToken string) { ActionPolicyChange := &fleetapi.ActionPolicyChange{ ActionID: "abc123", ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "hello": "world", + }}, } storePath := filepath.Join(t.TempDir(), "state.yml") @@ -111,9 +113,10 @@ func runTestStateStore(t *testing.T, ackToken string) { ActionID: "test", ActionType: fleetapi.ActionTypeUpgrade, ActionStartTime: ts.Format(time.RFC3339), - Version: "1.2.3", - SourceURI: "https://example.com", - }} + Data: fleetapi.ActionUpgradeData{ + Version: "1.2.3", + SourceURI: "https://example.com", + }}} storePath := filepath.Join(t.TempDir(), "state.yml") s := storage.NewDiskStore(storePath) @@ -146,15 +149,17 @@ func runTestStateStore(t *testing.T, ackToken string) { ActionID: "test", ActionType: fleetapi.ActionTypeUpgrade, ActionStartTime: ts.Format(time.RFC3339), - Version: "1.2.3", - SourceURI: "https://example.com", - Retry: 1, - }, &fleetapi.ActionPolicyChange{ + Data: fleetapi.ActionUpgradeData{ + Version: "1.2.3", + SourceURI: "https://example.com", + Retry: 1, + }}, &fleetapi.ActionPolicyChange{ ActionID: "abc123", ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - }, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "hello": "world", + }}, }} storePath := filepath.Join(t.TempDir(), "state.yml") @@ -269,6 +274,11 @@ func runTestStateStore(t *testing.T, ackToken string) { }) t.Run("migrate", func(t *testing.T) { + // TODO: DO NOT MERGE TO MAIN WITHOUT REMOVING THIS SKIP + t.Skip("this test is broken because the migration haven't been" + + " implemented yet. It'll implemented on another PR as part of" + + "https://github.com/elastic/elastic-agent/issues/3912") + if runtime.GOOS == "darwin" { // the original migrate never actually run, so with this at least // there is coverage for linux and windows. @@ -279,10 +289,12 @@ func runTestStateStore(t *testing.T, ackToken string) { want := &fleetapi.ActionPolicyChange{ ActionID: "abc123", ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "hello": "world", - "phi": 1.618, - "answer": 42, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "hello": "world", + "phi": 1.618, + "answer": 42, + }, }, } @@ -337,6 +349,148 @@ func runTestStateStore(t *testing.T, ackToken string) { "queue should be empty, old action store did not have a queue") }) + t.Run("state store is correctly loaded from disk", func(t *testing.T) { + t.Run("ActionPolicyChange", func(t *testing.T) { + storePath := filepath.Join(t.TempDir(), "state.yaml") + want := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "hello": "world", + "phi": 1.618, + "answer": 42.0, + }, + }, + } + + s := storage.NewDiskStore(storePath) + stateStore, err := NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + stateStore.SetAckToken(ackToken) + stateStore.Add(want) + err = stateStore.Save() + require.NoError(t, err, "failed saving state store") + + // to load from disk a new store needs to be created + s = storage.NewDiskStore(storePath) + stateStore, err = NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + actions := stateStore.Actions() + require.Len(t, actions, 1, + "should have loaded exactly 1 action") + got, ok := actions[0].(*fleetapi.ActionPolicyChange) + require.True(t, ok, "could not cast action to fleetapi.ActionPolicyChange") + assert.Equal(t, want, got) + + emptyFields := hasEmptyFields(got) + if len(emptyFields) > 0 { + t.Errorf("the following fields of %T are serialized and are empty: %s."+ + " All serialised fields must have a value. Perhaps the action was"+ + " updated but this test was not. Ensure the test covers all"+ + "JSON serialized fields for this action.", + got, emptyFields) + } + }) + + t.Run("ActionUnenroll", func(t *testing.T) { + storePath := filepath.Join(t.TempDir(), "state.yaml") + want := &fleetapi.ActionUnenroll{ + ActionID: "abc123", + ActionType: fleetapi.ActionTypeUnenroll, + IsDetected: true, + Signed: &fleetapi.Signed{ + Data: "some data", + Signature: "a signature", + }, + } + + s := storage.NewDiskStore(storePath) + stateStore, err := NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + stateStore.SetAckToken(ackToken) + stateStore.Add(want) + err = stateStore.Save() + require.NoError(t, err, "failed saving state store") + + // to load from disk a new store needs to be created + s = storage.NewDiskStore(storePath) + stateStore, err = NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + actions := stateStore.Actions() + require.Len(t, actions, 1, + "should have loaded exactly 1 action") + got, ok := actions[0].(*fleetapi.ActionUnenroll) + require.True(t, ok, "could not cast action to fleetapi.ActionUnenroll") + assert.Equal(t, want, got) + + emptyFields := hasEmptyFields(got) + if len(emptyFields) > 0 { + t.Errorf("the following fields of %T are serialized and are empty: %s."+ + " All serialised fields must have a value. Perhaps the action was"+ + " updated but this test was not. Ensure the test covers all"+ + "JSON serialized fields for this action.", + got, emptyFields) + } + }) + + t.Run("action queue", func(t *testing.T) { + storePath := filepath.Join(t.TempDir(), "state.yaml") + now := time.Now().UTC().Round(time.Second) + want := &fleetapi.ActionUpgrade{ + ActionID: "test", + ActionType: fleetapi.ActionTypeUpgrade, + ActionStartTime: now.Format(time.RFC3339), + ActionExpiration: now.Add(time.Hour).Format(time.RFC3339), + Data: fleetapi.ActionUpgradeData{ + Version: "1.2.3", + SourceURI: "https://example.com", + Retry: 1, + }, + Signed: &fleetapi.Signed{ + Data: "some data", + Signature: "a signature", + }, + } + + t.Logf("state store: %q", storePath) + s := storage.NewDiskStore(storePath) + stateStore, err := NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + stateStore.SetAckToken(ackToken) + stateStore.SetQueue([]action{want}) + err = stateStore.Save() + require.NoError(t, err, "failed saving state store") + + // to load from disk a new store needs to be created + s = storage.NewDiskStore(storePath) + stateStore, err = NewStateStore(log, s) + require.NoError(t, err, "could not create disk store") + + queue := stateStore.Queue() + require.Len(t, queue, 1, "action queue should have only 1 action") + got := queue[0] + assert.Equal(t, want, got, + "deserialized action is different from what was saved to disk") + _, ok := got.(*fleetapi.ActionUpgrade) + require.True(t, ok, "could not cast action in the queue to upgradeAction") + + emptyFields := hasEmptyFields(got) + if len(emptyFields) > 0 { + t.Errorf("the following fields of %T are serialized and are empty: %s."+ + " All serialised fields must have a value. Perhaps the action was"+ + " updated but this test was not. Ensure the test covers all"+ + "JSON serialized fields for this action.", + got, emptyFields) + } + }) + }) + } type testAcker struct { @@ -372,3 +526,40 @@ func (t *testAcker) Items() []string { defer t.ackedLock.Unlock() return t.acked } + +// hasEmptyFields will check if action has any empty fields. It returns a string +// slice with any empty field, the field value is the zero value for its type. +// If the json tag of the field is "-", the field is ignored. +// If no field is empty, it returns nil. +func hasEmptyFields(action fleetapi.Action) []string { + var actionValue reflect.Value + actionValue = reflect.ValueOf(action) + // dereference if it's a pointer + if actionValue.Kind() == reflect.Pointer { + actionValue = actionValue.Elem() + } + + var failures []string + for i := 0; i < actionValue.NumField(); i++ { + fieldValue := actionValue.Field(i) + actionType := actionValue.Type() + structField := actionType.Field(i) + + fieldName := structField.Name + tag := structField.Tag.Get("json") + + // If the field isn't serialised, ignore it. + if tag == "-" { + continue + } + + got := fieldValue.Interface() + zeroValue := reflect.Zero(fieldValue.Type()).Interface() + + if reflect.DeepEqual(got, zeroValue) { + failures = append(failures, fieldName) + } + } + + return failures +} diff --git a/internal/pkg/config/operations/inspector.go b/internal/pkg/config/operations/inspector.go index 55e78069292..ef4ab0ab32b 100644 --- a/internal/pkg/config/operations/inspector.go +++ b/internal/pkg/config/operations/inspector.go @@ -124,7 +124,7 @@ func loadFleetConfig(ctx context.Context, l *logger.Logger) (map[string]interfac continue } - return cfgChange.Policy, nil + return cfgChange.Data.Policy, nil } return nil, nil } diff --git a/internal/pkg/fleetapi/ack_cmd_test.go b/internal/pkg/fleetapi/ack_cmd_test.go index 00b85d2ac80..27a26823e5f 100644 --- a/internal/pkg/fleetapi/ack_cmd_test.go +++ b/internal/pkg/fleetapi/ack_cmd_test.go @@ -51,9 +51,12 @@ func TestAck(t *testing.T) { action := &ActionPolicyChange{ ActionID: "my-id", ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ + + Data: struct { + Policy map[string]interface{} `json:"policy" yaml:"policy,omitempty"` + }{Policy: map[string]interface{}{ "id": "config_id", - }, + }}, } cmd := NewAckCmd(&agentinfo{}, client) diff --git a/internal/pkg/fleetapi/acker/fleet/fleet_acker_test.go b/internal/pkg/fleetapi/acker/fleet/fleet_acker_test.go index fcde240fe34..8dce010ce61 100644 --- a/internal/pkg/fleetapi/acker/fleet/fleet_acker_test.go +++ b/internal/pkg/fleetapi/acker/fleet/fleet_acker_test.go @@ -125,14 +125,18 @@ func TestAcker_Ack(t *testing.T) { &fleetapi.ActionUpgrade{ ActionID: "upgrade-retry", ActionType: fleetapi.ActionTypeUpgrade, - Retry: 1, - Err: errors.New("upgrade failed"), + Data: fleetapi.ActionUpgradeData{ + Retry: 1, + }, + Err: errors.New("upgrade failed"), }, &fleetapi.ActionUpgrade{ ActionID: "upgrade-failed", ActionType: fleetapi.ActionTypeUpgrade, - Retry: -1, - Err: errors.New("upgrade failed"), + Data: fleetapi.ActionUpgradeData{ + Retry: -1, + }, + Err: errors.New("upgrade failed"), }, }, }, @@ -165,7 +169,8 @@ func TestAcker_Ack(t *testing.T) { } err := json.Unmarshal(req.Events[i].Payload, &pl) require.NoError(t, err) - assert.Equal(t, a.Retry, pl.Attempt, "action ID %s failed", a.ActionID) + assert.Equal(t, a.Data.Retry, pl.Attempt, + "action ID %s failed", a.ActionID) // Check retry flag if pl.Attempt > 0 { assert.True(t, pl.Retry) diff --git a/internal/pkg/fleetapi/action.go b/internal/pkg/fleetapi/action.go index df70677d716..a7d4fdfb82f 100644 --- a/internal/pkg/fleetapi/action.go +++ b/internal/pkg/fleetapi/action.go @@ -11,7 +11,6 @@ import ( "time" "github.com/mitchellh/mapstructure" - "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" ) @@ -65,45 +64,27 @@ type ScheduledAction interface { type RetryableAction interface { ScheduledAction // RetryAttempt returns the retry-attempt number of the action - // the retry_attempt number is meant to be an interal counter for the elastic-agent and not communicated to fleet-server or ES. + // the retry_attempt number is meant to be an internal counter for the elastic-agent and not communicated to fleet-server or ES. // If RetryAttempt returns > 1, and GetError is not nil the acker should signal that the action is being retried. // If RetryAttempt returns < 1, and GetError is not nil the acker should signal that the action has failed. RetryAttempt() int // SetRetryAttempt sets the retry-attempt number of the action - // the retry_attempt number is meant to be an interal counter for the elastic-agent and not communicated to fleet-server or ES. + // the retry_attempt number is meant to be an internal counter for the elastic-agent and not communicated to fleet-server or ES. SetRetryAttempt(int) // SetStartTime sets the start_time of the action to the specified value. // this is used by the action-retry mechanism. SetStartTime(t time.Time) // GetError returns the error that is associated with the retry. // If it is a retryable action fleet-server should mark it as such. - // Otherwise fleet-server should mark the action as failed. + // Otherwise, fleet-server should mark the action as failed. GetError() error // SetError sets the retryable action error SetError(error) } type Signed struct { - Data string `yaml:"data" json:"data" mapstructure:"data"` - Signature string `yaml:"signature" json:"signature" mapstructure:"signature"` -} - -// FleetAction represents an action from fleet-server. -// should copy the action definition in fleet-server/model/schema.json -type FleetAction struct { - ActionID string `yaml:"action_id" json:"id"` // NOTE schema defines this as action_id, but fleet-server remaps it to id in the json response to agent check-in. - ActionType string `yaml:"type,omitempty" json:"type,omitempty"` - InputType string `yaml:"input_type,omitempty" json:"input_type,omitempty"` - ActionExpiration string `yaml:"expiration,omitempty" json:"expiration,omitempty"` - ActionStartTime string `yaml:"start_time,omitempty" json:"start_time,omitempty"` - Timeout int64 `yaml:"timeout,omitempty" json:"timeout,omitempty"` - Data json.RawMessage `yaml:"data,omitempty" json:"data,omitempty"` - Retry int `json:"retry_attempt,omitempty" yaml:"retry_attempt,omitempty"` // used internally for serialization by elastic-agent. - //Agents []string // disabled, fleet-server uses this to generate each agent's actions - //Timestamp string // disabled, agent does not care when the document was created - //UserID string // disabled, agent does not care - //MinimumExecutionDuration int64 // disabled, used by fleet-server for scheduling - Signed *Signed `yaml:"signed,omitempty" json:"signed,omitempty"` + Data string `json:"data" yaml:"data" mapstructure:"data"` + Signature string `json:"signature" yaml:"signature" mapstructure:"signature"` } func newAckEvent(id, aType string) AckEvent { @@ -121,9 +102,10 @@ func newAckEvent(id, aType string) AckEvent { // NOTE: We only keep the original type and the action id, the payload of the event is dropped, we // do this to make sure we do not leak any unwanted information. type ActionUnknown struct { - originalType string - ActionID string - ActionType string + ActionID string `json:"id" yaml:"id" mapstructure:"id"` + ActionType string `json:"type,omitempty" yaml:"type,omitempty" mapstructure:"type"` + // OriginalType is the original type of the action as returned by the API. + OriginalType string `json:"original_type,omitempty" yaml:"original_type,omitempty" mapstructure:"original_type"` } // Type returns the type of the Action. @@ -143,30 +125,30 @@ func (a *ActionUnknown) String() string { s.WriteString(", type: ") s.WriteString(a.ActionType) s.WriteString(" (original type: ") - s.WriteString(a.OriginalType()) + s.WriteString(a.OriginalType) s.WriteString(")") return s.String() } -// OriginalType returns the original type of the action as returned by the API. -func (a *ActionUnknown) OriginalType() string { - return a.originalType -} - func (a *ActionUnknown) AckEvent() AckEvent { return AckEvent{ EventType: "ACTION_RESULT", // TODO Discuss EventType/SubType needed - by default only ACTION_RESULT was used - what is (or was) the intended purpose of these attributes? Are they documented? Can we change them to better support acking an error or a retry? SubType: "ACKNOWLEDGED", ActionID: a.ActionID, Message: fmt.Sprintf("Action %q of type %q acknowledged.", a.ActionID, a.ActionType), - Error: fmt.Sprintf("Action %q of type %q is unknown to the elastic-agent", a.ActionID, a.originalType), + Error: fmt.Sprintf("Action %q of type %q is unknown to the elastic-agent", a.ActionID, a.OriginalType), } } -// ActionPolicyReassign is a request to apply a new +// ActionPolicyReassign is a request to apply a new policy type ActionPolicyReassign struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"type"` + ActionID string `json:"id" yaml:"id"` + ActionType string `json:"type" yaml:"type"` + Data ActionPolicyReassignData `json:"data,omitempty"` +} + +type ActionPolicyReassignData struct { + PolicyID string `json:"policy_id"` } func (a *ActionPolicyReassign) String() string { @@ -194,9 +176,13 @@ func (a *ActionPolicyReassign) AckEvent() AckEvent { // ActionPolicyChange is a request to apply a new type ActionPolicyChange struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"type"` - Policy map[string]interface{} `json:"policy" yaml:"policy,omitempty"` + ActionID string `json:"id" yaml:"id"` + ActionType string `json:"type" yaml:"type"` + Data ActionPolicyChangeData `json:"data,omitempty" yaml:"data,omitempty"` +} + +type ActionPolicyChangeData struct { + Policy map[string]interface{} `json:"policy" yaml:"policy,omitempty"` } func (a *ActionPolicyChange) String() string { @@ -224,15 +210,21 @@ func (a *ActionPolicyChange) AckEvent() AckEvent { // ActionUpgrade is a request for agent to upgrade. type ActionUpgrade struct { - ActionID string `yaml:"action_id" mapstructure:"id"` - ActionType string `yaml:"type" mapstructure:"type"` - ActionStartTime string `json:"start_time" yaml:"start_time,omitempty" mapstructure:"-"` // TODO change to time.Time in unmarshal - ActionExpiration string `json:"expiration" yaml:"expiration,omitempty" mapstructure:"-"` - Version string `json:"version" yaml:"version,omitempty" mapstructure:"-"` - SourceURI string `json:"source_uri,omitempty" yaml:"source_uri,omitempty" mapstructure:"-"` - Retry int `json:"retry_attempt,omitempty" yaml:"retry_attempt,omitempty" mapstructure:"-"` - Signed *Signed `json:"signed,omitempty" yaml:"signed,omitempty" mapstructure:"signed,omitempty"` - Err error `json:"-" yaml:"-" mapstructure:"-"` + ActionID string `json:"id" yaml:"id" mapstructure:"id"` + ActionType string `json:"type" yaml:"type" mapstructure:"type"` + ActionStartTime string `json:"start_time" yaml:"start_time,omitempty" mapstructure:"-"` // TODO change to time.Time in unmarshal + ActionExpiration string `json:"expiration" yaml:"expiration,omitempty" mapstructure:"-"` + // does anyone know why those aren't mapped to mapstructure? + Data ActionUpgradeData `json:"data,omitempty" mapstructure:"-"` + Signed *Signed `json:"signed,omitempty" yaml:"signed,omitempty" mapstructure:"signed,omitempty"` + Err error `json:"-" yaml:"-" mapstructure:"-"` +} + +type ActionUpgradeData struct { + Version string `json:"version" yaml:"version,omitempty" mapstructure:"-"` + SourceURI string `json:"source_uri,omitempty" yaml:"source_uri,omitempty" mapstructure:"-"` + // TODO: update fleet open api schema + Retry int `json:"retry_attempt,omitempty" yaml:"retry_attempt,omitempty" mapstructure:"-"` } func (a *ActionUpgrade) String() string { @@ -254,8 +246,8 @@ func (a *ActionUpgrade) AckEvent() AckEvent { Attempt int `json:"retry_attempt,omitempty"` } payload.Retry = true - payload.Attempt = a.Retry - if a.Retry < 1 { // retry is set to -1 if it will not re attempt + payload.Attempt = a.Data.Retry + if a.Data.Retry < 1 { // retry is set to -1 if it will not re attempt payload.Retry = false } p, _ := json.Marshal(payload) @@ -300,12 +292,12 @@ func (a *ActionUpgrade) Expiration() (time.Time, error) { // RetryAttempt will return the retry_attempt of the action func (a *ActionUpgrade) RetryAttempt() int { - return a.Retry + return a.Data.Retry } // SetRetryAttempt sets the retry_attempt of the action func (a *ActionUpgrade) SetRetryAttempt(n int) { - a.Retry = n + a.Data.Retry = n } // GetError returns the error associated with the attempt to run the action. @@ -332,8 +324,8 @@ func (a *ActionUpgrade) MarshalMap() (map[string]interface{}, error) { // ActionUnenroll is a request for agent to unhook from fleet. type ActionUnenroll struct { - ActionID string `yaml:"action_id" mapstructure:"id"` - ActionType string `yaml:"type" mapstructure:"type"` + ActionID string `json:"id" yaml:"id" mapstructure:"id"` + ActionType string `json:"type" yaml:"type" mapstructure:"type"` IsDetected bool `json:"is_detected,omitempty" yaml:"is_detected,omitempty" mapstructure:"-"` Signed *Signed `json:"signed,omitempty" mapstructure:"signed,omitempty"` } @@ -370,9 +362,15 @@ func (a *ActionUnenroll) MarshalMap() (map[string]interface{}, error) { // ActionSettings is a request to change agent settings. type ActionSettings struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"type"` - LogLevel string `json:"log_level" yaml:"log_level,omitempty"` + ActionID string `json:"id" yaml:"id"` + ActionType string `json:"type" yaml:"type"` + Data ActionSettingsData `json:"data,omitempty"` +} + +type ActionSettingsData struct { + // LogLevel can only be one of "debug", "info", "warning", "error" + // TODO: add validation + LogLevel string `json:"log_level" yaml:"log_level,omitempty"` } // ID returns the ID of the Action. @@ -392,7 +390,7 @@ func (a *ActionSettings) String() string { s.WriteString(", type: ") s.WriteString(a.ActionType) s.WriteString(", log_level: ") - s.WriteString(a.LogLevel) + s.WriteString(a.Data.LogLevel) return s.String() } @@ -402,9 +400,13 @@ func (a *ActionSettings) AckEvent() AckEvent { // ActionCancel is a request to cancel an action. type ActionCancel struct { - ActionID string `yaml:"action_id"` - ActionType string `yaml:"type"` - TargetID string `json:"target_id" yaml:"target_id,omitempty"` + ActionID string `json:"id" yaml:"id"` + ActionType string `json:"type" yaml:"type"` + Data ActionCancelData `json:"data,omitempty"` +} + +type ActionCancelData struct { + TargetID string `json:"target_id" yaml:"target_id,omitempty"` } // ID returns the ID of the Action. @@ -424,7 +426,7 @@ func (a *ActionCancel) String() string { s.WriteString(", type: ") s.WriteString(a.ActionType) s.WriteString(", target_id: ") - s.WriteString(a.TargetID) + s.WriteString(a.Data.TargetID) return s.String() } @@ -434,7 +436,7 @@ func (a *ActionCancel) AckEvent() AckEvent { // ActionDiagnostics is a request to gather and upload a diagnostics bundle. type ActionDiagnostics struct { - ActionID string `json:"action_id"` + ActionID string `json:"id"` ActionType string `json:"type"` UploadID string `json:"-"` Err error `json:"-"` @@ -538,99 +540,54 @@ type Actions []Action // UnmarshalJSON takes every raw representation of an action and try to decode them. func (a *Actions) UnmarshalJSON(data []byte) error { - var responses []FleetAction - if err := json.Unmarshal(data, &responses); err != nil { + var typeUnmarshaler []struct { + ActionType string `json:"type,omitempty" yaml:"type,omitempty"` + } + + if err := json.Unmarshal(data, &typeUnmarshaler); err != nil { + return errors.New(err, + "fail to decode actions to read their types", + errors.TypeConfig) + } + + rawActions := make([]json.RawMessage, len(typeUnmarshaler)) + if err := json.Unmarshal(data, &rawActions); err != nil { return errors.New(err, "fail to decode actions", errors.TypeConfig) } - actions := make([]Action, 0, len(responses)) - for _, response := range responses { + actions := make([]Action, 0, len(typeUnmarshaler)) + for i, response := range typeUnmarshaler { var action Action + + // keep the case statements alphabetically sorted switch response.ActionType { - case ActionTypePolicyChange: - action = &ActionPolicyChange{ - ActionID: response.ActionID, - ActionType: response.ActionType, - } - if err := json.Unmarshal(response.Data, action); err != nil { - return errors.New(err, - "fail to decode POLICY_CHANGE action", - errors.TypeConfig) - } - case ActionTypePolicyReassign: - action = &ActionPolicyReassign{ - ActionID: response.ActionID, - ActionType: response.ActionType, - } + case ActionTypeCancel: + action = &ActionCancel{} + case ActionTypeDiagnostics: + action = &ActionDiagnostics{} case ActionTypeInputAction: // Only INPUT_ACTION type actions could possibly be signed https://github.com/elastic/elastic-agent/pull/2348 - action = &ActionApp{ - ActionID: response.ActionID, - ActionType: response.ActionType, - InputType: response.InputType, - Timeout: response.Timeout, - Data: response.Data, - Signed: response.Signed, - } + action = &ActionApp{} + case ActionTypePolicyChange: + action = &ActionPolicyChange{} + case ActionTypePolicyReassign: + action = &ActionPolicyReassign{} + case ActionTypeSettings: + action = &ActionSettings{} case ActionTypeUnenroll: - action = &ActionUnenroll{ - ActionID: response.ActionID, - ActionType: response.ActionType, - Signed: response.Signed, - } + action = &ActionUnenroll{} case ActionTypeUpgrade: - action = &ActionUpgrade{ - ActionID: response.ActionID, - ActionType: response.ActionType, - ActionStartTime: response.ActionStartTime, - ActionExpiration: response.ActionExpiration, - Signed: response.Signed, - } - - if err := json.Unmarshal(response.Data, action); err != nil { - return errors.New(err, - "fail to decode UPGRADE_ACTION action", - errors.TypeConfig) - } - case ActionTypeSettings: - action = &ActionSettings{ - ActionID: response.ActionID, - ActionType: response.ActionType, - } - - if err := json.Unmarshal(response.Data, action); err != nil { - return errors.New(err, - "fail to decode SETTINGS_ACTION action", - errors.TypeConfig) - } - case ActionTypeCancel: - action = &ActionCancel{ - ActionID: response.ActionID, - ActionType: response.ActionType, - } - if err := json.Unmarshal(response.Data, action); err != nil { - return errors.New(err, - "fail to decode CANCEL_ACTION action", - errors.TypeConfig) - } - case ActionTypeDiagnostics: - action = &ActionDiagnostics{ - ActionID: response.ActionID, - ActionType: response.ActionType, - } - if err := json.Unmarshal(response.Data, action); err != nil { - return errors.New(err, - "fail to decode REQUEST_DIAGNOSTICS_ACTION action", - errors.TypeConfig) - } + action = &ActionUpgrade{} default: - action = &ActionUnknown{ - ActionID: response.ActionID, - ActionType: ActionTypeUnknown, - originalType: response.ActionType, - } + action = &ActionUnknown{OriginalType: response.ActionType} + } + + if err := json.Unmarshal(rawActions[i], action); err != nil { + return errors.New(err, + fmt.Sprintf("fail to decode %s action", action.Type()), + errors.TypeConfig) } actions = append(actions, action) } @@ -639,101 +596,13 @@ func (a *Actions) UnmarshalJSON(data []byte) error { return nil } -// UnmarshalYAML attempts to decode yaml actions. -func (a *Actions) UnmarshalYAML(unmarshal func(interface{}) error) error { - var nodes []FleetAction - if err := unmarshal(&nodes); err != nil { - return errors.New(err, - "fail to decode action", - errors.TypeConfig) - } - actions := make([]Action, 0, len(nodes)) - for i := range nodes { - var action Action - n := nodes[i] - switch n.ActionType { - case ActionTypePolicyChange: - action = &ActionPolicyChange{ - ActionID: n.ActionID, - ActionType: n.ActionType, - } - if err := yaml.Unmarshal(n.Data, action); err != nil { - return errors.New(err, - "fail to decode POLICY_CHANGE action", - errors.TypeConfig) - } - case ActionTypePolicyReassign: - action = &ActionPolicyReassign{ - ActionID: n.ActionID, - ActionType: n.ActionType, - } - case ActionTypeInputAction: - action = &ActionApp{ - ActionID: n.ActionID, - ActionType: n.ActionType, - InputType: n.InputType, - Timeout: n.Timeout, - Data: n.Data, - Signed: n.Signed, - } - case ActionTypeUnenroll: - action = &ActionUnenroll{ - ActionID: n.ActionID, - ActionType: n.ActionType, - Signed: n.Signed, - } - case ActionTypeUpgrade: - action = &ActionUpgrade{ - ActionID: n.ActionID, - ActionType: n.ActionType, - ActionStartTime: n.ActionStartTime, - ActionExpiration: n.ActionExpiration, - Retry: n.Retry, - } - if err := yaml.Unmarshal(n.Data, &action); err != nil { - return errors.New(err, - "fail to decode UPGRADE_ACTION action", - errors.TypeConfig) - } - case ActionTypeSettings: - action = &ActionSettings{ - ActionID: n.ActionID, - ActionType: n.ActionType, - } - if err := yaml.Unmarshal(n.Data, action); err != nil { - return errors.New(err, - "fail to decode SETTINGS_ACTION action", - errors.TypeConfig) - } - case ActionTypeCancel: - action = &ActionCancel{ - ActionID: n.ActionID, - ActionType: n.ActionType, - } - if err := yaml.Unmarshal(n.Data, action); err != nil { - return errors.New(err, - "fail to decode CANCEL_ACTION action", - errors.TypeConfig) - } - case ActionTypeDiagnostics: - action = &ActionDiagnostics{ - ActionID: n.ActionID, - ActionType: n.ActionType, - } - if err := yaml.Unmarshal(n.Data, action); err != nil { - return errors.New(err, - "fail to decode REQUEST_DIAGNOSTICS_ACTION action", - errors.TypeConfig) - } - default: - action = &ActionUnknown{ - ActionID: n.ActionID, - ActionType: ActionTypeUnknown, - originalType: n.ActionType, - } - } - actions = append(actions, action) - } - *a = actions - return nil +// UnmarshalYAML prevents to decode actions from . +func (a *Actions) UnmarshalYAML(_ func(interface{}) error) error { + // TODO(AndersonQ): we need this to migrate the store from YAML to JSON + return errors.New("Actions cannot be Unmarshalled from YAML") +} + +// MarshalYAML attempts to decode yaml actions. +func (a *Actions) MarshalYAML() (interface{}, error) { + return nil, errors.New("Actions cannot be Marshaled to YAML") } diff --git a/internal/pkg/fleetapi/action_test.go b/internal/pkg/fleetapi/action_test.go index ac83f31852b..00d132edf92 100644 --- a/internal/pkg/fleetapi/action_test.go +++ b/internal/pkg/fleetapi/action_test.go @@ -96,9 +96,9 @@ func TestActionsUnmarshalJSON(t *testing.T) { assert.Equal(t, ActionTypeUpgrade, action.ActionType) assert.Empty(t, action.ActionStartTime) assert.Empty(t, action.ActionExpiration) - assert.Equal(t, "1.2.3", action.Version) - assert.Equal(t, "http://example.com", action.SourceURI) - assert.Equal(t, 0, action.Retry) + assert.Equal(t, "1.2.3", action.Data.Version) + assert.Equal(t, "http://example.com", action.Data.SourceURI) + assert.Equal(t, 0, action.Data.Retry) }) t.Run("ActionUpgrade with start time", func(t *testing.T) { p := []byte(`[{"id":"testid","type":"UPGRADE","start_time":"2022-01-02T12:00:00Z","expiration":"2022-01-02T13:00:00Z","data":{"version":"1.2.3","source_uri":"http://example.com"}}]`) @@ -111,9 +111,9 @@ func TestActionsUnmarshalJSON(t *testing.T) { assert.Equal(t, ActionTypeUpgrade, action.ActionType) assert.Equal(t, "2022-01-02T12:00:00Z", action.ActionStartTime) assert.Equal(t, "2022-01-02T13:00:00Z", action.ActionExpiration) - assert.Equal(t, "1.2.3", action.Version) - assert.Equal(t, "http://example.com", action.SourceURI) - assert.Equal(t, 0, action.Retry) + assert.Equal(t, "1.2.3", action.Data.Version) + assert.Equal(t, "http://example.com", action.Data.SourceURI) + assert.Equal(t, 0, action.Data.Retry) }) t.Run("ActionPolicyChange no start time", func(t *testing.T) { p := []byte(`[{"id":"testid","type":"POLICY_CHANGE","data":{"policy":{"key":"value"}}}]`) @@ -124,7 +124,7 @@ func TestActionsUnmarshalJSON(t *testing.T) { require.True(t, ok, "unable to cast action to specific type") assert.Equal(t, "testid", action.ActionID) assert.Equal(t, ActionTypePolicyChange, action.ActionType) - assert.NotNil(t, action.Policy) + assert.NotNil(t, action.Data.Policy) }) t.Run("ActionPolicyChange with start time", func(t *testing.T) { p := []byte(`[{"id":"testid","type":"POLICY_CHANGE","start_time":"2022-01-02T12:00:00Z","expiration":"2022-01-02T13:00:00Z","data":{"policy":{"key":"value"}}}]`) @@ -135,7 +135,7 @@ func TestActionsUnmarshalJSON(t *testing.T) { require.True(t, ok, "unable to cast action to specific type") assert.Equal(t, "testid", action.ActionID) assert.Equal(t, ActionTypePolicyChange, action.ActionType) - assert.NotNil(t, action.Policy) + assert.NotNil(t, action.Data.Policy) }) t.Run("ActionUpgrade with retry_attempt", func(t *testing.T) { p := []byte(`[{"id":"testid","type":"UPGRADE","data":{"version":"1.2.3","source_uri":"http://example.com","retry_attempt":1}}]`) @@ -148,9 +148,9 @@ func TestActionsUnmarshalJSON(t *testing.T) { assert.Equal(t, ActionTypeUpgrade, action.ActionType) assert.Empty(t, action.ActionStartTime) assert.Empty(t, action.ActionExpiration) - assert.Equal(t, "1.2.3", action.Version) - assert.Equal(t, "http://example.com", action.SourceURI) - assert.Equal(t, 1, action.Retry) + assert.Equal(t, "1.2.3", action.Data.Version) + assert.Equal(t, "http://example.com", action.Data.SourceURI) + assert.Equal(t, 1, action.Data.Retry) }) } diff --git a/internal/pkg/fleetapi/checkin_cmd_test.go b/internal/pkg/fleetapi/checkin_cmd_test.go index 46a0b4db4d1..568da8601cc 100644 --- a/internal/pkg/fleetapi/checkin_cmd_test.go +++ b/internal/pkg/fleetapi/checkin_cmd_test.go @@ -175,7 +175,7 @@ func TestCheckin(t *testing.T) { // UnknownAction require.Equal(t, "id2", r.Actions[1].ID()) require.Equal(t, "UNKNOWN", r.Actions[1].Type()) - require.Equal(t, "WHAT_TO_DO_WITH_IT", r.Actions[1].(*ActionUnknown).OriginalType()) + require.Equal(t, "WHAT_TO_DO_WITH_IT", r.Actions[1].(*ActionUnknown).OriginalType) }, ))