Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and refactor the state store #4441

Merged
merged 68 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
db6c7a6
Fix action store -> state store migration tests (#4235)
AndersonQ Feb 15, 2024
19c8fd0
update action model to match fleet-server schema (#4240)
AndersonQ Feb 23, 2024
a7b7bca
refactor state store (#4253)
AndersonQ Mar 7, 2024
9f7ccc2
add migrations for action and state stores (#4305)
AndersonQ Mar 20, 2024
1af2c59
merge 'upstream/main' into bugfix/3912-borken-stat-store
AndersonQ Mar 27, 2024
1c3dfe0
fix stores conflicts
AndersonQ Mar 27, 2024
f61bbdf
adjust actions in protection package and diagnostics action
AndersonQ Apr 2, 2024
ac8591e
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 2, 2024
9589a5d
fix action diagnostics handler
AndersonQ Apr 2, 2024
4032005
it should not be there
AndersonQ Apr 2, 2024
bd1f715
use .json instead of .yml/.yaml
AndersonQ Apr 2, 2024
27920c3
fix more tests
AndersonQ Apr 2, 2024
cc4d6b4
split actionDiagnosticsData into its own type and fix another test
AndersonQ Apr 2, 2024
85324a5
add changelog
AndersonQ Apr 2, 2024
27a235e
fix signed action data schema
AndersonQ Apr 2, 2024
7fd93e9
making the linter happy
AndersonQ Apr 3, 2024
85ac7b3
PR review changes
AndersonQ Apr 4, 2024
4c8dfe7
move migration tests to its own file and add more tests
AndersonQ Apr 4, 2024
22bbe6e
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 4, 2024
6cdad71
use unprivileged vault
AndersonQ Apr 5, 2024
b4e4077
Merge remote-tracking branch 'upstream/main' into bugfix/3912-borken-…
AndersonQ Apr 5, 2024
ff3d90e
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Apr 5, 2024
f89a3ab
another mac fix
AndersonQ Apr 5, 2024
ce05de0
remove debug log
AndersonQ Apr 5, 2024
177693d
fix typo and comment
AndersonQ Apr 5, 2024
9c07d92
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 5, 2024
4f05e97
add notice
AndersonQ Apr 5, 2024
0862675
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Apr 5, 2024
3820c15
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 9, 2024
dde9111
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 9, 2024
c215ade
fix and add test for JSON nested map marshalling
AndersonQ Apr 10, 2024
f5a0438
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Apr 11, 2024
f3b8373
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 10, 2024
d022fa9
Merge branch 'bugfix/3912-borken-stat-store' of github.com:AndersonQ/…
AndersonQ Jun 11, 2024
b1c6863
Merge branch 'main' of github.com:elastic/elastic-agent into bugfix/3…
AndersonQ Jun 11, 2024
469c793
fix after merging main
AndersonQ Jun 11, 2024
b3afffe
Merge remote-tracking branch 'upstream/main' into bugfix/3912-borken-…
AndersonQ Jun 11, 2024
03116c0
fix after merge
AndersonQ Jun 11, 2024
ec9874f
fix after merge
AndersonQ Jun 11, 2024
9447e07
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 11, 2024
97093aa
adjust changelog
AndersonQ Jun 11, 2024
cad7c9e
remove unnecessary empty line
AndersonQ Jun 11, 2024
57538a1
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Jun 11, 2024
f0ac632
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 12, 2024
806865b
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 12, 2024
f987143
Merge branch 'main' into bugfix/3912-borken-stat-store
jlind23 Jun 13, 2024
1c3fc9a
fix after merge
AndersonQ Jun 13, 2024
f71f7df
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 17, 2024
5221ad4
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 17, 2024
c84451a
fix another merge issue
AndersonQ Jun 17, 2024
4d76b70
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Jun 17, 2024
cb95c52
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 19, 2024
a9832f8
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 20, 2024
a305fd4
improve docs/comments
AndersonQ Jun 20, 2024
e949aa1
fix store dirty state being cleaned on failed save and add tests
AndersonQ Jun 20, 2024
057fc26
fix docs and remove TODO
AndersonQ Jun 20, 2024
100375f
add tests to load store
AndersonQ Jun 20, 2024
785372b
WIP
AndersonQ Jun 20, 2024
bb44a11
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Jun 20, 2024
a1ae941
migration: add tests for empty and unknown actions
AndersonQ Jun 21, 2024
113bf22
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 21, 2024
eba70e6
always return the reader.Close error
AndersonQ Jun 21, 2024
f450435
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Jun 21, 2024
d2b9fda
Merge branch 'main' into bugfix/3912-borken-stat-store
AndersonQ Jun 21, 2024
987c9f6
do not log errors
AndersonQ Jun 21, 2024
57dfe49
Merge branch 'bugfix/3912-borken-stat-store' of github.com:elastic/el…
AndersonQ Jun 21, 2024
e61cf6d
add debug log when dropping actions on SetAction
AndersonQ Jun 21, 2024
57dec1e
fix typos/better logs
AndersonQ Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions changelog/fragments/1712067343-fix-state-store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix the Elastic Agent state store

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
This change fixes issues when loading data from the Agent's internal state store.
Which include the error `error parsing version ""` the Agent would present after
trying to execute a scheduled upgrade after a restart.

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3912
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acke
if !ok {
return fmt.Errorf("invalid type, expected ActionCancel and received %T", a)
}
n := h.c.Cancel(action.TargetID)
n := h.c.Cancel(action.Data.TargetID)
if n == 0 {
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.", action.ActionID, action.TargetID)
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.",
action.ActionID, action.Data.TargetID)
return nil
}
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.", action.ActionID, action.TargetID, n)
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.",
action.ActionID, action.Data.TargetID, n)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
cDiag := h.diagComponents(ctx, action)

var r io.Reader
// attempt to create the a temporary diagnostics file on disk in order to avoid loading a
// potentially large file in memory.
// attempt to create a temporary diagnostics file on disk in order to avoid
// loading a potentially large file in memory.
// if on-disk creation fails an in-memory buffer is used.
f, s, err := h.diagFile(aDiag, uDiag, cDiag, action.ExcludeEventsLog)
f, s, err := h.diagFile(aDiag, uDiag, cDiag, action.Data.ExcludeEventsLog)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have an integration test for the diagnostics action yet. Now would be a good time to add one, breaking that unintentionally would be Very Bad.

Related: #4384

I'd suggest implementing it separately on main and then updating this PR to include it, since it should pass in both cases. I would not add it to this already large PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so just to confirm, #4384 is already the ticket for this test, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be if you meet the criteria it proposes. I am mostly interested in ensuring that we can collect diagnostics and send them to fleet here, not verifying the structure of the diagnostics. Re-reading it that issue considers the Fleet diagnostics part optional, I found it via key word search. You can create a new issue with reduced scope if you want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As author of #4384 , the main idea behind it is that we don't trigger diagnostic on a real agent with real components. The Fleet part was marked as optional as I didn't see that part of the code exercised anywhere yet in the integration tests.

For reference, all we have for diagnostics in integration tests is triggered with an elastic-agent diagnostics command, see:
https://github.com/elastic/elastic-agent/blob/main/testing/integration/diagnostics_test.go

_, err = fixture.Exec(ctx, []string{"diagnostics", "-f", diagAbsPath})

Feel free to keep the issue and make the Fleet part mandatory if you want or create a new one with reduced scope as @cmacknz suggested

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless the approach to further test the diagnostics, the state store is not part of the diagnostics. And we already have a test for the diagnostics, therefore I think this minor change on the diagnostics code is covered by tests, also it's just a model change, the compiler also should prevent an error here.

if err != nil {
var b bytes.Buffer
h.log.Warnw("Diagnostics action unable to use temporary file, using buffer instead.", "error.message", err)
Expand All @@ -161,7 +161,7 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
h.log.Warn(str)
}
}()
err := diagnostics.ZipArchive(&wBuf, &b, h.topPath, aDiag, uDiag, cDiag, action.ExcludeEventsLog)
err := diagnostics.ZipArchive(&wBuf, &b, h.topPath, aDiag, uDiag, cDiag, action.Data.ExcludeEventsLog)
if err != nil {
h.log.Errorw(
"diagnostics action handler failed generate zip archive",
Expand Down Expand Up @@ -202,7 +202,7 @@ func (h *Diagnostics) runHooks(ctx context.Context, action *fleetapi.ActionDiagn
// Currently CPU is the only additional metric we can collect.
// If this changes we would need to change how we scan AdditionalMetrics.
collectCPU := false
for _, metric := range action.AdditionalMetrics {
for _, metric := range action.Data.AdditionalMetrics {
if metric == "CPU" {
h.log.Debug("Diagnostics will collect CPU profile.")
collectCPU = true
Expand Down Expand Up @@ -296,7 +296,7 @@ func (h *Diagnostics) diagComponents(ctx context.Context, action *fleetapi.Actio
h.log.Debugf("Component diagnostics complete. Took: %s", time.Since(startTime))
}()
additionalMetrics := []cproto.AdditionalDiagnosticRequest{}
for _, metric := range action.AdditionalMetrics {
for _, metric := range action.Data.AdditionalMetrics {
if metric == "CPU" {
additionalMetrics = append(additionalMetrics, cproto.AdditionalDiagnosticRequest_CPU)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestDiagnosticHandlerWithCPUProfile(t *testing.T) {
mockUploader.EXPECT().UploadDiagnostics(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("upload-id", nil)

diagAction := &fleetapi.ActionDiagnostics{
AdditionalMetrics: []string{"CPU"},
Data: fleetapi.ActionDiagnosticsData{AdditionalMetrics: []string{"CPU"}},
}
handler.collectDiag(context.Background(), diagAction, mockAcker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
// // Cache signature validation key for the next policy handling
// h.signatureValidationKey = signatureValidationKey

c, err := config.NewConfigFrom(action.Policy)
c, err := config.NewConfigFrom(action.Data.Policy)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func TestPolicyChange(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
ActionType: "POLICY_CHANGE",
Policy: conf,
Data: fleetapi.ActionPolicyChangeData{
Policy: conf,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down Expand Up @@ -86,7 +88,9 @@ func TestPolicyAcked(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: actionID,
ActionType: "POLICY_CHANGE",
Policy: config,
Data: fleetapi.ActionPolicyChangeData{
Policy: config,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
return fmt.Errorf("invalid type, expected ActionSettings and received %T", a)
}

logLevel := action.LogLevel
logLevel := action.Data.LogLevel
return h.handleLogLevel(ctx, logLevel, acker, action)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: "debug",
Data: fleetapi.ActionSettingsData{LogLevel: "debug"},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand All @@ -154,7 +154,8 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: clearLogLevelValue,
Data: fleetapi.ActionSettingsData{
LogLevel: clearLogLevelValue},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand All @@ -175,7 +176,8 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: clearLogLevelValue,
Data: fleetapi.ActionSettingsData{
LogLevel: clearLogLevelValue},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ const (
)

type stateStore interface {
Add(fleetapi.Action)
SetAction(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
Action() fleetapi.Action
}

// Unenroll results in running agent entering idle state, non managed non standalone.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac

if h.stateStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.stateStore.Add(a)
h.stateStore.SetAction(a)
if err := h.stateStore.Save(); err != nil {
h.log.Warnf("Failed to update state store: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
}

go func() {
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
h.log.Infof("starting upgrade to version %s in background", action.Data.Version)
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
h.bkgMutex.Lock()
Expand Down Expand Up @@ -125,14 +125,17 @@ func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, a
h.log.Errorf("invalid type, expected ActionUpgrade and received %T", action)
return nil, false
}
if (upgradeAction.Version == bkgAction.Version) && (upgradeAction.SourceURI == bkgAction.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received", bkgAction.Version)
if (upgradeAction.Data.Version == bkgAction.Data.Version) &&
(upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received",
bkgAction.Data.Version)
h.bkgActions = append(h.bkgActions, action)
return nil, false
}

// Versions must be different, cancel the first upgrade and run the new one
h.log.Infof("Canceling upgrade to version %s received", bkgAction.Version)
h.log.Infof("Canceling upgrade to version %s received",
bkgAction.Data.Version)
h.bkgCancel()

// Ack here because we have the lock, and we need to clear out the saved actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestUpgradeHandler(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
Expand Down Expand Up @@ -114,7 +115,8 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a, ack)
err2 := u.Handle(ctx, &a, ack)
Expand Down Expand Up @@ -149,8 +151,10 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
a1 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.2.0", SourceURI: "http://localhost"}}
a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.5.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (ad *ActionDispatcher) handleExpired(
version := "unknown"
expiration := "unknown"
if upgrade, ok := e.(*fleetapi.ActionUpgrade); ok {
version = upgrade.Version
version = upgrade.Data.Version
expiration = upgrade.ActionExpiration
}
ad.lastUpgradeDetails = details.NewDetails(version, details.StateFailed, e.ID())
Expand Down Expand Up @@ -358,7 +358,7 @@ func (ad *ActionDispatcher) reportNextScheduledUpgrade(input []fleetapi.Action,
}

upgradeDetails := details.NewDetails(
nextUpgrade.Version,
nextUpgrade.Data.Version,
details.StateScheduled,
nextUpgrade.ID())
startTime, err := nextUpgrade.StartTime()
Expand Down
20 changes: 15 additions & 5 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.12.3",
Data: fleetapi.ActionUpgradeData{
Version: "8.12.3",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand All @@ -685,7 +687,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action2",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.0",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.0",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -702,12 +706,16 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action3",
ActionStartTime: muchLater.Format(time.RFC3339),
Version: "8.14.1",
Data: fleetapi.ActionUpgradeData{
Version: "8.14.1",
},
},
&fleetapi.ActionUpgrade{
ActionID: "action4",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.5",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.5",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -723,8 +731,10 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.13.2",
ActionStartTime: "invalid",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.2",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ type agentInfo interface {
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
}

type FleetGateway struct {
Expand Down
17 changes: 6 additions & 11 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newManagedConfigManager(
// Create the state store that will persist the last good policy change on disk.
stateStore, err := store.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", paths.AgentActionStoreFile()))
return nil, errors.New(err, fmt.Sprintf("fail to read state store '%s'", paths.AgentStateStoreFile()))
}

actionQueue, err := queue.NewActionQueue(stateStore.Queue(), stateStore)
Expand Down Expand Up @@ -158,14 +158,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
close(retrierRun)
}()

actions := m.stateStore.Actions()
action := m.stateStore.Action()
stateRestored := false
if len(actions) > 0 && !m.wasUnenrolled() {
if action != nil && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, action)
stateRestored = true
}

Expand Down Expand Up @@ -269,13 +269,8 @@ func (m *managedConfigManager) Watch() <-chan coordinator.ConfigChange {
}

func (m *managedConfigManager) wasUnenrolled() bool {
actions := m.stateStore.Actions()
for _, a := range actions {
if a.Type() == "UNENROLL" {
return true
}
}
return false
return m.stateStore.Action() != nil &&
m.stateStore.Action().Type() == fleetapi.ActionTypeUnenroll
}

func (m *managedConfigManager) initFleetServer(ctx context.Context, cfg *configuration.FleetServerConfig) error {
Expand Down
Loading