From 92acf08344bd8d91eaf4de7dfce34527517307c1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 19 Oct 2023 16:28:23 -0700 Subject: [PATCH 1/5] Track upgrade details (#3527) * Remove context and handle cancellation internally instead * More optimizations * Add back context * Adding FSM for upgrades * Implementing TODO * WIP * WIP * Reorganizing imports * Running go mod tidy * Resolve deadlock * Add unit tests * Fix type * Renaming variable to avoid conflict with package name * Handle failures in one place * Set UPG_RESTARTING state * Remove Fleet changes * Add guard for action * Immediately notify observer when registered * Add UpgradeCompleted effect to observer doc * Fix initialization * Adding details progress observer and unit tests * Fixing booboos introduced during conflict resolution * Add unit test * Add assertion on error * Add comment on stateNeedsRefresh * Add comment linking to Fleet Server OpenAPI spec for UPG_* values * Use public accessor for setting upgrade details on coordinator to prevent data race * Use buffered channel for upgradeDetailsChan in test so test can run in single goroutine * Fixing unit test * Add mutex to prevent data race * Clarify assertion's intent * Make copy of details before notifying observer with it. * Add setter for setting download percent * Remove unnecessary struct tags * Change mutex type * Document FailedState and ErrorMsg fields * Track download rate as well * Change data type of time field * Rename struct to avoid stutter in naming * Log upgrade details when they change * Add nil guard * Setting logger in test * Use sentinel value for encoding +Inf download rate in JSON * Fix up comment * Set omitempty on failed_state and error_msg * Add units to download rate --- .../handlers/handler_action_upgrade_test.go | 3 +- .../application/coordinator/coordinator.go | 42 ++++- .../coordinator/coordinator_state.go | 21 ++- .../coordinator/coordinator_test.go | 61 ++++++- .../coordinator/coordinator_unit_test.go | 12 +- .../artifact/download/http/downloader.go | 24 +-- .../artifact/download/http/downloader_test.go | 13 +- .../download/http/progress_observer.go | 35 ++++ .../download/http/progress_observer_test.go | 36 ++++ .../artifact/download/http/verifier_test.go | 4 +- .../download/localremote/downloader.go | 7 +- .../artifact/download/snapshot/downloader.go | 5 +- .../application/upgrade/details/details.go | 166 ++++++++++++++++++ .../upgrade/details/details_test.go | 95 ++++++++++ .../application/upgrade/details/state.go | 22 +++ .../application/upgrade/step_download.go | 19 +- .../application/upgrade/step_download_test.go | 21 ++- .../pkg/agent/application/upgrade/upgrade.go | 15 +- 18 files changed, 547 insertions(+), 54 deletions(-) create mode 100644 internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go create mode 100644 internal/pkg/agent/application/upgrade/details/details.go create mode 100644 internal/pkg/agent/application/upgrade/details/details_test.go create mode 100644 internal/pkg/agent/application/upgrade/details/state.go diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go index 17de63af699..b917b00b900 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" @@ -35,7 +36,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error { return nil } -func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { select { case <-time.After(2 * time.Second): u.msgChan <- "completed " + version diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index a17d3bf6199..0da3546ffa8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -17,8 +17,10 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/capabilities" @@ -59,7 +61,7 @@ type UpgradeManager interface { Reload(rawConfig *config.Config) error // Upgrade upgrades running agent. - Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) + Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) // Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action Ack(ctx context.Context, acker acker.Acker) error @@ -192,8 +194,12 @@ type Coordinator struct { // state should never be directly read or written outside the Coordinator // goroutine. Callers who need to access or modify the state should use the // public accessors like State(), SetLogLevel(), etc. - state State - stateBroadcaster *broadcaster.Broadcaster[State] + state State + stateBroadcaster *broadcaster.Broadcaster[State] + + // If you get a race detector error while accessing this field, it probably + // means you're calling private Coordinator methods from outside the + // Coordinator goroutine. stateNeedsRefresh bool // overrideState is used during the update process to report the overall @@ -204,6 +210,10 @@ type Coordinator struct { // SetOverrideState helper to the Coordinator goroutine. overrideStateChan chan *coordinatorOverrideState + // upgradeDetailsChan forwards upgrade details from the publicly accessible + // SetUpgradeDetails helper to the Coordinator goroutine. + upgradeDetailsChan chan *details.Details + // loglevelCh forwards log level changes from the public API (SetLogLevel) // to the run loop in Coordinator's main goroutine. logLevelCh chan logp.Level @@ -326,8 +336,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), - logLevelCh: make(chan logp.Level), - overrideStateChan: make(chan *coordinatorOverrideState), + logLevelCh: make(chan logp.Level), + overrideStateChan: make(chan *coordinatorOverrideState), + upgradeDetailsChan: make(chan *details.Details), } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -445,17 +456,33 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str // override the overall state to upgrading until the re-execution is complete c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version)) - cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + + // initialize upgrade details + actionID := "" + if action != nil { + actionID = action.ActionID + } + det := details.NewDetails(version, details.StateRequested, actionID) + det.RegisterObserver(c.SetUpgradeDetails) + det.RegisterObserver(c.logUpgradeDetails) + + cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { c.ClearOverrideState() + det.Fail(err) return err } if cb != nil { + det.SetState(details.StateRestarting) c.ReExec(cb) } return nil } +func (c *Coordinator) logUpgradeDetails(details *details.Details) { + c.logger.Infow("updated upgrade details", "upgrade_details", details) +} + // AckUpgrade is the method used on startup to ack a previously successful upgrade action. // Called from external goroutines. func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error { @@ -878,6 +905,9 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case overrideState := <-c.overrideStateChan: c.setOverrideState(overrideState) + case upgradeDetails := <-c.upgradeDetailsChan: + c.setUpgradeDetails(upgradeDetails) + case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index f896f024733..6e645c3a06b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -7,11 +7,13 @@ package coordinator import ( "fmt" - agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/component/runtime" + agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" ) // State provides the current state of the coordinator along with all the current states of components and units. @@ -30,6 +32,8 @@ type State struct { Components []runtime.ComponentComponentState `yaml:"components"` LogLevel logp.Level `yaml:"log_level"` + + UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"` } type coordinatorOverrideState struct { @@ -54,6 +58,11 @@ func (c *Coordinator) ClearOverrideState() { c.overrideStateChan <- nil } +// SetUpgradeDetails sets upgrade details. This is used during upgrades. +func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) { + c.upgradeDetailsChan <- upgradeDetails +} + // setRuntimeManagerError updates the error state for the runtime manager. // Called on the main Coordinator goroutine. func (c *Coordinator) setRuntimeManagerError(err error) { @@ -114,6 +123,13 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState) c.stateNeedsRefresh = true } +// setUpgradeDetails is the internal helper to set upgrade details and set stateNeedsRefresh. +// Must be called on the main Coordinator goroutine. +func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) { + c.state.UpgradeDetails = upgradeDetails + c.stateNeedsRefresh = true +} + // Forward the current state to the broadcaster and clear the stateNeedsRefresh // flag. Must be called on the main Coordinator goroutine. func (c *Coordinator) refreshState() { @@ -163,6 +179,7 @@ func (c *Coordinator) generateReportableState() (s State) { s.FleetState = c.state.FleetState s.FleetMessage = c.state.FleetMessage s.LogLevel = c.state.LogLevel + s.UpgradeDetails = c.state.UpgradeDetails s.Components = make([]runtime.ComponentComponentState, len(c.state.Components)) copy(s.Components, c.state.Components) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index e84b43f182c..131b91b447a 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/stretchr/testify/assert" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -471,8 +473,50 @@ func TestCoordinator_Upgrade(t *testing.T) { require.NoError(t, err) } +func TestCoordinator_UpgradeDetails(t *testing.T) { + coordCh := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + expectedErr := errors.New("some upgrade error") + upgradeManager := &fakeUpgradeManager{ + upgradeable: true, + upgradeErr: expectedErr, + } + coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithUpgradeManager(upgradeManager)) + require.Nil(t, coord.state.UpgradeDetails) + go func() { + err := coord.Run(ctx) + if errors.Is(err, context.Canceled) { + // allowed error + err = nil + } + coordCh <- err + }() + + // no vars used by the config + varsMgr.Vars(ctx, []*transpiler.Vars{{}}) + + // no need for anything to really run + cfg, err := config.NewConfigFrom(nil) + require.NoError(t, err) + cfgMgr.Config(ctx, cfg) + + err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false) + require.ErrorIs(t, expectedErr, err) + cancel() + + err = <-coordCh + require.NoError(t, err) + + require.Equal(t, details.StateFailed, coord.state.UpgradeDetails.State) + require.Equal(t, details.StateRequested, coord.state.UpgradeDetails.Metadata.FailedState) + require.Equal(t, expectedErr.Error(), coord.state.UpgradeDetails.Metadata.ErrorMsg) +} + type createCoordinatorOpts struct { - managed bool + managed bool + upgradeManager UpgradeManager } type CoordinatorOpt func(o *createCoordinatorOpts) @@ -483,6 +527,12 @@ func ManagedCoordinator(managed bool) CoordinatorOpt { } } +func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt { + return func(o *createCoordinatorOpts) { + o.upgradeManager = upgradeManager + } +} + // createCoordinator creates a coordinator that using a fake config manager and a fake vars manager. // // The runtime specifications is set up to use both the fake component and fake shipper. @@ -527,7 +577,12 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt cfgMgr := newFakeConfigManager() varsMgr := newFakeVarsManager() - coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, &fakeUpgradeManager{}, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) + upgradeManager := o.upgradeManager + if upgradeManager == nil { + upgradeManager = &fakeUpgradeManager{} + } + + coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) return coord, cfgMgr, varsMgr } @@ -574,7 +629,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error { return nil } -func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { f.upgradeCalled = true if f.upgradeErr != nil { return nil, f.upgradeErr diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 805139f26e8..5752c100a41 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/pkg/component" @@ -811,6 +812,9 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { // since a successful upgrade sets the override state twice. overrideStateChan := make(chan *coordinatorOverrideState, 2) + // similarly, upgradeDetailsChan is a buffered channel as well. + upgradeDetailsChan := make(chan *details.Details, 2) + // Create a manager that will allow upgrade attempts but return a failure // from Upgrade itself (success requires testing ReExec and we aren't // quite ready to do that yet). @@ -820,9 +824,11 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { } coord := &Coordinator{ - stateBroadcaster: broadcaster.New(State{}, 0, 0), - overrideStateChan: overrideStateChan, - upgradeMgr: upgradeMgr, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + overrideStateChan: overrideStateChan, + upgradeDetailsChan: upgradeDetailsChan, + upgradeMgr: upgradeMgr, + logger: logp.NewLogger("testing"), } // Call upgrade and make sure the upgrade manager receives an Upgrade call diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index a02585e5938..50fc6849f21 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -43,13 +44,14 @@ const ( // Downloader is a downloader able to fetch artifacts from elastic.co web page. type Downloader struct { - log *logger.Logger - config *artifact.Config - client http.Client + log *logger.Logger + config *artifact.Config + client http.Client + upgradeDetails *details.Details } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (*Downloader, error) { client, err := config.HTTPTransportSettings.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second}, @@ -59,15 +61,16 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, er } client.Transport = download.WithHeaders(client.Transport, download.Headers) - return NewDownloaderWithClient(log, config, *client), nil + return NewDownloaderWithClient(log, config, *client, upgradeDetails), nil } // NewDownloaderWithClient creates Elastic Downloader with specific client used -func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader { +func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader { return &Downloader{ - log: log, - config: config, - client: client, + log: log, + config: config, + client: client, + upgradeDetails: upgradeDetails, } } @@ -206,7 +209,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout) - dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver) + detailsObserver := newDetailsProgressObserver(e.upgradeDetails) + dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver, detailsObserver) dp.Report(ctx) _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index 4c341a2aa5e..94e3ce856e2 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/docker/go-units" @@ -56,7 +57,8 @@ func TestDownload(t *testing.T) { config.OperatingSystem = testCase.system config.Architecture = testCase.arch - testClient := NewDownloaderWithClient(log, config, elasticClient) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) if err != nil { t.Fatal(err) @@ -105,7 +107,8 @@ func TestDownloadBodyError(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) if err == nil { @@ -161,7 +164,8 @@ func TestDownloadLogProgressWithLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") @@ -243,7 +247,8 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go index 4eef0682a50..ca024c53c88 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go @@ -5,10 +5,12 @@ package http import ( + "sync" "time" "github.com/docker/go-units" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -95,3 +97,36 @@ func (lpObs *loggingProgressObserver) ReportFailed(sourceURI string, timePast ti lpObs.log.Warnf(msg, args...) } } + +type detailsProgressObserver struct { + upgradeDetails *details.Details + mu sync.RWMutex +} + +func newDetailsProgressObserver(upgradeDetails *details.Details) *detailsProgressObserver { + upgradeDetails.SetState(details.StateDownloading) + return &detailsProgressObserver{ + upgradeDetails: upgradeDetails, + } +} + +func (dpObs *detailsProgressObserver) Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(percentComplete, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportCompleted(sourceURI string, timePast time.Duration, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(1, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64, err error) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.Fail(err) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go new file mode 100644 index 00000000000..bb1d7ac1c87 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/docker/go-units" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +func TestDetailsProgressObserver(t *testing.T) { + upgradeDetails := details.NewDetails("8.11.0", details.StateRequested, "") + detailsObs := newDetailsProgressObserver(upgradeDetails) + + detailsObs.Report("http://some/uri", 20*time.Second, 400*units.MiB, 500*units.MiB, 0.8, 4455) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 0.8, upgradeDetails.Metadata.DownloadPercent) + + detailsObs.ReportCompleted("http://some/uri", 30*time.Second, 3333) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 1.0, upgradeDetails.Metadata.DownloadPercent) + + err := errors.New("some download error") + detailsObs.ReportFailed("http://some/uri", 30*time.Second, 450*units.MiB, 500*units.MiB, 0.9, 1122, err) + require.Equal(t, details.StateFailed, upgradeDetails.State) + require.Equal(t, details.StateDownloading, upgradeDetails.Metadata.FailedState) + require.Equal(t, err.Error(), upgradeDetails.Metadata.ErrorMsg) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go index c8f3405f404..66c8bd715e0 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -46,7 +47,8 @@ func TestVerify(t *testing.T) { config.OperatingSystem = testCase.system config.Architecture = testCase.arch - testClient := NewDownloaderWithClient(log, config, elasticClient) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) artifact, err := testClient.Download(context.Background(), beatSpec, version) if err != nil { t.Fatal(err) diff --git a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go index 78cef03e578..023c15a5272 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go @@ -11,13 +11,14 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/fs" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/snapshot" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" ) // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { downloaders := make([]download.Downloader, 0, 3) downloaders = append(downloaders, fs.NewDownloader(config)) @@ -26,7 +27,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo // a snapshot version of fleet, for example. // try snapshot repo before official if release.Snapshot() { - snapDownloader, err := snapshot.NewDownloader(log, config, nil) + snapDownloader, err := snapshot.NewDownloader(log, config, nil, upgradeDetails) if err != nil { log.Error(err) } else { @@ -34,7 +35,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo } } - httpDownloader, err := http.NewDownloader(log, config) + httpDownloader, err := http.NewDownloader(log, config, upgradeDetails) if err != nil { return nil, err } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go index 51b16ee4372..ecf2497851c 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -32,13 +33,13 @@ type Downloader struct { // We need to pass the versionOverride separately from the config as // artifact.Config struct is part of agent configuration and a version // override makes no sense there -func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer, upgradeDetails *details.Details) (download.Downloader, error) { cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, fmt.Errorf("error creating snapshot config: %w", err) } - httpDownloader, err := http.NewDownloader(log, cfg) + httpDownloader, err := http.NewDownloader(log, cfg, upgradeDetails) if err != nil { return nil, fmt.Errorf("failed to create snapshot downloader: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/details/details.go b/internal/pkg/agent/application/upgrade/details/details.go new file mode 100644 index 00000000000..028990aafcd --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details.go @@ -0,0 +1,166 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + + "github.com/docker/go-units" +) + +// downloadRate is a float64 that can be safely marshalled to JSON +// when the value is Infinity. The rate is always in bytes/second units. +type downloadRate float64 + +// Observer is a function that will be called with upgrade details +type Observer func(details *Details) + +// Details consists of details regarding an ongoing upgrade. +type Details struct { + TargetVersion string `json:"target_version"` + State State `json:"state"` + ActionID string `json:"action_id,omitempty"` + Metadata Metadata `json:"metadata"` + + observers []Observer + mu sync.Mutex +} + +// Metadata consists of metadata relating to a specific upgrade state +type Metadata struct { + ScheduledAt time.Time `json:"scheduled_at,omitempty"` + DownloadPercent float64 `json:"download_percent,omitempty"` + DownloadRate downloadRate `json:"download_rate,omitempty"` + + // FailedState is the state an upgrade was in if/when it failed. Use the + // Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + FailedState State `json:"failed_state,omitempty"` + + // ErrorMsg is any error message encountered if/when an upgrade fails. Use + // the Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + ErrorMsg string `json:"error_msg,omitempty"` +} + +func NewDetails(targetVersion string, initialState State, actionID string) *Details { + return &Details{ + TargetVersion: targetVersion, + State: initialState, + ActionID: actionID, + Metadata: Metadata{}, + observers: []Observer{}, + } +} + +// SetState is a convenience method to set the state of the upgrade and +// notify all observers. +func (d *Details) SetState(s State) { + d.mu.Lock() + defer d.mu.Unlock() + + d.State = s + d.notifyObservers() +} + +// SetDownloadProgress is a convenience method to set the download percent +// when the upgrade is in UPG_DOWNLOADING state. +func (d *Details) SetDownloadProgress(percent, rateBytesPerSecond float64) { + d.mu.Lock() + defer d.mu.Unlock() + + d.Metadata.DownloadPercent = percent + d.Metadata.DownloadRate = downloadRate(rateBytesPerSecond) + d.notifyObservers() +} + +// Fail is a convenience method to set the state of the upgrade +// to StateFailed, set metadata associated with the failure, and +// notify all observers. +func (d *Details) Fail(err error) { + d.mu.Lock() + defer d.mu.Unlock() + + // Record the state the upgrade process was in right before it + // failed, but only do this if we haven't already transitioned the + // state to the StateFailed state; otherwise we'll just end up recording + // the state we failed from as StateFailed which is not useful. + if d.State != StateFailed { + d.Metadata.FailedState = d.State + } + + d.Metadata.ErrorMsg = err.Error() + d.State = StateFailed + d.notifyObservers() +} + +// RegisterObserver allows an interested consumer of Details to register +// themselves as an Observer. The registered observer is immediately notified +// of the current upgrade details. +func (d *Details) RegisterObserver(observer Observer) { + d.mu.Lock() + defer d.mu.Unlock() + + d.observers = append(d.observers, observer) + d.notifyObserver(observer) +} + +func (d *Details) notifyObservers() { + for _, observer := range d.observers { + d.notifyObserver(observer) + } +} + +func (d *Details) notifyObserver(observer Observer) { + if d.State == StateCompleted { + observer(nil) + } else { + dCopy := Details{ + TargetVersion: d.TargetVersion, + State: d.State, + ActionID: d.ActionID, + Metadata: d.Metadata, + } + observer(&dCopy) + } +} + +func (dr *downloadRate) MarshalJSON() ([]byte, error) { + downloadRateBytesPerSecond := float64(*dr) + if math.IsInf(downloadRateBytesPerSecond, 0) { + return json.Marshal("+Inf bps") + } + + return json.Marshal( + fmt.Sprintf("%sps", units.HumanSizeWithPrecision(downloadRateBytesPerSecond, 2)), + ) +} + +func (dr *downloadRate) UnmarshalJSON(data []byte) error { + var downloadRateStr string + err := json.Unmarshal(data, &downloadRateStr) + if err != nil { + return err + } + + if downloadRateStr == "+Inf bps" { + *dr = downloadRate(math.Inf(1)) + return nil + } + + downloadRateStr = strings.TrimSuffix(downloadRateStr, "ps") + downloadRateBytesPerSecond, err := units.FromHumanSize(downloadRateStr) + if err != nil { + return err + } + + *dr = downloadRate(downloadRateBytesPerSecond) + return nil +} diff --git a/internal/pkg/agent/application/upgrade/details/details_test.go b/internal/pkg/agent/application/upgrade/details/details_test.go new file mode 100644 index 00000000000..88d239dfabf --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details_test.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "errors" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDetailsNew(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + require.Equal(t, "99.999.9999", det.TargetVersion) + require.Equal(t, "test_action_id", det.ActionID) + require.Equal(t, Metadata{}, det.Metadata) +} + +func TestDetailsSetState(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) +} + +func TestDetailsFail(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + err := errors.New("test error") + det.Fail(err) + require.Equal(t, StateFailed, det.State) + require.Equal(t, StateRequested, det.Metadata.FailedState) + require.Equal(t, err.Error(), det.Metadata.ErrorMsg) +} + +func TestDetailsObserver(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + var observedDetails *Details + obs := func(updatedDetails *Details) { observedDetails = updatedDetails } + + det.RegisterObserver(obs) + require.Len(t, det.observers, 1) + require.NotNil(t, observedDetails) + require.Equal(t, StateRequested, observedDetails.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) + require.Equal(t, StateDownloading, observedDetails.State) + + det.SetState(StateCompleted) + require.Equal(t, StateCompleted, det.State) + require.Nil(t, nil, observedDetails) +} + +func TestDetailsDownloadRateJSON(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + + // Normal (non-infinity) download rate + t.Run("non_infinity", func(t *testing.T) { + det.SetDownloadProgress(.8, 1794.7) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, float64(1800), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, .8, unmarshalledDetails.Metadata.DownloadPercent) + }) + + // Infinity download rate + t.Run("infinity", func(t *testing.T) { + det.SetDownloadProgress(0.99, math.Inf(1)) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, math.Inf(1), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, 0.99, unmarshalledDetails.Metadata.DownloadPercent) + }) + +} diff --git a/internal/pkg/agent/application/upgrade/details/state.go b/internal/pkg/agent/application/upgrade/details/state.go new file mode 100644 index 00000000000..19aaaae8a25 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/state.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +type State string + +// The values of these State* constants should match those enumerated for +// upgrade_details.state in https://github.com/elastic/fleet-server/blob/main/model/openapi.yml +const ( + StateRequested State = "UPG_REQUESTED" + StateScheduled State = "UPG_SCHEDULED" + StateDownloading State = "UPG_DOWNLOADING" + StateExtracting State = "UPG_EXTRACTING" + StateReplacing State = "UPG_REPLACING" + StateRestarting State = "UPG_RESTARTING" + StateWatching State = "UPG_WATCHING" + StateRollback State = "UPG_ROLLBACK" + StateCompleted State = "UPG_COMPLETED" + StateFailed State = "UPG_FAILED" +) diff --git a/internal/pkg/agent/application/upgrade/step_download.go b/internal/pkg/agent/application/upgrade/step_download.go index cee0c1d75dc..d86a43a5a3b 100644 --- a/internal/pkg/agent/application/upgrade/step_download.go +++ b/internal/pkg/agent/application/upgrade/step_download.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/cenkalti/backoff/v4" "go.elastic.co/apm" @@ -35,7 +37,7 @@ const ( fleetUpgradeFallbackPGPFormat = "/api/agents/upgrades/%d.%d.%d/pgp-public-key" ) -func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { +func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { span, ctx := apm.StartSpan(ctx, "downloadArtifact", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() @@ -69,7 +71,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri return "", errors.New(err, fmt.Sprintf("failed to create download directory at %s", paths.Downloads())) } - path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings) + path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings, upgradeDetails) if err != nil { return "", errors.New(err, "failed download of agent binary") } @@ -121,20 +123,20 @@ func (u *Upgrader) appendFallbackPGP(targetVersion string, pgpBytes []string) [] return pgpBytes } -func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { +func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { if !version.IsSnapshot() { - return localremote.NewDownloader(log, settings) + return localremote.NewDownloader(log, settings, upgradeDetails) } // TODO since we know if it's a snapshot or not, shouldn't we add EITHER the snapshot downloader OR the release one ? // try snapshot repo before official - snapDownloader, err := snapshot.NewDownloader(log, settings, version) + snapDownloader, err := snapshot.NewDownloader(log, settings, version, upgradeDetails) if err != nil { return nil, err } - httpDownloader, err := http.NewDownloader(log, settings) + httpDownloader, err := http.NewDownloader(log, settings, upgradeDetails) if err != nil { return nil, err } @@ -169,9 +171,10 @@ func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings func (u *Upgrader) downloadWithRetries( ctx context.Context, - downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config) (download.Downloader, error), + downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config, *details.Details) (download.Downloader, error), version *agtversion.ParsedSemVer, settings *artifact.Config, + upgradeDetails *details.Details, ) (string, error) { cancelCtx, cancel := context.WithTimeout(ctx, settings.Timeout) defer cancel() @@ -187,7 +190,7 @@ func (u *Upgrader) downloadWithRetries( attempt++ u.log.Infof("download attempt %d", attempt) - downloader, err := downloaderCtor(version, u.log, settings) + downloader, err := downloaderCtor(version, u.log, settings, upgradeDetails) if err != nil { return fmt.Errorf("unable to create fetcher: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/step_download_test.go b/internal/pkg/agent/application/upgrade/step_download_test.go index 330a60f5288..dcdc4da7de8 100644 --- a/internal/pkg/agent/application/upgrade/step_download_test.go +++ b/internal/pkg/agent/application/upgrade/step_download_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -84,14 +85,15 @@ func TestDownloadWithRetries(t *testing.T) { // Successful immediately (no retries) t.Run("successful_immediately", func(t *testing.T) { - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{expectedDownloadPath, nil}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -103,7 +105,7 @@ func TestDownloadWithRetries(t *testing.T) { // Downloader constructor failing on first attempt, but succeeding on second attempt (= first retry) t.Run("constructor_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -125,7 +127,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -139,7 +142,7 @@ func TestDownloadWithRetries(t *testing.T) { // Download failing on first attempt, but succeeding on second attempt (= first retry) t.Run("download_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -161,7 +164,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -178,14 +182,15 @@ func TestDownloadWithRetries(t *testing.T) { testCaseSettings.Timeout = 200 * time.Millisecond testCaseSettings.RetrySleepInitDuration = 100 * time.Millisecond - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{"", errors.New("download failed")}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings, upgradeDetails) require.Equal(t, "context deadline exceeded", err.Error()) require.Equal(t, "", path) diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index eaf51ef7684..36276f239b6 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/install" "github.com/elastic/elastic-agent/internal/pkg/config" @@ -28,7 +29,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/release" - "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -127,7 +127,7 @@ func (u *Upgrader) Upgradeable() bool { } // Upgrade upgrades running agent, function returns shutdown callback that must be called by reexec. -func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI) span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal") defer span.End() @@ -137,17 +137,22 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads()) } + det.SetState(details.StateDownloading) + sourceURI = u.sourceURI(sourceURI) - archivePath, err := u.downloadArtifact(ctx, version, sourceURI, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + archivePath, err := u.downloadArtifact(ctx, version, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { // Run the same pre-upgrade cleanup task to get rid of any newly downloaded files // This may have an issue if users are upgrading to the same version number. if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil { u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr) } + return nil, err } + det.SetState(details.StateExtracting) + newHash, err := u.unpack(version, archivePath) if err != nil { return nil, err @@ -170,6 +175,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, errors.New(err, "failed to copy run directory") } + det.SetState(details.StateReplacing) + if err := ChangeSymlink(ctx, u.log, newHash); err != nil { u.log.Errorw("Rolling back: changing symlink failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) @@ -182,6 +189,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, err } + det.SetState(details.StateWatching) + if err := InvokeWatcher(u.log); err != nil { u.log.Errorw("Rolling back: starting watcher failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) From 3c9380f66b4ace6e201dabe890cdf373aa15e69e Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 23 Oct 2023 09:21:48 +0200 Subject: [PATCH 2/5] Update go grpc version to 1.58.3 (#3635) --- NOTICE.txt | 38 +++++++++++++++++++------------------- go.mod | 18 +++++++++--------- go.sum | 34 ++++++++++++++++++---------------- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 3290e34d2e5..2eb7dcd2da0 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6173,11 +6173,11 @@ THE SOFTWARE. -------------------------------------------------------------------------------- Dependency : golang.org/x/crypto -Version: v0.7.0 +Version: v0.11.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.7.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.11.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -6321,11 +6321,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/sys -Version: v0.9.0 +Version: v0.10.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.9.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.10.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -6358,11 +6358,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/text -Version: v0.9.0 +Version: v0.11.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/text@v0.9.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/text@v0.11.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -6469,11 +6469,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : google.golang.org/grpc -Version: v1.53.0 +Version: v1.58.3 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.53.0/LICENSE: +Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.58.3/LICENSE: Apache License @@ -6681,11 +6681,11 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.53.0/LIC -------------------------------------------------------------------------------- Dependency : google.golang.org/protobuf -Version: v1.29.1 +Version: v1.31.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.29.1/LICENSE: +Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.31.0/LICENSE: Copyright (c) 2018 The Go Authors. All rights reserved. @@ -17494,11 +17494,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/net -Version: v0.9.0 +Version: v0.12.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/net@v0.9.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/net@v0.12.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -17531,11 +17531,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/oauth2 -Version: v0.4.0 +Version: v0.10.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/oauth2@v0.4.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/oauth2@v0.10.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -17568,11 +17568,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/term -Version: v0.7.0 +Version: v0.10.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/term@v0.7.0/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/term@v0.10.0/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. @@ -17816,12 +17816,12 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/appengine@v1.6.7 -------------------------------------------------------------------------------- -Dependency : google.golang.org/genproto -Version: v0.0.0-20230110181048-76db0878b65f +Dependency : google.golang.org/genproto/googleapis/rpc +Version: v0.0.0-20230711160842-782d3b101e98 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/google.golang.org/genproto@v0.0.0-20230110181048-76db0878b65f/LICENSE: +Contents of probable licence file $GOMODCACHE/google.golang.org/genproto/googleapis/rpc@v0.0.0-20230711160842-782d3b101e98/LICENSE: Apache License diff --git a/go.mod b/go.mod index 48c55e7e5a7..4cab4536513 100644 --- a/go.mod +++ b/go.mod @@ -53,16 +53,16 @@ require ( go.elastic.co/ecszap v1.0.1 go.elastic.co/go-licence-detector v0.5.0 go.uber.org/zap v1.25.0 - golang.org/x/crypto v0.7.0 + golang.org/x/crypto v0.11.0 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 golang.org/x/sync v0.3.0 - golang.org/x/sys v0.9.0 - golang.org/x/text v0.9.0 + golang.org/x/sys v0.10.0 + golang.org/x/text v0.11.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.7.0 - google.golang.org/grpc v1.53.0 - google.golang.org/protobuf v1.29.1 + google.golang.org/grpc v1.58.3 + google.golang.org/protobuf v1.31.0 gopkg.in/ini.v1 v1.67.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -146,11 +146,11 @@ require ( go.elastic.co/fastjson v1.1.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.9.0 // indirect - golang.org/x/net v0.9.0 // indirect - golang.org/x/oauth2 v0.4.0 // indirect - golang.org/x/term v0.7.0 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/oauth2 v0.10.0 // indirect + golang.org/x/term v0.10.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767 // indirect gopkg.in/inf.v0 v0.9.1 // indirect howett.net/plist v1.0.0 // indirect diff --git a/go.sum b/go.sum index d176fb612e9..c44d94e86b4 100644 --- a/go.sum +++ b/go.sum @@ -1905,8 +1905,9 @@ golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4 golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -2039,8 +2040,8 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2067,8 +2068,9 @@ golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.0.0-20221006150949-b44042a4b9c1/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= -golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M= golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec= +golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8= +golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2237,8 +2239,8 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2253,8 +2255,8 @@ golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= -golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2270,8 +2272,8 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2558,8 +2560,8 @@ google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZV google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= -google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= -google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -2605,8 +2607,8 @@ google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= -google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= -google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767 h1:r16FSFCMhn7+LU8CzbtAIKppYeU6NUPJVdvXeIqVIq8= google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767/go.mod h1:wKDg0brwMZpaizQ1i7IzYcJjH1TmbJudYdnQC9+J+LE= @@ -2625,8 +2627,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.29.1 h1:7QBf+IK2gx70Ap/hDsOmam3GE0v9HicjfEdAxE62UoM= -google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 3d7b3c14584eb517b5dafb155dfcc10dd6ade6d9 Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Mon, 23 Oct 2023 10:55:45 +0300 Subject: [PATCH 3/5] Fixing script to read ascii code (#3556) * Fixing script to read ascii code * Changing to read from version.go * Adding workspace to makefile * Adding shell on top of makefile --- deploy/kubernetes/Makefile | 8 +++++--- deploy/kubernetes/elastic-agent-managed-kubernetes.yaml | 2 +- .../kubernetes/elastic-agent-standalone-kubernetes.yaml | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/deploy/kubernetes/Makefile b/deploy/kubernetes/Makefile index e4d8b474908..6247c9461e1 100644 --- a/deploy/kubernetes/Makefile +++ b/deploy/kubernetes/Makefile @@ -1,7 +1,9 @@ -ALL=elastic-agent-standalone elastic-agent-managed +SHELL := /bin/bash -BEAT_VERSION=$(shell grep '^:stack-version:' ../../version/docs/version.asciidoc | cut -c 17- ) -BRANCH_VERSION=$(shell grep '^:doc-branch:' ../../version/docs/version.asciidoc | cut -c 14- ) +ALL=elastic-agent-standalone elastic-agent-managed +WORKSPACE=$(shell git rev-parse --show-toplevel) +BEAT_VERSION=$(shell grep -oE '[0-9]+\.[0-9]+\.[0-9]+(\-[a-zA-Z]+[0-9]+)?' "${WORKSPACE}/version/version.go") +BRANCH_VERSION=$(shell cut -d. -f1-2 <<< '${BEAT_VERSION}') #variables needed for ci-create-kubernetes-templates-pull-request ELASTIC_AGENT_REPO=kibana diff --git a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml index 4e9399ec6c6..44df212a629 100644 --- a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml @@ -30,7 +30,7 @@ spec: dnsPolicy: ClusterFirstWithHostNet containers: - name: elastic-agent - image: docker.elastic.co/beats/elastic-agent:8.8.1 + image: docker.elastic.co/beats/elastic-agent:8.12.0 env: # Set to 1 for enrollment into Fleet server. If not set, Elastic Agent is run in standalone mode - name: FLEET_ENROLL diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml index 244003075e0..125936a37b3 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml @@ -683,13 +683,13 @@ spec: # - -c # - >- # mkdir -p /etc/elastic-agent/inputs.d && - # wget -O - https://github.com/elastic/elastic-agent/archive/main.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-main/deploy/kubernetes/elastic-agent-standalone/templates.d" + # wget -O - https://github.com/elastic/elastic-agent/archive/8.12.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-8.12/deploy/kubernetes/elastic-agent-standalone/templates.d" # volumeMounts: # - name: external-inputs # mountPath: /etc/elastic-agent/inputs.d containers: - name: elastic-agent-standalone - image: docker.elastic.co/beats/elastic-agent:8.8.1 + image: docker.elastic.co/beats/elastic-agent:8.12.0 args: ["-c", "/etc/elastic-agent/agent.yml", "-e"] env: # The basic authentication username used to connect to Elasticsearch From 97d9c80fadf3bac7e18f94e86225b01a81121223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paolo=20Chil=C3=A0?= Date: Mon, 23 Oct 2023 14:10:53 +0200 Subject: [PATCH 4/5] Fix double stop components (#3482) * Skip stopping already stopped components --- ...92-Prevent-multiple-stops-of-services.yaml | 32 +++++++++++++++++++ pkg/component/runtime/manager.go | 3 ++ pkg/component/runtime/runtime.go | 4 +++ pkg/component/runtime/service.go | 10 ++++++ 4 files changed, 49 insertions(+) create mode 100644 changelog/fragments/1695920792-Prevent-multiple-stops-of-services.yaml diff --git a/changelog/fragments/1695920792-Prevent-multiple-stops-of-services.yaml b/changelog/fragments/1695920792-Prevent-multiple-stops-of-services.yaml new file mode 100644 index 00000000000..e15f5d6e927 --- /dev/null +++ b/changelog/fragments/1695920792-Prevent-multiple-stops-of-services.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Prevent multiple attempts to stop an already stopped service + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: runtime + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 42824794aea..8462ac3c17e 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -705,6 +705,7 @@ func (m *Manager) update(model component.Model, teardown bool) error { var stoppedWg sync.WaitGroup stoppedWg.Add(len(stop)) for _, existing := range stop { + m.logger.Debugf("Stopping component %q", existing.id) _ = existing.stop(teardown, model.Signed) // stop is async, wait for operation to finish, // otherwise new instance may be started and components @@ -755,6 +756,7 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) { for { latestState := comp.getLatest() if latestState.State == client.UnitStateStopped { + m.logger.Debugf("component %q stopped.", compID) return } @@ -767,6 +769,7 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) { select { case <-timeoutCh: + m.logger.Errorf("timeout exceeded waiting for component %q to stop", compID) return case <-time.After(stopCheckRetryPeriod): } diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index a04eec804bb..feeee2c6d7e 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -187,6 +187,10 @@ func (s *componentRuntimeState) start() error { } func (s *componentRuntimeState) stop(teardown bool, signed *component.Signed) error { + if s.shuttingDown.Load() { + // already stopping + return nil + } s.shuttingDown.Store(true) if teardown { return s.runtime.Teardown(signed) diff --git a/pkg/component/runtime/service.go b/pkg/component/runtime/service.go index 2ea47e2105c..5f007924d7f 100644 --- a/pkg/component/runtime/service.go +++ b/pkg/component/runtime/service.go @@ -38,6 +38,7 @@ var ( type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error // serviceRuntime provides the command runtime for running a component as a service. +// an instance of serviceRuntime is not reused: after being stopped, it cannot be started again. type serviceRuntime struct { comp component.Component log *logger.Logger @@ -124,6 +125,8 @@ func (s *serviceRuntime) Run(ctx context.Context, comm Communicator) (err error) lastCheckin time.Time missedCheckins int tearingDown bool + // flag that signals if we are already stopping + stopping bool ignoreCheckins bool ) @@ -136,6 +139,13 @@ func (s *serviceRuntime) Run(ctx context.Context, comm Communicator) (err error) defer cisStop() onStop := func(am actionMode) { + if stopping { + s.log.Debugf("service %s is already stopping: skipping...", s.name()) + return + } + // the flag is set once and never reset since the serviceRuntime object + // is not supposed to be reused once it's stopping + stopping = true // Stop check-in timer s.log.Debugf("stop check-in timer for %s service", s.name()) checkinTimer.Stop() From 9ae9a4c7e39afa2ab85aac12356dd440836a41cb Mon Sep 17 00:00:00 2001 From: Davide Girardi <1390902+girodav@users.noreply.github.com> Date: Mon, 23 Oct 2023 13:53:37 +0100 Subject: [PATCH 5/5] Add assetbeat as external component (#3503) --- .../1696319263-add-assetbeat-dependency.yaml | 32 +++++++++++++++++++ dev-tools/mage/manifest/manifest.go | 1 + magefile.go | 2 ++ 3 files changed, 35 insertions(+) create mode 100644 changelog/fragments/1696319263-add-assetbeat-dependency.yaml diff --git a/changelog/fragments/1696319263-add-assetbeat-dependency.yaml b/changelog/fragments/1696319263-add-assetbeat-dependency.yaml new file mode 100644 index 00000000000..ab428367953 --- /dev/null +++ b/changelog/fragments/1696319263-add-assetbeat-dependency.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add assetbeat among the external dependencies needed to package Elastic Agent + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/obs-infraobs-team/issues/1114 diff --git a/dev-tools/mage/manifest/manifest.go b/dev-tools/mage/manifest/manifest.go index 62f82eb2c87..a2c377d600c 100644 --- a/dev-tools/mage/manifest/manifest.go +++ b/dev-tools/mage/manifest/manifest.go @@ -87,6 +87,7 @@ func DownloadComponentsFromManifest(manifest string, platforms []string, platfor "beats": {"auditbeat", "filebeat", "heartbeat", "metricbeat", "osquerybeat", "packetbeat"}, "cloud-defend": {"cloud-defend"}, "cloudbeat": {"cloudbeat"}, + "assetbeat": {"assetbeat"}, "elastic-agent-shipper": {"elastic-agent-shipper"}, "endpoint-dev": {"endpoint-security"}, "fleet-server": {"fleet-server"}, diff --git a/magefile.go b/magefile.go index bf7b335cbe8..75b0f3ef93a 100644 --- a/magefile.go +++ b/magefile.go @@ -928,6 +928,7 @@ func packageAgent(platforms []string, packagingFn func()) { // https://artifacts-snapshot.elastic.co/endpoint-dev/latest/8.11.0-SNAPSHOT.json // https://artifacts-snapshot.elastic.co/fleet-server/latest/8.11.0-SNAPSHOT.json // https://artifacts-snapshot.elastic.co/prodfiler/latest/8.11.0-SNAPSHOT.json + // https://artifacts-snapshot.elastic.co/assetbeat/latest/8.11.0-SNAPSHOT.json externalBinaries := map[string]string{ "auditbeat": "beats", "filebeat": "beats", @@ -943,6 +944,7 @@ func packageAgent(platforms []string, packagingFn func()) { "pf-elastic-collector": "prodfiler", "pf-elastic-symbolizer": "prodfiler", "pf-host-agent": "prodfiler", + "assetbeat": "assetbeat", // only supporting linux/amd64 or linux/arm64 } // Only log fatal logs for logs produced using logrus. This is the global logger