From 135ea2bb6a60c8a02c76fcf6f170ae74d73e89da Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 21 Sep 2023 12:52:58 -0400 Subject: [PATCH] Code cleanup for command checkins (#3376) * Small cleanups * more cleanups --- pkg/component/runtime/command.go | 38 +++++++++----- pkg/component/runtime/manager.go | 16 ++---- pkg/component/runtime/runtime.go | 6 +++ pkg/component/runtime/state.go | 87 +++++++++++++++----------------- 4 files changed, 77 insertions(+), 70 deletions(-) diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index b68faa685d2..4facc93df3d 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -73,13 +73,31 @@ type commandRuntime struct { current component.Component monitor MonitoringManager - ch chan ComponentState + // ch is the reporting channel for the current state. When a policy change + // or client checkin affects the component state, its new value is sent + // here and handled by (componentRuntimeState).runLoop. + ch chan ComponentState + + // When the managed process closes, its termination status is sent on procCh + // by the watching goroutine, and handled by (*commandRuntime).Run. + procCh chan procState + + // compCh forwards new component metadata from the runtime manager to + // the command runtime. It is written by calls to (*commandRuntime).Update + // and read by the run loop in (*commandRuntime).Run. + compCh chan component.Component + + // The most recent mode received on actionCh. The mode will be either + // actionStart (indicating the process should be running, and should be + // created if it is not), or actionStop or actionTeardown (indicating that + // it should terminate). + actionState actionMode + + // actionState is changed by sending its new value on actionCh, where it is + // handled by (*commandRuntime).Run. actionCh chan actionMode - procCh chan procState - compCh chan component.Component - actionState actionMode - proc *process.Info + proc *process.Info state ComponentState lastCheckin time.Time @@ -204,14 +222,10 @@ func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error { } else { // running and should be running now := time.Now().UTC() - if c.lastCheckin.IsZero() { - // never checked-in - c.missedCheckins++ - } else if now.Sub(c.lastCheckin) > checkinPeriod { - // missed check-in during required period - c.missedCheckins++ - } else if now.Sub(c.lastCheckin) <= checkinPeriod { + if now.Sub(c.lastCheckin) <= checkinPeriod { c.missedCheckins = 0 + } else { + c.missedCheckins++ } if c.missedCheckins == 0 { c.compState(client.UnitStateHealthy) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index a799e899670..42824794aea 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -268,7 +268,6 @@ func (m *Manager) State() []ComponentComponentState { defer m.currentMx.RUnlock() states := make([]ComponentComponentState, 0, len(m.current)) for _, crs := range m.current { - crs.latestMx.RLock() var legacyPID string if crs.runtime != nil { if commandRuntime, ok := crs.runtime.(*commandRuntime); ok { @@ -282,10 +281,9 @@ func (m *Manager) State() []ComponentComponentState { } states = append(states, ComponentComponentState{ Component: crs.getCurrent(), - State: crs.latestState.Copy(), + State: crs.getLatest(), LegacyPID: legacyPID, }) - crs.latestMx.RUnlock() } return states } @@ -484,9 +482,7 @@ func (m *Manager) Subscribe(ctx context.Context, componentID string) *Subscripti comp, ok := m.current[componentID] m.currentMx.RUnlock() if ok { - comp.latestMx.RLock() - latestState := comp.latestState.Copy() - comp.latestMx.RUnlock() + latestState := comp.getLatest() go func() { select { case <-ctx.Done(): @@ -532,9 +528,7 @@ func (m *Manager) SubscribeAll(ctx context.Context) *SubscriptionAll { m.currentMx.RLock() latest := make([]ComponentComponentState, 0, len(m.current)) for _, comp := range m.current { - comp.latestMx.RLock() - latest = append(latest, ComponentComponentState{Component: comp.getCurrent(), State: comp.latestState.Copy()}) - comp.latestMx.RUnlock() + latest = append(latest, ComponentComponentState{Component: comp.getCurrent(), State: comp.getLatest()}) } m.currentMx.RUnlock() if len(latest) > 0 { @@ -759,9 +753,7 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) { timeoutCh := time.After(timeout) for { - comp.latestMx.RLock() - latestState := comp.latestState - comp.latestMx.RUnlock() + latestState := comp.getLatest() if latestState.State == client.UnitStateStopped { return } diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index a69aeea8d14..a04eec804bb 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -176,6 +176,12 @@ func (s *componentRuntimeState) setCurrent(current component.Component) { s.currCompMx.Unlock() } +func (s *componentRuntimeState) getLatest() ComponentState { + s.latestMx.RLock() + defer s.latestMx.RUnlock() + return s.latestState.Copy() +} + func (s *componentRuntimeState) start() error { return s.runtime.Start() } diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index cf52edd9bc6..37392b6d4bc 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -102,7 +102,10 @@ func newComponentState(comp *component.Component) (s ComponentState) { s.expectedFeaturesIdx = 1 s.expectedComponentIdx = 1 - s.syncComponent(comp) + // Merge initial component state. + s.syncExpected(comp) + s.syncUnits(comp) + return s } @@ -131,15 +134,6 @@ func (s *ComponentState) Copy() (c ComponentState) { return c } -func (s *ComponentState) syncComponent(comp *component.Component) bool { - changed := s.syncExpected(comp) - s.syncUnits(comp) - if changed { - return true - } - return s.unsettled() -} - func (s *ComponentState) syncExpected(comp *component.Component) bool { changed := false touched := make(map[ComponentUnitKey]bool) @@ -151,34 +145,34 @@ func (s *ComponentState) syncExpected(comp *component.Component) bool { } touched[key] = true - existing, ok := s.expectedUnits[key] + expected, ok := s.expectedUnits[key] if ok { - if existing.logLevel != unit.LogLevel { - existing.logLevel = unit.LogLevel + if expected.logLevel != unit.LogLevel { + expected.logLevel = unit.LogLevel changed = true } - if !gproto.Equal(existing.config, unit.Config) { - existing.config = unit.Config - existing.configStateIdx++ + if !gproto.Equal(expected.config, unit.Config) { + expected.config = unit.Config + expected.configStateIdx++ changed = true } } else { - existing.state = client.UnitStateHealthy - existing.logLevel = unit.LogLevel - existing.config = unit.Config - existing.configStateIdx = 1 + expected.state = client.UnitStateHealthy + expected.logLevel = unit.LogLevel + expected.config = unit.Config + expected.configStateIdx = 1 changed = true } - if !errors.Is(existing.err, unit.Err) { - existing.err = unit.Err - if existing.err != nil { - existing.state = client.UnitStateFailed + if !errors.Is(expected.err, unit.Err) { + expected.err = unit.Err + if expected.err != nil { + expected.state = client.UnitStateFailed } changed = true } - s.expectedUnits[key] = existing + s.expectedUnits[key] = expected } for key, unit := range s.expectedUnits { @@ -320,27 +314,28 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { } for key, unit := range s.Units { - _, ok := touched[key] - if !ok { - unit.unitState = client.UnitStateStarting - unit.unitMessage = "" - unit.unitPayload = nil - unit.configStateIdx = 0 - if unit.err != nil { - errMsg := unit.err.Error() - if unit.State != client.UnitStateFailed || unit.Message != errMsg || diffPayload(unit.Payload, nil) { - changed = true - unit.State = client.UnitStateFailed - unit.Message = errMsg - unit.Payload = nil - } - } else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped { - if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) { - changed = true - unit.State = client.UnitStateFailed - unit.Message = missingMsg - unit.Payload = nil - } + // Look for units that weren't in the checkin. + if _, ok := touched[key]; ok { + continue + } + unit.unitState = client.UnitStateStarting + unit.unitMessage = "" + unit.unitPayload = nil + unit.configStateIdx = 0 + if unit.err != nil { + errMsg := unit.err.Error() + if unit.State != client.UnitStateFailed || unit.Message != errMsg || diffPayload(unit.Payload, nil) { + changed = true + unit.State = client.UnitStateFailed + unit.Message = errMsg + unit.Payload = nil + } + } else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped { + if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) { + changed = true + unit.State = client.UnitStateFailed + unit.Message = missingMsg + unit.Payload = nil } }