From 665674ab0b4bf9f2dca20027a30fd25bc3c19b9e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 10 Nov 2023 17:59:43 -0500 Subject: [PATCH 1/7] Rework runtime manager updates to block the coordinator less --- .../application/coordinator/coordinator.go | 32 ++-- .../coordinator/coordinator_state.go | 17 +- .../coordinator/coordinator_test.go | 12 +- .../coordinator/coordinator_unit_test.go | 20 ++- pkg/component/runtime/manager.go | 147 ++++++++---------- pkg/component/runtime/manager_test.go | 142 +++++++++++------ 6 files changed, 197 insertions(+), 173 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index bce2188c0e7..6acfa08c57a 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -96,10 +96,7 @@ type RuntimeManager interface { Runner // Update updates the current components model. - Update(model component.Model) error - - // State returns the current components model state. - State() []runtime.ComponentComponentState + Update(model component.Model) // PerformAction executes an action on a unit. PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) @@ -237,17 +234,19 @@ type Coordinator struct { // into the reported state before broadcasting -- State() will report // agentclient.Failed if one of these is set, even if the underlying // coordinator state is agentclient.Healthy. - runtimeMgrErr error // Currently unused - configMgrErr error - actionsErr error - varsMgrErr error + // Errors from the runtime manager report policy update failures and are + // stored in runtimeUpdateErr below. + configMgrErr error + actionsErr error + varsMgrErr error // Errors resulting from different possible failure modes when setting a // new policy. Right now there are three different stages where a policy // update can fail: // - in generateAST, converting the policy to an AST // - in process, converting the AST and vars into a full component model - // - while sending the final component model to the runtime manager + // - while applying the final component model in the runtime manager + // (reported asynchronously via the runtime manager error channel) // // The plan is to improve our preprocessing so we can always detect // failures immediately https://github.com/elastic/elastic-agent/issues/2887. @@ -899,7 +898,13 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { return case runtimeErr := <-c.managerChans.runtimeManagerError: - c.setRuntimeManagerError(runtimeErr) + // runtime manager errors report the result of a policy update. + // Coordinator transitions from starting to healthy when a policy update + // is successful. + c.setRuntimeUpdateError(runtimeErr) + if runtimeErr == nil { + c.setCoordinatorState(agentclient.Healthy, "Running") + } case configErr := <-c.managerChans.configManagerError: if c.isManaged { @@ -1127,12 +1132,7 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") - err = c.runtimeMgr.Update(model) - c.setRuntimeUpdateError(err) - if err != nil { - return fmt.Errorf("updating runtime: %w", err) - } - c.setCoordinatorState(agentclient.Healthy, "Running") + c.runtimeMgr.Update(model) return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 6e645c3a06b..22394ee52e0 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -63,10 +63,10 @@ func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) { c.upgradeDetailsChan <- upgradeDetails } -// setRuntimeManagerError updates the error state for the runtime manager. +// setRuntimeUpdateError reports a failed policy update in the runtime manager. // Called on the main Coordinator goroutine. -func (c *Coordinator) setRuntimeManagerError(err error) { - c.runtimeMgrErr = err +func (c *Coordinator) setRuntimeUpdateError(err error) { + c.runtimeUpdateErr = err c.stateNeedsRefresh = true } @@ -107,14 +107,6 @@ func (c *Coordinator) setComponentGenError(err error) { c.stateNeedsRefresh = true } -// setRuntimeUpdateError updates the error state for sending a component model -// update to the runtime manager. -// Called on the main Coordinator goroutine. -func (c *Coordinator) setRuntimeUpdateError(err error) { - c.runtimeUpdateErr = err - c.stateNeedsRefresh = true -} - // setOverrideState is the internal helper to set the override state and // set stateNeedsRefresh. // Must be called on the main Coordinator goroutine. @@ -201,9 +193,6 @@ func (c *Coordinator) generateReportableState() (s State) { } else if c.runtimeUpdateErr != nil { s.State = agentclient.Failed s.Message = fmt.Sprintf("Runtime update failed: %s", c.runtimeUpdateErr.Error()) - } else if c.runtimeMgrErr != nil { - s.State = agentclient.Failed - s.Message = fmt.Sprintf("Runtime manager: %s", c.runtimeMgrErr.Error()) } else if c.configMgrErr != nil { s.State = agentclient.Failed s.Message = fmt.Sprintf("Config manager: %s", c.configMgrErr.Error()) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 074e5cf6e74..00004c42689 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -778,6 +778,8 @@ func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) { type fakeRuntimeManager struct { state []runtime.ComponentComponentState updateCallback func([]component.Component) error + result error + errChan chan error } func (r *fakeRuntimeManager) Run(ctx context.Context) error { @@ -787,11 +789,15 @@ func (r *fakeRuntimeManager) Run(ctx context.Context) error { func (r *fakeRuntimeManager) Errors() <-chan error { return nil } -func (r *fakeRuntimeManager) Update(model component.Model) error { +func (r *fakeRuntimeManager) Update(model component.Model) { + r.result = nil if r.updateCallback != nil { - return r.updateCallback(model.Components) + r.result = r.updateCallback(model.Components) + } + if r.errChan != nil { + // If a reporting channel is set, send the result to it + r.errChan <- r.result } - return nil } // State returns the current components model state. diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index fd4034f24c4..f645c24afb0 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -751,6 +751,7 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { logger := logp.NewLogger("testing") configChan := make(chan ConfigChange, 1) + updateErrChan := make(chan error, 1) const errorStr = "update failed for testing reasons" // Create a mocked runtime manager that always reports an error @@ -758,6 +759,7 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { updateCallback: func(comp []component.Component) error { return fmt.Errorf(errorStr) }, + errChan: updateErrChan, } coord := &Coordinator{ @@ -766,6 +768,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { stateBroadcaster: broadcaster.New(State{}, 0, 0), managerChans: managerChans{ configManagerUpdate: configChan, + // Give coordinator the same error channel we set on the runtime + // manager, so it receives the update result. + runtimeManagerError: updateErrChan, }, runtimeMgr: runtimeManager, vars: emptyVars(t), @@ -778,16 +783,19 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { configChan <- configChange coord.runLoopIteration(ctx) - // Make sure the failure was reported to the config manager - assert.True(t, configChange.failed, "Config change should report failure if the runtime manager returns an error") - require.Error(t, configChange.err, "Config change should get an error if runtime manager update fails") - assert.Contains(t, configChange.err.Error(), errorStr) + // Make sure the config change was acknowledged to the config manager + // (the failure is not reported here since it happens asynchronously; it + // will appear in the coordinator state afterwards.) + assert.True(t, configChange.acked, "Config change should be acknowledged to the config manager") + assert.NoError(t, configChange.err, "Config change with async error should succeed") - // Make sure the error is saved in Coordinator.runtimeUpdateErr + // Now do another run loop iteration to let the update error propagate, + // and make sure it is reported correctly. + coord.runLoopIteration(ctx) require.Error(t, coord.runtimeUpdateErr, "Runtime update failure should be saved in runtimeUpdateErr") assert.Equal(t, errorStr, coord.runtimeUpdateErr.Error(), "runtimeUpdateErr should match the error reported by the runtime manager") - // Make sure the error is reported in Coordinator state. + // Make sure the error appears in the Coordinator state. state := coord.State() assert.Equal(t, agentclient.Failed, state.State, "Failed policy update should cause failed Coordinator") assert.Contains(t, state.Message, errorStr, "Failed policy update should be reported in Coordinator state message") diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 8462ac3c17e..d724017902a 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -93,16 +93,12 @@ type Manager struct { baseLogger *logger.Logger ca *authority.CertificateAuthority listenAddr string + listenPort int agentInfo *info.AgentInfo tracer *apm.Tracer monitor MonitoringManager grpcConfig *configuration.GRPCConfig - // netMx synchronizes the access to listener and server only - netMx sync.RWMutex - listener net.Listener - server *grpc.Server - // Set when the RPC server is ready to receive requests, for use by tests. serverReady *atomic.Bool @@ -110,6 +106,10 @@ type Manager struct { // only one call to update occurs at a time updateMx sync.Mutex + // updateChan forwards component model updates from the public Update method + // to the internal run loop. + updateChan chan component.Model + // currentMx protects access to the current map only currentMx sync.RWMutex current map[string]*componentRuntimeState @@ -123,10 +123,9 @@ type Manager struct { errCh chan error - // upon creation the Manager is neither running not shutting down, thus both - // flags are needed. - running atomic.Bool - shuttingDown atomic.Bool + // doneChan is closed when Manager is shutting down to signal that any + // pending requests should be canceled. + doneChan chan struct{} } // NewManager creates a new manager. @@ -153,10 +152,12 @@ func NewManager( current: make(map[string]*componentRuntimeState), shipperConns: make(map[string]*shipperConn), subscriptions: make(map[string][]*Subscription), + updateChan: make(chan component.Model), errCh: make(chan error), monitor: monitor, grpcConfig: grpcConfig, serverReady: atomic.NewBool(false), + doneChan: make(chan struct{}), } return m, nil } @@ -169,16 +170,11 @@ func NewManager( // // Blocks until the context is done. func (m *Manager) Run(ctx context.Context) error { - m.running.Store(true) - m.shuttingDown.Store(false) - - lis, err := net.Listen("tcp", m.listenAddr) + listener, err := net.Listen("tcp", m.listenAddr) if err != nil { return fmt.Errorf("error starting tcp listener for runtime manager: %w", err) } - m.netMx.Lock() - m.listener = lis - m.netMx.Unlock() + m.listenPort = listener.Addr().(*net.TCPAddr).Port certPool := x509.NewCertPool() if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok { @@ -205,87 +201,76 @@ func (m *Manager) Run(ctx context.Context) error { grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), ) } - m.netMx.Lock() - m.server = server - m.netMx.Unlock() - proto.RegisterElasticAgentServer(m.server, m) + proto.RegisterElasticAgentServer(server, m) // start serving GRPC connections - var wg sync.WaitGroup - wg.Add(1) + var wgServer sync.WaitGroup + wgServer.Add(1) go func() { - defer wg.Done() - m.serverReady.Store(true) - for { - err := server.Serve(lis) - if err != nil { - m.logger.Errorf("control protocol failed: %s", err) - } - if ctx.Err() != nil { - // context has an error don't start again - return + defer wgServer.Done() + go m.serverLoop(ctx, listener, server) + }() + +RUNLOOP: + for ctx.Err() == nil { + select { + case <-ctx.Done(): + // Signal that we will not accept any more updates + break RUNLOOP + case model := <-m.updateChan: + // New component model. This is an external update coming from a + // policy change, so we set tearDown to true. + err := m.update(model, true) + + // When update is done, send its result back to the coordinator, unless we're shutting down. + select { + case m.errCh <- err: + case <-ctx.Done(): } } - }() + } + close(m.doneChan) - <-ctx.Done() - m.running.Store(false) - m.shuttingDown.Store(true) + // Notify components to shutdown and wait for their response m.shutdown() + // Close the rpc listener and wait for serverLoop to return + listener.Close() + wgServer.Wait() + + // Cancel any remaining connections server.Stop() - wg.Wait() - m.netMx.Lock() - m.listener = nil - m.server = nil - m.netMx.Unlock() return ctx.Err() } +func (m *Manager) serverLoop(ctx context.Context, listener net.Listener, server *grpc.Server) { + m.serverReady.Store(true) + for ctx.Err() == nil { + err := server.Serve(listener) + if err != nil { + m.logger.Errorf("control protocol listener failed: %s", err) + } + } +} + // Errors returns channel that errors are reported on. func (m *Manager) Errors() <-chan error { return m.errCh } -// Update updates the currComp state of the running components. +// Update forwards a new component model to Manager's run loop. +// When it has been processed, a result will be sent on Manager's +// error channel. // Called from the main Coordinator goroutine. // -// This returns as soon as possible, the work is performed in the background. -func (m *Manager) Update(model component.Model) error { - shuttingDown := m.shuttingDown.Load() - if shuttingDown { - // ignore any updates once shutdown started - return nil - } - // teardown is true because the public `Update` method would be coming directly from - // policy so if a component was removed it needs to be torn down. - return m.update(model, true) -} - -// State returns the current component states. -func (m *Manager) State() []ComponentComponentState { - m.currentMx.RLock() - defer m.currentMx.RUnlock() - states := make([]ComponentComponentState, 0, len(m.current)) - for _, crs := range m.current { - var legacyPID string - if crs.runtime != nil { - if commandRuntime, ok := crs.runtime.(*commandRuntime); ok { - if commandRuntime != nil { - procInfo := commandRuntime.proc - if procInfo != nil { - legacyPID = fmt.Sprint(commandRuntime.proc.PID) - } - } - } - } - states = append(states, ComponentComponentState{ - Component: crs.getCurrent(), - State: crs.getLatest(), - LegacyPID: legacyPID, - }) +// If calling from a test, you should read from errCh afterwards to avoid +// blocking Manager's main loop. +func (m *Manager) Update(model component.Model) { + select { + case m.updateChan <- model: + case <-m.doneChan: + // Manager is shutting down, ignore the udpate } - return states } // PerformAction executes an action on a unit. @@ -891,13 +876,7 @@ func (m *Manager) getRuntimeFromComponent(comp component.Component) *componentRu func (m *Manager) getListenAddr() string { addr := strings.SplitN(m.listenAddr, ":", 2) if len(addr) == 2 && addr[1] == "0" { - m.netMx.RLock() - lis := m.listener - m.netMx.RUnlock() - if lis != nil { - port := lis.Addr().(*net.TCPAddr).Port - return fmt.Sprintf("%s:%d", addr[0], port) - } + return fmt.Sprintf("%s:%d", addr[0], m.listenPort) } return m.listenAddr } diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index bb83de6fd04..444f0724cc2 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -145,7 +145,8 @@ func TestManager_SimpleComponentErr(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -237,7 +238,8 @@ func TestManager_FakeInput_StartStop(t *testing.T) { subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) } else if unit.State == client.UnitStateHealthy { // remove the component which will stop it - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -260,7 +262,8 @@ func TestManager_FakeInput_StartStop(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -384,7 +387,8 @@ func TestManager_FakeInput_Features(t *testing.T) { Fqdn: &proto.FQDNFeature{Enabled: true}, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -439,7 +443,8 @@ func TestManager_FakeInput_Features(t *testing.T) { "message": "Fake Healthy", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { t.Logf("error updating component state to health: %v", err) @@ -459,7 +464,8 @@ func TestManager_FakeInput_Features(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -609,7 +615,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: initialAPMConfig, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -653,7 +660,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: modifiedAPMConfig, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -692,7 +700,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: nil, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -741,7 +750,8 @@ func TestManager_FakeInput_APM(t *testing.T) { "message": "Fake Healthy", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { t.Logf("error updating component state to health: %v", err) @@ -761,7 +771,8 @@ func TestManager_FakeInput_APM(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -902,9 +913,10 @@ func TestManager_FakeInput_Limits(t *testing.T) { GoMaxProcs: 101, }, } - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -917,9 +929,10 @@ func TestManager_FakeInput_Limits(t *testing.T) { assert.Equal(t, uint64(101), componentState.Component.Limits.GoMaxProcs) comp.Component = nil - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -945,7 +958,8 @@ func TestManager_FakeInput_Limits(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -1063,9 +1077,10 @@ func TestManager_FakeShipper_Limits(t *testing.T) { GoMaxProcs: 101, }, } - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -1078,9 +1093,10 @@ func TestManager_FakeShipper_Limits(t *testing.T) { assert.Equal(t, uint64(101), componentState.Component.Limits.GoMaxProcs) comp.Component = nil - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -1106,7 +1122,8 @@ func TestManager_FakeShipper_Limits(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -1222,7 +1239,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { } unitBad = false - err := m.Update(component.Model{Components: []component.Component{updatedComp}}) + m.Update(component.Model{Components: []component.Component{updatedComp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1250,7 +1268,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { } } else if unit.State == client.UnitStateHealthy { // bad unit is now healthy; stop the component - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1274,7 +1293,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1365,7 +1385,8 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { endTimer := time.NewTimer(30 * time.Second) defer endTimer.Stop() - err = m.Update(component.Model{Components: []component.Component{healthyComp}}) + m.Update(component.Model{Components: []component.Component{healthyComp}}) + err = <-m.errCh require.NoError(t, err) // nextState tracks the stage of the test. We expect the sequence @@ -1395,7 +1416,8 @@ LOOP: if unit.State == client.UnitStateHealthy { // good unit is healthy; now make it bad t.Logf("marking good-input as having a hard-error for config") - err := m.Update(component.Model{Components: []component.Component{unhealthyComp}}) + m.Update(component.Model{Components: []component.Component{unhealthyComp}}) + err := <-m.errCh require.NoError(t, err, "Component model update should succeed") // We next expect to transition to Failed @@ -1414,7 +1436,8 @@ LOOP: if unit.State == client.UnitStateFailed { // Reached the expected state, now send an empty component model // to stop everything. - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh require.NoError(t, err, "Component model update should succeed") nextState = client.UnitStateStopped } else { @@ -1523,7 +1546,8 @@ func TestManager_FakeInput_NoDeadlock(t *testing.T) { } i += 1 comp = updatedComp - err := m.Update(component.Model{Components: []component.Component{updatedComp}}) + m.Update(component.Model{Components: []component.Component{updatedComp}}) + err := <-m.errCh if err != nil { updatedErr <- err return @@ -1564,7 +1588,8 @@ LOOP: case <-endTimer.C: // no deadlock after timeout (all good stop the component) updatedCancel() - _ = m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + <-errCh // Don't care about the result of Update, just that it runs break LOOP case err := <-errCh: require.NoError(t, err) @@ -1656,7 +1681,8 @@ func TestManager_FakeInput_Configure(t *testing.T) { "state": int(client.UnitStateDegraded), "message": "Fake Degraded", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1679,7 +1705,8 @@ func TestManager_FakeInput_Configure(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1797,7 +1824,8 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { } else if unit1.State == client.UnitStateHealthy { // unit1 is healthy lets remove it from the component comp.Units = comp.Units[0:1] - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh if err != nil { subErrCh <- err } @@ -1832,7 +1860,8 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1956,7 +1985,8 @@ func TestManager_FakeInput_ActionState(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2091,7 +2121,8 @@ func TestManager_FakeInput_Restarts(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2208,7 +2239,8 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { "message": "Fake Healthy", "kill": rp[1], }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -2233,7 +2265,8 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(1 * time.Minute) @@ -2347,7 +2380,8 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { "message": fmt.Sprintf("Fake Healthy %d", lastStoppedCount), "kill_on_interval": true, }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -2375,7 +2409,8 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(1 * time.Minute) @@ -2493,7 +2528,8 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2610,7 +2646,8 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2807,7 +2844,8 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) { defer drainErrChan(subErrCh1) defer drainErrChan(subErrCh2) - err = m.Update(component.Model{Components: components}) + m.Update(component.Model{Components: components}) + err = <-m.errCh require.NoError(t, err) count := 0 @@ -2963,7 +3001,8 @@ func TestManager_FakeInput_LogLevel(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -3177,7 +3216,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err = <-m.errCh if err != nil { subErrCh <- err } @@ -3206,7 +3246,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -3241,7 +3282,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -3266,7 +3308,8 @@ func TestManager_FakeShipper(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: comps}) + m.Update(component.Model{Components: comps}) + err = <-m.errCh require.NoError(t, err) timeout := 2 * time.Minute @@ -3458,11 +3501,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { stateProgressionWG.Done() }() - // Wait manager start running, then check if any error happened - assert.Eventually(t, - func() bool { return m.running.Load() }, - 500*time.Millisecond, - 10*time.Millisecond) + err = waitForReady(waitCtx, m) + require.NoError(t, err, "Manager must finish initializing") select { case err := <-errCh: @@ -3471,7 +3511,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { } time.Sleep(100 * time.Millisecond) - err = m.Update(component.Model{Components: components}) + m.Update(component.Model{Components: components}) + err = <-m.errCh require.NoError(t, err) updateSleep := 300 * time.Millisecond @@ -3480,7 +3521,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { updateSleep = time.Second } time.Sleep(updateSleep) - err = m.Update(component.Model{Components: components2}) + m.Update(component.Model{Components: components2}) + err = <-m.errCh require.NoError(t, err) count := 0 From 830fbc61fd67dba9f32351887b9c2ffd85771dba Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 10 Nov 2023 18:48:37 -0500 Subject: [PATCH 2/7] remove updateMx, clean up connection error logs --- pkg/component/runtime/manager.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index d724017902a..d4cc2fb6f7b 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -102,10 +102,6 @@ type Manager struct { // Set when the RPC server is ready to receive requests, for use by tests. serverReady *atomic.Bool - // updateMx protects the call to update to ensure that - // only one call to update occurs at a time - updateMx sync.Mutex - // updateChan forwards component model updates from the public Update method // to the internal run loop. updateChan chan component.Model @@ -247,7 +243,10 @@ func (m *Manager) serverLoop(ctx context.Context, listener net.Listener, server m.serverReady.Store(true) for ctx.Err() == nil { err := server.Serve(listener) - if err != nil { + if err != nil && ctx.Err() == nil { + // Only log an error if we aren't shutting down, otherwise we'll spam + // the logs with "use of closed network connection" for a connection that + // was closed on purpose. m.logger.Errorf("control protocol listener failed: %s", err) } } @@ -643,13 +642,10 @@ func (m *Manager) Actions(server proto.ElasticAgent_ActionsServer) error { } // update updates the current state of the running components. +// It is only called by the main runtime manager goroutine in Manager.Run. // // This returns as soon as possible, work is performed in the background. func (m *Manager) update(model component.Model, teardown bool) error { - // ensure that only one `update` can occur at the same time - m.updateMx.Lock() - defer m.updateMx.Unlock() - // prepare the components to add consistent shipper connection information between // the connected components in the model err := m.connectShippers(model.Components) From 67288f9ef1b6dea3312dbdda94baed62c4b70f0f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 13 Nov 2023 10:59:29 -0500 Subject: [PATCH 3/7] fix channel ref in test --- pkg/component/runtime/manager_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index 444f0724cc2..51bb941bca6 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -1468,12 +1468,10 @@ LOOP: } func TestManager_FakeInput_NoDeadlock(t *testing.T) { - /* - NOTE: This is a long-running test that spams the runtime managers `Update` function to try and - trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: + // NOTE: This is a long-running test that spams the runtime managers `Update` function to try and + // trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: + // https://github.com/elastic/elastic-agent/issues/2691 - https://github.com/elastic/elastic-agent/issues/2691 - */ testPaths(t) ctx, cancel := context.WithCancel(context.Background()) @@ -1589,7 +1587,7 @@ LOOP: // no deadlock after timeout (all good stop the component) updatedCancel() m.Update(component.Model{Components: []component.Component{}}) - <-errCh // Don't care about the result of Update, just that it runs + <-m.errCh // Don't care about the result of Update, just that it runs break LOOP case err := <-errCh: require.NoError(t, err) From ecc3cff91f6bef4771f9bfd8983b9550bd7ef858 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 15 Nov 2023 13:37:49 -0500 Subject: [PATCH 4/7] Fix the update synchronization for real --- pkg/component/runtime/manager.go | 129 +++++++++++++++++++++++-------- 1 file changed, 95 insertions(+), 34 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index d4cc2fb6f7b..e65d91cdc62 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -106,6 +106,24 @@ type Manager struct { // to the internal run loop. updateChan chan component.Model + // Component model update is run asynchronously and pings this channel when + // finished, so the runtime manager loop knows it's safe to advance to the + // next update without ever having to block on the result. + updateDoneChan chan struct{} + + // Next component model update that will be applied, in case we get one + // while a previous update is still in progress. If we get more than one, + // keep only the most recent. + // Only access from the main runtime manager goroutine. + nextUpdate *component.Model + + // Whether we're already waiting on the results of an update call. + // If this is true when the run loop finishes, we need to wait for the + // final update result before shutting down, otherwise the shutdown's + // update call will conflict. + // Only access from the main runtime manager goroutine. + updateInProgress bool + // currentMx protects access to the current map only currentMx sync.RWMutex current map[string]*componentRuntimeState @@ -139,21 +157,22 @@ func NewManager( return nil, err } m := &Manager{ - logger: logger, - baseLogger: baseLogger, - ca: ca, - listenAddr: listenAddr, - agentInfo: agentInfo, - tracer: tracer, - current: make(map[string]*componentRuntimeState), - shipperConns: make(map[string]*shipperConn), - subscriptions: make(map[string][]*Subscription), - updateChan: make(chan component.Model), - errCh: make(chan error), - monitor: monitor, - grpcConfig: grpcConfig, - serverReady: atomic.NewBool(false), - doneChan: make(chan struct{}), + logger: logger, + baseLogger: baseLogger, + ca: ca, + listenAddr: listenAddr, + agentInfo: agentInfo, + tracer: tracer, + current: make(map[string]*componentRuntimeState), + shipperConns: make(map[string]*shipperConn), + subscriptions: make(map[string][]*Subscription), + updateChan: make(chan component.Model), + updateDoneChan: make(chan struct{}), + errCh: make(chan error), + monitor: monitor, + grpcConfig: grpcConfig, + serverReady: atomic.NewBool(false), + doneChan: make(chan struct{}), } return m, nil } @@ -207,25 +226,9 @@ func (m *Manager) Run(ctx context.Context) error { go m.serverLoop(ctx, listener, server) }() -RUNLOOP: - for ctx.Err() == nil { - select { - case <-ctx.Done(): - // Signal that we will not accept any more updates - break RUNLOOP - case model := <-m.updateChan: - // New component model. This is an external update coming from a - // policy change, so we set tearDown to true. - err := m.update(model, true) - - // When update is done, send its result back to the coordinator, unless we're shutting down. - select { - case m.errCh <- err: - case <-ctx.Done(): - } - } - } - close(m.doneChan) + // Start the run loop, which continues on the main goroutine + // until the context is canceled. + m.runLoop(ctx) // Notify components to shutdown and wait for their response m.shutdown() @@ -239,6 +242,64 @@ RUNLOOP: return ctx.Err() } +// The main run loop for the runtime manager, whose responsibilities are: +// - Accept component model updates from the Coordinator +// - Apply those updates safely without ever blocking, because a block here +// propagates to a block in the Coordinator +// - Close doneChan when the loop ends, so the Coordinator knows not to send +// any more updates +func (m *Manager) runLoop(ctx context.Context) { +LOOP: + for ctx.Err() == nil { + select { + case <-ctx.Done(): + break LOOP + case model := <-m.updateChan: + // Mark the new model as the next update, overwriting any previously + // pending values. + m.nextUpdate = &model + case <-m.updateDoneChan: + // An update call has finished, we can initiate another when available. + m.updateInProgress = false + } + + // After each select call, check if there's a pending update that + // can be applied. + if m.nextUpdate != nil && !m.updateInProgress { + // There is a component model update available, apply it. + go func(model component.Model) { + // Run the update with tearDown set to true since this is coming + // from a user-initiated policy update + result := m.update(model, true) + + // When update is done, send its result back to the coordinator, + // unless we're shutting down. + select { + case m.errCh <- result: + case <-ctx.Done(): + } + // Signal the runtime manager that we're finished. Note that + // we don't select on ctx.Done() in this case because the runtime + // manager always reads the results of an update once initiated, + // even if it is shutting down. + m.updateDoneChan <- struct{}{} + }(*m.nextUpdate) + m.updateInProgress = true + m.nextUpdate = nil + } + } + // Signal that the run loop is ended to unblock any incoming messages. + close(m.doneChan) + + if m.updateInProgress { + // Wait for the existing update to finish before shutting down, + // otherwise the new update call closing everything will + // conflict. + <-m.updateDoneChan + m.updateInProgress = false + } +} + func (m *Manager) serverLoop(ctx context.Context, listener net.Listener, server *grpc.Server) { m.serverReady.Store(true) for ctx.Err() == nil { From 2ed71087608e62b41dfb3501b764f9d26c41a68a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 15 Nov 2023 13:59:10 -0500 Subject: [PATCH 5/7] add some comments --- pkg/component/runtime/manager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index e65d91cdc62..db532938a64 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -255,8 +255,8 @@ LOOP: case <-ctx.Done(): break LOOP case model := <-m.updateChan: - // Mark the new model as the next update, overwriting any previously - // pending values. + // We got a new component model from m.Update(), mark it as the + // next update to apply, overwriting any previous pending value. m.nextUpdate = &model case <-m.updateDoneChan: // An update call has finished, we can initiate another when available. @@ -289,6 +289,8 @@ LOOP: } } // Signal that the run loop is ended to unblock any incoming messages. + // We need to do this before waiting on the final update result, otherwise + // it might be stuck trying to send the result to errCh. close(m.doneChan) if m.updateInProgress { From 57503ff6a1a85d4f08e40ec1df16ded149526d97 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 16 Nov 2023 07:27:37 -0500 Subject: [PATCH 6/7] Update pkg/component/runtime/manager.go Co-authored-by: Shaunak Kashyap --- pkg/component/runtime/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index db532938a64..8ec37ac0a2a 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -331,7 +331,7 @@ func (m *Manager) Update(model component.Model) { select { case m.updateChan <- model: case <-m.doneChan: - // Manager is shutting down, ignore the udpate + // Manager is shutting down, ignore the update } } From 81bbba044b35bc8f89c4f1660a29c57c082b6e58 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 16 Nov 2023 07:27:50 -0500 Subject: [PATCH 7/7] Update pkg/component/runtime/manager.go Co-authored-by: Shaunak Kashyap --- pkg/component/runtime/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 8ec37ac0a2a..850349406d6 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -270,12 +270,12 @@ LOOP: go func(model component.Model) { // Run the update with tearDown set to true since this is coming // from a user-initiated policy update - result := m.update(model, true) + err := m.update(model, true) // When update is done, send its result back to the coordinator, // unless we're shutting down. select { - case m.errCh <- result: + case m.errCh <- err: case <-ctx.Done(): } // Signal the runtime manager that we're finished. Note that