From 92acf08344bd8d91eaf4de7dfce34527517307c1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 19 Oct 2023 16:28:23 -0700 Subject: [PATCH] Track upgrade details (#3527) * Remove context and handle cancellation internally instead * More optimizations * Add back context * Adding FSM for upgrades * Implementing TODO * WIP * WIP * Reorganizing imports * Running go mod tidy * Resolve deadlock * Add unit tests * Fix type * Renaming variable to avoid conflict with package name * Handle failures in one place * Set UPG_RESTARTING state * Remove Fleet changes * Add guard for action * Immediately notify observer when registered * Add UpgradeCompleted effect to observer doc * Fix initialization * Adding details progress observer and unit tests * Fixing booboos introduced during conflict resolution * Add unit test * Add assertion on error * Add comment on stateNeedsRefresh * Add comment linking to Fleet Server OpenAPI spec for UPG_* values * Use public accessor for setting upgrade details on coordinator to prevent data race * Use buffered channel for upgradeDetailsChan in test so test can run in single goroutine * Fixing unit test * Add mutex to prevent data race * Clarify assertion's intent * Make copy of details before notifying observer with it. * Add setter for setting download percent * Remove unnecessary struct tags * Change mutex type * Document FailedState and ErrorMsg fields * Track download rate as well * Change data type of time field * Rename struct to avoid stutter in naming * Log upgrade details when they change * Add nil guard * Setting logger in test * Use sentinel value for encoding +Inf download rate in JSON * Fix up comment * Set omitempty on failed_state and error_msg * Add units to download rate --- .../handlers/handler_action_upgrade_test.go | 3 +- .../application/coordinator/coordinator.go | 42 ++++- .../coordinator/coordinator_state.go | 21 ++- .../coordinator/coordinator_test.go | 61 ++++++- .../coordinator/coordinator_unit_test.go | 12 +- .../artifact/download/http/downloader.go | 24 +-- .../artifact/download/http/downloader_test.go | 13 +- .../download/http/progress_observer.go | 35 ++++ .../download/http/progress_observer_test.go | 36 ++++ .../artifact/download/http/verifier_test.go | 4 +- .../download/localremote/downloader.go | 7 +- .../artifact/download/snapshot/downloader.go | 5 +- .../application/upgrade/details/details.go | 166 ++++++++++++++++++ .../upgrade/details/details_test.go | 95 ++++++++++ .../application/upgrade/details/state.go | 22 +++ .../application/upgrade/step_download.go | 19 +- .../application/upgrade/step_download_test.go | 21 ++- .../pkg/agent/application/upgrade/upgrade.go | 15 +- 18 files changed, 547 insertions(+), 54 deletions(-) create mode 100644 internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go create mode 100644 internal/pkg/agent/application/upgrade/details/details.go create mode 100644 internal/pkg/agent/application/upgrade/details/details_test.go create mode 100644 internal/pkg/agent/application/upgrade/details/state.go diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go index 17de63af699..b917b00b900 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" @@ -35,7 +36,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error { return nil } -func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { select { case <-time.After(2 * time.Second): u.msgChan <- "completed " + version diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index a17d3bf6199..0da3546ffa8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -17,8 +17,10 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/capabilities" @@ -59,7 +61,7 @@ type UpgradeManager interface { Reload(rawConfig *config.Config) error // Upgrade upgrades running agent. - Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) + Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) // Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action Ack(ctx context.Context, acker acker.Acker) error @@ -192,8 +194,12 @@ type Coordinator struct { // state should never be directly read or written outside the Coordinator // goroutine. Callers who need to access or modify the state should use the // public accessors like State(), SetLogLevel(), etc. - state State - stateBroadcaster *broadcaster.Broadcaster[State] + state State + stateBroadcaster *broadcaster.Broadcaster[State] + + // If you get a race detector error while accessing this field, it probably + // means you're calling private Coordinator methods from outside the + // Coordinator goroutine. stateNeedsRefresh bool // overrideState is used during the update process to report the overall @@ -204,6 +210,10 @@ type Coordinator struct { // SetOverrideState helper to the Coordinator goroutine. overrideStateChan chan *coordinatorOverrideState + // upgradeDetailsChan forwards upgrade details from the publicly accessible + // SetUpgradeDetails helper to the Coordinator goroutine. + upgradeDetailsChan chan *details.Details + // loglevelCh forwards log level changes from the public API (SetLogLevel) // to the run loop in Coordinator's main goroutine. logLevelCh chan logp.Level @@ -326,8 +336,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), - logLevelCh: make(chan logp.Level), - overrideStateChan: make(chan *coordinatorOverrideState), + logLevelCh: make(chan logp.Level), + overrideStateChan: make(chan *coordinatorOverrideState), + upgradeDetailsChan: make(chan *details.Details), } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -445,17 +456,33 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str // override the overall state to upgrading until the re-execution is complete c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version)) - cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + + // initialize upgrade details + actionID := "" + if action != nil { + actionID = action.ActionID + } + det := details.NewDetails(version, details.StateRequested, actionID) + det.RegisterObserver(c.SetUpgradeDetails) + det.RegisterObserver(c.logUpgradeDetails) + + cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { c.ClearOverrideState() + det.Fail(err) return err } if cb != nil { + det.SetState(details.StateRestarting) c.ReExec(cb) } return nil } +func (c *Coordinator) logUpgradeDetails(details *details.Details) { + c.logger.Infow("updated upgrade details", "upgrade_details", details) +} + // AckUpgrade is the method used on startup to ack a previously successful upgrade action. // Called from external goroutines. func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error { @@ -878,6 +905,9 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case overrideState := <-c.overrideStateChan: c.setOverrideState(overrideState) + case upgradeDetails := <-c.upgradeDetailsChan: + c.setUpgradeDetails(upgradeDetails) + case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index f896f024733..6e645c3a06b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -7,11 +7,13 @@ package coordinator import ( "fmt" - agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/component/runtime" + agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" ) // State provides the current state of the coordinator along with all the current states of components and units. @@ -30,6 +32,8 @@ type State struct { Components []runtime.ComponentComponentState `yaml:"components"` LogLevel logp.Level `yaml:"log_level"` + + UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"` } type coordinatorOverrideState struct { @@ -54,6 +58,11 @@ func (c *Coordinator) ClearOverrideState() { c.overrideStateChan <- nil } +// SetUpgradeDetails sets upgrade details. This is used during upgrades. +func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) { + c.upgradeDetailsChan <- upgradeDetails +} + // setRuntimeManagerError updates the error state for the runtime manager. // Called on the main Coordinator goroutine. func (c *Coordinator) setRuntimeManagerError(err error) { @@ -114,6 +123,13 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState) c.stateNeedsRefresh = true } +// setUpgradeDetails is the internal helper to set upgrade details and set stateNeedsRefresh. +// Must be called on the main Coordinator goroutine. +func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) { + c.state.UpgradeDetails = upgradeDetails + c.stateNeedsRefresh = true +} + // Forward the current state to the broadcaster and clear the stateNeedsRefresh // flag. Must be called on the main Coordinator goroutine. func (c *Coordinator) refreshState() { @@ -163,6 +179,7 @@ func (c *Coordinator) generateReportableState() (s State) { s.FleetState = c.state.FleetState s.FleetMessage = c.state.FleetMessage s.LogLevel = c.state.LogLevel + s.UpgradeDetails = c.state.UpgradeDetails s.Components = make([]runtime.ComponentComponentState, len(c.state.Components)) copy(s.Components, c.state.Components) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index e84b43f182c..131b91b447a 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/stretchr/testify/assert" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -471,8 +473,50 @@ func TestCoordinator_Upgrade(t *testing.T) { require.NoError(t, err) } +func TestCoordinator_UpgradeDetails(t *testing.T) { + coordCh := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + expectedErr := errors.New("some upgrade error") + upgradeManager := &fakeUpgradeManager{ + upgradeable: true, + upgradeErr: expectedErr, + } + coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithUpgradeManager(upgradeManager)) + require.Nil(t, coord.state.UpgradeDetails) + go func() { + err := coord.Run(ctx) + if errors.Is(err, context.Canceled) { + // allowed error + err = nil + } + coordCh <- err + }() + + // no vars used by the config + varsMgr.Vars(ctx, []*transpiler.Vars{{}}) + + // no need for anything to really run + cfg, err := config.NewConfigFrom(nil) + require.NoError(t, err) + cfgMgr.Config(ctx, cfg) + + err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false) + require.ErrorIs(t, expectedErr, err) + cancel() + + err = <-coordCh + require.NoError(t, err) + + require.Equal(t, details.StateFailed, coord.state.UpgradeDetails.State) + require.Equal(t, details.StateRequested, coord.state.UpgradeDetails.Metadata.FailedState) + require.Equal(t, expectedErr.Error(), coord.state.UpgradeDetails.Metadata.ErrorMsg) +} + type createCoordinatorOpts struct { - managed bool + managed bool + upgradeManager UpgradeManager } type CoordinatorOpt func(o *createCoordinatorOpts) @@ -483,6 +527,12 @@ func ManagedCoordinator(managed bool) CoordinatorOpt { } } +func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt { + return func(o *createCoordinatorOpts) { + o.upgradeManager = upgradeManager + } +} + // createCoordinator creates a coordinator that using a fake config manager and a fake vars manager. // // The runtime specifications is set up to use both the fake component and fake shipper. @@ -527,7 +577,12 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt cfgMgr := newFakeConfigManager() varsMgr := newFakeVarsManager() - coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, &fakeUpgradeManager{}, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) + upgradeManager := o.upgradeManager + if upgradeManager == nil { + upgradeManager = &fakeUpgradeManager{} + } + + coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) return coord, cfgMgr, varsMgr } @@ -574,7 +629,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error { return nil } -func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { f.upgradeCalled = true if f.upgradeErr != nil { return nil, f.upgradeErr diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 805139f26e8..5752c100a41 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/pkg/component" @@ -811,6 +812,9 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { // since a successful upgrade sets the override state twice. overrideStateChan := make(chan *coordinatorOverrideState, 2) + // similarly, upgradeDetailsChan is a buffered channel as well. + upgradeDetailsChan := make(chan *details.Details, 2) + // Create a manager that will allow upgrade attempts but return a failure // from Upgrade itself (success requires testing ReExec and we aren't // quite ready to do that yet). @@ -820,9 +824,11 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { } coord := &Coordinator{ - stateBroadcaster: broadcaster.New(State{}, 0, 0), - overrideStateChan: overrideStateChan, - upgradeMgr: upgradeMgr, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + overrideStateChan: overrideStateChan, + upgradeDetailsChan: upgradeDetailsChan, + upgradeMgr: upgradeMgr, + logger: logp.NewLogger("testing"), } // Call upgrade and make sure the upgrade manager receives an Upgrade call diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index a02585e5938..50fc6849f21 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -43,13 +44,14 @@ const ( // Downloader is a downloader able to fetch artifacts from elastic.co web page. type Downloader struct { - log *logger.Logger - config *artifact.Config - client http.Client + log *logger.Logger + config *artifact.Config + client http.Client + upgradeDetails *details.Details } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (*Downloader, error) { client, err := config.HTTPTransportSettings.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second}, @@ -59,15 +61,16 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, er } client.Transport = download.WithHeaders(client.Transport, download.Headers) - return NewDownloaderWithClient(log, config, *client), nil + return NewDownloaderWithClient(log, config, *client, upgradeDetails), nil } // NewDownloaderWithClient creates Elastic Downloader with specific client used -func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader { +func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader { return &Downloader{ - log: log, - config: config, - client: client, + log: log, + config: config, + client: client, + upgradeDetails: upgradeDetails, } } @@ -206,7 +209,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout) - dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver) + detailsObserver := newDetailsProgressObserver(e.upgradeDetails) + dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver, detailsObserver) dp.Report(ctx) _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index 4c341a2aa5e..94e3ce856e2 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/docker/go-units" @@ -56,7 +57,8 @@ func TestDownload(t *testing.T) { config.OperatingSystem = testCase.system config.Architecture = testCase.arch - testClient := NewDownloaderWithClient(log, config, elasticClient) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) if err != nil { t.Fatal(err) @@ -105,7 +107,8 @@ func TestDownloadBodyError(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) if err == nil { @@ -161,7 +164,8 @@ func TestDownloadLogProgressWithLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") @@ -243,7 +247,8 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go index 4eef0682a50..ca024c53c88 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go @@ -5,10 +5,12 @@ package http import ( + "sync" "time" "github.com/docker/go-units" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -95,3 +97,36 @@ func (lpObs *loggingProgressObserver) ReportFailed(sourceURI string, timePast ti lpObs.log.Warnf(msg, args...) } } + +type detailsProgressObserver struct { + upgradeDetails *details.Details + mu sync.RWMutex +} + +func newDetailsProgressObserver(upgradeDetails *details.Details) *detailsProgressObserver { + upgradeDetails.SetState(details.StateDownloading) + return &detailsProgressObserver{ + upgradeDetails: upgradeDetails, + } +} + +func (dpObs *detailsProgressObserver) Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(percentComplete, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportCompleted(sourceURI string, timePast time.Duration, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(1, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64, err error) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.Fail(err) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go new file mode 100644 index 00000000000..bb1d7ac1c87 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/docker/go-units" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +func TestDetailsProgressObserver(t *testing.T) { + upgradeDetails := details.NewDetails("8.11.0", details.StateRequested, "") + detailsObs := newDetailsProgressObserver(upgradeDetails) + + detailsObs.Report("http://some/uri", 20*time.Second, 400*units.MiB, 500*units.MiB, 0.8, 4455) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 0.8, upgradeDetails.Metadata.DownloadPercent) + + detailsObs.ReportCompleted("http://some/uri", 30*time.Second, 3333) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 1.0, upgradeDetails.Metadata.DownloadPercent) + + err := errors.New("some download error") + detailsObs.ReportFailed("http://some/uri", 30*time.Second, 450*units.MiB, 500*units.MiB, 0.9, 1122, err) + require.Equal(t, details.StateFailed, upgradeDetails.State) + require.Equal(t, details.StateDownloading, upgradeDetails.Metadata.FailedState) + require.Equal(t, err.Error(), upgradeDetails.Metadata.ErrorMsg) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go index c8f3405f404..66c8bd715e0 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -46,7 +47,8 @@ func TestVerify(t *testing.T) { config.OperatingSystem = testCase.system config.Architecture = testCase.arch - testClient := NewDownloaderWithClient(log, config, elasticClient) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) artifact, err := testClient.Download(context.Background(), beatSpec, version) if err != nil { t.Fatal(err) diff --git a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go index 78cef03e578..023c15a5272 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go @@ -11,13 +11,14 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/fs" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/snapshot" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" ) // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { downloaders := make([]download.Downloader, 0, 3) downloaders = append(downloaders, fs.NewDownloader(config)) @@ -26,7 +27,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo // a snapshot version of fleet, for example. // try snapshot repo before official if release.Snapshot() { - snapDownloader, err := snapshot.NewDownloader(log, config, nil) + snapDownloader, err := snapshot.NewDownloader(log, config, nil, upgradeDetails) if err != nil { log.Error(err) } else { @@ -34,7 +35,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo } } - httpDownloader, err := http.NewDownloader(log, config) + httpDownloader, err := http.NewDownloader(log, config, upgradeDetails) if err != nil { return nil, err } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go index 51b16ee4372..ecf2497851c 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -32,13 +33,13 @@ type Downloader struct { // We need to pass the versionOverride separately from the config as // artifact.Config struct is part of agent configuration and a version // override makes no sense there -func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer, upgradeDetails *details.Details) (download.Downloader, error) { cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, fmt.Errorf("error creating snapshot config: %w", err) } - httpDownloader, err := http.NewDownloader(log, cfg) + httpDownloader, err := http.NewDownloader(log, cfg, upgradeDetails) if err != nil { return nil, fmt.Errorf("failed to create snapshot downloader: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/details/details.go b/internal/pkg/agent/application/upgrade/details/details.go new file mode 100644 index 00000000000..028990aafcd --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details.go @@ -0,0 +1,166 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + + "github.com/docker/go-units" +) + +// downloadRate is a float64 that can be safely marshalled to JSON +// when the value is Infinity. The rate is always in bytes/second units. +type downloadRate float64 + +// Observer is a function that will be called with upgrade details +type Observer func(details *Details) + +// Details consists of details regarding an ongoing upgrade. +type Details struct { + TargetVersion string `json:"target_version"` + State State `json:"state"` + ActionID string `json:"action_id,omitempty"` + Metadata Metadata `json:"metadata"` + + observers []Observer + mu sync.Mutex +} + +// Metadata consists of metadata relating to a specific upgrade state +type Metadata struct { + ScheduledAt time.Time `json:"scheduled_at,omitempty"` + DownloadPercent float64 `json:"download_percent,omitempty"` + DownloadRate downloadRate `json:"download_rate,omitempty"` + + // FailedState is the state an upgrade was in if/when it failed. Use the + // Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + FailedState State `json:"failed_state,omitempty"` + + // ErrorMsg is any error message encountered if/when an upgrade fails. Use + // the Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + ErrorMsg string `json:"error_msg,omitempty"` +} + +func NewDetails(targetVersion string, initialState State, actionID string) *Details { + return &Details{ + TargetVersion: targetVersion, + State: initialState, + ActionID: actionID, + Metadata: Metadata{}, + observers: []Observer{}, + } +} + +// SetState is a convenience method to set the state of the upgrade and +// notify all observers. +func (d *Details) SetState(s State) { + d.mu.Lock() + defer d.mu.Unlock() + + d.State = s + d.notifyObservers() +} + +// SetDownloadProgress is a convenience method to set the download percent +// when the upgrade is in UPG_DOWNLOADING state. +func (d *Details) SetDownloadProgress(percent, rateBytesPerSecond float64) { + d.mu.Lock() + defer d.mu.Unlock() + + d.Metadata.DownloadPercent = percent + d.Metadata.DownloadRate = downloadRate(rateBytesPerSecond) + d.notifyObservers() +} + +// Fail is a convenience method to set the state of the upgrade +// to StateFailed, set metadata associated with the failure, and +// notify all observers. +func (d *Details) Fail(err error) { + d.mu.Lock() + defer d.mu.Unlock() + + // Record the state the upgrade process was in right before it + // failed, but only do this if we haven't already transitioned the + // state to the StateFailed state; otherwise we'll just end up recording + // the state we failed from as StateFailed which is not useful. + if d.State != StateFailed { + d.Metadata.FailedState = d.State + } + + d.Metadata.ErrorMsg = err.Error() + d.State = StateFailed + d.notifyObservers() +} + +// RegisterObserver allows an interested consumer of Details to register +// themselves as an Observer. The registered observer is immediately notified +// of the current upgrade details. +func (d *Details) RegisterObserver(observer Observer) { + d.mu.Lock() + defer d.mu.Unlock() + + d.observers = append(d.observers, observer) + d.notifyObserver(observer) +} + +func (d *Details) notifyObservers() { + for _, observer := range d.observers { + d.notifyObserver(observer) + } +} + +func (d *Details) notifyObserver(observer Observer) { + if d.State == StateCompleted { + observer(nil) + } else { + dCopy := Details{ + TargetVersion: d.TargetVersion, + State: d.State, + ActionID: d.ActionID, + Metadata: d.Metadata, + } + observer(&dCopy) + } +} + +func (dr *downloadRate) MarshalJSON() ([]byte, error) { + downloadRateBytesPerSecond := float64(*dr) + if math.IsInf(downloadRateBytesPerSecond, 0) { + return json.Marshal("+Inf bps") + } + + return json.Marshal( + fmt.Sprintf("%sps", units.HumanSizeWithPrecision(downloadRateBytesPerSecond, 2)), + ) +} + +func (dr *downloadRate) UnmarshalJSON(data []byte) error { + var downloadRateStr string + err := json.Unmarshal(data, &downloadRateStr) + if err != nil { + return err + } + + if downloadRateStr == "+Inf bps" { + *dr = downloadRate(math.Inf(1)) + return nil + } + + downloadRateStr = strings.TrimSuffix(downloadRateStr, "ps") + downloadRateBytesPerSecond, err := units.FromHumanSize(downloadRateStr) + if err != nil { + return err + } + + *dr = downloadRate(downloadRateBytesPerSecond) + return nil +} diff --git a/internal/pkg/agent/application/upgrade/details/details_test.go b/internal/pkg/agent/application/upgrade/details/details_test.go new file mode 100644 index 00000000000..88d239dfabf --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details_test.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "errors" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDetailsNew(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + require.Equal(t, "99.999.9999", det.TargetVersion) + require.Equal(t, "test_action_id", det.ActionID) + require.Equal(t, Metadata{}, det.Metadata) +} + +func TestDetailsSetState(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) +} + +func TestDetailsFail(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + err := errors.New("test error") + det.Fail(err) + require.Equal(t, StateFailed, det.State) + require.Equal(t, StateRequested, det.Metadata.FailedState) + require.Equal(t, err.Error(), det.Metadata.ErrorMsg) +} + +func TestDetailsObserver(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + var observedDetails *Details + obs := func(updatedDetails *Details) { observedDetails = updatedDetails } + + det.RegisterObserver(obs) + require.Len(t, det.observers, 1) + require.NotNil(t, observedDetails) + require.Equal(t, StateRequested, observedDetails.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) + require.Equal(t, StateDownloading, observedDetails.State) + + det.SetState(StateCompleted) + require.Equal(t, StateCompleted, det.State) + require.Nil(t, nil, observedDetails) +} + +func TestDetailsDownloadRateJSON(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + + // Normal (non-infinity) download rate + t.Run("non_infinity", func(t *testing.T) { + det.SetDownloadProgress(.8, 1794.7) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, float64(1800), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, .8, unmarshalledDetails.Metadata.DownloadPercent) + }) + + // Infinity download rate + t.Run("infinity", func(t *testing.T) { + det.SetDownloadProgress(0.99, math.Inf(1)) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, math.Inf(1), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, 0.99, unmarshalledDetails.Metadata.DownloadPercent) + }) + +} diff --git a/internal/pkg/agent/application/upgrade/details/state.go b/internal/pkg/agent/application/upgrade/details/state.go new file mode 100644 index 00000000000..19aaaae8a25 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/state.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +type State string + +// The values of these State* constants should match those enumerated for +// upgrade_details.state in https://github.com/elastic/fleet-server/blob/main/model/openapi.yml +const ( + StateRequested State = "UPG_REQUESTED" + StateScheduled State = "UPG_SCHEDULED" + StateDownloading State = "UPG_DOWNLOADING" + StateExtracting State = "UPG_EXTRACTING" + StateReplacing State = "UPG_REPLACING" + StateRestarting State = "UPG_RESTARTING" + StateWatching State = "UPG_WATCHING" + StateRollback State = "UPG_ROLLBACK" + StateCompleted State = "UPG_COMPLETED" + StateFailed State = "UPG_FAILED" +) diff --git a/internal/pkg/agent/application/upgrade/step_download.go b/internal/pkg/agent/application/upgrade/step_download.go index cee0c1d75dc..d86a43a5a3b 100644 --- a/internal/pkg/agent/application/upgrade/step_download.go +++ b/internal/pkg/agent/application/upgrade/step_download.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/cenkalti/backoff/v4" "go.elastic.co/apm" @@ -35,7 +37,7 @@ const ( fleetUpgradeFallbackPGPFormat = "/api/agents/upgrades/%d.%d.%d/pgp-public-key" ) -func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { +func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { span, ctx := apm.StartSpan(ctx, "downloadArtifact", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() @@ -69,7 +71,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri return "", errors.New(err, fmt.Sprintf("failed to create download directory at %s", paths.Downloads())) } - path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings) + path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings, upgradeDetails) if err != nil { return "", errors.New(err, "failed download of agent binary") } @@ -121,20 +123,20 @@ func (u *Upgrader) appendFallbackPGP(targetVersion string, pgpBytes []string) [] return pgpBytes } -func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { +func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { if !version.IsSnapshot() { - return localremote.NewDownloader(log, settings) + return localremote.NewDownloader(log, settings, upgradeDetails) } // TODO since we know if it's a snapshot or not, shouldn't we add EITHER the snapshot downloader OR the release one ? // try snapshot repo before official - snapDownloader, err := snapshot.NewDownloader(log, settings, version) + snapDownloader, err := snapshot.NewDownloader(log, settings, version, upgradeDetails) if err != nil { return nil, err } - httpDownloader, err := http.NewDownloader(log, settings) + httpDownloader, err := http.NewDownloader(log, settings, upgradeDetails) if err != nil { return nil, err } @@ -169,9 +171,10 @@ func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings func (u *Upgrader) downloadWithRetries( ctx context.Context, - downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config) (download.Downloader, error), + downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config, *details.Details) (download.Downloader, error), version *agtversion.ParsedSemVer, settings *artifact.Config, + upgradeDetails *details.Details, ) (string, error) { cancelCtx, cancel := context.WithTimeout(ctx, settings.Timeout) defer cancel() @@ -187,7 +190,7 @@ func (u *Upgrader) downloadWithRetries( attempt++ u.log.Infof("download attempt %d", attempt) - downloader, err := downloaderCtor(version, u.log, settings) + downloader, err := downloaderCtor(version, u.log, settings, upgradeDetails) if err != nil { return fmt.Errorf("unable to create fetcher: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/step_download_test.go b/internal/pkg/agent/application/upgrade/step_download_test.go index 330a60f5288..dcdc4da7de8 100644 --- a/internal/pkg/agent/application/upgrade/step_download_test.go +++ b/internal/pkg/agent/application/upgrade/step_download_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -84,14 +85,15 @@ func TestDownloadWithRetries(t *testing.T) { // Successful immediately (no retries) t.Run("successful_immediately", func(t *testing.T) { - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{expectedDownloadPath, nil}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -103,7 +105,7 @@ func TestDownloadWithRetries(t *testing.T) { // Downloader constructor failing on first attempt, but succeeding on second attempt (= first retry) t.Run("constructor_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -125,7 +127,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -139,7 +142,7 @@ func TestDownloadWithRetries(t *testing.T) { // Download failing on first attempt, but succeeding on second attempt (= first retry) t.Run("download_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -161,7 +164,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -178,14 +182,15 @@ func TestDownloadWithRetries(t *testing.T) { testCaseSettings.Timeout = 200 * time.Millisecond testCaseSettings.RetrySleepInitDuration = 100 * time.Millisecond - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{"", errors.New("download failed")}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings, upgradeDetails) require.Equal(t, "context deadline exceeded", err.Error()) require.Equal(t, "", path) diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index eaf51ef7684..36276f239b6 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/install" "github.com/elastic/elastic-agent/internal/pkg/config" @@ -28,7 +29,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/release" - "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -127,7 +127,7 @@ func (u *Upgrader) Upgradeable() bool { } // Upgrade upgrades running agent, function returns shutdown callback that must be called by reexec. -func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI) span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal") defer span.End() @@ -137,17 +137,22 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads()) } + det.SetState(details.StateDownloading) + sourceURI = u.sourceURI(sourceURI) - archivePath, err := u.downloadArtifact(ctx, version, sourceURI, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + archivePath, err := u.downloadArtifact(ctx, version, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { // Run the same pre-upgrade cleanup task to get rid of any newly downloaded files // This may have an issue if users are upgrading to the same version number. if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil { u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr) } + return nil, err } + det.SetState(details.StateExtracting) + newHash, err := u.unpack(version, archivePath) if err != nil { return nil, err @@ -170,6 +175,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, errors.New(err, "failed to copy run directory") } + det.SetState(details.StateReplacing) + if err := ChangeSymlink(ctx, u.log, newHash); err != nil { u.log.Errorw("Rolling back: changing symlink failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) @@ -182,6 +189,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, err } + det.SetState(details.StateWatching) + if err := InvokeWatcher(u.log); err != nil { u.log.Errorw("Rolling back: starting watcher failed", "error.message", err) rollbackInstall(ctx, u.log, newHash)