Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework runtime manager updates to block the coordinator less #3747

Merged
merged 10 commits into from
Nov 16, 2023
32 changes: 16 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ type RuntimeManager interface {
Runner

// Update updates the current components model.
Update(model component.Model) error

// State returns the current components model state.
State() []runtime.ComponentComponentState
Update(model component.Model)

// PerformAction executes an action on a unit.
PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)
Expand Down Expand Up @@ -241,17 +238,19 @@ type Coordinator struct {
// into the reported state before broadcasting -- State() will report
// agentclient.Failed if one of these is set, even if the underlying
// coordinator state is agentclient.Healthy.
runtimeMgrErr error // Currently unused
configMgrErr error
actionsErr error
varsMgrErr error
// Errors from the runtime manager report policy update failures and are
// stored in runtimeUpdateErr below.
configMgrErr error
actionsErr error
varsMgrErr error

// Errors resulting from different possible failure modes when setting a
// new policy. Right now there are three different stages where a policy
// update can fail:
// - in generateAST, converting the policy to an AST
// - in process, converting the AST and vars into a full component model
// - while sending the final component model to the runtime manager
// - while applying the final component model in the runtime manager
// (reported asynchronously via the runtime manager error channel)
//
// The plan is to improve our preprocessing so we can always detect
// failures immediately https://github.com/elastic/elastic-agent/issues/2887.
Expand Down Expand Up @@ -920,7 +919,13 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
return

case runtimeErr := <-c.managerChans.runtimeManagerError:
c.setRuntimeManagerError(runtimeErr)
// runtime manager errors report the result of a policy update.
// Coordinator transitions from starting to healthy when a policy update
// is successful.
c.setRuntimeUpdateError(runtimeErr)
if runtimeErr == nil {
c.setCoordinatorState(agentclient.Healthy, "Running")
}

case configErr := <-c.managerChans.configManagerError:
if c.isManaged {
Expand Down Expand Up @@ -1153,12 +1158,7 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {

c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
err = c.runtimeMgr.Update(model)
c.setRuntimeUpdateError(err)
if err != nil {
return fmt.Errorf("updating runtime: %w", err)
}
c.setCoordinatorState(agentclient.Healthy, "Running")
c.runtimeMgr.Update(model)
return nil
}

Expand Down
17 changes: 3 additions & 14 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) {
c.upgradeDetailsChan <- upgradeDetails
}

// setRuntimeManagerError updates the error state for the runtime manager.
// setRuntimeUpdateError reports a failed policy update in the runtime manager.
// Called on the main Coordinator goroutine.
func (c *Coordinator) setRuntimeManagerError(err error) {
c.runtimeMgrErr = err
func (c *Coordinator) setRuntimeUpdateError(err error) {
c.runtimeUpdateErr = err
c.stateNeedsRefresh = true
}

Expand Down Expand Up @@ -107,14 +107,6 @@ func (c *Coordinator) setComponentGenError(err error) {
c.stateNeedsRefresh = true
}

// setRuntimeUpdateError updates the error state for sending a component model
// update to the runtime manager.
// Called on the main Coordinator goroutine.
func (c *Coordinator) setRuntimeUpdateError(err error) {
c.runtimeUpdateErr = err
c.stateNeedsRefresh = true
}

// setOverrideState is the internal helper to set the override state and
// set stateNeedsRefresh.
// Must be called on the main Coordinator goroutine.
Expand Down Expand Up @@ -201,9 +193,6 @@ func (c *Coordinator) generateReportableState() (s State) {
} else if c.runtimeUpdateErr != nil {
s.State = agentclient.Failed
s.Message = fmt.Sprintf("Runtime update failed: %s", c.runtimeUpdateErr.Error())
} else if c.runtimeMgrErr != nil {
s.State = agentclient.Failed
s.Message = fmt.Sprintf("Runtime manager: %s", c.runtimeMgrErr.Error())
} else if c.configMgrErr != nil {
s.State = agentclient.Failed
s.Message = fmt.Sprintf("Config manager: %s", c.configMgrErr.Error())
Expand Down
12 changes: 9 additions & 3 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,8 @@ func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
type fakeRuntimeManager struct {
state []runtime.ComponentComponentState
updateCallback func([]component.Component) error
result error
errChan chan error
}

func (r *fakeRuntimeManager) Run(ctx context.Context) error {
Expand All @@ -796,11 +798,15 @@ func (r *fakeRuntimeManager) Run(ctx context.Context) error {

func (r *fakeRuntimeManager) Errors() <-chan error { return nil }

func (r *fakeRuntimeManager) Update(model component.Model) error {
func (r *fakeRuntimeManager) Update(model component.Model) {
r.result = nil
if r.updateCallback != nil {
return r.updateCallback(model.Components)
r.result = r.updateCallback(model.Components)
}
if r.errChan != nil {
// If a reporting channel is set, send the result to it
r.errChan <- r.result
}
return nil
}

// State returns the current components model state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,13 +754,15 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)
updateErrChan := make(chan error, 1)

const errorStr = "update failed for testing reasons"
// Create a mocked runtime manager that always reports an error
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return fmt.Errorf(errorStr)
},
errChan: updateErrChan,
}

coord := &Coordinator{
Expand All @@ -769,6 +771,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
// Give coordinator the same error channel we set on the runtime
// manager, so it receives the update result.
runtimeManagerError: updateErrChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
Expand All @@ -781,16 +786,19 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
configChan <- configChange
coord.runLoopIteration(ctx)

// Make sure the failure was reported to the config manager
assert.True(t, configChange.failed, "Config change should report failure if the runtime manager returns an error")
require.Error(t, configChange.err, "Config change should get an error if runtime manager update fails")
assert.Contains(t, configChange.err.Error(), errorStr)
// Make sure the config change was acknowledged to the config manager
// (the failure is not reported here since it happens asynchronously; it
// will appear in the coordinator state afterwards.)
assert.True(t, configChange.acked, "Config change should be acknowledged to the config manager")
assert.NoError(t, configChange.err, "Config change with async error should succeed")

// Make sure the error is saved in Coordinator.runtimeUpdateErr
// Now do another run loop iteration to let the update error propagate,
// and make sure it is reported correctly.
coord.runLoopIteration(ctx)
require.Error(t, coord.runtimeUpdateErr, "Runtime update failure should be saved in runtimeUpdateErr")
assert.Equal(t, errorStr, coord.runtimeUpdateErr.Error(), "runtimeUpdateErr should match the error reported by the runtime manager")

// Make sure the error is reported in Coordinator state.
// Make sure the error appears in the Coordinator state.
state := coord.State()
assert.Equal(t, agentclient.Failed, state.State, "Failed policy update should cause failed Coordinator")
assert.Contains(t, state.Message, errorStr, "Failed policy update should be reported in Coordinator state message")
Expand Down
Loading
Loading