Skip to content

Commit

Permalink
Track upgrade details (#3527)
Browse files Browse the repository at this point in the history
* Remove context and handle cancellation internally instead

* More optimizations

* Add back context

* Adding FSM for upgrades

* Implementing TODO

* WIP

* WIP

* Reorganizing imports

* Running go mod tidy

* Resolve deadlock

* Add unit tests

* Fix type

* Renaming variable to avoid conflict with package name

* Handle failures in one place

* Set UPG_RESTARTING state

* Remove Fleet changes

* Add guard for action

* Immediately notify observer when registered

* Add UpgradeCompleted effect to observer doc

* Fix initialization

* Adding details progress observer and unit tests

* Fixing booboos introduced during conflict resolution

* Add unit test

* Add assertion on error

* Add comment on stateNeedsRefresh

* Add comment linking to Fleet Server OpenAPI spec for UPG_* values

* Use public accessor for setting upgrade details on coordinator to prevent data race

* Use buffered channel for upgradeDetailsChan in test so test can run in single goroutine

* Fixing unit test

* Add mutex to prevent data race

* Clarify assertion's intent

* Make copy of details before notifying observer with it.

* Add setter for setting download percent

* Remove unnecessary struct tags

* Change mutex type

* Document FailedState and ErrorMsg fields

* Track download rate as well

* Change data type of time field

* Rename struct to avoid stutter in naming

* Log upgrade details when they change

* Add nil guard

* Setting logger in test

* Use sentinel value for encoding +Inf download rate in JSON

* Fix up comment

* Set omitempty on failed_state and error_msg

* Add units to download rate
  • Loading branch information
ycombinator authored Oct 19, 2023
1 parent e43be2a commit 92acf08
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
Expand All @@ -35,7 +36,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
return nil
}

func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
select {
case <-time.After(2 * time.Second):
u.msgChan <- "completed " + version
Expand Down
42 changes: 36 additions & 6 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
Expand Down Expand Up @@ -59,7 +61,7 @@ type UpgradeManager interface {
Reload(rawConfig *config.Config) error

// Upgrade upgrades running agent.
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)

// Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action
Ack(ctx context.Context, acker acker.Acker) error
Expand Down Expand Up @@ -192,8 +194,12 @@ type Coordinator struct {
// state should never be directly read or written outside the Coordinator
// goroutine. Callers who need to access or modify the state should use the
// public accessors like State(), SetLogLevel(), etc.
state State
stateBroadcaster *broadcaster.Broadcaster[State]
state State
stateBroadcaster *broadcaster.Broadcaster[State]

// If you get a race detector error while accessing this field, it probably
// means you're calling private Coordinator methods from outside the
// Coordinator goroutine.
stateNeedsRefresh bool

// overrideState is used during the update process to report the overall
Expand All @@ -204,6 +210,10 @@ type Coordinator struct {
// SetOverrideState helper to the Coordinator goroutine.
overrideStateChan chan *coordinatorOverrideState

// upgradeDetailsChan forwards upgrade details from the publicly accessible
// SetUpgradeDetails helper to the Coordinator goroutine.
upgradeDetailsChan chan *details.Details

// loglevelCh forwards log level changes from the public API (SetLogLevel)
// to the run loop in Coordinator's main goroutine.
logLevelCh chan logp.Level
Expand Down Expand Up @@ -326,8 +336,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
// synchronization in the subscriber API, just set the input buffer to 0.
stateBroadcaster: broadcaster.New(state, 64, 32),

logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
}
// Setup communication channels for any non-nil components. This pattern
// lets us transparently accept nil managers / simulated events during
Expand Down Expand Up @@ -445,17 +456,33 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str

// override the overall state to upgrading until the re-execution is complete
c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version))
cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, skipVerifyOverride, skipDefaultPgp, pgpBytes...)

// initialize upgrade details
actionID := ""
if action != nil {
actionID = action.ActionID
}
det := details.NewDetails(version, details.StateRequested, actionID)
det.RegisterObserver(c.SetUpgradeDetails)
det.RegisterObserver(c.logUpgradeDetails)

cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
if err != nil {
c.ClearOverrideState()
det.Fail(err)
return err
}
if cb != nil {
det.SetState(details.StateRestarting)
c.ReExec(cb)
}
return nil
}

func (c *Coordinator) logUpgradeDetails(details *details.Details) {
c.logger.Infow("updated upgrade details", "upgrade_details", details)
}

// AckUpgrade is the method used on startup to ack a previously successful upgrade action.
// Called from external goroutines.
func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
Expand Down Expand Up @@ -878,6 +905,9 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
case overrideState := <-c.overrideStateChan:
c.setOverrideState(overrideState)

case upgradeDetails := <-c.upgradeDetailsChan:
c.setUpgradeDetails(upgradeDetails)

case componentState := <-c.managerChans.runtimeManagerUpdate:
// New component change reported by the runtime manager via
// Coordinator.watchRuntimeComponents(), merge it with the
Expand Down
21 changes: 19 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package coordinator
import (
"fmt"

agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
)

// State provides the current state of the coordinator along with all the current states of components and units.
Expand All @@ -30,6 +32,8 @@ type State struct {

Components []runtime.ComponentComponentState `yaml:"components"`
LogLevel logp.Level `yaml:"log_level"`

UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"`
}

type coordinatorOverrideState struct {
Expand All @@ -54,6 +58,11 @@ func (c *Coordinator) ClearOverrideState() {
c.overrideStateChan <- nil
}

// SetUpgradeDetails sets upgrade details. This is used during upgrades.
func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) {
c.upgradeDetailsChan <- upgradeDetails
}

