Skip to content

Commit

Permalink
update action model to match fleet-server schema (#4240)
Browse files Browse the repository at this point in the history
* simplify fleetapi.Actions.UnmarshalJSON
* add test to ensure the state store is correctly loaded from disk
* skip state store migration tests, they will be fixes on a follow-up PR as part of #3912
  • Loading branch information
AndersonQ authored Feb 23, 2024
1 parent db6c7a6 commit 19c8fd0
Show file tree
Hide file tree
Showing 19 changed files with 489 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acke
if !ok {
return fmt.Errorf("invalid type, expected ActionCancel and received %T", a)
}
n := h.c.Cancel(action.TargetID)
n := h.c.Cancel(action.Data.TargetID)
if n == 0 {
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.", action.ActionID, action.TargetID)
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.",
action.ActionID, action.Data.TargetID)
return nil
}
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.", action.ActionID, action.TargetID, n)
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.",
action.ActionID, action.Data.TargetID, n)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
// // Cache signature validation key for the next policy handling
// h.signatureValidationKey = signatureValidationKey

c, err := config.NewConfigFrom(action.Policy)
c, err := config.NewConfigFrom(action.Data.Policy)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func TestPolicyChange(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
ActionType: "POLICY_CHANGE",
Policy: conf,
Data: fleetapi.ActionPolicyChangeData{
Policy: conf,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down Expand Up @@ -73,7 +75,9 @@ func TestPolicyAcked(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: actionID,
ActionType: "POLICY_CHANGE",
Policy: config,
Data: fleetapi.ActionPolicyChangeData{
Policy: config,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
return fmt.Errorf("invalid type, expected ActionSettings and received %T", a)
}

if !isSupportedLogLevel(action.LogLevel) {
return fmt.Errorf("invalid log level, expected debug|info|warning|error and received '%s'", action.LogLevel)
if !isSupportedLogLevel(action.Data.LogLevel) {
return fmt.Errorf("invalid log level, expected debug|info|warning|error and received '%s'",
action.Data.LogLevel)
}

lvl := logp.InfoLevel
err := lvl.Unpack(action.LogLevel)
err := lvl.Unpack(action.Data.LogLevel)
if err != nil {
return fmt.Errorf("failed to unpack log level: %w", err)
}

if err := h.agentInfo.SetLogLevel(ctx, action.LogLevel); err != nil {
if err := h.agentInfo.SetLogLevel(ctx, action.Data.LogLevel); err != nil {
return fmt.Errorf("failed to update log level: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
}

go func() {
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
h.log.Infof("starting upgrade to version %s in background", action.Data.Version)
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
h.bkgMutex.Lock()
Expand Down Expand Up @@ -125,14 +125,17 @@ func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, a
h.log.Errorf("invalid type, expected ActionUpgrade and received %T", action)
return nil, false
}
if (upgradeAction.Version == bkgAction.Version) && (upgradeAction.SourceURI == bkgAction.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received", bkgAction.Version)
if (upgradeAction.Data.Version == bkgAction.Data.Version) &&
(upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received",
bkgAction.Data.Version)
h.bkgActions = append(h.bkgActions, action)
return nil, false
}

// Versions must be different, cancel the first upgrade and run the new one
h.log.Infof("Canceling upgrade to version %s received", bkgAction.Version)
h.log.Infof("Canceling upgrade to version %s received",
bkgAction.Data.Version)
h.bkgCancel()

// Ack here because we have the lock, and we need to clear out the saved actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestUpgradeHandler(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
Expand Down Expand Up @@ -114,7 +115,8 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a, ack)
err2 := u.Handle(ctx, &a, ack)
Expand Down Expand Up @@ -149,8 +151,10 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
a1 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.2.0", SourceURI: "http://localhost"}}
a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.5.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (ad *ActionDispatcher) handleExpired(
version := "unknown"
expiration := "unknown"
if upgrade, ok := e.(*fleetapi.ActionUpgrade); ok {
version = upgrade.Version
version = upgrade.Data.Version
expiration = upgrade.ActionExpiration
}
ad.lastUpgradeDetails = details.NewDetails(version, details.StateFailed, e.ID())
Expand Down Expand Up @@ -356,7 +356,7 @@ func (ad *ActionDispatcher) reportNextScheduledUpgrade(input []fleetapi.Action,
}

upgradeDetails := details.NewDetails(
nextUpgrade.Version,
nextUpgrade.Data.Version,
details.StateScheduled,
nextUpgrade.ID())
startTime, err := nextUpgrade.StartTime()
Expand Down
20 changes: 15 additions & 5 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.12.3",
Data: fleetapi.ActionUpgradeData{
Version: "8.12.3",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand All @@ -685,7 +687,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action2",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.0",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.0",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -702,12 +706,16 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action3",
ActionStartTime: muchLater.Format(time.RFC3339),
Version: "8.14.1",
Data: fleetapi.ActionUpgradeData{
Version: "8.14.1",
},
},
&fleetapi.ActionUpgrade{
ActionID: "action4",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.5",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.5",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -723,8 +731,10 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.13.2",
ActionStartTime: "invalid",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.2",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func convertToMarkerAction(a *fleetapi.ActionUpgrade) *MarkerActionUpgrade {
return &MarkerActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
Version: a.Data.Version,
SourceURI: a.Data.SourceURI,
}
}

Expand All @@ -82,8 +82,10 @@ func convertToActionUpgrade(a *MarkerActionUpgrade) *fleetapi.ActionUpgrade {
return &fleetapi.ActionUpgrade{
ActionID: a.ActionID,
ActionType: a.ActionType,
Version: a.Version,
SourceURI: a.SourceURI,
Data: fleetapi.ActionUpgradeData{
Version: a.Version,
SourceURI: a.SourceURI,
},
}
}

Expand Down
29 changes: 15 additions & 14 deletions internal/pkg/agent/storage/store/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"
"io"

yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand All @@ -19,7 +19,8 @@ import (
// take care of action policy change every other action are discarded. The store will only keep the
// last good action on disk, we assume that the action is added to the store after it was ACK with
// Fleet. The store is not threadsafe.
// ATTN!!!: THE actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed.
// The actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed.
// Deprecated.
type actionStore struct {
log *logger.Logger
store storeLoad
Expand Down Expand Up @@ -86,7 +87,7 @@ func (s *actionStore) save() error {
if apc, ok := s.action.(*fleetapi.ActionPolicyChange); ok {
serialize := actionPolicyChangeSerializer(*apc)

r, err := yamlToReader(&serialize)
r, err := jsonToReader(&serialize)
if err != nil {
return err
}
Expand All @@ -95,7 +96,7 @@ func (s *actionStore) save() error {
} else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok {
serialize := actionUnenrollSerializer(*aun)

r, err := yamlToReader(&serialize)
r, err := jsonToReader(&serialize)
if err != nil {
return err
}
Expand Down Expand Up @@ -130,26 +131,26 @@ func (s *actionStore) actions() []action {
// There are four ways to achieve the same results:
// 1. We create a second struct that map the existing field.
// 2. We add the serialization in the fleetapi.
// 3. We move the actual action type outside of the actual fleetapi package.
// 3. We move the actual action type outside the actual fleetapi package.
// 4. We have two sets of type.
//
// This could be done in a refactoring.
type actionPolicyChangeSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
Policy map[string]interface{} `yaml:"policy"`
ActionID string `json:"id" yaml:"id"`
ActionType string `json:"type" yaml:"type"`
Data fleetapi.ActionPolicyChangeData `json:"data,omitempty" yaml:"data,omitempty"`
}

// add a guards between the serializer structs and the original struct.
var _ actionPolicyChangeSerializer = actionPolicyChangeSerializer(fleetapi.ActionPolicyChange{})
var _ = actionPolicyChangeSerializer(fleetapi.ActionPolicyChange{})

// actionUnenrollSerializer is a struct that adds a YAML serialization,
type actionUnenrollSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
IsDetected bool `yaml:"is_detected"`
Signed *fleetapi.Signed `yaml:"signed,omitempty"`
ActionID string `json:"action_id"`
ActionType string `json:"action_type"`
IsDetected bool `json:"is_detected"`
Signed *fleetapi.Signed `json:"signed,omitempty"`
}

// add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})
var _ = actionUnenrollSerializer(fleetapi.ActionUnenroll{})
5 changes: 2 additions & 3 deletions internal/pkg/agent/storage/store/action_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ func TestActionStore(t *testing.T) {
ActionPolicyChange := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
ActionType: "POLICY_CHANGE",
Policy: map[string]interface{}{
"hello": "world",
},
Data: fleetapi.ActionPolicyChangeData{
Policy: map[string]interface{}{"hello": "world"}},
}

s := storage.NewDiskStore(file)
Expand Down
Loading

0 comments on commit 19c8fd0

Please sign in to comment.