Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state store #4253

Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ const (
)

type stateStore interface {
Add(fleetapi.Action)
SetAction(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
Action() fleetapi.Action
}

// Unenroll results in running agent entering idle state, non managed non standalone.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac

if h.stateStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.stateStore.Add(a)
h.stateStore.SetAction(a)
if err := h.stateStore.Save(); err != nil {
h.log.Warnf("Failed to update state store: %v", err)
}
Expand Down
2 changes: 0 additions & 2 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ type agentInfo interface {
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
}

type FleetGateway struct {
Expand Down
15 changes: 5 additions & 10 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
close(retrierRun)
}()

actions := m.stateStore.Actions()
action := m.stateStore.Action()
stateRestored := false
if len(actions) > 0 && !m.wasUnenrolled() {
if action != nil && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, action)
stateRestored = true
}

Expand Down Expand Up @@ -268,13 +268,8 @@ func (m *managedConfigManager) Watch() <-chan coordinator.ConfigChange {
}

func (m *managedConfigManager) wasUnenrolled() bool {
actions := m.stateStore.Actions()
for _, a := range actions {
if a.Type() == "UNENROLL" {
return true
}
}
return false
return m.stateStore.Action() != nil &&
m.stateStore.Action().Type() == fleetapi.ActionTypeUnenroll
}

func (m *managedConfigManager) initFleetServer(ctx context.Context, cfg *configuration.FleetServerConfig) error {
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
}

// the encrypted state does not exist but the unencrypted file does
err = migration.MigrateToEncryptedConfig(ctx, l, paths.AgentStateStoreYmlFile(), paths.AgentStateStoreFile())
err = migration.MigrateToEncryptedConfig(ctx, l,
paths.AgentStateStoreYmlFile(),
paths.AgentStateStoreFile())
if err != nil {
return errors.New(err, "error migrating agent state")
}
Expand Down
14 changes: 7 additions & 7 deletions internal/pkg/agent/storage/store/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
// Deprecated.
type actionStore struct {
log *logger.Logger
store storeLoad
store saveLoader
dirty bool
action action
action fleetapi.Action
}

// newActionStore creates a new action store.
func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) {
func newActionStore(log *logger.Logger, store saveLoader) (*actionStore, error) {
// If the store exists we will read it, if an error is returned we log it
// and return an empty store.
reader, err := store.Load()
Expand Down Expand Up @@ -64,7 +64,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) {

// add is only taking care of ActionPolicyChange for now and will only keep the last one it receive,
// any other type of action will be silently ignored.
func (s *actionStore) add(a action) {
func (s *actionStore) add(a fleetapi.Action) {
switch v := a.(type) {
case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll:
// Only persist the action if the action is different.
Expand Down Expand Up @@ -117,12 +117,12 @@ func (s *actionStore) save() error {

// actions returns a slice of action to execute in order, currently only a action policy change is
// persisted.
func (s *actionStore) actions() []action {
func (s *actionStore) actions() fleetapi.Actions {
if s.action == nil {
return []action{}
return fleetapi.Actions{}
}

return []action{s.action}
return fleetapi.Actions{s.action}
}

// actionPolicyChangeSerializer is a struct that adds a YAML serialization, I don't think serialization
Expand Down
Loading
Loading