// setRuntimeManagerError updates the error state for the runtime manager.
// Called on the main Coordinator goroutine.
func (c *Coordinator) setRuntimeManagerError(err error) {
Expand Down Expand Up @@ -114,6 +123,13 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState)
c.stateNeedsRefresh = true
}

// setUpgradeDetails is the internal helper to set upgrade details and set stateNeedsRefresh.
// Must be called on the main Coordinator goroutine.
func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) {
c.state.UpgradeDetails = upgradeDetails
c.stateNeedsRefresh = true
}

// Forward the current state to the broadcaster and clear the stateNeedsRefresh
// flag. Must be called on the main Coordinator goroutine.
func (c *Coordinator) refreshState() {
Expand Down Expand Up @@ -163,6 +179,7 @@ func (c *Coordinator) generateReportableState() (s State) {
s.FleetState = c.state.FleetState
s.FleetMessage = c.state.FleetMessage
s.LogLevel = c.state.LogLevel
s.UpgradeDetails = c.state.UpgradeDetails
s.Components = make([]runtime.ComponentComponentState, len(c.state.Components))
copy(s.Components, c.state.Components)

Expand Down
61 changes: 58 additions & 3 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
Expand Down Expand Up @@ -471,8 +473,50 @@ func TestCoordinator_Upgrade(t *testing.T) {
require.NoError(t, err)
}

func TestCoordinator_UpgradeDetails(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

expectedErr := errors.New("some upgrade error")
upgradeManager := &fakeUpgradeManager{
upgradeable: true,
upgradeErr: expectedErr,
}
coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithUpgradeManager(upgradeManager))
require.Nil(t, coord.state.UpgradeDetails)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
// allowed error
err = nil
}
coordCh <- err
}()

// no vars used by the config
varsMgr.Vars(ctx, []*transpiler.Vars{{}})

// no need for anything to really run
cfg, err := config.NewConfigFrom(nil)
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false)
require.ErrorIs(t, expectedErr, err)
cancel()

err = <-coordCh
require.NoError(t, err)

require.Equal(t, details.StateFailed, coord.state.UpgradeDetails.State)
require.Equal(t, details.StateRequested, coord.state.UpgradeDetails.Metadata.FailedState)
require.Equal(t, expectedErr.Error(), coord.state.UpgradeDetails.Metadata.ErrorMsg)
}

type createCoordinatorOpts struct {
managed bool
managed bool
upgradeManager UpgradeManager
}

type CoordinatorOpt func(o *createCoordinatorOpts)
Expand All @@ -483,6 +527,12 @@ func ManagedCoordinator(managed bool) CoordinatorOpt {
}
}

func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt {
return func(o *createCoordinatorOpts) {
o.upgradeManager = upgradeManager
}
}

// createCoordinator creates a coordinator that using a fake config manager and a fake vars manager.
//
// The runtime specifications is set up to use both the fake component and fake shipper.
Expand Down Expand Up @@ -527,7 +577,12 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
cfgMgr := newFakeConfigManager()
varsMgr := newFakeVarsManager()

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, &fakeUpgradeManager{}, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed)
upgradeManager := o.upgradeManager
if upgradeManager == nil {
upgradeManager = &fakeUpgradeManager{}
}

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed)
return coord, cfgMgr, varsMgr
}

Expand Down Expand Up @@ -574,7 +629,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error {
return nil
}

func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
f.upgradeCalled = true
if f.upgradeErr != nil {
return nil, f.upgradeErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/component"
Expand Down Expand Up @@ -811,6 +812,9 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
// since a successful upgrade sets the override state twice.
overrideStateChan := make(chan *coordinatorOverrideState, 2)

// similarly, upgradeDetailsChan is a buffered channel as well.
upgradeDetailsChan := make(chan *details.Details, 2)

// Create a manager that will allow upgrade attempts but return a failure
// from Upgrade itself (success requires testing ReExec and we aren't
// quite ready to do that yet).
Expand All @@ -820,9 +824,11 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
}

coord := &Coordinator{
stateBroadcaster: broadcaster.New(State{}, 0, 0),
overrideStateChan: overrideStateChan,
upgradeMgr: upgradeMgr,
stateBroadcaster: broadcaster.New(State{}, 0, 0),
overrideStateChan: overrideStateChan,
upgradeDetailsChan: upgradeDetailsChan,
upgradeMgr: upgradeMgr,
logger: logp.NewLogger("testing"),
}

// Call upgrade and make sure the upgrade manager receives an Upgrade call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -43,13 +44,14 @@ const (

// Downloader is a downloader able to fetch artifacts from elastic.co web page.
type Downloader struct {
log *logger.Logger
config *artifact.Config
client http.Client
log *logger.Logger
config *artifact.Config
client http.Client
upgradeDetails *details.Details
}

// NewDownloader creates and configures Elastic Downloader
func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) {
func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (*Downloader, error) {
client, err := config.HTTPTransportSettings.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second},
Expand All @@ -59,15 +61,16 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, er
}

client.Transport = download.WithHeaders(client.Transport, download.Headers)
return NewDownloaderWithClient(log, config, *client), nil
return NewDownloaderWithClient(log, config, *client, upgradeDetails), nil
}

// NewDownloaderWithClient creates Elastic Downloader with specific client used
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader {
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader {
return &Downloader{
log: log,
config: config,
client: client,
log: log,
config: config,
client: client,
upgradeDetails: upgradeDetails,
}
}

Expand Down Expand Up @@ -206,7 +209,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
}

loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
detailsObserver := newDetailsProgressObserver(e.upgradeDetails)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver, detailsObserver)
dp.Report(ctx)
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
if err != nil {
Expand Down
Loading

0 comments on commit 92acf08

Please sign in to comment.